scala – 结构化流 – Foreach接收器
发布时间:2020-12-16 18:34:48 所属栏目:安全 来源:网络整理
导读:我基本上是从Kafka源代码中读取,并将每条消息转发给我的foreach处理器(感谢Jacek的简单示例页面). 如果这实际上有效,我将在这里实际执行流程方法中的一些业务逻辑,但是,这不起作用.我相信println因为在执行程序上运行而无法工作,并且无法将这些日志恢复到驱
我基本上是从Kafka源代码中读取,并将每条消息转发给我的foreach处理器(感谢Jacek的简单示例页面).
如果这实际上有效,我将在这里实际执行流程方法中的一些业务逻辑,但是,这不起作用.我相信println因为在执行程序上运行而无法工作,并且无法将这些日志恢复到驱动程序.但是,这个插入临时表应该至少可以工作,并告诉我消息实际上被消耗并处理到接收器. 我在这里错过了什么? 真的在寻找第二组眼睛来检查我的努力: val stream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers",Util.getProperty("kafka10.broker")) .option("subscribe",src_topic) .load() val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)] val df = stream.selectExpr("cast (value as string) as json") val writer = new ForeachWriter[Row] { val scon = new SConConnection override def open(partitionId: Long,version: Long) = { true } override def process(value: Row) = { println("++++++++++++++++++++++++++++++++++++" + value.get(0)) scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")") } override def close(errorOrNull: Throwable) = { scon.closeConnection } } val yy = df.writeStream .queryName("ForEachQuery") .foreach(writer) .outputMode("append") .start() yy.awaitTermination() 解决方法
感谢Harald和其他人的评论,我发现了一些事情,这使我实现了正常的处理行为 –
>使用本地模式测试代码,纱线不是调试的最大帮助 希望它能帮助别人. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |