在hive中,有时候一些内置的函数,和普通的查询操作已经满足不了我们要查询的要求,这时候可以自己写一些自定义函数来处理。自定义函数(user defined function =UDF)
由于hive本身是用java语言开发,所以udf必须用java来写才可以。
Hive中有三种UDF
1. 普通udf(UDF)
操作单个数据行,且产生一个数据作为输出。例如(数学函数,字符串函数)
2. 聚合udf (UDAF)
接受多个数据行,并产生一个数据行作为输出。例如(COUNT,MAX函数等)
3. 表生成UDF(UDTF)
接受一个数据行,然后返回产生多个数据行(一个表作为输出)。比如lateral view(据说是一个将行转成列的函数)。
编写UDF
编写UDF必须满足一下:
1. 必须是org.apache.hadoop.hive.ql.exec.UDF的子类
2. 必须实现evaluate函数。
1. strip UDF
java code
package com.hcr.hadoop.hive;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class Strip extends UDF {
public Text evaluate(String str) {
return str == null ? null : new Text(StringUtils.strip(str));
}
public Text evaluate(String str,String chrStr) {
return str == null ? null : new Text(StringUtils.strip(str,chrStr));
}
}
写完代码打成jar包hcr.jar
hive> add jar/root/hcr/tmp/hcr.jar;
Added /root/hcr/tmp/hcr.jar to class path
Added resource: /root/hcr/tmp/hcr.jar
声明strip函数
create temporary function strip as 'com.hcr.hadoop.hive.Strip';
select year,strip(name,'ruishen') from records2;
原数据
1990 ruishenh0
1992 ruishenh2
1991 ruishenh1
1993 ruishenh3
1994 ruishenh4
1995 ruishenh5
1996 ruishenh6
1997 ruishenh7
1998 ruishenh8
1990 0
1992 2
1991 1
1993 3
1994 4
1995 5
1996 6
1997 7
1998 8
编写UDAF
编写UDAF必须满足一下:
1. 必须是org.apache.hadoop.hive.ql.exec.UDAF的子类
2. 且包含一个或多个嵌套的,实现了org.apache.hadoop.hive.ql.exec.UDAFEvaluator的静态类
3. 如果是计算函数必须包含如下5个函数
a) Init()
初始化计算函数和内部数据结构状态等。
b) Iterate()
每一个新值调用聚集计算时都会调用这个函数。计算函数要聚集计算的结果更新其内部状态,iterate函数接受的参数和hive中被调用函数的参数是对应的。
c) terminatePartial()
这个就是取计算到当前(局部)的时候的数据对象函数。(比如1-10。计算5的时候要调用一下这个函数查看一下当前的内部结构对象也就是1-5的聚合结果)
d) merge()
在hive决定要合并一个部分聚集值和另一个部分聚集值是会调用merge()方法,该方法接受一个对象输入,这个对象的类型必须和terminatePartial()返回的一致。
e) terminate()
hive需要最终聚集结果时会调用terminate方法,计算函数需要把状态作为一个值返回。
实例UDAF(一)
package com.hcr.hadoop.hive;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
public class Maxinum extends UDAF {
public static class MaxinumIntUDAFEvaluator implements UDAFEvaluator {
private IntWritable result;
@Override
public void init() {
result = null;
}
public boolean iterate(IntWritable value) {
if (value == null) {
return true;
}
if (result == null) {
result = new IntWritable(value.get());
} else {
System.out.println("result:"+result+",value:"+value);
result.set(Math.max(result.get(), value.get()));
}
return true;
}
public IntWritable terminatePartial() {
return result;
}
public boolean merge(IntWritable value) {
System.out.println("merge-result:"+result);
return iterate(value);
}
public IntWritable terminate() {
System.out.println("terminate-result:"+result);
return result;
}
}
}
hive> add jar /root/hcr/tmp/hcr.jar;
Added /root/hcr/tmp/hcr.jar to class path
Added resource: /root/hcr/tmp/hcr.jar
hive> create temporary function maxinum as 'com.hcr.hadoop.hive.Maxinum';
OK
Time taken: 0.192 seconds
hive> select maxinum(temperature) fromrecords;
58
实例UDAF(二)
package com.hcr.hadoop.hive;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
public class Mean extends UDAF {
public static class MeanDoubleUDAFEvaluator implements UDAFEvaluator {
public static class PartialResult {
double sum;
long count;
}
private PartialResult partial;
@Override
public void init() {
partial = null;
}
public boolean iterate(DoubleWritable value) {
if (value == null) {
return true;
}
if (partial == null) {
partial = new PartialResult();
}
partial.sum += value.get();
partial.count++;
return true;
}
public PartialResult terminatePartial() {
return partial;
}
public boolean merge(PartialResult value) {
if (value == null) {
return true;
}
if (partial == null) {
partial = new PartialResult();
}
partial.sum += value.sum;
partial.count += value.count;
return true;
}
public DoubleWritable terminate() {
if (partial == null)
return null;
return new DoubleWritable(partial.sum / partial.count);
}
}
}
hive> add jar /root/hcr/tmp/hcr.jar;
Added /root/hcr/tmp/hcr.jar to class path
Added resource: /root/hcr/tmp/hcr.jar
hive> create temporary function mean as'com.hcr.hadoop.hive.Mean';
OK
Time taken: 0.198 seconds
hive>ALTER TABLE records CHANGE COLUMN temperature temperature double;
hive> select mean(temperature) from records;
OK
43.916666666666664
Time taken: 20.769 seconds
hive>
关于很多系统内置的UDAF函数可以在下边这个类中查看注册的UDF和UDAF等
org.apache.hadoop.hive.ql.exec.FunctionRegistry
参照hive内部的写法
先写一个resolver类,然后便写evaluator(GenericUDAFEvaluator的子类)类
参靠:
http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。