scala – Custom Receiver在Spark Streaming中停止工作
发布时间:2020-12-16 09:07:10 所属栏目:安全 来源:网络整理
导读:我正在尝试使用自定义接收器编写Spark Streaming应用程序.我应该通过提供具有预定义间隔的随机值来模拟实时输入数据. (简化)接收器如下所示,使用下面的Spark Streaming应用程序代码: class SparkStreamingReceiver extends Actor with ActorHelper { privat
我正在尝试使用自定义接收器编写Spark Streaming应用程序.我应该通过提供具有预定义间隔的随机值来模拟实时输入数据. (简化)接收器如下所示,使用下面的Spark Streaming应用程序代码:
class SparkStreamingReceiver extends Actor with ActorHelper { private val random = new Random() override def preStart = { context.system.scheduler.schedule(500 milliseconds,1000 milliseconds)({ self ! ("string",random.nextGaussian()) }) } override def receive = { case data: (String,Double) => { store[(String,Double)](data) } } } val conf: SparkConf = new SparkConf() conf.setAppName("Spark Streaming App") .setMaster("local") val ssc: StreamingContext = new StreamingContext(conf,Seconds(2)) val randomValues: ReceiverInputDStream[(String,Double)] = ssc.actorStream[(String,Double)](Props(new SparkStreamingReceiver()),"Receiver") randomValues.saveAsTextFiles("<<OUTPUT_PATH>>/randomValues") 运行此代码,我看到接收器正在工作(存储项目,收到单个日志条目).但是,saveAsTextFiles永远不会输出值. 我可以通过将主服务器更改为使用两个线程(local [2])来解决问题,但如果我注册了我的接收器的另一个实例(我打算这样做),它会重新出现.更具体地说,我需要至少有一个线程超过我注册的自定义接收器的数量以获得任何输出. 在我看来好像工作线程被接收器停止了. 任何人都可以解释这种效果,并可能解释如何修复我的代码? 解决方法
每个接收器使用一个计算槽.因此2个接收器将需要2个计算插槽.如果所有计算槽都由接收器占用,那么就没有剩余的槽来处理数据.这就是为什么带有1个接收器的“本地”模式和带有2个接收器的“本地[2]”停止处理的原因.
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |