Hive可扩展接口UDF


Hive 可扩展接口学习笔记


Hive提供灵活的接口,以使用户能更灵活地处理数据。可扩展接口分为三种:UDF、UDTF、和UDAF

UDF

UDF是最常用到的接口,用来处理字段并返回一个单一的值。

  1. 接口类型:

    UDF提供两种接口:

    1. 简单API org.apache.hadoop.hive.ql.exec.UDF

      UDF接口可以用来读取并返回初级类型。这里说的初级类型指的是hadoop和hive的可写类型:Text, IntWritable, LongWritable,
      DoubleWritable等。

    2. 复杂API org.apache.hadoop.hive.ql.udf.generic.GenericUDF

      GenericUDF接口可以用来处理内嵌的数据结构,像Map, List和Set.

  2. 使用举例(简单API):

    某字段由多组数据组成,数据间由逗号分隔,此UDF返回此字段内所有数据之和

    package com.dokia.hive.MyUDF;
    
    import org.apache.hadoop.hive.ql.exec.UDF;
    import org.apache.hadoop.io.Text;
    
    public class SumComma extends UDF{
        public Text evaluate(final Text s) {
            if (s == null) return null;
            String[] nums = s.toString().split(",");
            long sum = 0;
            for (String num : nums) {
                try {
                    long data = Long.parseLong(num);
                    sum += data;
                } catch (NumberFormatException ex) {}
            }
            return new Text(String.valueOf(sum));
        }
    }
    

    将此类打包成SumComma.jar,并加载到hive中,有三种方法:

    • 临时添加UDF,回话结束后函数自动销毁,每次新回话需要add jar并且create temporary function

      > add jar SumComma.jar;
      > create temporary function sum_comma as 'com.dokia.hive.MyUDF.SumComma';
      

      然后就可以在sql中使用函数sum_comma了。

    • 也可以将上述命令通过文件导入hive,获得的同样是临时函数

      $ hive -i hive_init;
      
    • 自定义UDF注册为hive内置函数,比较危险,而且需要编译hive,不建议

      参考资料: hive利器 自定义UDF+重编译hive

    UDF中需要注意字段为null的情况,因为在数据库中null字段还是很常见的。

  3. 使用举例(复杂API):

    GenericUDF接口的继承需要实现三个方法:

    // 处理字段的方法
    abstract Object evaluate evaluate(GenericUDF.DeferredObject[] arguments);
    // 返回一个字符串,表示函数名称
    abstract String getDisplayString(String[] children);
    // 在处理字段之前检测字段的类型
    abstract ObjectInspector initialize(ObjectInspector[] arguments);
    
  4. Maven依赖

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
    </dependency>   
    

UDTF

UDTF可以输出多行多列

使用UDTF需要实现接口 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF, 并实现其三个方法:

// 判别输入输出格式
abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
// 处理输入数据,并输出处理后的结构
abstract void process(Object[] record) throws HiveException;
// 通知UDTF数据已经处理完
abstruct void close() throws HiveException;

Hive sql使用接口:

# 从people表格中获取name, 并将其拆分为name和surname两部分输出
SELECT
    adTable.name,
    adTable.surname
FROM people
    lateral view process_names(name) adTable as name, surname;

UDAF

UDAF可以一次处理一整个列的数据,并进行聚合操作.(类似于sum()和count())

使用UDAF需要实现两个接口:

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

前者检查输入参数,确定需要使用的reslover的类型,后者是处理数据逻辑的主要部分,需要实现以下几个方法:

// 确定输入输出数据的类型
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
// 存储数据处理结果 (中间结果和最终结果)
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
// 重新设置聚合buffer
public void reset(AggregationBuffer agg) throws HiveException;
// 从输入表格中读数据 a typical Mapper
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
// 处理部分数据
public Object terminalPartial(AggregationBuffer agg) throws HiveException;
// 把部分聚合结果相加
public void merge(AggregationBuffer agg, Object partial) throws HiveException;
// 输出最终结果 the Reducer
public Object terminate(AggregationBuffer agg) throws HiveException;

参考文档:

Apache Hive Customization Tutorial Series

Hive Extension Examples


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告