spark scala基础知识汇总


前段时间搞了一阵spark scala,处理一个APP大半年的日志。本意是将日志格式化,挖掘其中有用的信息,尽管后来变成了数据统计。但这段时间确实学习了spark scala,知道了这么一个大数据处理工具。本文将一些基本用法记下来。

个人体会,spark是相对于MapReduce更高层次的抽象。使用MapReduce时,需要将每个任务拆分成Map和Reduce过程,在处理连续任务时,整个流程比较复杂。我在初次使用spark时,产生了如同使用matlab那般的想法,操作对象是数据集,有两种操作:转换(transformation)和动作(action)。spark完全封装了数据的分布、调度问题,而且充分利用内存的特性,通过把中间数据直接保存在内存中来提高性能。程序员只需要关注如何将任务拆分成两种变换即可。

读写文件 

val lines = sc.textFile("file:///path_to_local/file")
val lines = sc.textFile("hdfs:///path_to_hdfs/file")
rdd.saveAsTextFile("hdfs://")

  
spark可以直接读写文本文件,无论本地或者HDFS,无论是单个文本还是目录,还可以对目录进行正则匹配。得到的结果是以行为单位的文本集合,上述lines就是文本的行的集合。
如果是parquet格式文件,可以用下面的办法,得到一个DataFrame,同样可以识别本地及hdfs文件,也可以识别目录及正则。
val parquetFile = sqlContext.read.parquet("people.parquet")
df.write.save("temp.parquet")
如果是JSON格式文件,其读写办法如下:
val df = sqlContext.read.json("path to json file")
val df = sqlContext.read.format("json").load("path to file")
df.write.format("json").save("path to save")

spark操作:转换和动作

spark里面一个核心的概念是RDD,简而言之,就是一个可以操作的数据集合。如果你有matlab编程的经验,你会比较习惯spark的编程思维,即对集合进行运算。spark支持两种RDD操作,转换和动作。转换是对集合每个元素进行某种运算,得到一个新的集合。动作是对集合所有元素进行相同运算,最后聚合成一个元素。
下面是一个用spark统计字符数的例子例子:
val lines = sc.textFile("data.txt")            //读文件,得到以行字符串为单位的RDD
val lineLengths = lines.map(s => s.length) //转换,将字符串元素映射为其长度
val totalLength = lineLengths.reduce((a, b) => a + b) //动作,将所有元素加起来

最常用的转换操作有两个:map和filter,map(func)是将func应用到所有元素,得到一个新的RDD。filter是将func返回为true的元素过滤出来,组成一个新的RDD。一些比较常用的转换如下:

map(func)  返回一个新的分布式数据集,将数据源的每一个元素传递给函数 func 映射组成。
filter(func)  返回一个新的数据集,从数据源中选中一些元素通过函数 func 返回 true。
flatMap(func)  类似于 map,但是每个输入项能被映射成多个输出项(所以 func 必须返回一个 Seq,而不是单个 item)。
union(otherDataset)   两个RDD求并集
intersection(otherDataset)   两个RDD求交集
groupByKey()  作用于(K,V)的数据集,依据K对值进行归并,返回一个(K, Iterable)
reduceByKey(func) 作用于(K,V)的数据集,依据K对值使用func进行归约,返回一个(K,V)数据集
sortByKey([asending])  返回一个依据K进行排序的数据集

最常用的动作就是reduce,将数据集归约为一个结果。一些比较常用的动作如下:
reduce(func)  按照func函数对数据集进行归约,func接受两个参数,返回一个结果,须满足结合律和交换律,以便于分布式计算。
count() 返回数据集的元素个数
first() 返回第一个元素
take(n)  以数组形式返回集合的前n个元素
saveAsTextFile(path) 将数据集保存为文本文件

一个单词统计的例子

val textFile = sc.textFile("hdfs://...")                //读取hdfs文件,转换为以行为单位的文本集合
val counts = textFile.flatMap(line => line.split(" ")) //转换,将行字符串转换为单词,组成新的RDD
.map(word => (word, 1)) //转换,将单词转换为词频统计
.reduceByKey(_ + _) //转换,根据key值进行归约
counts.saveAsTextFile("hdfs://...") //保存
在上面的代码中,下划线表示临时的变量。

自定义转换函数

spark基本语法可以支持很多常用的转换操作。但是在实际的业务场景中,对转换的需求是比较复杂的。这就需要自定义转换函数。举个例子,后台日志中,以tag:123的形式记录了某个含义的字段。现在,要把这个数值全部提取出来。写法如下:
object test {
def fetch_tag(line : String) : Int = {
try {
val regex = "tag:([0-9]+)".r
val regex(tag) = line
tag.toInt
}
catch {
case ex: Exception => 0
}
}
}
lines.map(test.fetch_tag(_)).reduce(_+_)

通过test.fetch_tag函数将字符串中的要求值正则出来,最后返回一个值的数据集,然后进行归约。

通过以上基本功能的组合,就能写出处理复杂业务的工具了。

智能推荐

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告