Spark Sql提供了丰富的内置函数供猿友们使用,辣为何还要用户自定义函数呢?实际的业务场景可能很复杂,内置函数hold不住,所以spark sql提供了可扩展的内置函数接口:哥们,你的业务太变态了,我满足不了你,自己按照我的规范去定义一个sql函数,该怎么折腾就怎么折腾!
这里还是先以Scala实现一个简单的hello world级别的小样为例,来体验udf与udaf的使用好了。
将如下数组:
val bigData = Array("Spark","Hadoop","Flink","Spark","Hadoop","Flink", "Spark","Hadoop","Flink","Spark","Hadoop","Flink")
中的字符分组聚合并计算出每个字符的长度及字符出现的个数。正常结果
如下:
+------+-----+------+ | name|count|length| +------+-----+------+ | Spark| 4| 5| | Flink| 4| 5| |Hadoop| 4| 6| +------+-----+------+
注:‘spark’ 这个字符的长度为5 ,共出现了4次。
package com.hand.datasafe import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext, SparkSession} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType /** * Spark SQL UDAF:user defined aggregation function * UDF: 函数的输入是一条具体的数据记录,实现上讲就是普通的scala函数-只不过需要注册 * UDAF:用户自定义的聚合函数,函数本身作用于数据集合,能够在具体操作的基础上进行自定义操作 */ object SparkSQLUDF { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("datasafe").master("local").getOrCreate() val bigData = Array("Spark", "Hadoop", "Flink", "Spark", "Hadoop", "Flink", "Spark", "Hadoop", "Flink", "Spark", "Hadoop", "Flink") val bigDataRDD = spark.sparkContext.parallelize(bigData) val bigDataRowRDD: RDD[Row] = bigDataRDD.map(line => Row(line)) val structType = StructType(Array(StructField("name", StringType, true))) val bigDataDF = spark.createDataFrame(bigDataRowRDD, structType) bigDataDF.printSchema() bigDataDF.createTempView("bigDataTable") /* * 通过saprk注册UDF,在scala2.1.x版本UDF函数最多可以接受22个输入参数 */ spark.udf.register("computeLength", (input: String) => input.length) spark.sql("select name,computeLength(name) as length from bigDataTable").show //while(true){} spark.udf.register("wordCount", new MyUDAF) spark.sql("select name,wordCount(name) as count,computeLength(name) as length from bigDataTable group by name ").show spark.sql("select name,wordCount(name) as count,computeLength(name) as length from bigDataTable group by name ").printSchema() } }
package com.hand.datasafe import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ /** * 用户自定义函数 */ class MyUDAF extends UserDefinedAggregateFunction { /** * 指定具体的输入数据的类型 * 自段名称随意:Users can choose names to identify the input arguments - 这里可以是“name”,或者其他任意串 */ override def inputSchema:StructType = StructType(Array(StructField("name",StringType,true))) /** * 在进行聚合操作的时候所要处理的数据的中间结果类型 */ override def bufferSchema:StructType = StructType(Array(StructField("count",IntegerType,true))) /** * 返回类型 */ override def dataType:DataType = IntegerType /** * whether given the same input, * always return the same output * true: yes */ override def deterministic:Boolean = true /** * Initializes the given aggregation buffer */ override def initialize(buffer:MutableAggregationBuffer):Unit = {buffer(0)=0} /** * 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算 * 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner */ override def update(buffer:MutableAggregationBuffer,input:Row):Unit={ buffer(0) = buffer.getInt(0)+1 } /** * 最后在分布式节点进行local reduce完成后需要进行全局级别的merge操作 */ override def merge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={ buffer1(0) = buffer1.getInt(0)+buffer2.getInt(0) } /** * 返回UDAF最后的计算结果 */ override def evaluate(buffer:Row):Any = buffer.getInt(0) }
呼叫spark大神升级udaf实现
为了自己实现一个sql聚合函数,我需要继承UserDefinedAggregateFunction并实现8个抽象方法!8个方法啊!what’s a disaster ! 然而,要想在sql中完成符合特定业务场景的聚合类(a = aggregation)功能,就得udaf。
怎么理解MutableAggregationBuffer呢?就是存储中间结果的,聚合就意味着多条记录的累加等操作。
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。