第13课 spark内核架构解密学习笔记


第13课 spark内核架构解密学习笔记  2016.01.16


内容:
1.通过手动绘图的方式解密spark内核架构
2.通过案例验证spark内核架构
3.spark架构思考


第一阶段:彻底精通spark
第二阶段:价值千万超大型spark项目:包含所有spark知识点,编码,测试




driver是运行程序时具有main方法并创建了spark context的环境对象。如果要看一段程序是否是Driver的话,就要看其内部是否有运行Application的main函数/方法,并且一定会创建SparkContext。sparkContext是通往集群的唯一入口,也是开发者使用Spark集群各种功能的唯一通道,SparkContext是整个程序运行调度的核心。注意:是程序运行调度的核心而不是资源调度的核心。SparkContext里有高层调度器和底层调度器。高层调度器就是把整个作业划分成几个阶段,底层调度器是每个阶段里的任务该如何处理;
SchedulerBackend是管理整个集群中为当前程序分配的计算资源(Executor)。
这里讲的Standalone模式。原因:
1)这是Spark自带的。
2)效率比在Yarn和Mesos上高很多,也不需要其他的内容
3)只要Spark一个框架就可以了。


SparkContext里有高层调度器、低层调度器、SchedulerBackend
SparkContext在创建这些对象的同时会向Master注册当前程序,注册OK后会分配资源,根据Action触发的JOB,JOB里有RDD,从后往前推,如果有宽依赖的话,就划分成不同的Stage。stage划分完成后提交给底层调度器TaskScheduler。
一个Stage内部都是计算轮回完全一样,只是计算的数据不同而已。
TaskScheduler拿到任务的集合,就会根据数据的本地性把任务发到Executor执行。
Executor在出问题时会向Driver汇报。运行完后SparkContext会关闭。当然创建的对象也都会关闭。
Driver是应用程序运行调度的核心,因为它负责了整个作业的调度,并且会向Master申请资源来完成具体作业的工作过程。
应用程序就是用户编写的Spark程序及打包后的依赖,包含driver功能的代码和分布在集群中多个节点上的Executor的代码。
应用程序有两个层面,application=driver+executor
Driver是驱动Executor工作的,Executor是具体处理数据分片的,内部是线程池并发地处理数据分片。
应用程序是Driver和Executors的模式,每个应用程序都有Executor代码。
Executor部分代码其实就是main方法中new SparkConf,SparkConf进行配置然后创建SparkContext。这些就是driver部分的代码。
Driver部分的代码:SparkConf+SparkContext
val conf = new SparkConf()
conf.setAppName(“...”)
conf.setMaster(“local”)
cal sc = new SparkContext(conf)


SparkContext创建的过程中做了很多内容,包括DAG Scheduler、TaskScheduler、SchedulerBackend、SparkEnv
textFile flatMap map reduceByKey:这些代码都是Transformation级别的,都会产生RDD,既是RDD操作又会产生RDD,这些代码就是具体的业务实现,就是Executor中具体执行的代码。最后都会被Action触发执行,都是在Worker中的Executor中处理的。
一个应用程序默认只有一个DAG Scheduler。
sparkContext/Spark  


driver是以sparkContext为核心的,可以理解为driver就是sparkContext




◆ Executor是运行在Worker节点上的为当前应用程序而开启的一个进程里面的处理对象,这个对象负责了Task的运行,通过线程池中的线程并发执行和线程复用的方式,线程池中的每一个线程可以运行一个任务,任务完成后回收到线程池中进行线程复用。
hadoop的mr的jvm不能复用
JVM是重量级的,而Spark在一个节点上在默认情况下只为当前程序开启一个JVM。在JVM中是以线程池的方式,通过线程来处理Task任务。
Executor就是进程里的对象。
默认情况下Driver运行在当前提交的机器上。
一个worker默认分为当前的应用程序只开启一个executor,当然可以配置为多个。
Executor靠线程池中的线程来运行Task时,Task肯定要从磁盘或内存中读写数据,每个APPlication都有自己独立的一批executor。
※ 问题:一个worker里executor是多点好还是少点好?
=>看情况。如果只分了一个Executor,占据了大量CPU core,但资源闲置,造成资源浪费。
由于CPU Cores的个数是有限的,如果只开启一个Executor,当任务比较大时内存易OOM。这时最好分成几个不同的Executor。
◆ clustermanager是集群获取资源的外部服务。Spark最开始时没有Yarn模式,也没有Standalone模式,最开始的资源管理服务是mesos。
Spark程序的运行不依赖于ClusterManager。
Spark应用程序向clustermanager注册成功后,Master就提前直接分配好资源,程序运行过程中不需要ClusterManager参与。
ClusterManager是可插拨的。
这种资源分配方式是粗粒度的资源分配方式。
◆ worker就是集群中任何可以运行APP具体操作代码的节点。worker上不会运行程序代码,worker是管理当前节点内存CPU等资源使用状况的,它会接收Master分配资源的指令,并通过executor runner启动一个新进程,进程内有Executor。
worker是工头,clustermanager是项目经理,管理很多worker,Worker下面有很多资源。
Worker管理当前Node的计算资源(主要是CPU和内存),并接受Master的指令来分配具体的计算资源Executor(在新的进程中分配)
ExecutorRunner 是管理新分配的进程。监控Executor运行状况(代理模式)。实质上就是在ExecutorRunner中创建出Executor进程的。
※ worker会不会向master汇报当前node的资源信息?
=> 不会!!!
worker会不断向Master发的心跳,但内容只有worker ID。是用来判断Worker是否活着。
那master怎么知道各节点的资源信息?
=> 分配资源的时候就已经知道了。应用程序在向Master注册时,注册成功后master就会分配资源,分配时就会记录资源,所有的资源都是Master分配的,所以Master当然知道各节点的资源信息了。
只有当Worker出现故障时才会向Master汇报资源情况。
◆ 作业:JOB
JOB就是包含了一系列Task的并行计算。JOB一般由应用程序的Action操作触发,比如saveAsTextFile。
JOB里面是一系列的RDD及作用在RDD的各种operation操作
collect就是一个Action,会触发一个作业。
wordCountOrdered就是一系列的RDD及对RDD的操作。
包括map、flatMap、TextFile,每一步都会至少产生一个RDD。TextFile就会产生hadoopRDD 和MapPartitionsRDD。
JOB都是由Action触发的,触发时前面有一系列的RDD。
action不会产生RDD,只会导致RunJOB。
action前是RDD,是transformation级别的,是lazy级别的执行方式。
如果后面的RDD对前面的RDD进行回溯时是窄依赖的话,就会在内存中进行迭代,这是Spark快的一个很重要的原因。
spark快不仅是因为基于内存。调度,容错才是Spark的精髓的基本点。
窄依赖有一个Range级别,即依赖固定个数的父RDD。所谓固定个数是说不会随着数据规模的扩大而改变。
依赖构成了DAG。如果是宽依赖DAG Scheduler就会划分Stage,Stage内部是基于内存迭代的,当然也可以基于磁盘迭代。
stage内部计算逻辑完全一样,只是计算的数据不同。
任务(Task)就是计算一个数据分片的,
数据分片:例如从HDFS上读取数据时默认数据分片就是128MB。
※ 一个数据分片是否精准地等于一个Block的大小(默认128MB)?
=> 一般情况下都不等于,因为最后一个分片会跨两个Block。


下面实际运行一个程序:
1)启动Spark集群:  start-all.sh
启动historyserver:  start-historyserver.sh
2)在浏览器中打开:
master:50070 (HDFS)
master:8080 (计算框架的资源管理器)
master:18080 (historyserver)
可以看到以前运行过的JOB详情:


这里有两个stage是因为reduceByKey产生了shuffle,所以划分成了两个Stage。
Stage内部textFile、flatMap、Map默认都是基于内存迭代(内存不够时可会基于磁盘迭代)
Collect操作触发作业,作业从后往前推,Stage内部都有一系列的任务。
点击Stage名,就可以看到Stage的详细情况,这里可以看到有88个并行任务。
A)在DAG Visuallization里可以清楚地看到运行的代码。
B)在Tasks里可以看到Task分布在不同的节点上的Executor中。
C)在Aggregated Metrics By Executor中可以看到有4个Executor。这是因为默认情况下每个Worker为当前应用程序只分配一个Executor。Spark集群有4个Worker,所以有4个Executor。在Adress中可以看到每个Executor位于哪个Worker上。在Total Tasks中可以看到运行了哪些任务。同时可以看到Failed Tasks/Successed Tasks。
★ 在Stage中看不到SparkContext和SparkConf。因为SparkContext和SparkConf是Driver。
★ 一个application里可以有多个JOB。因为一个应用程序中可以有不同的Action。
一般一个Action操作就会对应一个JOB。
特殊情况下:
checkpoint也可以导致JOB; 排序时进行Range范围的划分也会触发JOB


Spark内核架构图
★ 默认情况下有一台机器专门用来提交spark程序,这台机器一般一定与spark cluster在同一个网络环境中(因为Driver要驱动Application运行,要指挥worker工作,要频繁与Executor交互通信),其配置和普通worker一致(因为要频繁发送接收任务,需要较大的CPU MEM),
spark运行有两种模式cluster/client,默认是client模式。因为在client模式下可以看到更多的交互信息,即运行过程的信息。
指定deploy_mode为cluster模式的话,master会确定Worker中的一台机器作为Driver,Mastter分配的第一个Executor其实就是Driver级别的Executor。
※ 千万不要在eclipse/IDEA中开发好就把当机提交。
=> 会出现Task丢失、序列化故障、超时等问题。
为什么不能够在IDE集成开发环境中直接发布Spark程序到Spark集群中
第一点:内存和Cores的限制,默认情况下Spark程序的Driver会在提交Spark程序的机器上,所以如果在IDE中提交程序的话,那IDE机器就必须非常强大
第二点:Driver要指挥Workers的运行并频繁的发生通信,如果开发环境IDE和Spark集群不在同样一个网络下,就会出现任务丢失,运行缓慢等多种不必要的问题;
第三点:这是不安全的!


提交Spark程序用的机器上application(各种依赖外部资源,例如*.SO库,file),使用spark submit运行程序(可以配置运行时的各种参数,例如memory,cores)
※ 实际生产环境下会不会手动提交submit?
实际生产环境下用自动化脚本配置和提交程序。当然当前机要装spark,但这里安装的Spark不属于集群(不安装Spark的话就无法运行Spark-submit)。
★ driver(核心是sparkContext),通过driver执行应用程序。
spark1.6.x内部实现是通过rpc,rpc底层是akka.
※ 那为什么还要做rpc? => 以后可能不用akka。可能是netty?。
※ driver需要像master一样做HA吗
=> client模式运行的话,Driver无法做HA。
cluster模式在提交spark-submit时可以指定--supervise,如果是supervise的话,当前Driver故障时会自动再启动。 


★ 编写程序时先是创建sparkConf->然后是sparkContext
在程序开始时就要实例化sparkContext,
sparkContext要做三件事:
对sparkContext本身来讲,最重要的是创建DAG Scheduler/ task scheduler/ scheduler backend
Task Scheduler是负责一个作业内部运行的。
SchedulerBackend是管理计算资源的。
sparkContext在实例化的过程中一定要注册当前程序给master,master接受注册,如果没有问题,master会为当前程序分配AppID并分配计算资源。


★ spark Cluster(核心:masters)
master接收用户提交的程序,并发指令给worker为当前程序分配计算资源。每个worker所在节点默认为当前程序分配一个executor,在excutor中通过线程池并发执行,
※ 程序还未执行master怎么知道分配多少资源?
1,spark-env.sh和spark-defaults.sh
2,spark-submit提供的参数
3,程序中SparkConf配置的参数


master通知worker按照要求启动executor,默认启动一个Executor。
WorkerNode上Worker进程通过一个Proxy为ExecutorRunner的对象实例来远程启动ExecutorBackend进行。
Executor里有线程池。
实际在工作时Executor接收一个Task会通过TaskRunner来封装task,然后从ThreadPool中获取一条线程执行task,执行完后线程被回收复用
TaskRunner在封装时(具体运行时)会把代码反序列化执行具体内容。


★ 一般情况当通过action触发job时sparkContext会通过DAGscheduler来把job中的RDD构成的DAG划分成不同的stage(宽依赖),每个stage内部是一系列业务逻辑完全相同 但处理数据不同的tasks,构成了task set(任务集合)。
Task Set底层交给task scheduler。
task scheduler和schedulerbackend负责具体任务的运行(遵循数据本地性)
数据本地性是在什么时候确定的?就是说Task运行在哪台机器上是什么时候确定的?
=>DAG Scheduler划分Stage时确定的。
TaskScheduler会把每个Stage内部的一系列的Task发送给Executor。TaskScheduler就是根据数据本地性来确定把哪些任务发给哪个Executor的。
大数据一句精髓的话就是数据不动代码动。
触发JOB前已经通过SchedulerBackend获得了Executor了。
执行时运行的Task有两种类型:
1) 最后一个stage中的task称为ResultTask,产生job的结果,其它前面的stage中的task都是shuffleMapTask,为下一阶段的stage做准备,相当于MapReduce的Mapper
2)整个spark程序的运行,就是DAGScheduler把JOB分成不同的stage,提交taskset给taskScheduler,进而提交给executor执行(符合数据本地性),每个task会RDD中的一个partition,partition来具体执行我们定义的一系列同一个stage内部的函数,以此类推直到整个程序运行完成!!!
Spark是对MR的更精致和高效的实现。

Spark内核架构图


以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第13课的学习笔记。
王家林老师是SparkFlinkDockerAndroid技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。

微信公众账号:DT_Spark

联系邮箱18610086859@126.com 

电话:18610086859

QQ:1740415547

微信号:18610086859  

新浪微博:ilovepains



注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号