spark standalone模式
--准备:
jdk (1.7之上)
scala (2.10.4)
hadoop2.x(至少hdfs)
spark standalone
--配置conf文件:
[hadoop@node1 conf]$ mv spark-env.sh.template spark-env.sh
[hadoop@node1 conf]$ mv slaves.template slaves
[hadoop@node1 conf]$ mv spark-defaults.conf.template spark-defaults.conf
--启动hadoop
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
--启动spark
"$sbin"/start-master.sh
"$sbin"/start-slaves.sh
spark-shell
spark-shell --master spark://node1:7077
上面的等价,spark-shell 因为它读取了spark-defaults.conf中的 spark.master spark://node1:7077
--web监控
app:
http://node1: 8080
job:
http://node1:4040
---------------------------------------------------------------
--附配置:
---------------------------------------------------------------
[hadoop@node1 conf]$ vi slaves
node1
[hadoop@node1 conf]$ vi spark-defaults.conf
spark.master spark://node1:7077
[hadoop@node1 conf]$ vi spark-env.sh
# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
JAVA_HOME=/home/hadoop/jdk1.7.0_67
SCALA_HOME=/home/hadoop/scala-2.10.4
HADOOP_CONF_DIR=/home/hadoop/hadoop-2.7.1/etc/hadoop
SPARK_PID_DIR=/home/hadoop/dirtmp
SPARK_MASTER_IP=node1
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=1 #cpu核心数量
SPARK_WORKER_MEMORY=1024m #生产中要改大,默认1G
SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081
SPARK_WORKER_INSTANCES=1
---------------------------------------------------------------
--spark语法测试:
---------------------------------------------------------------
--启动单节点hadoop
[hadoop@node1 ~]$ hadoop-daemon.sh start namenode
[hadoop@node1 ~]$ hadoop-daemon.sh start datanode
[hadoop@node1 ~]$ hadoop-daemon.sh stop namenode
[hadoop@node1 ~]$ hadoop-daemon.sh stop datanode
--启动单节点spark
[hadoop@node1 sbin]$ start-master.sh
[hadoop@node1 sbin]$ start-slaves.sh
[hadoop@node1 logs]$ stop-master.sh
[hadoop@node1 logs]$ stop-slaves.sh
[hadoop@node1 sbin]$ jps
3526 DataNode
3912 Jps
3699 Master
2534 NameNode
3873 Worker
--数据源
[hadoop@node1 test]$ vi aaa.txt
hello hadoop
hellp spark
hadoop spark
hive hadoop
sqoop hive
[hadoop@node1 ~]$ hadoop fs -ls /test/input
-rw-r--r-- 1 hadoop supergroup 61 2015-12-09 12:42 /test/input/aaa.txt
[hadoop@node1 ~]$ spark-shell
--2G的虚拟机基本用完
[hadoop@node1 ~]$ free -h
total used free shared buffers cached
Mem: 1.8G 1.6G 241M 248K 29M 318M
-/+ buffers/cache: 1.3G 588M
Swap: 1.9G 0B 1.9G
[hadoop@node1 spark-1.5.2-bin-hadoop2.6]$ spark-shell
...
Spark context available as sc.
SQL context available as sqlContext.
...
scala> val rdd=sc.textFile("hdfs://node1:8020/test/input")
scala> rdd.collect
res2: Array[String] = Array(hello hadoop, hellp spark, hadoop spark, hive hadoop, sqoop hive)
scala> rdd.first
res3: String = hello hadoop
scala> rdd.count
res4: Long = 5
scala> val wordcount= rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
scala> wordcount.take(10)
res5: Array[(String, Int)] = Array((hive,2), (hello,1), (sqoop,1), (spark,2), (hadoop,3), (hellp,1)) --结果是没有排序的
scala> val wordsort= wordcount.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
scala> wordsort.take(10)
res6: Array[(String, Int)] = Array((hadoop,3), (hive,2), (spark,2), (hello,1), (sqoop,1), (hellp,1)) --结果按第二列排序
scala> wordsort.take(1)
res7: Array[(String, Int)] = Array((hadoop,3))
val rdd=sc.textFile("hdfs://node1:8020/test/input")
--原始数据,一行为一个元素
scala> rdd.collect
res2: Array[String] = Array(hello hadoop, hellp spark, hadoop spark, hive hadoop, sqoop hive)
scala> rdd.map(_.split(" ")).collect
res6: Array[Array[String]] = Array(Array(hello, hadoop), Array(hellp, spark), Array(hadoop, spark), Array(hive, hadoop), Array(sqoop, hive))
--一个单词为一个元素
scala> rdd.flatMap(_.split(" ")).collect
res8: Array[String] = Array(hello, hadoop, hellp, spark, hadoop, spark, hive, hadoop, sqoop, hive)
--用map方法封装:key为单词,value为1
scala> rdd.flatMap(_.split(" ")).map((_,1)).collect
res10: Array[(String, Int)] = Array((hello,1), (hadoop,1), (hellp,1), (spark,1), (hadoop,1), (spark,1),--map (hive,1), (hadoop,1), (sqoop,1), (hive,1))
--用reduceByKey求和
scala> rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res11: Array[(String, Int)] = Array((hive,2), (hello,1), (sqoop,1), (spark,2), (hadoop,3), (hellp,1))
--sortByKey(true)排序,h到s
scala> rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortByKey(true).collect
res19: Array[(String, Int)] = Array((hadoop,3), (hello,1), (hellp,1), (hive,2), (spark,2), (sqoop,1))
--倒序排列,先k,v倒过来,再sortByKey,再把k,v倒过来,map().sortByKey(false).map()
--x当成是一个数组的元素,如(hadoop,3), x._2是第二个元素,x._1是第一个元素
scala> rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortByKey(true).map(x=>(x._2,x._1)).collect
res23: Array[(Int, String)] = Array((3,hadoop), (1,hello), (1,hellp), (2,hive), (2,spark), (1,sqoop))
scala> rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortByKey(true).map(x=>(x._2,x._1)).sortByKey(false).collect
res24: Array[(Int, String)] = Array((3,hadoop), (2,hive), (2,spark), (1,hello), (1,hellp), (1,sqoop))
scala> rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortByKey(true).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).collect
res21: Array[(String, Int)] = Array((hadoop,3), (hive,2), (spark,2), (hello,1), (hellp,1), (sqoop,1))
--rownum<=3前3个
scala> rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortByKey(true).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).take(3)
res26: Array[(String, Int)] = Array((hadoop,3), (hive,2), (spark,2))
--结果保存到hdfs
scala> rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortByKey(true).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile("hdfs://node1:8020/test/output")
[hadoop@node1 ~]$ hadoop fs -ls /test/output
15/12/10 15:32:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2015-12-10 15:31 /test/output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 11 2015-12-10 15:31 /test/output/part-00000
-rw-r--r-- 1 hadoop supergroup 49 2015-12-10 15:31 /test/output/part-00001
[hadoop@node1 ~]$ hadoop fs -cat /test/output/part-00000
(hadoop,3)
[hadoop@node1 ~]$ hadoop fs -cat /test/output/part-00001
(hive,2)
(spark,2)
(hello,1)
(hellp,1)
(sqoop,1)
--hdfs中保存到本地?
[hadoop@node1 ~]$ hadoop fs -getmerge hdfs://node1:8020/test/output/part-00000 hdfs://node1:8020/test/output/part-00001 /home/hadoop/test/output
[hadoop@node1 test]$ cat output
(hadoop,3)
(hive,2)
(spark,2)
(hello,1)
(hellp,1)
(sqoop,1)
--cache缓存起来
scala> val rdd=sc.textFile("hdfs://node1:8020/test/input")
scala> rdd.collect
scala> rdd.cache res30: rdd.type = MapPartitionsRDD[95] at textFile at <console>:21
scala> rdd.collect --第二次比第一次快很多
http://node1:4040/storage/
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。