spark版本是1.3+

Woker启动Executor过程并向Driver注册时序图:

1.launchExecutor Master发送消息让Worker启动Executor

2.Worker new()

Master 发送给Worker的消息,让Worker启动Execitor,LaunchExecutor是一个Case Class,里面封装以后要启动的Executor的信息

new ExecutorRunner 创建ExcutorRunner,将参数都放到其中,然后在通过他启动Executor

注册ExecutorID -> Executor放到一个map中,对应关系

executors(appId + "/" + execId) = manager

3.Worker start()

调用ExecutorRunner的start方法来启动Executor Java子进程

manager.start() 

4.ExecutorRunner new()

先创建一个线程对象,然后通过一个线程来启动一个java子进程

workerThread = new Thread("ExecutorRunner for " + fullId) {

      override def run() { fetchAndRunExecutor() }

    }

5.ExecutorRunner 

workerThread.start()

6.ExecutorRunner 

程序经常也会遇到进程挂掉的情况,一些状态没有正确的保存下来,这时候就需要在JVM关掉的时候执行一些清理现场的代码 Runtime.getRuntime.addShutdownHook(shutdownHook)

shutdownHook = new Thread() {

      override def run() {

        killProcess(Some("Worker shutting down"))

      }

    }

7.ExecutorRunner fetchAndRunExecutor()

线程对象调用该方法启动java子进程

8.ExecutorRunner fetchAndRunExecutor()

保存参数

val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,

        sparkHome.getAbsolutePath, substituteVariables)

9.ExecutorRunner 

启动一个子进程 -> CoarseGrainedExecutorBackend的main方法

process = builder.start()

10.CoarseGrainedExecutorBackend main()

Executor进程执行的入口

11.CoarseGrainedExecutorBackend run()

调用RUN方法

run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

12.CoarseGrainedExecutorBackend preStart() CoarseGrainedExecutorBackend的生命周期方法

跟Driver建立连接

driver = context.actorSelection(driverUrl)

13.CoarseGrainedSchedulerBackend RegisterExecutor

Executor向DriverActor发送消息

 case RegisterExecutor(executorId, hostPort, cores, logUrls) ...

14.DriverActor CoarseGrainedExecutorBackend

DriverActor发送给Executor的消息,告诉他已经注册成功

case RegisteredExecutor => ...

15.DriverActor CoarseGrainedSchedulerBackend

查看是否有任务需要提交(DriverActor -> Executor)

makeOffers()

16.CoarseGrainedExecutorBackend new()

创建一个Executor实例,用来执行业务逻辑

executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

17.Executor 

初始化线程池

val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

18.Executor

Executor向DriverActor发送心跳,是否存活

startDriverHeartbeater()

推荐链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。