加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – 使用带水印的附加输出模式时的结构化流异常

发布时间:2020-12-15 05:14:32 所属栏目:Java 来源:网络整理
导读:尽管我正在使用Watermark(),但是当我运行我的spark工作时,我收到以下错误消息: Exception in thread “main” org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/
尽管我正在使用Watermark(),但是当我运行我的spark工作时,我收到以下错误消息:

Exception in thread “main” org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

从我在programming guide中看到的内容,这与预期用法(和示例代码)完全匹配.有谁知道什么可能是错的?

提前致谢!

相关代码(Java 8,Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp",TimestampType)
        .add("key",IntegerType)
        .add("val",IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers",brokers)
        .option("subscribe",topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"),logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp","10 minutes")
        .groupBy(
            parsed.col("key"),window(parsed.col("timestamp"),"1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate",false)
        .start();

解决方法

问题出在parsed.col中.用col替换它将解决问题.我建议总是使用col函数而不是Dataset.col.

当col返回未解析的列时,Dataset.col返回已解析的列.

parsed.withWatermark(“timestamp”,“10 minutes”)将创建一个新数据集,其中包含具有相同名称的新列.水印信息附加在新数据集中的timestamp列,而不是parsed.col(“timestamp”),因此groupBy中的列没有水印.

当您使用未解析的列时,Spark会为您找出正确的列.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读