第67课:SparkSQL下案例综合实战学习笔记


第67课:SparkSQL下案例综合实战学习笔记

1 SparkSQL案例分析

2 通过JavaScala实现案例

 

本课直接通过实战练习SparkSQL下的Join操作:

先用Java编写代码:

 

package SparkSQLByJava;

 

import java.util.ArrayList;

import java.util.List;

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.SQLContext;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

 

public class SparkSQLwithJoin {

public static void main(String[]args) {

SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLwithJoin");

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

//针对json文件数据源来创建DataFrame

DataFrame peoplesDF = sqlContext.read().json("D:\\DT-IMF\\testdata\\peoples.json");

//基于Json构建的DataFrame来注册临时表

peoplesDF.registerTempTable("peopleScores");

//查询出分数大于90的人

DataFrame excellentScoresDF = sqlContext.sql("select name,score from peopleScores where score >90");

/**

 * 在DataFrame的基础上转化成为RDD,通过Map操作计算出分数大于90的所有人的姓名

 */

List<String> execellentScoresNameList =excellentScoresDF.javaRDD().map(new Function<Row, String>() {

 

@Override

public String call(Rowrow) throws Exception {

return row.getAs("name");

}

}).collect();

//动态组拼出JSON

List<String> peopleInformations = new ArrayList<String>();

peopleInformations.add("{\"name\":\"Michael\", \"age\":20}");

peopleInformations.add("{\"name\":\"Andy\", \"age\":17}");

peopleInformations.add("{\"name\":\"Justin\", \"age\":19}");

//通过内容为JSON的RDD来构造DataFrame

JavaRDD<String> peopleInformationsRDD =sc.parallelize(peopleInformations);

DataFrame peopleInformationsDF = sqlContext.read().json(peopleInformationsRDD);

//注册成为临时表

peopleInformationsDF.registerTempTable("peopleInformations");

String sqlText = "select name, age from peopleInformations where name in (";

for(int i =0;i < execellentScoresNameList.size(); i++){

sqlText += "'" + execellentScoresNameList.get(i) +"'";

if (i <execellentScoresNameList.size()-1){

sqlText += ",";

}

}

sqlText += ")";

DataFrame execellentNameAgeDF = sqlContext.sql(sqlText);

JavaPairRDD<String, Tuple2<Integer, Integer>>  resultRDD =excellentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Tuple2<String, Integer> call(Rowrow) throws Exception {

return new Tuple2<String, Integer>(row.getAs("name"), (int)row.getLong(1));

}

}).join(execellentNameAgeDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Tuple2<String, Integer> call(Rowrow) throws Exception {

return new Tuple2<String, Integer>(row.getAs("name"), (int)row.getLong(1));

}

}));

JavaRDD<Row> reusltRowRDD = resultRDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {

 

@Override

public Row call(Tuple2<String, Tuple2<Integer, Integer>>tuple) throws Exception {

// TODO Auto-generated method stub

return RowFactory.create(tuple._1,tuple._2._2,tuple._2._1 );

}

});

List<StructField> structFields = new ArrayList<StructField>();

structFields.add(DataTypes.createStructField("name", DataTypes.StringType,true));

structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType,true));

structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType,true));

//构建StructType,用于最后DataFrame元数据的描述

StructType structType =DataTypes.createStructType(structFields);

DataFrame personsDF = sqlContext.createDataFrame(reusltRowRDD,structType);

personsDF.show();

personsDF.write().format("json").save("D:\\DT-IMF\\testdata\\peopleresult");

}

}

 

 

在eclipse运行时的console:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

16/04/08 00:01:27 INFO SparkContext: Running Spark version 1.6.0

16/04/08 00:01:29 INFO SecurityManager: Changing view acls to: think

16/04/08 00:01:29 INFO SecurityManager: Changing modify acls to: think

16/04/08 00:01:29 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)

16/04/08 00:01:32 INFO Utils: Successfully started service 'sparkDriver' on port 52189.

16/04/08 00:01:33 INFO Slf4jLogger: Slf4jLogger started

16/04/08 00:01:33 INFO Remoting: Starting remoting

16/04/08 00:01:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:52202]

16/04/08 00:01:34 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 52202.

16/04/08 00:01:34 INFO SparkEnv: Registering MapOutputTracker

16/04/08 00:01:34 INFO SparkEnv: Registering BlockManagerMaster

16/04/08 00:01:34 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-0efb49ea-2c12-4819-8543-efcad6cbe9ee

16/04/08 00:01:34 INFO MemoryStore: MemoryStore started with capacity 1773.8 MB

16/04/08 00:01:35 INFO SparkEnv: Registering OutputCommitCoordinator

16/04/08 00:01:36 INFO Utils: Successfully started service 'SparkUI' on port 4040.

16/04/08 00:01:36 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040

16/04/08 00:01:36 INFO Executor: Starting executor ID driver on host localhost

16/04/08 00:01:36 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52209.

16/04/08 00:01:36 INFO NettyBlockTransferService: Server created on 52209

16/04/08 00:01:36 INFO BlockManagerMaster: Trying to register BlockManager

16/04/08 00:01:36 INFO BlockManagerMasterEndpoint: Registering block manager localhost:52209 with 1773.8 MB RAM, BlockManagerId(driver, localhost, 52209)

16/04/08 00:01:36 INFO BlockManagerMaster: Registered BlockManager

16/04/08 00:01:40 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:d401:a5b5:2103:6d13%eth8, but we couldn't find any external IP address!

16/04/08 00:01:41 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peoples.json on driver

16/04/08 00:01:42 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 208.9 KB, free 208.9 KB)

16/04/08 00:01:43 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.4 KB, free 228.3 KB)

16/04/08 00:01:43 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:52209 (size: 19.4 KB, free: 1773.7 MB)

16/04/08 00:01:43 INFO SparkContext: Created broadcast 0 from json at SparkSQLwithJoin.java:28

16/04/08 00:01:43 INFO FileInputFormat: Total input paths to process : 1

16/04/08 00:01:43 INFO SparkContext: Starting job: json at SparkSQLwithJoin.java:28

16/04/08 00:01:43 INFO DAGScheduler: Got job 0 (json at SparkSQLwithJoin.java:28) with 1 output partitions

16/04/08 00:01:43 INFO DAGScheduler: Final stage: ResultStage 0 (json at SparkSQLwithJoin.java:28)

16/04/08 00:01:43 INFO DAGScheduler: Parents of final stage: List()

16/04/08 00:01:43 INFO DAGScheduler: Missing parents: List()

16/04/08 00:01:43 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at json at SparkSQLwithJoin.java:28), which has no missing parents

16/04/08 00:01:44 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.3 KB, free 232.7 KB)

16/04/08 00:01:44 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 235.1 KB)

16/04/08 00:01:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:52209 (size: 2.4 KB, free: 1773.7 MB)

16/04/08 00:01:44 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

16/04/08 00:01:44 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at json at SparkSQLwithJoin.java:28)

16/04/08 00:01:44 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/04/08 00:01:44 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2139 bytes)

16/04/08 00:01:44 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/04/08 00:01:44 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91

16/04/08 00:01:44 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

16/04/08 00:01:44 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

16/04/08 00:01:44 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

16/04/08 00:01:44 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

16/04/08 00:01:44 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

16/04/08 00:01:47 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2847 bytes result sent to driver

16/04/08 00:01:47 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3109 ms on localhost (1/1)

16/04/08 00:01:47 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

16/04/08 00:01:47 INFO DAGScheduler: ResultStage 0 (json at SparkSQLwithJoin.java:28) finished in 3.246 s

16/04/08 00:01:47 INFO DAGScheduler: Job 0 finished: json at SparkSQLwithJoin.java:28, took 3.609467 s

16/04/08 00:01:49 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 61.8 KB, free 296.9 KB)

16/04/08 00:01:50 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.3 KB, free 316.2 KB)

16/04/08 00:01:50 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:52209 (size: 19.3 KB, free: 1773.7 MB)

16/04/08 00:01:50 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:52209 in memory (size: 2.4 KB, free: 1773.7 MB)

16/04/08 00:01:50 INFO SparkContext: Created broadcast 2 from javaRDD at SparkSQLwithJoin.java:38

16/04/08 00:01:50 INFO ContextCleaner: Cleaned accumulator 1

16/04/08 00:01:50 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:52209 in memory (size: 19.4 KB, free: 1773.7 MB)

16/04/08 00:01:50 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 208.9 KB, free 290.1 KB)

16/04/08 00:01:50 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 19.4 KB, free 309.5 KB)

16/04/08 00:01:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:52209 (size: 19.4 KB, free: 1773.7 MB)

16/04/08 00:01:50 INFO SparkContext: Created broadcast 3 from javaRDD at SparkSQLwithJoin.java:38

16/04/08 00:01:50 INFO FileInputFormat: Total input paths to process : 1

16/04/08 00:01:50 INFO SparkContext: Starting job: collect at SparkSQLwithJoin.java:45

16/04/08 00:01:50 INFO DAGScheduler: Got job 1 (collect at SparkSQLwithJoin.java:45) with 1 output partitions

16/04/08 00:01:50 INFO DAGScheduler: Final stage: ResultStage 1 (collect at SparkSQLwithJoin.java:45)

16/04/08 00:01:50 INFO DAGScheduler: Parents of final stage: List()

16/04/08 00:01:50 INFO DAGScheduler: Missing parents: List()

16/04/08 00:01:50 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at map at SparkSQLwithJoin.java:38), which has no missing parents

16/04/08 00:01:50 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.2 KB, free 317.7 KB)

16/04/08 00:01:50 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.5 KB, free 322.1 KB)

16/04/08 00:01:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:52209 (size: 4.5 KB, free: 1773.7 MB)

16/04/08 00:01:50 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006

16/04/08 00:01:50 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at map at SparkSQLwithJoin.java:38)

16/04/08 00:01:50 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

16/04/08 00:01:50 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2139 bytes)

16/04/08 00:01:50 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

16/04/08 00:01:50 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91

16/04/08 00:01:52 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:52209 in memory (size: 19.3 KB, free: 1773.7 MB)

16/04/08 00:01:52 INFO GenerateUnsafeProjection: Code generated in 1170.2809 ms

16/04/08 00:01:52 INFO GeneratePredicate: Code generated in 21.119108 ms

16/04/08 00:01:52 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2275 bytes result sent to driver

16/04/08 00:01:52 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1461 ms on localhost (1/1)

16/04/08 00:01:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

16/04/08 00:01:52 INFO DAGScheduler: ResultStage 1 (collect at SparkSQLwithJoin.java:45) finished in 1.465 s

16/04/08 00:01:52 INFO DAGScheduler: Job 1 finished: collect at SparkSQLwithJoin.java:45, took 1.506153 s

16/04/08 00:01:52 INFO SparkContext: Starting job: json at SparkSQLwithJoin.java:57

16/04/08 00:01:52 INFO DAGScheduler: Got job 2 (json at SparkSQLwithJoin.java:57) with 1 output partitions

16/04/08 00:01:52 INFO DAGScheduler: Final stage: ResultStage 2 (json at SparkSQLwithJoin.java:57)

16/04/08 00:01:52 INFO DAGScheduler: Parents of final stage: List()

16/04/08 00:01:52 INFO DAGScheduler: Missing parents: List()

16/04/08 00:01:52 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at json at SparkSQLwithJoin.java:57), which has no missing parents

16/04/08 00:01:52 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.5 KB, free 244.5 KB)

16/04/08 00:01:52 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 246.6 KB)

16/04/08 00:01:52 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:52209 (size: 2.1 KB, free: 1773.7 MB)

16/04/08 00:01:52 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006

16/04/08 00:01:52 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at json at SparkSQLwithJoin.java:57)

16/04/08 00:01:52 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks

16/04/08 00:01:52 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2169 bytes)

16/04/08 00:01:52 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)

16/04/08 00:01:52 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1716 bytes result sent to driver

16/04/08 00:01:52 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 80 ms on localhost (1/1)

16/04/08 00:01:52 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool

16/04/08 00:01:52 INFO DAGScheduler: ResultStage 2 (json at SparkSQLwithJoin.java:57) finished in 0.088 s

16/04/08 00:01:52 INFO DAGScheduler: Job 2 finished: json at SparkSQLwithJoin.java:57, took 0.163387 s

16/04/08 00:01:53 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 208.5 KB, free 455.1 KB)

16/04/08 00:01:53 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 19.3 KB, free 474.4 KB)

16/04/08 00:01:53 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:52209 (size: 19.3 KB, free: 1773.7 MB)

16/04/08 00:01:53 INFO SparkContext: Created broadcast 6 from javaRDD at SparkSQLwithJoin.java:82

16/04/08 00:01:53 INFO SparkContext: Starting job: show at SparkSQLwithJoin.java:111

16/04/08 00:01:53 INFO DAGScheduler: Registering RDD 14 (mapToPair at SparkSQLwithJoin.java:73)

16/04/08 00:01:53 INFO DAGScheduler: Registering RDD 19 (mapToPair at SparkSQLwithJoin.java:82)

16/04/08 00:01:53 INFO DAGScheduler: Got job 3 (show at SparkSQLwithJoin.java:111) with 1 output partitions

16/04/08 00:01:53 INFO DAGScheduler: Final stage: ResultStage 5 (show at SparkSQLwithJoin.java:111)

16/04/08 00:01:53 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3, ShuffleMapStage 4)

16/04/08 00:01:53 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 3, ShuffleMapStage 4)

16/04/08 00:01:53 INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[14] at mapToPair at SparkSQLwithJoin.java:73), which has no missing parents

16/04/08 00:01:53 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 8.4 KB, free 482.8 KB)

16/04/08 00:01:53 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 4.6 KB, free 487.4 KB)

16/04/08 00:01:53 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:52209 (size: 4.6 KB, free: 1773.7 MB)

16/04/08 00:01:53 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006

16/04/08 00:01:53 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (MapPartitionsRDD[14] at mapToPair at SparkSQLwithJoin.java:73)

16/04/08 00:01:53 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks

16/04/08 00:01:53 INFO DAGScheduler: Submitting ShuffleMapStage 4 (MapPartitionsRDD[19] at mapToPair at SparkSQLwithJoin.java:82), which has no missing parents

16/04/08 00:01:53 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2128 bytes)

16/04/08 00:01:53 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)

16/04/08 00:01:53 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 7.6 KB, free 495.1 KB)

16/04/08 00:01:53 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 4.2 KB, free 499.3 KB)

16/04/08 00:01:53 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:52209 (size: 4.2 KB, free: 1773.7 MB)

16/04/08 00:01:53 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91

16/04/08 00:01:53 INFO SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006

16/04/08 00:01:53 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 4 (MapPartitionsRDD[19] at mapToPair at SparkSQLwithJoin.java:82)

16/04/08 00:01:53 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks

16/04/08 00:01:53 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 2467 bytes result sent to driver

16/04/08 00:01:53 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, partition 0,PROCESS_LOCAL, 2158 bytes)

16/04/08 00:01:53 INFO Executor: Running task 0.0 in stage 4.0 (TID 4)

16/04/08 00:01:53 INFO DAGScheduler: ShuffleMapStage 3 (mapToPair at SparkSQLwithJoin.java:73) finished in 0.216 s

16/04/08 00:01:53 INFO DAGScheduler: looking for newly runnable stages

16/04/08 00:01:53 INFO DAGScheduler: running: Set(ShuffleMapStage 4)

16/04/08 00:01:53 INFO DAGScheduler: waiting: Set(ResultStage 5)

16/04/08 00:01:53 INFO DAGScheduler: failed: Set()

16/04/08 00:01:53 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 213 ms on localhost (1/1)

16/04/08 00:01:53 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

16/04/08 00:01:53 INFO GeneratePredicate: Code generated in 36.313835 ms

16/04/08 00:01:53 INFO Executor: Finished task 0.0 in stage 4.0 (TID 4). 1372 bytes result sent to driver

16/04/08 00:01:53 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 107 ms on localhost (1/1)

16/04/08 00:01:53 INFO DAGScheduler: ShuffleMapStage 4 (mapToPair at SparkSQLwithJoin.java:82) finished in 0.235 s

16/04/08 00:01:53 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool

16/04/08 00:01:53 INFO DAGScheduler: looking for newly runnable stages

16/04/08 00:01:53 INFO DAGScheduler: running: Set()

16/04/08 00:01:53 INFO DAGScheduler: waiting: Set(ResultStage 5)

16/04/08 00:01:53 INFO DAGScheduler: failed: Set()

16/04/08 00:01:53 INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[25] at show at SparkSQLwithJoin.java:111), which has no missing parents

16/04/08 00:01:54 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 5.5 KB, free 504.8 KB)

16/04/08 00:01:54 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 2.8 KB, free 507.6 KB)

16/04/08 00:01:54 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:52209 (size: 2.8 KB, free: 1773.7 MB)

16/04/08 00:01:54 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:1006

16/04/08 00:01:54 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[25] at show at SparkSQLwithJoin.java:111)

16/04/08 00:01:54 INFO TaskSchedulerImpl: Adding task set 5.0 with 1 tasks

16/04/08 00:01:54 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 5, localhost, partition 0,PROCESS_LOCAL, 1967 bytes)

16/04/08 00:01:54 INFO Executor: Running task 0.0 in stage 5.0 (TID 5)

16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 17 ms

16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

16/04/08 00:01:54 INFO Executor: Finished task 0.0 in stage 5.0 (TID 5). 1608 bytes result sent to driver

16/04/08 00:01:54 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 198 ms on localhost (1/1)

16/04/08 00:01:54 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool

16/04/08 00:01:54 INFO DAGScheduler: ResultStage 5 (show at SparkSQLwithJoin.java:111) finished in 0.198 s

16/04/08 00:01:54 INFO DAGScheduler: Job 3 finished: show at SparkSQLwithJoin.java:111, took 0.597677 s

+-------+---+-----+

|   name|age|score|

+-------+---+-----+

|Michael| 20|   98|

|   Andy| 17|   95|

+-------+---+-----+

 

16/04/08 00:01:54 INFO DefaultWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

16/04/08 00:01:54 INFO SparkContext: Starting job: save at SparkSQLwithJoin.java:113

16/04/08 00:01:54 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 143 bytes

16/04/08 00:01:54 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 143 bytes

16/04/08 00:01:54 INFO DAGScheduler: Got job 4 (save at SparkSQLwithJoin.java:113) with 1 output partitions

16/04/08 00:01:54 INFO DAGScheduler: Final stage: ResultStage 8 (save at SparkSQLwithJoin.java:113)

16/04/08 00:01:54 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 6, ShuffleMapStage 7)

16/04/08 00:01:54 INFO DAGScheduler: Missing parents: List()

16/04/08 00:01:54 INFO DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[24] at createDataFrame at SparkSQLwithJoin.java:109), which has no missing parents

16/04/08 00:01:54 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 66.7 KB, free 574.3 KB)

16/04/08 00:01:54 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 23.5 KB, free 597.8 KB)

16/04/08 00:01:54 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on localhost:52209 (size: 23.5 KB, free: 1773.7 MB)

16/04/08 00:01:54 INFO SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:1006

16/04/08 00:01:54 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (MapPartitionsRDD[24] at createDataFrame at SparkSQLwithJoin.java:109)

16/04/08 00:01:54 INFO TaskSchedulerImpl: Adding task set 8.0 with 1 tasks

16/04/08 00:01:54 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 6, localhost, partition 0,PROCESS_LOCAL, 1967 bytes)

16/04/08 00:01:54 INFO Executor: Running task 0.0 in stage 8.0 (TID 6)

16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms

16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/04/08 00:01:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

16/04/08 00:01:54 INFO DefaultWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

16/04/08 00:01:55 INFO FileOutputCommitter: Saved output of task 'attempt_201604080001_0008_m_000000_0' to file:/D:/DT-IMF/testdata/peopleresult/_temporary/0/task_201604080001_0008_m_000000

16/04/08 00:01:55 INFO SparkHadoopMapRedUtil: attempt_201604080001_0008_m_000000_0: Committed

16/04/08 00:01:55 INFO Executor: Finished task 0.0 in stage 8.0 (TID 6). 1165 bytes result sent to driver

16/04/08 00:01:55 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 6) in 523 ms on localhost (1/1)

16/04/08 00:01:55 INFO DAGScheduler: ResultStage 8 (save at SparkSQLwithJoin.java:113) finished in 0.523 s

16/04/08 00:01:55 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool

16/04/08 00:01:55 INFO DAGScheduler: Job 4 finished: save at SparkSQLwithJoin.java:113, took 0.617657 s

16/04/08 00:01:55 INFO DefaultWriterContainer: Job job_201604080001_0000 committed.

16/04/08 00:01:55 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peopleresult on driver

16/04/08 00:01:55 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peopleresult on driver

16/04/08 00:01:55 INFO SparkContext: Invoking stop() from shutdown hook

16/04/08 00:01:55 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040

16/04/08 00:01:55 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/04/08 00:01:55 INFO MemoryStore: MemoryStore cleared

16/04/08 00:01:55 INFO BlockManager: BlockManager stopped

16/04/08 00:01:55 INFO BlockManagerMaster: BlockManagerMaster stopped

16/04/08 00:01:55 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/04/08 00:01:55 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

16/04/08 00:01:55 INFO SparkContext: Successfully stopped SparkContext

16/04/08 00:01:55 INFO ShutdownHookManager: Shutdown hook called

16/04/08 00:01:55 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

16/04/08 00:01:55 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-2762f4bc-6ffa-4744-86d7-c6a9b2a66082

 

 

运行结束时自动生成D:\\DT-IMF\\testdata\\peopleresult目录,并在目录下生成了结果:

{"name":"Michael","age":20,"score":98}

{"name":"Andy","age":17,"score":95}

 

 

 

下面用Scala重新编写此程序:

package SparkSQLByJava;

 

import java.util.ArrayList;

import java.util.List;

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.SQLContext;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

 

public class SparkSQLwithJoin {

public static void main(String[]args) {

SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLwithJoin");

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

//针对json文件数据源来创建DataFrame

DataFrame peoplesDF = sqlContext.read().json("D:\\DT-IMF\\testdata\\peoples.json");

//基于Json构建的DataFrame来注册临时表

peoplesDF.registerTempTable("peopleScores");

//查询出分数大于90的人

DataFrame excellentScoresDF = sqlContext.sql("select name,score from peopleScores where score >90");

/**

 * 在DataFrame的基础上转化成为RDD,通过Map操作计算出分数大于90的所有人的姓名

 */

List<String> execellentScoresNameList =excellentScoresDF.javaRDD().map(new Function<Row, String>() {

 

@Override

public String call(Rowrow) throws Exception {

return row.getAs("name");

}

}).collect();

//动态组拼出JSON

List<String> peopleInformations = new ArrayList<String>();

peopleInformations.add("{\"name\":\"Michael\", \"age\":20}");

peopleInformations.add("{\"name\":\"Andy\", \"age\":17}");

peopleInformations.add("{\"name\":\"Justin\", \"age\":19}");

//通过内容为JSON的RDD来构造DataFrame

JavaRDD<String> peopleInformationsRDD =sc.parallelize(peopleInformations);

DataFrame peopleInformationsDF = sqlContext.read().json(peopleInformationsRDD);

//注册成为临时表

peopleInformationsDF.registerTempTable("peopleInformations");

String sqlText = "select name, age from peopleInformations where name in (";

for(int i =0;i < execellentScoresNameList.size(); i++){

sqlText += "'" + execellentScoresNameList.get(i) +"'";

if (i <execellentScoresNameList.size()-1){

sqlText += ",";

}

}

sqlText += ")";

DataFrame execellentNameAgeDF = sqlContext.sql(sqlText);

JavaPairRDD<String, Tuple2<Integer, Integer>>  resultRDD =excellentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Tuple2<String, Integer> call(Rowrow) throws Exception {

return new Tuple2<String, Integer>(row.getAs("name"), (int)row.getLong(1));

}

}).join(execellentNameAgeDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

 

private static final long serialVersionUID = 1L;

 

@Override

public Tuple2<String, Integer> call(Rowrow) throws Exception {

return new Tuple2<String, Integer>(row.getAs("name"), (int)row.getLong(1));

}

}));

JavaRDD<Row> reusltRowRDD = resultRDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {

 

@Override

public Row call(Tuple2<String, Tuple2<Integer, Integer>>tuple) throws Exception {

// TODO Auto-generated method stub

return RowFactory.create(tuple._1,tuple._2._2,tuple._2._1 );

}

});

List<StructField> structFields = new ArrayList<StructField>();

structFields.add(DataTypes.createStructField("name", DataTypes.StringType,true));

structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType,true));

structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType,true));

//构建StructType,用于最后DataFrame元数据的描述

StructType structType =DataTypes.createStructType(structFields);

DataFrame personsDF = sqlContext.createDataFrame(reusltRowRDD,structType);

personsDF.show();

personsDF.write().format("json").save("D:\\DT-IMF\\testdata\\peopleresult");

}

}

 

运行:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

16/04/08 00:18:50 INFO SparkContext: Running Spark version 1.6.0

16/04/08 00:18:52 INFO SecurityManager: Changing view acls to: think

16/04/08 00:18:52 INFO SecurityManager: Changing modify acls to: think

16/04/08 00:18:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)

16/04/08 00:18:55 INFO Utils: Successfully started service 'sparkDriver' on port 52453.

16/04/08 00:18:57 INFO Slf4jLogger: Slf4jLogger started

16/04/08 00:18:57 INFO Remoting: Starting remoting

16/04/08 00:18:57 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:52466]

16/04/08 00:18:57 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 52466.

16/04/08 00:18:57 INFO SparkEnv: Registering MapOutputTracker

16/04/08 00:18:58 INFO SparkEnv: Registering BlockManagerMaster

16/04/08 00:18:58 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-e100d6ba-bb71-4beb-84f3-96b6801e4813

16/04/08 00:18:58 INFO MemoryStore: MemoryStore started with capacity 1773.8 MB

16/04/08 00:18:58 INFO SparkEnv: Registering OutputCommitCoordinator

16/04/08 00:18:59 INFO Utils: Successfully started service 'SparkUI' on port 4040.

16/04/08 00:18:59 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040

16/04/08 00:19:00 INFO Executor: Starting executor ID driver on host localhost

16/04/08 00:19:00 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52473.

16/04/08 00:19:00 INFO NettyBlockTransferService: Server created on 52473

16/04/08 00:19:00 INFO BlockManagerMaster: Trying to register BlockManager

16/04/08 00:19:00 INFO BlockManagerMasterEndpoint: Registering block manager localhost:52473 with 1773.8 MB RAM, BlockManagerId(driver, localhost, 52473)

16/04/08 00:19:00 INFO BlockManagerMaster: Registered BlockManager

16/04/08 00:19:04 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:d401:a5b5:2103:6d13%eth8, but we couldn't find any external IP address!

16/04/08 00:19:05 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peoples.json on driver

16/04/08 00:19:07 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 208.9 KB, free 208.9 KB)

16/04/08 00:19:07 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.4 KB, free 228.3 KB)

16/04/08 00:19:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:52473 (size: 19.4 KB, free: 1773.7 MB)

16/04/08 00:19:07 INFO SparkContext: Created broadcast 0 from json at SparkSQLwithJoin.java:28

16/04/08 00:19:08 INFO FileInputFormat: Total input paths to process : 1

16/04/08 00:19:08 INFO SparkContext: Starting job: json at SparkSQLwithJoin.java:28

16/04/08 00:19:08 INFO DAGScheduler: Got job 0 (json at SparkSQLwithJoin.java:28) with 1 output partitions

16/04/08 00:19:08 INFO DAGScheduler: Final stage: ResultStage 0 (json at SparkSQLwithJoin.java:28)

16/04/08 00:19:08 INFO DAGScheduler: Parents of final stage: List()

16/04/08 00:19:08 INFO DAGScheduler: Missing parents: List()

16/04/08 00:19:08 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at json at SparkSQLwithJoin.java:28), which has no missing parents

16/04/08 00:19:08 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.3 KB, free 232.7 KB)

16/04/08 00:19:08 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 235.1 KB)

16/04/08 00:19:08 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:52473 (size: 2.4 KB, free: 1773.7 MB)

16/04/08 00:19:08 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

16/04/08 00:19:08 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at json at SparkSQLwithJoin.java:28)

16/04/08 00:19:08 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/04/08 00:19:08 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2139 bytes)

16/04/08 00:19:08 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/04/08 00:19:09 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91

16/04/08 00:19:09 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

16/04/08 00:19:09 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

16/04/08 00:19:09 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

16/04/08 00:19:09 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

16/04/08 00:19:09 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

16/04/08 00:19:11 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2847 bytes result sent to driver

16/04/08 00:19:12 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3208 ms on localhost (1/1)

16/04/08 00:19:12 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

16/04/08 00:19:12 INFO DAGScheduler: ResultStage 0 (json at SparkSQLwithJoin.java:28) finished in 3.313 s

16/04/08 00:19:12 INFO DAGScheduler: Job 0 finished: json at SparkSQLwithJoin.java:28, took 3.652331 s

16/04/08 00:19:14 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 61.8 KB, free 296.9 KB)

16/04/08 00:19:14 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.3 KB, free 316.2 KB)

16/04/08 00:19:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:52473 (size: 19.3 KB, free: 1773.7 MB)

16/04/08 00:19:14 INFO SparkContext: Created broadcast 2 from javaRDD at SparkSQLwithJoin.java:38

16/04/08 00:19:14 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:52473 in memory (size: 2.4 KB, free: 1773.7 MB)

16/04/08 00:19:14 INFO ContextCleaner: Cleaned accumulator 1

16/04/08 00:19:14 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:52473 in memory (size: 19.4 KB, free: 1773.7 MB)

16/04/08 00:19:14 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 208.9 KB, free 290.1 KB)

16/04/08 00:19:14 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 19.4 KB, free 309.5 KB)

16/04/08 00:19:14 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:52473 (size: 19.4 KB, free: 1773.7 MB)

16/04/08 00:19:14 INFO SparkContext: Created broadcast 3 from javaRDD at SparkSQLwithJoin.java:38

16/04/08 00:19:15 INFO FileInputFormat: Total input paths to process : 1

16/04/08 00:19:15 INFO SparkContext: Starting job: collect at SparkSQLwithJoin.java:45

16/04/08 00:19:15 INFO DAGScheduler: Got job 1 (collect at SparkSQLwithJoin.java:45) with 1 output partitions

16/04/08 00:19:15 INFO DAGScheduler: Final stage: ResultStage 1 (collect at SparkSQLwithJoin.java:45)

16/04/08 00:19:15 INFO DAGScheduler: Parents of final stage: List()

16/04/08 00:19:15 INFO DAGScheduler: Missing parents: List()

16/04/08 00:19:15 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at map at SparkSQLwithJoin.java:38), which has no missing parents

16/04/08 00:19:15 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.2 KB, free 317.7 KB)

16/04/08 00:19:15 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.5 KB, free 322.1 KB)

16/04/08 00:19:15 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:52473 (size: 4.5 KB, free: 1773.7 MB)

16/04/08 00:19:15 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006

16/04/08 00:19:15 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at map at SparkSQLwithJoin.java:38)

16/04/08 00:19:15 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

16/04/08 00:19:15 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2139 bytes)

16/04/08 00:19:15 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

16/04/08 00:19:15 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91

16/04/08 00:19:16 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:52473 in memory (size: 19.3 KB, free: 1773.7 MB)

16/04/08 00:19:16 INFO GenerateUnsafeProjection: Code generated in 986.327423 ms

16/04/08 00:19:16 INFO GeneratePredicate: Code generated in 18.72276 ms

16/04/08 00:19:16 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2275 bytes result sent to driver

16/04/08 00:19:16 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1228 ms on localhost (1/1)

16/04/08 00:19:16 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

16/04/08 00:19:16 INFO DAGScheduler: ResultStage 1 (collect at SparkSQLwithJoin.java:45) finished in 1.233 s

16/04/08 00:19:16 INFO DAGScheduler: Job 1 finished: collect at SparkSQLwithJoin.java:45, took 1.278198 s

16/04/08 00:19:16 INFO SparkContext: Starting job: json at SparkSQLwithJoin.java:57

16/04/08 00:19:16 INFO DAGScheduler: Got job 2 (json at SparkSQLwithJoin.java:57) with 1 output partitions

16/04/08 00:19:16 INFO DAGScheduler: Final stage: ResultStage 2 (json at SparkSQLwithJoin.java:57)

16/04/08 00:19:16 INFO DAGScheduler: Parents of final stage: List()

16/04/08 00:19:16 INFO DAGScheduler: Missing parents: List()

16/04/08 00:19:16 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at json at SparkSQLwithJoin.java:57), which has no missing parents

16/04/08 00:19:16 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.5 KB, free 244.5 KB)

16/04/08 00:19:16 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 246.6 KB)

16/04/08 00:19:16 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:52473 (size: 2.1 KB, free: 1773.7 MB)

16/04/08 00:19:16 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006

16/04/08 00:19:16 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at json at SparkSQLwithJoin.java:57)

16/04/08 00:19:16 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks

16/04/08 00:19:16 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2169 bytes)

16/04/08 00:19:16 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)

16/04/08 00:19:16 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1716 bytes result sent to driver

16/04/08 00:19:16 INFO DAGScheduler: ResultStage 2 (json at SparkSQLwithJoin.java:57) finished in 0.119 s

16/04/08 00:19:16 INFO DAGScheduler: Job 2 finished: json at SparkSQLwithJoin.java:57, took 0.163232 s

16/04/08 00:19:16 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 118 ms on localhost (1/1)

16/04/08 00:19:16 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool

16/04/08 00:19:17 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 208.5 KB, free 455.1 KB)

16/04/08 00:19:17 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 19.3 KB, free 474.4 KB)

16/04/08 00:19:17 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:52473 (size: 19.3 KB, free: 1773.7 MB)

16/04/08 00:19:17 INFO SparkContext: Created broadcast 6 from javaRDD at SparkSQLwithJoin.java:82

16/04/08 00:19:17 INFO SparkContext: Starting job: show at SparkSQLwithJoin.java:111

16/04/08 00:19:17 INFO DAGScheduler: Registering RDD 14 (mapToPair at SparkSQLwithJoin.java:73)

16/04/08 00:19:17 INFO DAGScheduler: Registering RDD 19 (mapToPair at SparkSQLwithJoin.java:82)

16/04/08 00:19:17 INFO DAGScheduler: Got job 3 (show at SparkSQLwithJoin.java:111) with 1 output partitions

16/04/08 00:19:17 INFO DAGScheduler: Final stage: ResultStage 5 (show at SparkSQLwithJoin.java:111)

16/04/08 00:19:17 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3, ShuffleMapStage 4)

16/04/08 00:19:17 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 3, ShuffleMapStage 4)

16/04/08 00:19:17 INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[14] at mapToPair at SparkSQLwithJoin.java:73), which has no missing parents

16/04/08 00:19:17 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 8.4 KB, free 482.8 KB)

16/04/08 00:19:17 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 4.6 KB, free 487.4 KB)

16/04/08 00:19:17 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:52473 (size: 4.6 KB, free: 1773.7 MB)

16/04/08 00:19:17 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006

16/04/08 00:19:17 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (MapPartitionsRDD[14] at mapToPair at SparkSQLwithJoin.java:73)

16/04/08 00:19:17 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks

16/04/08 00:19:17 INFO DAGScheduler: Submitting ShuffleMapStage 4 (MapPartitionsRDD[19] at mapToPair at SparkSQLwithJoin.java:82), which has no missing parents

16/04/08 00:19:17 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2128 bytes)

16/04/08 00:19:17 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)

16/04/08 00:19:17 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 7.6 KB, free 495.1 KB)

16/04/08 00:19:17 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 4.2 KB, free 499.3 KB)

16/04/08 00:19:17 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:52473 (size: 4.2 KB, free: 1773.7 MB)

16/04/08 00:19:17 INFO SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006

16/04/08 00:19:17 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 4 (MapPartitionsRDD[19] at mapToPair at SparkSQLwithJoin.java:82)

16/04/08 00:19:17 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks

16/04/08 00:19:17 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/peoples.json:0+91

16/04/08 00:19:17 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 2467 bytes result sent to driver

16/04/08 00:19:17 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, localhost, partition 0,PROCESS_LOCAL, 2158 bytes)

16/04/08 00:19:17 INFO Executor: Running task 0.0 in stage 4.0 (TID 4)

16/04/08 00:19:17 INFO DAGScheduler: ShuffleMapStage 3 (mapToPair at SparkSQLwithJoin.java:73) finished in 0.169 s

16/04/08 00:19:17 INFO DAGScheduler: looking for newly runnable stages

16/04/08 00:19:17 INFO DAGScheduler: running: Set(ShuffleMapStage 4)

16/04/08 00:19:17 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 164 ms on localhost (1/1)

16/04/08 00:19:17 INFO DAGScheduler: waiting: Set(ResultStage 5)

16/04/08 00:19:17 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

16/04/08 00:19:17 INFO DAGScheduler: failed: Set()

16/04/08 00:19:18 INFO GeneratePredicate: Code generated in 46.831888 ms

16/04/08 00:19:18 INFO Executor: Finished task 0.0 in stage 4.0 (TID 4). 1372 bytes result sent to driver

16/04/08 00:19:18 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) in 142 ms on localhost (1/1)

16/04/08 00:19:18 INFO DAGScheduler: ShuffleMapStage 4 (mapToPair at SparkSQLwithJoin.java:82) finished in 0.252 s

16/04/08 00:19:18 INFO DAGScheduler: looking for newly runnable stages

16/04/08 00:19:18 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool

16/04/08 00:19:18 INFO DAGScheduler: running: Set()

16/04/08 00:19:18 INFO DAGScheduler: waiting: Set(ResultStage 5)

16/04/08 00:19:18 INFO DAGScheduler: failed: Set()

16/04/08 00:19:18 INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[25] at show at SparkSQLwithJoin.java:111), which has no missing parents

16/04/08 00:19:18 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 5.5 KB, free 504.8 KB)

16/04/08 00:19:18 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 2.8 KB, free 507.6 KB)

16/04/08 00:19:18 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:52473 (size: 2.8 KB, free: 1773.7 MB)

16/04/08 00:19:18 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:1006

16/04/08 00:19:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[25] at show at SparkSQLwithJoin.java:111)

16/04/08 00:19:18 INFO TaskSchedulerImpl: Adding task set 5.0 with 1 tasks

16/04/08 00:19:18 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 5, localhost, partition 0,PROCESS_LOCAL, 1967 bytes)

16/04/08 00:19:18 INFO Executor: Running task 0.0 in stage 5.0 (TID 5)

16/04/08 00:19:18 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/04/08 00:19:18 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 23 ms

16/04/08 00:19:18 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/04/08 00:19:18 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

16/04/08 00:19:18 INFO Executor: Finished task 0.0 in stage 5.0 (TID 5). 1608 bytes result sent to driver

16/04/08 00:19:18 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 233 ms on localhost (1/1)

16/04/08 00:19:18 INFO DAGScheduler: ResultStage 5 (show at SparkSQLwithJoin.java:111) finished in 0.234 s

16/04/08 00:19:18 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool

16/04/08 00:19:18 INFO DAGScheduler: Job 3 finished: show at SparkSQLwithJoin.java:111, took 0.672277 s

+-------+---+-----+

|   name|age|score|

+-------+---+-----+

|Michael| 20|   98|

|   Andy| 17|   95|

+-------+---+-----+

 

16/04/08 00:19:18 INFO DefaultWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

16/04/08 00:19:18 INFO SparkContext: Starting job: save at SparkSQLwithJoin.java:113

16/04/08 00:19:18 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 143 bytes

16/04/08 00:19:18 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 143 bytes

16/04/08 00:19:18 INFO DAGScheduler: Got job 4 (save at SparkSQLwithJoin.java:113) with 1 output partitions

16/04/08 00:19:18 INFO DAGScheduler: Final stage: ResultStage 8 (save at SparkSQLwithJoin.java:113)

16/04/08 00:19:18 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 6, ShuffleMapStage 7)

16/04/08 00:19:18 INFO DAGScheduler: Missing parents: List()

16/04/08 00:19:18 INFO DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[24] at createDataFrame at SparkSQLwithJoin.java:109), which has no missing parents

16/04/08 00:19:18 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 66.7 KB, free 574.3 KB)

16/04/08 00:19:18 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 23.5 KB, free 597.8 KB)

16/04/08 00:19:18 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on localhost:52473 (size: 23.5 KB, free: 1773.7 MB)

16/04/08 00:19:18 INFO SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:1006

16/04/08 00:19:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (MapPartitionsRDD[24] at createDataFrame at SparkSQLwithJoin.java:109)

16/04/08 00:19:18 INFO TaskSchedulerImpl: Adding task set 8.0 with 1 tasks

16/04/08 00:19:18 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 6, localhost, partition 0,PROCESS_LOCAL, 1967 bytes)

16/04/08 00:19:18 INFO Executor: Running task 0.0 in stage 8.0 (TID 6)

16/04/08 00:19:19 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/04/08 00:19:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

16/04/08 00:19:19 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/04/08 00:19:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

16/04/08 00:19:19 INFO DefaultWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

16/04/08 00:19:19 INFO FileOutputCommitter: Saved output of task 'attempt_201604080019_0008_m_000000_0' to file:/D:/DT-IMF/testdata/peopleresult/_temporary/0/task_201604080019_0008_m_000000

16/04/08 00:19:19 INFO SparkHadoopMapRedUtil: attempt_201604080019_0008_m_000000_0: Committed

16/04/08 00:19:19 INFO Executor: Finished task 0.0 in stage 8.0 (TID 6). 1165 bytes result sent to driver

16/04/08 00:19:19 INFO DAGScheduler: ResultStage 8 (save at SparkSQLwithJoin.java:113) finished in 0.528 s

16/04/08 00:19:19 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 6) in 526 ms on localhost (1/1)

16/04/08 00:19:19 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool

16/04/08 00:19:19 INFO DAGScheduler: Job 4 finished: save at SparkSQLwithJoin.java:113, took 0.653766 s

16/04/08 00:19:19 INFO DefaultWriterContainer: Job job_201604080019_0000 committed.

16/04/08 00:19:19 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peopleresult on driver

16/04/08 00:19:19 INFO JSONRelation: Listing file:/D:/DT-IMF/testdata/peopleresult on driver

16/04/08 00:19:19 INFO SparkContext: Invoking stop() from shutdown hook

16/04/08 00:19:19 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040

16/04/08 00:19:19 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/04/08 00:19:19 INFO MemoryStore: MemoryStore cleared

16/04/08 00:19:19 INFO BlockManager: BlockManager stopped

16/04/08 00:19:19 INFO BlockManagerMaster: BlockManagerMaster stopped

16/04/08 00:19:19 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/04/08 00:19:19 INFO SparkContext: Successfully stopped SparkContext

16/04/08 00:19:19 INFO ShutdownHookManager: Shutdown hook called

16/04/08 00:19:19 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-8402f870-65c3-45cc-9deb-5b14f8f6ae38

16/04/08 00:19:19 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

 

 

如果与Java版本运行结果相同:

{"name":"Michael","age":20,"score":98}

{"name":"Andy","age":17,"score":95}

 



以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第67课的学习笔记。
王家林老师是Spark、Flink、DockerAndroid技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。

微信公众账号:DT_Spark

联系邮箱18610086859@126.com 

电话:18610086859

QQ:1740415547

微信号:18610086859  

新浪微博:ilovepains


智能推荐

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告