scala – 为什么Spark Streaming应用程序使用sbt run工作正常但
我在
Scala中有一个Spark应用程序,它每隔10秒从Kafka获取记录并将它们保存为文件.这是SBT项目,我使用sbt run命令运行我的应用程序.一切正常,直到我在Tomcat上部署我的应用程序.我设法用
this plugin生成WAR文件,但看起来我的应用程序在Tomcat上部署时没有做任何事情.
这是我的代码: object SparkConsumer { def main (args: Array[String]) { val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver") val ssc = new StreamingContext(conf,Seconds(10)) val kafkaParams = Map[String,Object]( "bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "group_id","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("mytopic") val stream = KafkaUtils.createDirectStream[String,String]( ssc,PreferConsistent,Subscribe[String,String](topics,kafkaParams) ) stream.map(record => (record.key,record.value)).print val arr = new ArrayBuffer[String](); val lines = stream.map(record => (record.key,record.value)); stream.foreachRDD { rdd => if (rdd.count() > 0 ) { val date = System.currentTimeMillis() rdd.saveAsTextFile ("/tmp/sparkout/mytopic/" + date.toString) rdd.foreach { record => println("t=" + record.topic + " m=" + record.toString()) } } println("Stream had " + rdd.count() + " messages") val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") println(o) } } stream.saveAsTextFiles("/tmp/output") ssc.start() ssc.awaitTermination() } } 奇怪的是,当通过sbt run命令运行时,应用程序完全正常.它正确地从Kafka读取记录并将它们作为文件保存在所需目录中.我不知道发生了什么.我尝试使用log4j启用日志记录,但在Tomcat上它甚至没有记录任何内容.我一直在寻找答案,但没有找到解决方案. 总结一下 我的Scala Spark应用程序(这是SBT项目)应该从Kafka读取记录并每隔10秒将它们保存为文件.它通过sbt run命令运行时有效,但在Tomcat上部署时则不行. 附加信息: > Scala 2.12 问:问题是什么? 解决方法
tl; dr独立应用程序SparkConsumer在Tomcat上运行正常,Tomcat本身也是如此.
读到这个问题我感到非常惊讶,因为你的代码不是我期望在Tomcat上工作的东西.抱歉. Tomcat是一个servlet容器,因此需要Web应用程序中的servlet. 即使您设法创建WAR并将其部署到Tomcat,您也没有从此Web应用程序“触发”任何内容来启动Spark Streaming应用程序(main方法中的代码). Spark Streaming应用程序在使用sbt run执行时可以正常工作,因为这是sbt run的目标,即在sbt管理的项目中执行独立应用程序. 鉴于您的sbt项目中只有一个独立的应用程序,sbt run已设法找到SparkConsumer并执行其主入口方法.这里不足为奇. 但它不适用于Tomcat.您必须将应用程序公开为POST或GET端点,并使用HTTP客户端(浏览器或命令行工具,如curl,wget或httpie)来执行它. Spark不支持Scala 2.12所以…你是如何设法使用Spark的Scala版本的?不可能! (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |