java – Disruptor – 未调用EventHandlers
发布时间:2020-12-14 06:00:22 所属栏目:Java 来源:网络整理
导读:我正在玩 Disruptor框架,并且发现我的事件处理程序没有被调用. 这是我的设置代码: private static final int BUFFER_SIZE = 1024 * 8;private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();private void initializeDisruptor() {
我正在玩
Disruptor框架,并且发现我的事件处理程序没有被调用.
这是我的设置代码: private static final int BUFFER_SIZE = 1024 * 8; private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(); private void initializeDisruptor() { if (disruptor != null) return; disruptor = new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY,EXECUTOR,new SingleThreadedClaimStrategy(BUFFER_SIZE),new SleepingWaitStrategy()); disruptor.handleEventsWith(searchTermMatchingHandler) .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler); this.ringBuffer = disruptor.start(); } 在其他地方,我发布了一些事件.我尝试过以下两种方法: 事件发布方法A: private void handleStatus(final Status status) { long sequence = ringBuffer.next(); TwitterStatusReceivedEvent event = ringBuffer.get(sequence); event.setStatus(status); event.setSearchInstruments(searchInstruments); ringBuffer.publish(sequence); } 在这种情况下,我发现第一个EventHandler被调用,但除此之外从未发生任何事情. 事件发布方法B: private void handleStatus(final Status status) { disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() { @Override public TwitterStatusReceivedEvent translateTo( TwitterStatusReceivedEvent event,long sequence) { event.setStatus(status); event.setSearchInstruments(searchInstruments); return event; } }); } 在这种情况下,我发现根本没有调用任何事件处理程序. 我究竟做错了什么? 更新 这是我的EventHandler.我应该如何表示处理已完成? public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> { @Override public void onEvent(TwitterStatusReceivedEvent event,long sequence,boolean endOfBatch) throws Exception { String statusText = event.getStatus().getText(); for (Instrument instrument : event.getSearchInstruments()) { if (statusText.contains(instrument.getSearchTerm())) { event.setMatchedInstrument(instrument); break; } } } } 解决方法
每个事件处理程序都需要在自己的线程中运行,该线程在关闭破坏程序之前不会退出.由于您使用的是单线程执行程序,因此只会执行恰好执行的第一个事件处理程序. (Disruptor类将每个处理程序存储在一个hashmap中,以便处理程序最终运行会有所不同)
如果你切换到cachedThreadPool,你会发现它都开始运行.您不需要对序列号进行任何管理,因为这些都是由Disruptor类为您设置和管理的EventProcessor处理的.只处理你得到的每个事件是完全正确的. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |