加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – SparkAppHandle监听器未被调用

发布时间:2020-12-16 18:47:46 所属栏目:安全 来源:网络整理
导读:我正在尝试使用play框架在 scala中的kubernetes集群上提交spark 2.3作业. 我还试过一个简单的scala程序而不使用play框架. 这项工作正在提交给k8集群,但是stateChanged infoChanged没有被调用.我也希望能够获得handle.getAppId. 我正在使用spark submit来提交
我正在尝试使用play框架在 scala中的kubernetes集群上提交spark 2.3作业.

我还试过一个简单的scala程序而不使用play框架.

这项工作正在提交给k8集群,但是stateChanged& infoChanged没有被调用.我也希望能够获得handle.getAppId.

我正在使用spark submit来提交作业,如here所述

$bin/spark-submit 
    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> 
    --deploy-mode cluster 
    --name spark-pi 
    --class org.apache.spark.examples.SparkPi 
    --conf spark.executor.instances=5 
    --conf spark.kubernetes.container.image=<spark-image> 
    local:///path/to/examples.jar

这是工作的代码:

def index = Action {
    try {
       val spark = new SparkLauncher()
        .setMaster("my k8 apiserver host")
        .setVerbose(true)
        .addSparkArg("--verbose")
        .setMainClass("myClass")
        .setAppResource("hdfs://server/inputs/my.jar")
        .setConf("spark.app.name","myapp")
        .setConf("spark.executor.instances","5")
        .setConf("spark.kubernetes.container.image","mydockerimage")
        .setDeployMode("cluster")
        .startApplication(new SparkAppHandle.Listener(){

          def infoChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] Info Changed.  State [" 
              + handle.getState + "]")
          }

          def stateChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] State Changed. State [" 
              + handle.getState + "]")
            if (handle.getState.toString == "FINISHED") System.exit(0)
          }    
      } )

    Ok(spark.getState().toString())

    } catch {
      case NonFatal(e)=>{
        println("failed with exception: " + e)
      }
    }    
  Ok
}

解决方法

Spark Launcher架构概述

SparkLauncher允许以编程方式运行spark-submit命令.它作为JVM中的单独子线程运行.您需要在客户端主函数中等待,直到驱动程序在K8中启动并且您获得侦听器回调.否则,JVM主线程存在查杀客户端而不报告任何内容.

-----------------------                       -----------------------
|      User App       |     spark-submit      |      Spark App      |
|                     |  -------------------> |                     |
|         ------------|                       |-------------        |
|         |           |        hello          |            |        |
|         | L. Server |<----------------------| L. Backend |        |
|         |           |                       |            |        |
|         -------------                       -----------------------
|               |     |                              ^
|               v     |                              |
|        -------------|                              |
|        |            |      <per-app channel>       |
|        | App Handle |<------------------------------
|        |            |
-----------------------

我添加了一个j.u.c.CountDownLatch实现,它阻止主线程退出,直到达到appState.isFinal.

object SparkLauncher {
  def main(args: Array[String]) {

    import java.util.concurrent.CountDownLatch
    val countDownLatch = new CountDownLatch(1)

    val launcher = new SparkLauncher()
      .setMaster("k8s://http://127.0.0.1:8001")
      .setAppResource("local:/{PATH}/spark-examples_2.11-2.3.0.jar")
      .setConf("spark.app.name","spark-pi")
      .setMainClass("org.apache.spark.examples.SparkPi")
      .setConf("spark.executor.instances","5")
      .setConf("spark.kubernetes.container.image","spark:spark-docker")
      .setConf("spark.kubernetes.driver.pod.name","spark-pi-driver")
      .setDeployMode("cluster")
      .startApplication(new SparkAppHandle.Listener() {
        def infoChanged(handle: SparkAppHandle): Unit = {
        }

        def stateChanged(handle: SparkAppHandle): Unit = {
          val appState = handle.getState()
          println(s"Spark App Id [${handle.getAppId}] State Changed. State [${handle.getState}]")

          if (appState != null && appState.isFinal) {
            countDownLatch.countDown //waiting until spark driver exits
          }
        }
      })

    countDownLatch.await()
  }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读