加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – Spring集成 – 用于整理/批处理服务调用的适当模式

发布时间:2020-12-14 19:32:52 所属栏目:Java 来源:网络整理
导读:当有特定事件发生时,我正在调用一个远程服务来加载产品的定价数据.一旦加载,产品定价就会被广播,供另一个消费者在其他地方处理. 我不想在每个事件上调用远程服务,而是将事件批量分组,然后一次性发送. 我基于聚合器拼凑了以下模式.虽然它有效但很多都有气味
当有特定事件发生时,我正在调用一个远程服务来加载产品的定价数据.一旦加载,产品定价就会被广播,供另一个消费者在其他地方处理.

我不想在每个事件上调用远程服务,而是将事件批量分组,然后一次性发送.

我基于聚合器拼凑了以下模式.虽然它有效但很多都有气味 – 尤其是我的SimpleCollat??ingAggregator.我是Spring Integration和EIP的新手,并且怀疑我是在滥用组件.

代码

通过调用以下@Gateway上的方法,我的代码在代码中的其他位置触发:

public interface ProductPricingGateway {    
    @Gateway(requestChannel="product.pricing.outbound.requests")
    public void broadcastPricing(ProductIdentifer productIdentifier);
}

然后将其连接到聚合器,如下所示:

<int:channel id="product.pricing.outbound.requests" />
<int:channel id="product.pricing.outbound.requests.batch" />
<int:aggregator input-channel="product.pricing.outbound.requests"
output-channel="product.pricing.outbound.requests.batch" release-strategy="releaseStrategy"
    ref="collatingAggregator" method="collate"
    correlation-strategy-expression="0"
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true"/>  
<bean id="collatingAggregator" class="com.mangofactory.pricing.SimpleCollatingAggregator" />
<bean id="releaseStrategy" class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy">
    <!-- Release when: 10 Messages ... or ... -->
    <constructor-arg index="0" value="10" />
    <!-- ... 5 seconds since first request -->
    <constructor-arg index="1" value="5000" />
</bean>

这是聚合器实现:

public class SimpleCollatingAggregator {

    public List<?> collate(List<?> input)
    {
        return input;
    }

}

最后,这会消耗在以下@ServiceActivator上:

@ServiceActivator(inputChannel="product.pricing.outbound.requests.batch")
public void fetchPricing(List<ProductIdentifer> identifiers)
{
        // omitted
}

注意:在实践中,我也使用@Async,以使调用代码尽可能快地返回.我也有很多问题,我将转向一个单独的问题.

问题1:
鉴于我想要实现的目标,聚合器模式在这里是一个合适的选择吗?感觉就像很多样板 – 是否有更好的方法?

问题2:
我使用固定的排序规则值为0,有效地说:’你如何对这些消息进行分组并不重要,就像他们来的时候一样.’

这是实现这一目标的适当方式吗?

问题3:
SimpleCollat??ingAggregator对我来说简直错了.

我希望它接收我的个人入站ProductIdentifier对象,并将它们分组,然后传递它们.这有效,但它是否合适?是否有更好的方法来实现同样的目标?

解决方法

Q1:是的,但请参阅第3季度以及下面的进一步讨论.
Q2:这是说’不需要相关’的正确方法(但是你需要完成的expire-groups-on,你有).
问题3:在这种情况下,您不需要自定义聚合器,只需使用默认值(删除ref和method属性).

注意聚合器是被动组件;新消息的到来触发了释放;因此,你的发布策略的第二部分只会在新消息到来时启动(它不会在5秒后自发释放该组).

但是,您可以为此目的配置MessageGroupStoreReaper:http://static.springsource.org/spring-integration/reference/html/messaging-routing-chapter.html#aggregator

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读