c# – 在调用OnMessage()后自动处理BrokeredMessage
我正在尝试从Azure Service Bus排队项目,以便我可以批量处理它们.我知道Azure Service Bus有一个ReceiveBatch()但由于以下原因似乎有问题:
>我一次最多只能获得256条消息,甚至可以根据消息大小随机获取. 我决定只使用比浪费偷看更便宜的消息监听器,并且会给我更多的控制权.
基于我的计时器要求,似乎阻塞集合不是一个好选项,所以我试图使用ConcurrentBag. var batchingQueue = new ConcurrentBag<BrokeredMessage>(); myQueueClient.OnMessage((m) => { Console.WriteLine("Queueing message"); batchingQueue.Add(m); }); while (true) { var sw = WaitableStopwatch.StartNew(); BrokeredMessage msg; while (batchingQueue.TryTake(out msg)) // <== Object is already disposed { ...do this until I have a thousand ready to be written to DB in batch Console.WriteLine("Completing message"); msg.Complete(); // <== ERRORS HERE } sw.Wait(MINIMUM_DELAY); }
我认为这必须是OnMessage的一些自动行为,我没有看到任何方式对消息做任何事情,除了立即处理它我不想做. 解决方法
这对BlockingCollection来说非常容易.
var batchingQueue = new BlockingCollection<BrokeredMessage>(); myQueueClient.OnMessage((m) => { Console.WriteLine("Queueing message"); batchingQueue.Add(m); }); 和你的消费者线程: foreach (var msg in batchingQueue.GetConsumingEnumerable()) { Console.WriteLine("Completing message"); msg.Complete(); } GetConsumingEnumerable返回一个迭代器,该迭代器使用队列中的项目,直到设置了IsCompleted属性并且队列为空.如果队列为空但IsCompleted为False,则会执行非忙等待下一个项目. 要取消使用者线程(即关闭程序),您将停止向队列添加内容并让主线程调用batchingQueue.CompleteAdding.使用者将清空队列,看到IsCompleted属性为True,然后退出. 在这里使用BlockingCollection比ConcurrentBag或ConcurrentQueue更好,因为BlockingCollection接口更容易使用.特别是,使用GetConsumingEnumerable可以使您不必担心检查计数或进行忙等待(轮询循环).它只是有效. 另请注意,ConcurrentBag有一些相当奇怪的删除行为.特别是,删除项目的顺序根据哪个线程删除项目而不同.创建包的线程以与其他线程不同的顺序删除项目.有关详细信息,请参阅Using the ConcurrentBag Collection. 您还没有说明为什么要在输入中批量处理项目.除非有一个压倒一切的性能原因,否则使用该批处理逻辑使代码复杂化似乎并不是一个特别好的主意. 如果你想对数据库进行批量写入,那么我建议使用一个简单的List< T>缓冲项目.如果必须在将项目写入数据库之前对其进行处理,请使用上面显示的技术来处理它们.然后,而是直接写入数据库,将项目添加到列表中.当列表获得1,000个项目或经过一定时间后,分配新列表并启动任务以将旧列表写入数据库.像这样: // at class scope // Flush every 5 minutes. private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5); private const int MaxBufferItems = 1000; // Create a timer for the buffer flush. System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush,FlushDelay.TotalMilliseconds,Timeout.Infinite); // A lock for the list. Unless you're getting hundreds of thousands // of items per second,this will not be a performance problem. object _listLock = new Object(); List<BrokeredMessage> _recordBuffer = new List<BrokeredMessage>(); 然后,在您的消费者中: foreach (var msg in batchingQueue.GetConsumingEnumerable()) { // process the message Console.WriteLine("Completing message"); msg.Complete(); lock (_listLock) { _recordBuffer.Add(msg); if (_recordBuffer.Count >= MaxBufferItems) { // Stop the timer _flushTimer.Change(Timeout.Infinite,Timeout.Infinite); // Save the old list and allocate a new one var myList = _recordBuffer; _recordBuffer = new List<BrokeredMessage>(); // Start a task to write to the database Task.Factory.StartNew(() => FlushBuffer(myList)); // Restart the timer _flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite); } } } private void TimedFlush() { bool lockTaken = false; List<BrokeredMessage> myList = null; try { if (Monitor.TryEnter(_listLock,out lockTaken)) { // Save the old list and allocate a new one myList = _recordBuffer; _recordBuffer = new List<BrokeredMessage>(); } } finally { if (lockTaken) { Monitor.Exit(_listLock); } } if (myList != null) { FlushBuffer(myList); } // Restart the timer _flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite); } 这里的想法是,您将旧列表排除在外,分配新列表以便继续处理,然后将旧列表的项目写入数据库.锁是为了防止计时器和记录计数器相互踩踏.没有锁定,事情可能会在一段时间内正常工作,然后你会在不可预测的时间发生奇怪的崩溃. 我喜欢这种设计,因为它消除了消费者的轮询.我唯一不喜欢的是消费者必须知道计时器(即它必须停止然后重新启动计时器).稍微考虑一下,我可以消除这个要求.但它的编写方式很好. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |