加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

junit – 如何从程序中停止flink流式传输作业

发布时间:2020-12-15 02:58:58 所属栏目:Java 来源:网络整理
导读:我正在尝试为Flink流作业创建一个JUnit测试,该作业将数据写入kafka主题并分别使用FlinkKafkaProducer09和FlinkKafkaConsumer09从相同的kafka主题读取数据.我正在传递产品中的测试数据: DataStreamString stream = env.fromElements("tom","jerry","bill");
我正在尝试为Flink流作业创建一个JUnit测试,该作业将数据写入kafka主题并分别使用FlinkKafkaProducer09和FlinkKafkaConsumer09从相同的kafka主题读取数据.我正在传递产品中的测试数据:
DataStream<String> stream = env.fromElements("tom","jerry","bill");

并检查相同的数据是否来自消费者:

List<String> expected = Arrays.asList("tom","bill");
List<String> result =  resultSink.getResult();
assertEquals(expected,result);

使用TestListResultSink.

我可以通过打印流来查看来自消费者的数据.但无法获得Junit测试结果,因为消费者即使在消息完成后也会继续运行.所以它没有来测试部分.

在Flink或FlinkKafkaConsumer09中以任何方式停止进程或运行特定时间吗?

解决方法

潜在的问题是流媒体程序通常不是有限的并且无限期地运行.

至少在目前,最好的方法是在流中插入一条特殊的控制消息,让源正确终止(只需通过离开读取循环停止读取更多数据).这样Flink就会告诉所有下游操作符,他们可以在消耗完所有数据后停止运营.

或者,您可以在源中引发特殊异常(例如,在一段时间之后),以便您可以区分“正确”终止与故障情况(通过检查错误原因).在源代码中抛出异常将使程序失败.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读