加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

Spring Integration Java DSL – 聚合器的配置

发布时间:2020-12-15 01:47:13 所属栏目:大数据 来源:网络整理
导读:我有一个非常简单的集成流程,其中RESTful请求使用发布 订阅通道转发给两个提供者.然后,将两个RESTful服务的结果聚合在一个阵列中.整合流程的草图如下所示: @BeanIntegrationFlow flow() throws Exception { return IntegrationFlows.from("inputChannel") .

我有一个非常简单的集成流程,其中RESTful请求使用发布 – 订阅通道转发给两个提供者.然后,将两个RESTful服务的结果聚合在一个阵列中.整合流程的草图如下所示:

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                ).subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class)
                        )
                )
            )
            .aggregate()
            .get();
}

但是,在运行我的代码时,结果数组包含仅由一个RESTful服务返回的项.我缺少任何配置步骤吗?

UPDATE

考虑到Artem的评论,以下版本对应于完整的解决方案.

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel-scatter")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather"))
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather")))
            .get();
}

@Bean
IntegrationFlow gatherFlow() {
    return IntegrationFlows.from("inputChannel-gather")
            .aggregate(a -> a.outputProcessor(g ->  new GenericMessage
最佳答案
实际上它不会那样工作.

.aggregate()是该publishSubscribeChannel的第三个订阅者.

你必须切断你的两个流量.像这样:

    @Bean
    public IntegrationFlow publishSubscribeFlow() {
        return flow -> flow
                .publishSubscribeChannel(s -> s
                        .applySequence(true)
                        .subscribe(f -> f
                                .handle((p,h) -> "Hello")
                                .channel("publishSubscribeAggregateFlow.input"))
                        .subscribe(f -> f
                                .handle((p,h) -> "World!")
                                .channel("publishSubscribeAggregateFlow.input"))
                );
    }

    @Bean
    public IntegrationFlow publishSubscribeAggregateFlow() {
        return flow -> flow
                .aggregate(a -> a.outputProcessor(g -> g.getMessages()
                        .stream()
                        .

请注意两个订阅者的.channel(“publishSubscribeAggregateFlow.input”)用法.

说实话,这是任何发布 – 订阅的重点.如果我们要聚合它们,我们必须知道在哪里发送所有订户的结果.

您的用例让我想起了Scatter-Gather EIP模式.

我们还没有在DSL中实现它.
随意提出GH issue问题,我们将尝试在即将发布的1.2版本中处理它.

UPDATE

关于此事的GH问题:https://github.com/spring-projects/spring-integration-java-dsl/issues/75

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读