spark版本是1.3+
Woker启动Executor过程并向Driver注册时序图:
1.launchExecutor Master发送消息让Worker启动Executor
2.Worker new()
Master 发送给Worker的消息,让Worker启动Execitor,LaunchExecutor是一个Case Class,里面封装以后要启动的Executor的信息
new ExecutorRunner 创建ExcutorRunner,将参数都放到其中,然后在通过他启动Executor
注册ExecutorID -> Executor放到一个map中,对应关系
executors(appId + "/" + execId) = manager
3.Worker start()
调用ExecutorRunner的start方法来启动Executor Java子进程
manager.start()
4.ExecutorRunner new()
先创建一个线程对象,然后通过一个线程来启动一个java子进程
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
5.ExecutorRunner
workerThread.start()
6.ExecutorRunner
程序经常也会遇到进程挂掉的情况,一些状态没有正确的保存下来,这时候就需要在JVM关掉的时候执行一些清理现场的代码 Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
7.ExecutorRunner fetchAndRunExecutor()
线程对象调用该方法启动java子进程
8.ExecutorRunner fetchAndRunExecutor()
保存参数
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
sparkHome.getAbsolutePath, substituteVariables)
9.ExecutorRunner
启动一个子进程 -> CoarseGrainedExecutorBackend的main方法
process = builder.start()
10.CoarseGrainedExecutorBackend main()
Executor进程执行的入口
11.CoarseGrainedExecutorBackend run()
调用RUN方法
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
12.CoarseGrainedExecutorBackend preStart() CoarseGrainedExecutorBackend的生命周期方法
跟Driver建立连接
driver = context.actorSelection(driverUrl)
13.CoarseGrainedSchedulerBackend RegisterExecutor
Executor向DriverActor发送消息
case RegisterExecutor(executorId, hostPort, cores, logUrls) ...
14.DriverActor CoarseGrainedExecutorBackend
DriverActor发送给Executor的消息,告诉他已经注册成功
case RegisteredExecutor => ...
15.DriverActor CoarseGrainedSchedulerBackend
查看是否有任务需要提交(DriverActor -> Executor)
makeOffers()
16.CoarseGrainedExecutorBackend new()
创建一个Executor实例,用来执行业务逻辑
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
17.Executor
初始化线程池
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
18.Executor
Executor向DriverActor发送心跳,是否存活
startDriverHeartbeater()
推荐链接
发表评论