第67课:SparkSQL下案例综合实战学习笔记
1 SparkSQL案例分析
2 通过Java和Scala实现案例
本课直接通过实战练习SparkSQL下的Join操作:
先用Java编写代码:
package SparkSQLByJava;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
public class SparkSQLwithJoin {
public static void main(String[]args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLwithJoin");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
//针对json文件数据源来创建DataFrame
DataFrame peoplesDF = sqlContext.read().json("D:\\DT-IMF\\testdata\\peoples.json");
//基于Json构建的DataFrame来注册临时表
peoplesDF.registerTempTable("peopleScores");
//查询出分数大于90的人
DataFrame excellentScoresDF = sqlContext.sql("select name,score from peopleScores where score >90");
/**
* 在DataFrame的基础上转化成为RDD,通过Map操作计算出分数大于90的所有人的姓名
*/
List<String> execellentScoresNameList =excellentScoresDF.javaRDD().map(new Function<Row, String>() {
@Override
public String call(Rowrow) throws Exception {
return row.getAs("name");
}
}).collect();
//动态组拼出JSON
List<String> peopleInformations = new ArrayList<String>();
peopleInformations.add("{\"name\":\"Michael\", \"age\":20}");
peopleInformations.add("{\"name\":\"Andy\", \"age\":17}");
peopleInformations.add("{\"name\":\"Justin\", \"age\":19}");
//通过内容为JSON的RDD来构造DataFrame
JavaRDD<String> peopleInformationsRDD =sc.parallelize(peopleInformations);
DataFrame peopleInformationsDF = sqlContext.read().json(peopleInformationsRDD);
//注册成为临时表
peopleInformationsDF.registerTempTable("peopleInformations");
String sqlText = "select name, age from peopleInformations where name in (";
for(int i =0;i < execellentScoresNameList.size(); i++){
sqlText += "'" + execellentScoresNameList.get(i) +"'";
if (i <execellentScoresNameList.size()-1){
sqlText += ",";
}
}
sqlText += ")";
DataFrame execellentNameAgeDF = sqlContext.sql(sqlText);
JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD =excellentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Rowrow) throws Exception {
return new Tuple2<String, Integer>(row.getAs("name"), (int)row.getLong(1));
}
}).join(execellentNameAgeDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Rowrow) throws Exception {
return new Tuple2<String, Integer>(row.getAs("name"), (int)row.getLong(1));
}
}));
JavaRDD<Row> reusltRowRDD = resultRDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
@Override
public Row call(Tuple2<String, Tuple2<Integer, Integer>>tuple) throws Exception {
// TODO Auto-generated method stub
return RowFactory.create(tuple._1,tuple._2._2,tuple._2._1 );
}
});
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType,true));
structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType,true));
//构建StructType,用于最后DataFrame元数据的描述
StructType structType =DataTypes.createStructType(structFields);
DataFrame personsDF = sqlContext.createDataFrame(reusltRowRDD,structType);
personsDF.show();
personsDF.write().format("json").save("D:\\DT-IMF\\testdata\\peopleresult");
}
}
在eclipse中运行时的console:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/04/08 00:01:27 INFO SparkContext: Running Spark version 1.6.0
16/04/08 00:01:29 INFO SecurityManager: Changing view acls to: think
16/04/08 00:01:29 INFO SecurityManager: Changing modify acls to: think
16/04/08 00:01:29 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)
16/04/08 00:01:32 INFO Utils: Successfully started service 'sparkDriver' on port 52189.
16/04/08 00:01:33 INFO Slf4jLogger: Slf4jLogger started
16/04/08 00:01:33 INFO Remoting: Starting remoting
16/04/08 00:01:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:52202]
16/04/08 00:01:34 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 52202.
16/04/08 00:01:34 INFO SparkEnv: Registering MapOutputTracker
16/04/08 00:01:34 INFO SparkEnv: Registering BlockManagerMaster
16/04/08 00:01:34 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-0efb49ea-2c12-4819-8543-efcad6cbe9ee
16/04/08 00:01:34 INFO MemoryStore: MemoryStore started with capacity 1773.8 MB
16/04/08 00:01:35 INFO SparkEnv: Registering OutputCommitCoordinator
16/04/08 00:01:36 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/04/08 00:01:36 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/04/08 00:01:36 INFO Executor: Starting executor ID driver on host localhost
16/04/08 00:01:36 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52209.
16/04/08 00:01:36 INFO NettyBlockTransferService: Server created on 52209
16/04/08 00:01:36 INFO BlockManagerMaster: Trying to register BlockManager
16/04/08 00:01:36 INFO BlockManagerMasterEndpoint: Registering block manager localhost:52209 with 1773.8 MB RAM, BlockManagerId(driver, localhost, 52209)
16/04/08 00:01:36 INFO BlockManagerMaster: Registered BlockManager
16/04/08 00:01:40 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:d401:a5b5:2103:6d13%eth8, but we couldn't find any external IP address!
16/04/08 00:01:41 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peoples.json on driver
16/04/08 00:01:42 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 208.9 KB, free 208.9 KB)
16/04/08 00:01:43 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.4 KB, free 228.3 KB)
16/04/08 00:01:43 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:52209 (size: 19.4 KB, free: 1773.7 MB)
16/04/08 00:01:43 INFO SparkContext: Created broadcast 0 from json at SparkSQLwithJoin.java:28
16/04/08 00:01:43 INFO FileInputFormat: Total input paths to process : 1
16/04/08 00:01:43 INFO SparkContext: Starting job: json at SparkSQLwithJoin.java:28
16/04/08 00:01:43 INFO DAGScheduler: Got job 0 (json at SparkSQLwithJoin.java:28) with 1 output partitions
16/04/08 00:01:43 INFO DAGScheduler: Final stage: ResultStage 0 (json at SparkSQLwithJoin.java:28)
16/04/08 00:01:43 INFO DAGScheduler: Parents of final stage: List()
16/04/08 00:01:43 INFO DAGScheduler: Missing parents: List()
16/04/08 00:01:43 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at json at SparkSQLwithJoin.java:28), which has no missing parents
16/04/08 00:01:44 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.3 KB, free 232.7 KB)
16/04/08 00:01:44 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 235.1 KB)
16/04/08 00:01:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:52209 (size: 2.4 KB, free: 1773.7 MB)
16/04/08 00:01:44 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/04/08 00:01:44 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at json at SparkSQLwithJoin.java:28)
16/04/08 00:01:44 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/04/08 00:01:44 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2139 bytes)
16/04/08 00:01:44 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/04/08 00:01:44 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91
16/04/08 00:01:44 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/04/08 00:01:44 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/04/08 00:01:44 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/04/08 00:01:44 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/04/08 00:01:44 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/04/08 00:01:47 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2847 bytes result sent to driver
16/04/08 00:01:47 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3109 ms on localhost (1/1)
16/04/08 00:01:47 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/04/08 00:01:47 INFO DAGScheduler: ResultStage 0 (json at SparkSQLwithJoin.java:28) finished in 3.246 s
16/04/08 00:01:47 INFO DAGScheduler: Job 0 finished: json at SparkSQLwithJoin.java:28, took 3.609467 s
16/04/08 00:01:49 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 61.8 KB, free 296.9 KB)
16/04/08 00:01:50 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.3 KB, free 316.2 KB)
16/04/08 00:01:50 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:52209 (size: 19.3 KB, free: 1773.7 MB)
16/04/08 00:01:50 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:52209 in memory (size: 2.4 KB, free: 1773.7 MB)
16/04/08 00:01:50 INFO SparkContext: Created broadcast 2 from javaRDD at SparkSQLwithJoin.java:38
16/04/08 00:01:50 INFO ContextCleaner: Cleaned accumulator 1
16/04/08 00:01:50 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:52209 in memory (size: 19.4 KB, free: 1773.7 MB)
16/04/08 00:01:50 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 208.9 KB, free 290.1 KB)
16/04/08 00:01:50 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 19.4 KB, free 309.5 KB)
16/04/08 00:01:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:52209 (size: 19.4 KB, free: 1773.7 MB)
16/04/08 00:01:50 INFO SparkContext: Created broadcast 3 from javaRDD at SparkSQLwithJoin.java:38
16/04/08 00:01:50 INFO FileInputFormat: Total input paths to process : 1
16/04/08 00:01:50 INFO SparkContext: Starting job: collect at SparkSQLwithJoin.java:45
16/04/08 00:01:50 INFO DAGScheduler: Got job 1 (collect at SparkSQLwithJoin.java:45) with 1 output partitions
16/04/08 00:01:50 INFO DAGScheduler: Final stage: ResultStage 1 (collect at SparkSQLwithJoin.java:45)
16/04/08 00:01:50 INFO DAGScheduler: Parents of final stage: List()
16/04/08 00:01:50 INFO DAGScheduler: Missing parents: List()
16/04/08 00:01:50 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at map at SparkSQLwithJoin.java:38), which has no missing parents
16/04/08 00:01:50 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.2 KB, free 317.7 KB)
16/04/08 00:01:50 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.5 KB, free 322.1 KB)
16/04/08 00:01:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:52209 (size: 4.5 KB, free: 1773.7 MB)
16/04/08 00:01:50 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006
16/04/08 00:01:50 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at map at SparkSQLwithJoin.java:38)
16/04/08 00:01:50 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/04/08 00:01:50 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2139 bytes)
16/04/08 00:01:50 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/04/08 00:01:50 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91
16/04/08 00:01:52 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:52209 in memory (size: 19.3 KB, free: 1773.7 MB)
16/04/08 00:01:52 INFO GenerateUnsafeProjection: Code generated in 1170.2809 ms
16/04/08 00:01:52 INFO GeneratePredicate: Code generated in 21.119108 ms
16/04/08 00:01:52 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2275 bytes result sent to driver
16/04/08 00:01:52 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1461 ms on localhost (1/1)
16/04/08 00:01:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/04/08 00:01:52 INFO DAGScheduler: ResultStage 1 (collect at SparkSQLwithJoin.java:45) finished in 1.465 s
16/04/08 00:01:52 INFO DAGScheduler: Job 1 finished: collect at SparkSQLwithJoin.java:45, took 1.506153 s
16/04/08 00:01:52 INFO SparkContext: Starting job: json at SparkSQLwithJoin.java:57
16/04/08 00:01:52 INFO DAGScheduler: Got job 2 (json at SparkSQLwithJoin.java:57) with 1 output partitions
16/04/08 00:01:52 INFO DAGScheduler: Final stage: ResultStage 2 (json at SparkSQLwithJoin.java:57)
16/04/08 00:01:52 INFO DAGScheduler: Parents of final stage: List()
16/04/08 00:01:52 INFO DAGScheduler: Missing parents: List()
16/04/08 00:01:52 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at json at SparkSQLwithJoin.java:57), which has no missing parents
16/04/08 00:01:52 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.5 KB, free 244.5 KB)
16/04/08 00:01:52 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 246.6 KB)
16/04/08 00:01:52 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:52209 (size: 2.1 KB, free: 1773.7 MB)
16/04/08 00:01:52 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
16/04/08 00:01:52 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at json at SparkSQLwithJoin.java:57)
16/04/08 00:01:52 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
16/04/08 00:01:52 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2169 bytes)
16/04/08 00:01:52 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
16/04/08 00:01:52 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1716 bytes result sent to driver
16/04/08 00:01:52 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 80 ms on localhost (1/1)
16/04/08 00:01:52 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
16/04/08 00:01:52 INFO DAGScheduler: ResultStage 2 (json at SparkSQLwithJoin.java:57) finished in 0.088 s
16/04/08 00:01:52 INFO DAGScheduler: Job 2 finished: json at SparkSQLwithJoin.java:57, took 0.163387 s
16/04/08 00:01:53 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 208.5 KB, free 455.1 KB)
16/04/08 00:01:53 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 19.3 KB, free 474.4 KB)
16/04/08 00:01:53 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:52209 (size: 19.3 KB, free: 1773.7 MB)
16/04/08 00:01:53 INFO SparkContext: Created broadcast 6 from javaRDD at SparkSQLwithJoin.java:82
16/04/08 00:01:53 INFO SparkContext: Starting job: show at SparkSQLwithJoin.java:111
16/04/08 00:01:53 INFO DAGScheduler: Registering RDD 14 (mapToPair at SparkSQLwithJoin.java:73)
16/04/08 00:01:53 INFO DAGScheduler: Registering RDD 19 (mapToPair at SparkSQLwithJoin.java:82)
16/04/08 00:01:53 INFO DAGScheduler: Got job 3 (show at SparkSQLwithJoin.java:111) with 1 output partitions
16/04/08 00:01:53 INFO DAGScheduler: Final stage: ResultStage 5 (show at SparkSQLwithJoin.java:111)
16/04/08 00:01:53 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3, ShuffleMapStage 4)
16/04/08 00:01:53 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 3, ShuffleMapStage 4)
16/04/08 00:01:53 INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[14] at mapToPair at SparkSQLwithJoin.java:73), which has no missing parents
16/04/08 00:01:53 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 8.4 KB, free 482.8 KB)
16/04/08 00:01:53 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 4.6 KB, free 487.4 KB)
16/04/08 00:01:53 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:52209 (size: 4.6 KB, free: 1773.7 MB)
16/04/08 00:01:53 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006
16/04/08 00:01:53 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (MapPartitionsRDD[14] at mapToPair at SparkSQLwithJoin.java:73)
16/04/08 00:01:53 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
16/04/08 00:01:53 INFO DAGScheduler: Submitting ShuffleMapStage 4 (MapPartitionsRDD[19] at mapToPair at SparkSQLwithJoin.java:82), which has no missing parents
16/04/08 00:01:53 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2128 bytes)
16/04/08 00:01:53 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
16/04/08 00:01:53 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 7.6 KB, free 495.1 KB)
16/04/08 00:01:53 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 4.2 KB, free 499.3 KB)
16/04/08 00:01:53 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:52209 (size: 4.2 KB, free: 1773.7 MB)
16/04/08 00:01:53 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91
16/04/08 00:01:53 INFO SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006
16/04/08 00:01:53 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 4 (MapPartitionsRDD[19] at mapToPair at SparkSQLwithJoin.java:82)
16/04/08 00:01:53 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
16/04/08 00:01:53 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 2467 bytes result sent to driver
16/04/08 00:01:53 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, partition 0,PROCESS_LOCAL, 2158 bytes)
16/04/08 00:01:53 INFO Executor: Running task 0.0 in stage 4.0 (TID 4)
16/04/08 00:01:53 INFO DAGScheduler: ShuffleMapStage 3 (mapToPair at SparkSQLwithJoin.java:73) finished in 0.216 s
16/04/08 00:01:53 INFO DAGScheduler: looking for newly runnable stages
16/04/08 00:01:53 INFO DAGScheduler: running: Set(ShuffleMapStage 4)
16/04/08 00:01:53 INFO DAGScheduler: waiting: Set(ResultStage 5)
16/04/08 00:01:53 INFO DAGScheduler: failed: Set()
16/04/08 00:01:53 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 213 ms on localhost (1/1)
16/04/08 00:01:53 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
16/04/08 00:01:53 INFO GeneratePredicate: Code generated in 36.313835 ms
16/04/08 00:01:53 INFO Executor: Finished task 0.0 in stage 4.0 (TID 4). 1372 bytes result sent to driver
16/04/08 00:01:53 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 107 ms on localhost (1/1)
16/04/08 00:01:53 INFO DAGScheduler: ShuffleMapStage 4 (mapToPair at SparkSQLwithJoin.java:82) finished in 0.235 s
16/04/08 00:01:53 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
16/04/08 00:01:53 INFO DAGScheduler: looking for newly runnable stages
16/04/08 00:01:53 INFO DAGScheduler: running: Set()
16/04/08 00:01:53 INFO DAGScheduler: waiting: Set(ResultStage 5)
16/04/08 00:01:53 INFO DAGScheduler: failed: Set()
16/04/08 00:01:53 INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[25] at show at SparkSQLwithJoin.java:111), which has no missing parents
16/04/08 00:01:54 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 5.5 KB, free 504.8 KB)
16/04/08 00:01:54 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 2.8 KB, free 507.6 KB)
16/04/08 00:01:54 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:52209 (size: 2.8 KB, free: 1773.7 MB)
16/04/08 00:01:54 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:1006
16/04/08 00:01:54 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[25] at show at SparkSQLwithJoin.java:111)
16/04/08 00:01:54 INFO TaskSchedulerImpl: Adding task set 5.0 with 1 tasks
16/04/08 00:01:54 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 5, localhost, partition 0,PROCESS_LOCAL, 1967 bytes)
16/04/08 00:01:54 INFO Executor: Running task 0.0 in stage 5.0 (TID 5)
16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 17 ms
16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/04/08 00:01:54 INFO Executor: Finished task 0.0 in stage 5.0 (TID 5). 1608 bytes result sent to driver
16/04/08 00:01:54 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 198 ms on localhost (1/1)
16/04/08 00:01:54 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool
16/04/08 00:01:54 INFO DAGScheduler: ResultStage 5 (show at SparkSQLwithJoin.java:111) finished in 0.198 s
16/04/08 00:01:54 INFO DAGScheduler: Job 3 finished: show at SparkSQLwithJoin.java:111, took 0.597677 s
+-------+---+-----+
| name|age|score|
+-------+---+-----+
|Michael| 20| 98|
| Andy| 17| 95|
+-------+---+-----+
16/04/08 00:01:54 INFO DefaultWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/04/08 00:01:54 INFO SparkContext: Starting job: save at SparkSQLwithJoin.java:113
16/04/08 00:01:54 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 143 bytes
16/04/08 00:01:54 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 143 bytes
16/04/08 00:01:54 INFO DAGScheduler: Got job 4 (save at SparkSQLwithJoin.java:113) with 1 output partitions
16/04/08 00:01:54 INFO DAGScheduler: Final stage: ResultStage 8 (save at SparkSQLwithJoin.java:113)
16/04/08 00:01:54 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 6, ShuffleMapStage 7)
16/04/08 00:01:54 INFO DAGScheduler: Missing parents: List()
16/04/08 00:01:54 INFO DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[24] at createDataFrame at SparkSQLwithJoin.java:109), which has no missing parents
16/04/08 00:01:54 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 66.7 KB, free 574.3 KB)
16/04/08 00:01:54 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 23.5 KB, free 597.8 KB)
16/04/08 00:01:54 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on localhost:52209 (size: 23.5 KB, free: 1773.7 MB)
16/04/08 00:01:54 INFO SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:1006
16/04/08 00:01:54 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (MapPartitionsRDD[24] at createDataFrame at SparkSQLwithJoin.java:109)
16/04/08 00:01:54 INFO TaskSchedulerImpl: Adding task set 8.0 with 1 tasks
16/04/08 00:01:54 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 6, localhost, partition 0,PROCESS_LOCAL, 1967 bytes)
16/04/08 00:01:54 INFO Executor: Running task 0.0 in stage 8.0 (TID 6)
16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/04/08 00:01:54 INFO DefaultWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/04/08 00:01:55 INFO FileOutputCommitter: Saved output of task 'attempt_201604080001_0008_m_000000_0' to file:/D:/DT-IMF/testdata/peopleresult/_temporary/0/task_201604080001_0008_m_000000
16/04/08 00:01:55 INFO SparkHadoopMapRedUtil: attempt_201604080001_0008_m_000000_0: Committed
16/04/08 00:01:55 INFO Executor: Finished task 0.0 in stage 8.0 (TID 6). 1165 bytes result sent to driver
16/04/08 00:01:55 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 6) in 523 ms on localhost (1/1)
16/04/08 00:01:55 INFO DAGScheduler: ResultStage 8 (save at SparkSQLwithJoin.java:113) finished in 0.523 s
16/04/08 00:01:55 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool
16/04/08 00:01:55 INFO DAGScheduler: Job 4 finished: save at SparkSQLwithJoin.java:113, took 0.617657 s
16/04/08 00:01:55 INFO DefaultWriterContainer: Job job_201604080001_0000 committed.
16/04/08 00:01:55 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peopleresult on driver
16/04/08 00:01:55 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peopleresult on driver
16/04/08 00:01:55 INFO SparkContext: Invoking stop() from shutdown hook
16/04/08 00:01:55 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/04/08 00:01:55 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/04/08 00:01:55 INFO MemoryStore: MemoryStore cleared
16/04/08 00:01:55 INFO BlockManager: BlockManager stopped
16/04/08 00:01:55 INFO BlockManagerMaster: BlockManagerMaster stopped
16/04/08 00:01:55 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/04/08 00:01:55 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/04/08 00:01:55 INFO SparkContext: Successfully stopped SparkContext
16/04/08 00:01:55 INFO ShutdownHookManager: Shutdown hook called
16/04/08 00:01:55 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/04/08 00:01:55 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-2762f4bc-6ffa-4744-86d7-c6a9b2a66082
运行结束时自动生成D:\\DT-IMF\\testdata\\peopleresult目录,并在目录下生成了结果:
{"name":"Michael","age":20,"score":98}
{"name":"Andy","age":17,"score":95}
下面用Scala重新编写此程序:
package SparkSQLByJava;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
public class SparkSQLwithJoin {
public static void main(String[]args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLwithJoin");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
//针对json文件数据源来创建DataFrame
DataFrame peoplesDF = sqlContext.read().json("D:\\DT-IMF\\testdata\\peoples.json");
//基于Json构建的DataFrame来注册临时表
peoplesDF.registerTempTable("peopleScores");
//查询出分数大于90的人
DataFrame excellentScoresDF = sqlContext.sql("select name,score from peopleScores where score >90");
/**
* 在DataFrame的基础上转化成为RDD,通过Map操作计算出分数大于90的所有人的姓名
*/
List<String> execellentScoresNameList =excellentScoresDF.javaRDD().map(new Function<Row, String>() {
@Override
public String call(Rowrow) throws Exception {
return row.getAs("name");
}
}).collect();
//动态组拼出JSON
List<String> peopleInformations = new ArrayList<String>();
peopleInformations.add("{\"name\":\"Michael\", \"age\":20}");
peopleInformations.add("{\"name\":\"Andy\", \"age\":17}");
peopleInformations.add("{\"name\":\"Justin\", \"age\":19}");
//通过内容为JSON的RDD来构造DataFrame
JavaRDD<String> peopleInformationsRDD =sc.parallelize(peopleInformations);
DataFrame peopleInformationsDF = sqlContext.read().json(peopleInformationsRDD);
//注册成为临时表
peopleInformationsDF.registerTempTable("peopleInformations");
String sqlText = "select name, age from peopleInformations where name in (";
for(int i =0;i < execellentScoresNameList.size(); i++){
sqlText += "'" + execellentScoresNameList.get(i) +"'";
if (i <execellentScoresNameList.size()-1){
sqlText += ",";
}
}
sqlText += ")";
DataFrame execellentNameAgeDF = sqlContext.sql(sqlText);
JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD =excellentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Rowrow) throws Exception {
return new Tuple2<String, Integer>(row.getAs("name"), (int)row.getLong(1));
}
}).join(execellentNameAgeDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Rowrow) throws Exception {
return new Tuple2<String, Integer>(row.getAs("name"), (int)row.getLong(1));
}
}));
JavaRDD<Row> reusltRowRDD = resultRDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
@Override
public Row call(Tuple2<String, Tuple2<Integer, Integer>>tuple) throws Exception {
// TODO Auto-generated method stub
return RowFactory.create(tuple._1,tuple._2._2,tuple._2._1 );
}
});
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType,true));
structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType,true));
//构建StructType,用于最后DataFrame元数据的描述
StructType structType =DataTypes.createStructType(structFields);
DataFrame personsDF = sqlContext.createDataFrame(reusltRowRDD,structType);
personsDF.show();
personsDF.write().format("json").save("D:\\DT-IMF\\testdata\\peopleresult");
}
}
运行:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/04/08 00:18:50 INFO SparkContext: Running Spark version 1.6.0
16/04/08 00:18:52 INFO SecurityManager: Changing view acls to: think
16/04/08 00:18:52 INFO SecurityManager: Changing modify acls to: think
16/04/08 00:18:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)
16/04/08 00:18:55 INFO Utils: Successfully started service 'sparkDriver' on port 52453.
16/04/08 00:18:57 INFO Slf4jLogger: Slf4jLogger started
16/04/08 00:18:57 INFO Remoting: Starting remoting
16/04/08 00:18:57 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:52466]
16/04/08 00:18:57 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 52466.
16/04/08 00:18:57 INFO SparkEnv: Registering MapOutputTracker
16/04/08 00:18:58 INFO SparkEnv: Registering BlockManagerMaster
16/04/08 00:18:58 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-e100d6ba-bb71-4beb-84f3-96b6801e4813
16/04/08 00:18:58 INFO MemoryStore: MemoryStore started with capacity 1773.8 MB
16/04/08 00:18:58 INFO SparkEnv: Registering OutputCommitCoordinator
16/04/08 00:18:59 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/04/08 00:18:59 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/04/08 00:19:00 INFO Executor: Starting executor ID driver on host localhost
16/04/08 00:19:00 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52473.
16/04/08 00:19:00 INFO NettyBlockTransferService: Server created on 52473
16/04/08 00:19:00 INFO BlockManagerMaster: Trying to register BlockManager
16/04/08 00:19:00 INFO BlockManagerMasterEndpoint: Registering block manager localhost:52473 with 1773.8 MB RAM, BlockManagerId(driver, localhost, 52473)
16/04/08 00:19:00 INFO BlockManagerMaster: Registered BlockManager
16/04/08 00:19:04 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:d401:a5b5:2103:6d13%eth8, but we couldn't find any external IP address!
16/04/08 00:19:05 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peoples.json on driver
16/04/08 00:19:07 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 208.9 KB, free 208.9 KB)
16/04/08 00:19:07 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.4 KB, free 228.3 KB)
16/04/08 00:19:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:52473 (size: 19.4 KB, free: 1773.7 MB)
16/04/08 00:19:07 INFO SparkContext: Created broadcast 0 from json at SparkSQLwithJoin.java:28
16/04/08 00:19:08 INFO FileInputFormat: Total input paths to process : 1
16/04/08 00:19:08 INFO SparkContext: Starting job: json at SparkSQLwithJoin.java:28
16/04/08 00:19:08 INFO DAGScheduler: Got job 0 (json at SparkSQLwithJoin.java:28) with 1 output partitions
16/04/08 00:19:08 INFO DAGScheduler: Final stage: ResultStage 0 (json at SparkSQLwithJoin.java:28)
16/04/08 00:19:08 INFO DAGScheduler: Parents of final stage: List()
16/04/08 00:19:08 INFO DAGScheduler: Missing parents: List()
16/04/08 00:19:08 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at json at SparkSQLwithJoin.java:28), which has no missing parents
16/04/08 00:19:08 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.3 KB, free 232.7 KB)
16/04/08 00:19:08 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 235.1 KB)
16/04/08 00:19:08 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:52473 (size: 2.4 KB, free: 1773.7 MB)
16/04/08 00:19:08 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/04/08 00:19:08 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at json at SparkSQLwithJoin.java:28)
16/04/08 00:19:08 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/04/08 00:19:08 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2139 bytes)
16/04/08 00:19:08 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/04/08 00:19:09 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91
16/04/08 00:19:09 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/04/08 00:19:09 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/04/08 00:19:09 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/04/08 00:19:09 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/04/08 00:19:09 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/04/08 00:19:11 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2847 bytes result sent to driver
16/04/08 00:19:12 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3208 ms on localhost (1/1)
16/04/08 00:19:12 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/04/08 00:19:12 INFO DAGScheduler: ResultStage 0 (json at SparkSQLwithJoin.java:28) finished in 3.313 s
16/04/08 00:19:12 INFO DAGScheduler: Job 0 finished: json at SparkSQLwithJoin.java:28, took 3.652331 s
16/04/08 00:19:14 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 61.8 KB, free 296.9 KB)
16/04/08 00:19:14 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.3 KB, free 316.2 KB)
16/04/08 00:19:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:52473 (size: 19.3 KB, free: 1773.7 MB)
16/04/08 00:19:14 INFO SparkContext: Created broadcast 2 from javaRDD at SparkSQLwithJoin.java:38
16/04/08 00:19:14 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:52473 in memory (size: 2.4 KB, free: 1773.7 MB)
16/04/08 00:19:14 INFO ContextCleaner: Cleaned accumulator 1
16/04/08 00:19:14 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:52473 in memory (size: 19.4 KB, free: 1773.7 MB)
16/04/08 00:19:14 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 208.9 KB, free 290.1 KB)
16/04/08 00:19:14 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 19.4 KB, free 309.5 KB)
16/04/08 00:19:14 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:52473 (size: 19.4 KB, free: 1773.7 MB)
16/04/08 00:19:14 INFO SparkContext: Created broadcast 3 from javaRDD at SparkSQLwithJoin.java:38
16/04/08 00:19:15 INFO FileInputFormat: Total input paths to process : 1
16/04/08 00:19:15 INFO SparkContext: Starting job: collect at SparkSQLwithJoin.java:45
16/04/08 00:19:15 INFO DAGScheduler: Got job 1 (collect at SparkSQLwithJoin.java:45) with 1 output partitions
16/04/08 00:19:15 INFO DAGScheduler: Final stage: ResultStage 1 (collect at SparkSQLwithJoin.java:45)
16/04/08 00:19:15 INFO DAGScheduler: Parents of final stage: List()
16/04/08 00:19:15 INFO DAGScheduler: Missing parents: List()
16/04/08 00:19:15 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at map at SparkSQLwithJoin.java:38), which has no missing parents
16/04/08 00:19:15 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.2 KB, free 317.7 KB)
16/04/08 00:19:15 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.5 KB, free 322.1 KB)
16/04/08 00:19:15 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:52473 (size: 4.5 KB, free: 1773.7 MB)
16/04/08 00:19:15 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006
16/04/08 00:19:15 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at map at SparkSQLwithJoin.java:38)
16/04/08 00:19:15 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/04/08 00:19:15 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2139 bytes)
16/04/08 00:19:15 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/04/08 00:19:15 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91
16/04/08 00:19:16 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:52473 in memory (size: 19.3 KB, free: 1773.7 MB)
16/04/08 00:19:16 INFO GenerateUnsafeProjection: Code generated in 986.327423 ms
16/04/08 00:19:16 INFO GeneratePredicate: Code generated in 18.72276 ms
16/04/08 00:19:16 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2275 bytes result sent to driver
16/04/08 00:19:16 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1228 ms on localhost (1/1)
16/04/08 00:19:16 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/04/08 00:19:16 INFO DAGScheduler: ResultStage 1 (collect at SparkSQLwithJoin.java:45) finished in 1.233 s
16/04/08 00:19:16 INFO DAGScheduler: Job 1 finished: collect at SparkSQLwithJoin.java:45, took 1.278198 s
16/04/08 00:19:16 INFO SparkContext: Starting job: json at SparkSQLwithJoin.java:57
16/04/08 00:19:16 INFO DAGScheduler: Got job 2 (json at SparkSQLwithJoin.java:57) with 1 output partitions
16/04/08 00:19:16 INFO DAGScheduler: Final stage: ResultStage 2 (json at SparkSQLwithJoin.java:57)
16/04/08 00:19:16 INFO DAGScheduler: Parents of final stage: List()
16/04/08 00:19:16 INFO DAGScheduler: Missing parents: List()
16/04/08 00:19:16 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at json at SparkSQLwithJoin.java:57), which has no missing parents
16/04/08 00:19:16 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.5 KB, free 244.5 KB)
16/04/08 00:19:16 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 246.6 KB)
16/04/08 00:19:16 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:52473 (size: 2.1 KB, free: 1773.7 MB)
16/04/08 00:19:16 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
16/04/08 00:19:16 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at json at SparkSQLwithJoin.java:57)
16/04/08 00:19:16 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
16/04/08 00:19:16 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2169 bytes)
16/04/08 00:19:16 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
16/04/08 00:19:16 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1716 bytes result sent to driver
16/04/08 00:19:16 INFO DAGScheduler: ResultStage 2 (json at SparkSQLwithJoin.java:57) finished in 0.119 s
16/04/08 00:19:16 INFO DAGScheduler: Job 2 finished: json at SparkSQLwithJoin.java:57, took 0.163232 s
16/04/08 00:19:16 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 118 ms on localhost (1/1)
16/04/08 00:19:16 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
16/04/08 00:19:17 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 208.5 KB, free 455.1 KB)
16/04/08 00:19:17 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 19.3 KB, free 474.4 KB)
16/04/08 00:19:17 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:52473 (size: 19.3 KB, free: 1773.7 MB)
16/04/08 00:19:17 INFO SparkContext: Created broadcast 6 from javaRDD at SparkSQLwithJoin.java:82
16/04/08 00:19:17 INFO SparkContext: Starting job: show at SparkSQLwithJoin.java:111
16/04/08 00:19:17 INFO DAGScheduler: Registering RDD 14 (mapToPair at SparkSQLwithJoin.java:73)
16/04/08 00:19:17 INFO DAGScheduler: Registering RDD 19 (mapToPair at SparkSQLwithJoin.java:82)
16/04/08 00:19:17 INFO DAGScheduler: Got job 3 (show at SparkSQLwithJoin.java:111) with 1 output partitions
16/04/08 00:19:17 INFO DAGScheduler: Final stage: ResultStage 5 (show at SparkSQLwithJoin.java:111)
16/04/08 00:19:17 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3, ShuffleMapStage 4)
16/04/08 00:19:17 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 3, ShuffleMapStage 4)
16/04/08 00:19:17 INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[14] at mapToPair at SparkSQLwithJoin.java:73), which has no missing parents
16/04/08 00:19:17 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 8.4 KB, free 482.8 KB)
16/04/08 00:19:17 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 4.6 KB, free 487.4 KB)
16/04/08 00:19:17 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:52473 (size: 4.6 KB, free: 1773.7 MB)
16/04/08 00:19:17 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006
16/04/08 00:19:17 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (MapPartitionsRDD[14] at mapToPair at SparkSQLwithJoin.java:73)
16/04/08 00:19:17 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
16/04/08 00:19:17 INFO DAGScheduler: Submitting ShuffleMapStage 4 (MapPartitionsRDD[19] at mapToPair at SparkSQLwithJoin.java:82), which has no missing parents
16/04/08 00:19:17 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2128 bytes)
16/04/08 00:19:17 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
16/04/08 00:19:17 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 7.6 KB, free 495.1 KB)
16/04/08 00:19:17 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 4.2 KB, free 499.3 KB)
16/04/08 00:19:17 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:52473 (size: 4.2 KB, free: 1773.7 MB)
16/04/08 00:19:17 INFO SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006
16/04/08 00:19:17 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 4 (MapPartitionsRDD[19] at mapToPair at SparkSQLwithJoin.java:82)
16/04/08 00:19:17 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
16/04/08 00:19:17 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91
16/04/08 00:19:17 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 2467 bytes result sent to driver
16/04/08 00:19:17 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, partition 0,PROCESS_LOCAL, 2158 bytes)
16/04/08 00:19:17 INFO Executor: Running task 0.0 in stage 4.0 (TID 4)
16/04/08 00:19:17 INFO DAGScheduler: ShuffleMapStage 3 (mapToPair at SparkSQLwithJoin.java:73) finished in 0.169 s
16/04/08 00:19:17 INFO DAGScheduler: looking for newly runnable stages
16/04/08 00:19:17 INFO DAGScheduler: running: Set(ShuffleMapStage 4)
16/04/08 00:19:17 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 164 ms on localhost (1/1)
16/04/08 00:19:17 INFO DAGScheduler: waiting: Set(ResultStage 5)
16/04/08 00:19:17 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
16/04/08 00:19:17 INFO DAGScheduler: failed: Set()
16/04/08 00:19:18 INFO GeneratePredicate: Code generated in 46.831888 ms
16/04/08 00:19:18 INFO Executor: Finished task 0.0 in stage 4.0 (TID 4). 1372 bytes result sent to driver
16/04/08 00:19:18 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 142 ms on localhost (1/1)
16/04/08 00:19:18 INFO DAGScheduler: ShuffleMapStage 4 (mapToPair at SparkSQLwithJoin.java:82) finished in 0.252 s
16/04/08 00:19:18 INFO DAGScheduler: looking for newly runnable stages
16/04/08 00:19:18 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
16/04/08 00:19:18 INFO DAGScheduler: running: Set()
16/04/08 00:19:18 INFO DAGScheduler: waiting: Set(ResultStage 5)
16/04/08 00:19:18 INFO DAGScheduler: failed: Set()
16/04/08 00:19:18 INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[25] at show at SparkSQLwithJoin.java:111), which has no missing parents
16/04/08 00:19:18 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 5.5 KB, free 504.8 KB)
16/04/08 00:19:18 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 2.8 KB, free 507.6 KB)
16/04/08 00:19:18 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:52473 (size: 2.8 KB, free: 1773.7 MB)
16/04/08 00:19:18 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:1006
16/04/08 00:19:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[25] at show at SparkSQLwithJoin.java:111)
16/04/08 00:19:18 INFO TaskSchedulerImpl: Adding task set 5.0 with 1 tasks
16/04/08 00:19:18 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 5, localhost, partition 0,PROCESS_LOCAL, 1967 bytes)
16/04/08 00:19:18 INFO Executor: Running task 0.0 in stage 5.0 (TID 5)
16/04/08 00:19:18 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/04/08 00:19:18 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 23 ms
16/04/08 00:19:18 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/04/08 00:19:18 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/04/08 00:19:18 INFO Executor: Finished task 0.0 in stage 5.0 (TID 5). 1608 bytes result sent to driver
16/04/08 00:19:18 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 233 ms on localhost (1/1)
16/04/08 00:19:18 INFO DAGScheduler: ResultStage 5 (show at SparkSQLwithJoin.java:111) finished in 0.234 s
16/04/08 00:19:18 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool
16/04/08 00:19:18 INFO DAGScheduler: Job 3 finished: show at SparkSQLwithJoin.java:111, took 0.672277 s
+-------+---+-----+
| name|age|score|
+-------+---+-----+
|Michael| 20| 98|
| Andy| 17| 95|
+-------+---+-----+
16/04/08 00:19:18 INFO DefaultWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/04/08 00:19:18 INFO SparkContext: Starting job: save at SparkSQLwithJoin.java:113
16/04/08 00:19:18 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 143 bytes
16/04/08 00:19:18 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 143 bytes
16/04/08 00:19:18 INFO DAGScheduler: Got job 4 (save at SparkSQLwithJoin.java:113) with 1 output partitions
16/04/08 00:19:18 INFO DAGScheduler: Final stage: ResultStage 8 (save at SparkSQLwithJoin.java:113)
16/04/08 00:19:18 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 6, ShuffleMapStage 7)
16/04/08 00:19:18 INFO DAGScheduler: Missing parents: List()
16/04/08 00:19:18 INFO DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[24] at createDataFrame at SparkSQLwithJoin.java:109), which has no missing parents
16/04/08 00:19:18 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 66.7 KB, free 574.3 KB)
16/04/08 00:19:18 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 23.5 KB, free 597.8 KB)
16/04/08 00:19:18 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on localhost:52473 (size: 23.5 KB, free: 1773.7 MB)
16/04/08 00:19:18 INFO SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:1006
16/04/08 00:19:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (MapPartitionsRDD[24] at createDataFrame at SparkSQLwithJoin.java:109)
16/04/08 00:19:18 INFO TaskSchedulerImpl: Adding task set 8.0 with 1 tasks
16/04/08 00:19:18 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 6, localhost, partition 0,PROCESS_LOCAL, 1967 bytes)
16/04/08 00:19:18 INFO Executor: Running task 0.0 in stage 8.0 (TID 6)
16/04/08 00:19:19 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/04/08 00:19:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/04/08 00:19:19 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/04/08 00:19:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/04/08 00:19:19 INFO DefaultWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/04/08 00:19:19 INFO FileOutputCommitter: Saved output of task 'attempt_201604080019_0008_m_000000_0' to file:/D:/DT-IMF/testdata/peopleresult/_temporary/0/task_201604080019_0008_m_000000
16/04/08 00:19:19 INFO SparkHadoopMapRedUtil: attempt_201604080019_0008_m_000000_0: Committed
16/04/08 00:19:19 INFO Executor: Finished task 0.0 in stage 8.0 (TID 6). 1165 bytes result sent to driver
16/04/08 00:19:19 INFO DAGScheduler: ResultStage 8 (save at SparkSQLwithJoin.java:113) finished in 0.528 s
16/04/08 00:19:19 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 6) in 526 ms on localhost (1/1)
16/04/08 00:19:19 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool
16/04/08 00:19:19 INFO DAGScheduler: Job 4 finished: save at SparkSQLwithJoin.java:113, took 0.653766 s
16/04/08 00:19:19 INFO DefaultWriterContainer: Job job_201604080019_0000 committed.
16/04/08 00:19:19 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peopleresult on driver
16/04/08 00:19:19 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peopleresult on driver
16/04/08 00:19:19 INFO SparkContext: Invoking stop() from shutdown hook
16/04/08 00:19:19 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/04/08 00:19:19 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/04/08 00:19:19 INFO MemoryStore: MemoryStore cleared
16/04/08 00:19:19 INFO BlockManager: BlockManager stopped
16/04/08 00:19:19 INFO BlockManagerMaster: BlockManagerMaster stopped
16/04/08 00:19:19 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/04/08 00:19:19 INFO SparkContext: Successfully stopped SparkContext
16/04/08 00:19:19 INFO ShutdownHookManager: Shutdown hook called
16/04/08 00:19:19 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-8402f870-65c3-45cc-9deb-5b14f8f6ae38
16/04/08 00:19:19 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
如果与Java版本运行结果相同:
{"name":"Michael","age":20,"score":98}
{"name":"Andy","age":17,"score":95}
以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第67课的学习笔记。
王家林老师是Spark、Flink、Docker、Android技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。
微信公众账号:DT_Spark
电话:18610086859
QQ:1740415547
微信号:18610086859
新浪微博:ilovepains
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。