加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

数据处理-Spark Worker工作流程图启动Driver源码解读

发布时间:2020-12-14 04:52:48 所属栏目:大数据 来源:网络整理
导读:Spark Worker原理和源码剖析解密:Worker工作流程图、启动Driver源码解密。 转载自:https://jingyan.baidu.com/article/f96699bbdeafbd894f3c1b7a.html 方法/步骤 1 Worker中Driver和Executor注册过程 Worker本身核心的作用是:管理当前机器的内存和CPU等资

Spark Worker原理和源码剖析解密:Worker工作流程图、启动Driver源码解密。

转载自:https://jingyan.baidu.com/article/f96699bbdeafbd894f3c1b7a.html

方法/步骤

  1. 1

    Worker中Driver和Executor注册过程

    Worker本身核心的作用是:管理当前机器的内存和CPU等资源,接受Master的指令来启动Driver,或者启动Executor。

    如何启动Driver

    如何启动Executor

    如果Driver或者Executor有挂掉了,则Master就可以通过schedule再次调度资源。

    Worker本身在实际运行的时候作为一个进程。实现RPC通信的。

    extends ThreadSafeRpcEndpoint with Logging {

    Master通过RPC协议将消息发给Worker,Worker通过receive接收到了Master发过来的消息。

    case LaunchDriver(driverId,driverDesc) => {

    ? logInfo(s"Asked to launch driver $driverId")

    ? val driver = new DriverRunner(

    ? ? conf,

    ? ? driverId,

    ? ? workDir,//工作目录

    ? ? sparkHome,

    ? ? driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command,conf)),

    ? ? self,

    ? ? workerUri,

    ? ? securityMgr)

    ? drivers(driverId) = driver

    //启动DriverRunner

    ? driver.start()

    ? coresUsed += driverDesc.cores

    ? memoryUsed += driverDesc.mem

    }

    根据DriverId来具体管理DriverRunner。DriverRunner内部通过开辟线程的方式来启动了另外的一个线程。DriverRunner是Driver所在进程中Driver本身的Process。

    DriverId DriverRunner

    val drivers = new HashMap[String,DriverRunner]

  2. 2

    DriverRunner:

    管理Driver的执行,包括在Driver失败的时候自动重启,主要是指在standaolone模式。Worker会负责重新启动Driver。Cluster中的Driver失败的时候,如果supervise为true,则启动Driver的Worker会负责重新启动该Driver。

    /**

    ?* Manages the execution of one driver,including automatically restarting the driver on failure.

    ?* This is currently only used in standalone cluster deploy mode.

    ?*/

  3. 3

创建Driver的工作目录:

/** Starts a thread to run and manage the driver. */

private[worker] def start() = {

? new Thread("DriverRunner for " + driverId) {

? ? override def run() {

? ? ? try {

? ? ? ? val driverDir = createWorkingDirectory()

  • 4
  • 创建Driver的工作目录

    /**

    ??* Creates the working directory for this driver.

    ??* Will throw an exception if there are errors preparing the directory.

    ??*/

    private def createWorkingDirectory(): File = {

    //创建Driver的工作目录

    ??val driverDir = new File(workDir,driverId)

    ??if (!driverDir.exists() && !driverDir.mkdirs()) {

    ? ? throw new IOException("Failed to create directory " + driverDir)

    ??}

    ??driverDir

    }

  • 5
  • 代码打成Jar包

    val localJarFilename = downloadUserJar(driverDir)

  • 6

    下载Jar文件,返回Jar在本地的路径,将程序打成JAR包上传到HDFS上,这样每台机器均可以从HDFS上下载。

    /**

    * Download the user jar into the supplied directory and return its local path.

    * Will throw an exception if there are errors downloading the jar.

    */

    private def downloadUserJar(driverDir: File): String = {

    //

    ??val jarPath = new Path(driverDesc.jarUrl)

    //从HDFS上获取Jar文件。

    ??val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

    ??val destPath = new File(driverDir.getAbsolutePath,jarPath.getName)

    ??val jarFileName = jarPath.getName

    ??val localJarFile = new File(driverDir,jarFileName)

    ??val localJarFilename = localJarFile.getAbsolutePath

    ??if (!localJarFile.exists()) { // May already exist if running multiple workers on one node

    ? ? logInfo(s"Copying user jar?jarPathtodestPath")

    ? ? Utils.fetchFile(

    ? ?? ?driverDesc.jarUrl,

    ? ?? ?driverDir,

    ? ?? ?conf,

    ? ?? ?securityManager,

    ? ?? ?hadoopConf,

    ? ?? ?System.currentTimeMillis(),

    ? ?? ?useCache = false)

    ??}

    ??if (!localJarFile.exists()) { // Verify copy succeeded

    ? ? throw new Exception(s"Did not see expected jar?jarFileNameindriverDir")

    ??}

    ??localJarFilename

    }

  • 7

    有些变量在开始的时候是占位符,因为还没有初始化,所以在实际运行的时候要初始化。

    def substituteVariables(argument: String): String = argument match {

    ??case "{{WORKER_URL}}" => workerUrl? ?? ???

    ??case "{{USER_JAR}}" => localJarFilename //前面已经下载好了。

    ??case other => other

    }

  • 8

    command主要就是构建进程执行类的入口

    // TODO: If we add ability to submit multiple jars they should also be added here

    // driverDesc.command指定启动的时候运行什么类。

    ??val builder = CommandUtils.buildProcessBuilder(driverDesc.command,securityManager,

    ? ? driverDesc.mem,sparkHome.getAbsolutePath,substituteVariables)

    //launchDriver

    ??launchDriver(builder,driverDir,driverDesc.supervise)

    }

  • 9

    launchDriver的源码如下:将stdout和stderr重定向到了baseDir之下了,这样就可以通过log去查看之前的执行情况

    private def launchDriver(builder: ProcessBuilder,baseDir: File,supervise: Boolean) {

    ??builder.directory(baseDir)

    ??def initialize(process: Process): Unit = {

    ? ? // Redirect stdout and stderr to files

    ? ? val stdout = new File(baseDir,"stdout")

    ? ? CommandUtils.redirectStream(process.getInputStream,stdout)

    ? ? val stderr = new File(baseDir,"stderr")

    ? ?? ???//将command格式化一下

    ? ? val formattedCommand = builder.command.asScala.mkString(""","" "",""")

    ? ? val header = "Launch Command: %sn%snn".format(formattedCommand,"=" * 40)

    ? ? Files.append(header,stderr,UTF_8)

    ? ? CommandUtils.redirectStream(process.getErrorStream,stderr)

    ??}

    ??runCommandWithRetry(ProcessBuilderLike(builder),initialize,supervise)

    }

  • 10

    ProcessBuilderLike静态方法:

    private[deploy] object ProcessBuilderLike {

    //apply方法复写了start方法

    ??def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike {

    ? ? override def start(): Process = processBuilder.start()

    ? ? override def command: Seq[String] = processBuilder.command().asScala

    ??}

    }

  • 11

    ProcessBuilderLike源码如下:

    // Needed because ProcessBuilder is a final class and cannot be mocked

    private[deploy] trait ProcessBuilderLike {

    ??def start(): Process

    ??def command: Seq[String]

    }

  • 12

    而在runCommandWithRetry方法中:

    //传入ProcessBuilderLike的接口

    def runCommandWithRetry(

    ? ? command: ProcessBuilderLike,initialize: Process => Unit,supervise: Boolean): Unit = {

    ??// Time to wait between submission retries.

    ??var waitSeconds = 1

    ??// A run of this many seconds resets the exponential back-off.

    ??val successfulRunDuration = 5

    ??var keepTrying = !killed

    ??while (keepTrying) {

    ? ? logInfo("Launch Command: " + command.command.mkString(""","""))

    ? ? synchronized {

    ? ?? ?if (killed) { return }

    ? ?? ???//调用ProcessBuilderLike的start()方法

    ? ?? ?process = Some(command.start())

    ? ?? ?initialize(process.get)

    ? ? }

    ? ? val processStart = clock.getTimeMillis()

    //然后再调用process.get.waitFor()来完成启动Driver。

    ? ? val exitCode = process.get.waitFor()

    ? ? if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {

    ? ?? ?waitSeconds = 1

    ? ? }

    ??if (supervise && exitCode != 0 && !killed) {

    ? ? logInfo(s"Command exited with status?exitCode,re?launchingafterwaitSeconds s.")

    ? ? sleeper.sleep(waitSeconds)

    ? ? waitSeconds = waitSeconds * 2 // exponential back-off

    ??}

    ??keepTrying = supervise && exitCode != 0 && !killed

    ??finalExitCode = Some(exitCode)

    }

  • 13

    如果Driver的状态有变,则会给自己发条消息

    worker.send(DriverStateChanged(driverId,state,finalException))

  • 14

    Worker端

    case driverStateChanged @ DriverStateChanged(driverId,exception) => {

    //处理Driver State Changed

    ??handleDriverStateChanged(driverStateChanged)

    }

  • Master端处理的时候,还要给Driver发送消息

    case ExecutorStateChanged(appId,message,exitStatus) => {

    ??val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))

    ??execOption match {

    ? ? case Some(exec) => {

    ? ?? ?val appInfo = idToApp(appId)

    ? ?? ?val oldState = exec.state

    ? ?? ?exec.state = state

    ? ?? ?if (state == ExecutorState.RUNNING) {

    ? ?? ???assert(oldState == ExecutorState.LAUNCHING,

    ? ?? ?? ? s"executor?execIdstatetransferfromoldState to RUNNING is illegal")

    ? ?? ???appInfo.resetRetryCount()

    ? ?? ?}

    //给Driver发送消息告诉Driver,Executor状态发生改变了。

    ? ?? ?exec.application.driver.send(ExecutorUpdated(execId,exitStatus))

  • (编辑:李大同)

    【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

      推荐文章
        热点阅读