java – 使用带水印的附加输出模式时的结构化流异常
尽管我正在使用Watermark(),但是当我运行我的spark工作时,我收到以下错误消息:
从我在programming guide中看到的内容,这与预期用法(和示例代码)完全匹配.有谁知道什么可能是错的? 提前致谢! 相关代码(Java 8,Spark 2.2.0): StructType logSchema = new StructType() .add("timestamp",TimestampType) .add("key",IntegerType) .add("val",IntegerType); Dataset<Row> kafka = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers",brokers) .option("subscribe",topics) .load(); Dataset<Row> parsed = kafka .select(from_json(col("value").cast("string"),logSchema).alias("parsed_value")) .select("parsed_value.*"); Dataset<Row> tenSecondCounts = parsed .withWatermark("timestamp","10 minutes") .groupBy( parsed.col("key"),window(parsed.col("timestamp"),"1 day")) .count(); StreamingQuery query = tenSecondCounts .writeStream() .trigger(Trigger.ProcessingTime("10 seconds")) .outputMode("append") .format("console") .option("truncate",false) .start(); 解决方法
问题出在parsed.col中.用col替换它将解决问题.我建议总是使用col函数而不是Dataset.col.
当col返回未解析的列时,Dataset.col返回已解析的列. parsed.withWatermark(“timestamp”,“10 minutes”)将创建一个新数据集,其中包含具有相同名称的新列.水印信息附加在新数据集中的timestamp列,而不是parsed.col(“timestamp”),因此groupBy中的列没有水印. 当您使用未解析的列时,Spark会为您找出正确的列. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |