加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark Streaming 1.6.0中的Checkpointing / WAL的可靠

发布时间:2020-12-16 09:11:38 所属栏目:安全 来源:网络整理
导读:描述 我们在Scala中有一个Spark Streaming 1.5.2应用程序,从Kinesis Stream读取JSON事件,进行一些转换/聚合,并将结果写入不同的S3前缀.当前批处理间隔为60秒.我们有3000-7000事件/秒.我们正在使用检查点来保护我们不会丢失聚合. 它一直运行良好一段时间,从异
描述

我们在Scala中有一个Spark Streaming 1.5.2应用程序,从Kinesis Stream读取JSON事件,进行一些转换/聚合,并将结果写入不同的S3前缀.当前批处理间隔为60秒.我们有3000-7000事件/秒.我们正在使用检查点来保护我们不会丢失聚合.

它一直运行良好一段时间,从异常恢复甚至集群重新启动.我们最近重新编译了Spark Streaming 1.6.0的代码,只改变了build.sbt文件中的库依赖关系.在Spark 1.6.0集群中运行代码几个小时后,我们注意到了以下几点:

> 1.6.0的“投入率”和“处理时间”的波动性大幅增加(见下面的截图).
>每隔几个小时,写入记录时抛出异常:将BlockAdditionEvent …写入WriteAheadLog. java.util.concurrent.TimeoutException:在[5000毫秒]之后的期限超时(见下面的完整堆栈跟踪)与特定批次(分钟)下降到0个事件/秒.

在做一些挖掘之后,我认为第二个问题看起来与这个Pull Request相关.PR的初始目标:“当使用S3作为WAL的目录时,写入需要太长时间.当多个接收器将AddBlock事件发送到ReceiverTracker时,驱动程序会很容易地瓶颈.该PR在ReceivedBlockTracker中添加事件批处理,以使接收器不被驱动程序阻止太久.

我们在Spark 1.5.2中的S3中检查点,并且没有性能/可靠性问题.我们已经在S3和本地NAS中的Spark 1.6.0中测试了检查点,在这两种情况下,我们都收到这种异常.它看起来像是检查一个批次需要5秒多时间,这个异常出现,我们检查了该批次的事件是否永久丢失.

问题

> Spark Streaming 1.6.0中预期的“输入率”和“处理时间”波动是否增加,是否有任何已知的改进方式?
>你知道除了这些2之外的任何解决方法吗?

1)为了保证检查点的写入所有文件需要不到5秒的时间.根据我的经验,你不能保证用S3,即使是小批量.对于本地NAS,这取决于谁负责基础架构(与云提供商有困难).

2)增加spark.streaming.driver.writeAheadLog.batchingTimeout属性值.
>你会期望在描述的场景中丢失任何事件吗?我会认为,如果批量检查点失败,碎片/接收器序列号不会增加,并在稍后重试.

Spark 1.5.2统计 – 屏幕截图

enter image description here

Spark 1.6.0 Statistics – 截图

enter image description here

全堆栈跟踪

16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650),SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874),shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914),shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842),shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618),shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
    at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

源代码提取

...
     // Function to create a new StreamingContext and set it up
  def setupContext(): StreamingContext = {
    ...
    // Create a StreamingContext
    val ssc = new StreamingContext(sc,Seconds(batchIntervalSeconds))

    // Create a Kinesis DStream
    val data = KinesisUtils.createStream(ssc,kinesisAppName,kinesisStreamName,kinesisEndpointUrl,RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(),InitialPositionInStream.LATEST,Seconds(kinesisCheckpointIntervalSeconds),StorageLevel.MEMORY_AND_DISK_SER_2,awsAccessKeyId,awsSecretKey)
...
    ssc.checkpoint(checkpointDir)

    ssc
  }


  // Get or create a streaming context.
  val ssc = StreamingContext.getActiveOrCreate(checkpointDir,setupContext)

  ssc.start()
  ssc.awaitTermination()

解决方法

关于 zero323关于发布我的评论作为答案的建议:

增加spark.streaming.driver.writeAheadLog.batchingTimeout解决了检查点超时问题.我们在确保我们有空间的时候做到了.我们已经测试了一段时间了.所以我只建议经过仔细的考虑增加它.

细节

我们在$SPARK_HOME / conf / spark-defaults.conf中使用了这两个设置:

spark.streaming.driver.writeAheadLog.allowBatching为true
???spark.streaming.driver.writeAheadLog.batchingTimeout 15000

最初,我们只将spark.streaming.driver.writeAheadLog.allowBatching设置为true.

在更改之前,我们已经在测试环境中重现了问题中提到的问题(“… ReceivedBlockTracker:在写入记录时抛出异常”).它发生在每隔几个小时.改变之后,问题消失了.我们跑了好几天才开始生产.

我们发现getBatchingTimeout() method of the WriteAheadLogUtils类的默认值为5000ms,如下所示:

def getBatchingTimeout(conf: SparkConf): Long = {
    conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY,defaultValue = 5000)
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读