scala – Spark Streaming 1.6.0中的Checkpointing / WAL的可靠
描述
我们在Scala中有一个Spark Streaming 1.5.2应用程序,从Kinesis Stream读取JSON事件,进行一些转换/聚合,并将结果写入不同的S3前缀.当前批处理间隔为60秒.我们有3000-7000事件/秒.我们正在使用检查点来保护我们不会丢失聚合. 它一直运行良好一段时间,从异常恢复甚至集群重新启动.我们最近重新编译了Spark Streaming 1.6.0的代码,只改变了build.sbt文件中的库依赖关系.在Spark 1.6.0集群中运行代码几个小时后,我们注意到了以下几点: > 1.6.0的“投入率”和“处理时间”的波动性大幅增加(见下面的截图). 在做一些挖掘之后,我认为第二个问题看起来与这个Pull Request相关.PR的初始目标:“当使用S3作为WAL的目录时,写入需要太长时间.当多个接收器将AddBlock事件发送到ReceiverTracker时,驱动程序会很容易地瓶颈.该PR在ReceivedBlockTracker中添加事件批处理,以使接收器不被驱动程序阻止太久. 我们在Spark 1.5.2中的S3中检查点,并且没有性能/可靠性问题.我们已经在S3和本地NAS中的Spark 1.6.0中测试了检查点,在这两种情况下,我们都收到这种异常.它看起来像是检查一个批次需要5秒多时间,这个异常出现,我们检查了该批次的事件是否永久丢失. 问题 > Spark Streaming 1.6.0中预期的“输入率”和“处理时间”波动是否增加,是否有任何已知的改进方式? 1)为了保证检查点的写入所有文件需要不到5秒的时间.根据我的经验,你不能保证用S3,即使是小批量.对于本地NAS,这取决于谁负责基础架构(与云提供商有困难). 2)增加spark.streaming.driver.writeAheadLog.batchingTimeout属性值. Spark 1.5.2统计 – 屏幕截图 Spark 1.6.0 Statistics – 截图 全堆栈跟踪 16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650),SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874),shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914),shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842),shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618),shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog. java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87) at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 源代码提取 ... // Function to create a new StreamingContext and set it up def setupContext(): StreamingContext = { ... // Create a StreamingContext val ssc = new StreamingContext(sc,Seconds(batchIntervalSeconds)) // Create a Kinesis DStream val data = KinesisUtils.createStream(ssc,kinesisAppName,kinesisStreamName,kinesisEndpointUrl,RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(),InitialPositionInStream.LATEST,Seconds(kinesisCheckpointIntervalSeconds),StorageLevel.MEMORY_AND_DISK_SER_2,awsAccessKeyId,awsSecretKey) ... ssc.checkpoint(checkpointDir) ssc } // Get or create a streaming context. val ssc = StreamingContext.getActiveOrCreate(checkpointDir,setupContext) ssc.start() ssc.awaitTermination() 解决方法
关于
zero323关于发布我的评论作为答案的建议:
增加spark.streaming.driver.writeAheadLog.batchingTimeout解决了检查点超时问题.我们在确保我们有空间的时候做到了.我们已经测试了一段时间了.所以我只建议经过仔细的考虑增加它. 细节 我们在$SPARK_HOME / conf / spark-defaults.conf中使用了这两个设置: spark.streaming.driver.writeAheadLog.allowBatching为true 最初,我们只将spark.streaming.driver.writeAheadLog.allowBatching设置为true. 在更改之前,我们已经在测试环境中重现了问题中提到的问题(“… ReceivedBlockTracker:在写入记录时抛出异常”).它发生在每隔几个小时.改变之后,问题消失了.我们跑了好几天才开始生产. 我们发现getBatchingTimeout() method of the WriteAheadLogUtils类的默认值为5000ms,如下所示: def getBatchingTimeout(conf: SparkConf): Long = { conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY,defaultValue = 5000) } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |