前段时间搞了一阵spark scala,处理一个APP大半年的日志。本意是将日志格式化,挖掘其中有用的信息,尽管后来变成了数据统计。但这段时间确实学习了spark scala,知道了这么一个大数据处理工具。本文将一些基本用法记下来。
个人体会,spark是相对于MapReduce更高层次的抽象。使用MapReduce时,需要将每个任务拆分成Map和Reduce过程,在处理连续任务时,整个流程比较复杂。我在初次使用spark时,产生了如同使用matlab那般的想法,操作对象是数据集,有两种操作:转换(transformation)和动作(action)。spark完全封装了数据的分布、调度问题,而且充分利用内存的特性,通过把中间数据直接保存在内存中来提高性能。程序员只需要关注如何将任务拆分成两种变换即可。
val lines = sc.textFile("file:///path_to_local/file")
val lines = sc.textFile("hdfs:///path_to_hdfs/file")
rdd.saveAsTextFile("hdfs://")
val parquetFile = sqlContext.read.parquet("people.parquet")
df.write.save("temp.parquet")
val df = sqlContext.read.json("path to json file")
val df = sqlContext.read.format("json").load("path to file")
df.write.format("json").save("path to save")
val lines = sc.textFile("data.txt") //读文件,得到以行字符串为单位的RDD
val lineLengths = lines.map(s => s.length) //转换,将字符串元素映射为其长度
val totalLength = lineLengths.reduce((a, b) => a + b) //动作,将所有元素加起来
val textFile = sc.textFile("hdfs://...") //读取hdfs文件,转换为以行为单位的文本集合
val counts = textFile.flatMap(line => line.split(" ")) //转换,将行字符串转换为单词,组成新的RDD
.map(word => (word, 1)) //转换,将单词转换为词频统计
.reduceByKey(_ + _) //转换,根据key值进行归约
counts.saveAsTextFile("hdfs://...") //保存
object test {
def fetch_tag(line : String) : Int = {
try {
val regex = "tag:([0-9]+)".r
val regex(tag) = line
tag.toInt
}
catch {
case ex: Exception => 0
}
}
}
lines.map(test.fetch_tag(_)).reduce(_+_)
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。