scala – SparkAppHandle监听器未被调用
发布时间:2020-12-16 18:47:46 所属栏目:安全 来源:网络整理
导读:我正在尝试使用play框架在 scala中的kubernetes集群上提交spark 2.3作业. 我还试过一个简单的scala程序而不使用play框架. 这项工作正在提交给k8集群,但是stateChanged infoChanged没有被调用.我也希望能够获得handle.getAppId. 我正在使用spark submit来提交
我正在尝试使用play框架在
scala中的kubernetes集群上提交spark 2.3作业.
我还试过一个简单的scala程序而不使用play框架. 这项工作正在提交给k8集群,但是stateChanged& infoChanged没有被调用.我也希望能够获得handle.getAppId. 我正在使用spark submit来提交作业,如here所述 $bin/spark-submit --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> --deploy-mode cluster --name spark-pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=5 --conf spark.kubernetes.container.image=<spark-image> local:///path/to/examples.jar 这是工作的代码: def index = Action { try { val spark = new SparkLauncher() .setMaster("my k8 apiserver host") .setVerbose(true) .addSparkArg("--verbose") .setMainClass("myClass") .setAppResource("hdfs://server/inputs/my.jar") .setConf("spark.app.name","myapp") .setConf("spark.executor.instances","5") .setConf("spark.kubernetes.container.image","mydockerimage") .setDeployMode("cluster") .startApplication(new SparkAppHandle.Listener(){ def infoChanged(handle: SparkAppHandle): Unit = { System.out.println("Spark App Id [" + handle.getAppId + "] Info Changed. State [" + handle.getState + "]") } def stateChanged(handle: SparkAppHandle): Unit = { System.out.println("Spark App Id [" + handle.getAppId + "] State Changed. State [" + handle.getState + "]") if (handle.getState.toString == "FINISHED") System.exit(0) } } ) Ok(spark.getState().toString()) } catch { case NonFatal(e)=>{ println("failed with exception: " + e) } } Ok } 解决方法
Spark Launcher架构概述
SparkLauncher允许以编程方式运行spark-submit命令.它作为JVM中的单独子线程运行.您需要在客户端主函数中等待,直到驱动程序在K8中启动并且您获得侦听器回调.否则,JVM主线程存在查杀客户端而不报告任何内容. ----------------------- ----------------------- | User App | spark-submit | Spark App | | | -------------------> | | | ------------| |------------- | | | | hello | | | | | L. Server |<----------------------| L. Backend | | | | | | | | | ------------- ----------------------- | | | ^ | v | | | -------------| | | | | <per-app channel> | | | App Handle |<------------------------------ | | | ----------------------- 解 我添加了一个j.u.c.CountDownLatch实现,它阻止主线程退出,直到达到appState.isFinal. object SparkLauncher { def main(args: Array[String]) { import java.util.concurrent.CountDownLatch val countDownLatch = new CountDownLatch(1) val launcher = new SparkLauncher() .setMaster("k8s://http://127.0.0.1:8001") .setAppResource("local:/{PATH}/spark-examples_2.11-2.3.0.jar") .setConf("spark.app.name","spark-pi") .setMainClass("org.apache.spark.examples.SparkPi") .setConf("spark.executor.instances","5") .setConf("spark.kubernetes.container.image","spark:spark-docker") .setConf("spark.kubernetes.driver.pod.name","spark-pi-driver") .setDeployMode("cluster") .startApplication(new SparkAppHandle.Listener() { def infoChanged(handle: SparkAppHandle): Unit = { } def stateChanged(handle: SparkAppHandle): Unit = { val appState = handle.getState() println(s"Spark App Id [${handle.getAppId}] State Changed. State [${handle.getState}]") if (appState != null && appState.isFinal) { countDownLatch.countDown //waiting until spark driver exits } } }) countDownLatch.await() } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |