Hive提供灵活的接口,以使用户能更灵活地处理数据。可扩展接口分为三种:UDF、UDTF、和UDAF
UDF是最常用到的接口,用来处理字段并返回一个单一的值。
接口类型:
UDF提供两种接口:
简单API org.apache.hadoop.hive.ql.exec.UDF
UDF接口可以用来读取并返回初级类型。这里说的初级类型指的是hadoop和hive的可写类型:Text, IntWritable, LongWritable,
DoubleWritable等。
复杂API org.apache.hadoop.hive.ql.udf.generic.GenericUDF
GenericUDF接口可以用来处理内嵌的数据结构,像Map, List和Set.
使用举例(简单API):
某字段由多组数据组成,数据间由逗号分隔,此UDF返回此字段内所有数据之和
package com.dokia.hive.MyUDF;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class SumComma extends UDF{
public Text evaluate(final Text s) {
if (s == null) return null;
String[] nums = s.toString().split(",");
long sum = 0;
for (String num : nums) {
try {
long data = Long.parseLong(num);
sum += data;
} catch (NumberFormatException ex) {}
}
return new Text(String.valueOf(sum));
}
}
将此类打包成SumComma.jar,并加载到hive中,有三种方法:
临时添加UDF,回话结束后函数自动销毁,每次新回话需要add jar并且create temporary function
> add jar SumComma.jar;
> create temporary function sum_comma as 'com.dokia.hive.MyUDF.SumComma';
然后就可以在sql中使用函数sum_comma了。
也可以将上述命令通过文件导入hive,获得的同样是临时函数
$ hive -i hive_init;
自定义UDF注册为hive内置函数,比较危险,而且需要编译hive,不建议
参考资料: hive利器 自定义UDF+重编译hive
UDF中需要注意字段为null的情况,因为在数据库中null字段还是很常见的。
使用举例(复杂API):
GenericUDF接口的继承需要实现三个方法:
// 处理字段的方法
abstract Object evaluate evaluate(GenericUDF.DeferredObject[] arguments);
// 返回一个字符串,表示函数名称
abstract String getDisplayString(String[] children);
// 在处理字段之前检测字段的类型
abstract ObjectInspector initialize(ObjectInspector[] arguments);
Maven依赖
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
UDTF可以输出多行多列
使用UDTF需要实现接口 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
, 并实现其三个方法:
// 判别输入输出格式
abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
// 处理输入数据,并输出处理后的结构
abstract void process(Object[] record) throws HiveException;
// 通知UDTF数据已经处理完
abstruct void close() throws HiveException;
Hive sql使用接口:
# 从people表格中获取name, 并将其拆分为name和surname两部分输出
SELECT
adTable.name,
adTable.surname
FROM people
lateral view process_names(name) adTable as name, surname;
UDAF可以一次处理一整个列的数据,并进行聚合操作.(类似于sum()和count())
使用UDAF需要实现两个接口:
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
前者检查输入参数,确定需要使用的reslover的类型,后者是处理数据逻辑的主要部分,需要实现以下几个方法:
// 确定输入输出数据的类型
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
// 存储数据处理结果 (中间结果和最终结果)
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
// 重新设置聚合buffer
public void reset(AggregationBuffer agg) throws HiveException;
// 从输入表格中读数据 a typical Mapper
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
// 处理部分数据
public Object terminalPartial(AggregationBuffer agg) throws HiveException;
// 把部分聚合结果相加
public void merge(AggregationBuffer agg, Object partial) throws HiveException;
// 输出最终结果 the Reducer
public Object terminate(AggregationBuffer agg) throws HiveException;
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。