Spark Worker原理和源码剖析解密:Worker工作流程图、启动Driver源码解密。
转载自:https://jingyan.baidu.com/article/f96699bbdeafbd894f3c1b7a.html
Spark Worker原理和源码剖析解密:Worker工作流程图、启动Driver源码解密。 转载自:https://jingyan.baidu.com/article/f96699bbdeafbd894f3c1b7a.html 方法/步骤
创建Driver的工作目录: /** Starts a thread to run and manage the driver. */ private[worker] def start() = { ? new Thread("DriverRunner for " + driverId) { ? ? override def run() { ? ? ? try { ? ? ? ? val driverDir = createWorkingDirectory() 创建Driver的工作目录 /** ??* Creates the working directory for this driver. ??* Will throw an exception if there are errors preparing the directory. ??*/ private def createWorkingDirectory(): File = { //创建Driver的工作目录 ??val driverDir = new File(workDir,driverId) ??if (!driverDir.exists() && !driverDir.mkdirs()) { ? ? throw new IOException("Failed to create directory " + driverDir) ??} ??driverDir } 代码打成Jar包 val localJarFilename = downloadUserJar(driverDir) 下载Jar文件,返回Jar在本地的路径,将程序打成JAR包上传到HDFS上,这样每台机器均可以从HDFS上下载。 /** * Download the user jar into the supplied directory and return its local path. * Will throw an exception if there are errors downloading the jar. */ private def downloadUserJar(driverDir: File): String = { // ??val jarPath = new Path(driverDesc.jarUrl) //从HDFS上获取Jar文件。 ??val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) ??val destPath = new File(driverDir.getAbsolutePath,jarPath.getName) ??val jarFileName = jarPath.getName ??val localJarFile = new File(driverDir,jarFileName) ??val localJarFilename = localJarFile.getAbsolutePath ??if (!localJarFile.exists()) { // May already exist if running multiple workers on one node ? ? logInfo(s"Copying user jar?jarPathtodestPath") ? ? Utils.fetchFile( ? ?? ?driverDesc.jarUrl, ? ?? ?driverDir, ? ?? ?conf, ? ?? ?securityManager, ? ?? ?hadoopConf, ? ?? ?System.currentTimeMillis(), ? ?? ?useCache = false) ??} ??if (!localJarFile.exists()) { // Verify copy succeeded ? ? throw new Exception(s"Did not see expected jar?jarFileNameindriverDir") ??} ??localJarFilename } 有些变量在开始的时候是占位符,因为还没有初始化,所以在实际运行的时候要初始化。 def substituteVariables(argument: String): String = argument match { ??case "{{WORKER_URL}}" => workerUrl? ?? ??? ??case "{{USER_JAR}}" => localJarFilename //前面已经下载好了。 ??case other => other } command主要就是构建进程执行类的入口 // TODO: If we add ability to submit multiple jars they should also be added here // driverDesc.command指定启动的时候运行什么类。 ??val builder = CommandUtils.buildProcessBuilder(driverDesc.command,securityManager, ? ? driverDesc.mem,sparkHome.getAbsolutePath,substituteVariables) //launchDriver ??launchDriver(builder,driverDir,driverDesc.supervise) } launchDriver的源码如下:将stdout和stderr重定向到了baseDir之下了,这样就可以通过log去查看之前的执行情况 private def launchDriver(builder: ProcessBuilder,baseDir: File,supervise: Boolean) { ??builder.directory(baseDir) ??def initialize(process: Process): Unit = { ? ? // Redirect stdout and stderr to files ? ? val stdout = new File(baseDir,"stdout") ? ? CommandUtils.redirectStream(process.getInputStream,stdout) ? ? val stderr = new File(baseDir,"stderr") ? ?? ???//将command格式化一下 ? ? val formattedCommand = builder.command.asScala.mkString(""","" "",""") ? ? val header = "Launch Command: %sn%snn".format(formattedCommand,"=" * 40) ? ? Files.append(header,stderr,UTF_8) ? ? CommandUtils.redirectStream(process.getErrorStream,stderr) ??} ??runCommandWithRetry(ProcessBuilderLike(builder),initialize,supervise) } ProcessBuilderLike静态方法: private[deploy] object ProcessBuilderLike { //apply方法复写了start方法 ??def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike { ? ? override def start(): Process = processBuilder.start() ? ? override def command: Seq[String] = processBuilder.command().asScala ??} } ProcessBuilderLike源码如下: // Needed because ProcessBuilder is a final class and cannot be mocked private[deploy] trait ProcessBuilderLike { ??def start(): Process ??def command: Seq[String] } 而在runCommandWithRetry方法中: //传入ProcessBuilderLike的接口 def runCommandWithRetry( ? ? command: ProcessBuilderLike,initialize: Process => Unit,supervise: Boolean): Unit = { ??// Time to wait between submission retries. ??var waitSeconds = 1 ??// A run of this many seconds resets the exponential back-off. ??val successfulRunDuration = 5 ??var keepTrying = !killed ??while (keepTrying) { ? ? logInfo("Launch Command: " + command.command.mkString(""",""")) ? ? synchronized { ? ?? ?if (killed) { return } ? ?? ???//调用ProcessBuilderLike的start()方法 ? ?? ?process = Some(command.start()) ? ?? ?initialize(process.get) ? ? } ? ? val processStart = clock.getTimeMillis() //然后再调用process.get.waitFor()来完成启动Driver。 ? ? val exitCode = process.get.waitFor() ? ? if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) { ? ?? ?waitSeconds = 1 ? ? } ??if (supervise && exitCode != 0 && !killed) { ? ? logInfo(s"Command exited with status?exitCode,re?launchingafterwaitSeconds s.") ? ? sleeper.sleep(waitSeconds) ? ? waitSeconds = waitSeconds * 2 // exponential back-off ??} ??keepTrying = supervise && exitCode != 0 && !killed ??finalExitCode = Some(exitCode) } 如果Driver的状态有变,则会给自己发条消息 worker.send(DriverStateChanged(driverId,state,finalException)) Worker端 case driverStateChanged @ DriverStateChanged(driverId,exception) => { //处理Driver State Changed ??handleDriverStateChanged(driverStateChanged) } Master端处理的时候,还要给Driver发送消息 case ExecutorStateChanged(appId,message,exitStatus) => { ??val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) ??execOption match { ? ? case Some(exec) => { ? ?? ?val appInfo = idToApp(appId) ? ?? ?val oldState = exec.state ? ?? ?exec.state = state ? ?? ?if (state == ExecutorState.RUNNING) { ? ?? ???assert(oldState == ExecutorState.LAUNCHING, ? ?? ?? ? s"executor?execIdstatetransferfromoldState to RUNNING is illegal") ? ?? ???appInfo.resetRetryCount() ? ?? ?} //给Driver发送消息告诉Driver,Executor状态发生改变了。 ? ?? ?exec.application.driver.send(ExecutorUpdated(execId,exitStatus)) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |