加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

c# – 在调用OnMessage()后自动处理BrokeredMessage

发布时间:2020-12-15 23:25:11 所属栏目:百科 来源:网络整理
导读:我正在尝试从Azure Service Bus排队项目,以便我可以批量处理它们.我知道Azure Service Bus有一个ReceiveBatch()但由于以下原因似乎有问题: 我一次最多只能获得256条消息,甚至可以根据消息大小随机获取. 即使我查看有多少消息在等待,我也不知道有多少Request
我正在尝试从Azure Service Bus排队项目,以便我可以批量处理它们.我知道Azure Service Bus有一个ReceiveBatch()但由于以下原因似乎有问题:

>我一次最多只能获得256条消息,甚至可以根据消息大小随机获取.
>即使我查看有多少消息在等待,我也不知道有多少RequestBatch调用,因为我不知道每次调用会给我多少消息.由于消息将继续进入,我不能继续发出请求,直到它为空,因为它永远不会是空的.

我决定只使用比浪费偷看更便宜的消息监听器,并且会给我更多的控制权.

Basically I am trying to let a set number of messages build up and
then process them at once. I use a timer to force a delay but I need
to be able to queue my items as they come in.

基于我的计时器要求,似乎阻塞集合不是一个好选项,所以我试图使用ConcurrentBag.

var batchingQueue = new ConcurrentBag<BrokeredMessage>();
myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});

while (true)
{
    var sw = WaitableStopwatch.StartNew();
    BrokeredMessage msg;
    while (batchingQueue.TryTake(out msg)) // <== Object is already disposed
    {
        ...do this until I have a thousand ready to be written to DB in batch
        Console.WriteLine("Completing message");
        msg.Complete(); // <== ERRORS HERE
    }

    sw.Wait(MINIMUM_DELAY);
}

However as soon as I access the message outside of the OnMessage
pipeline it shows the BrokeredMessage as already being disposed.

我认为这必须是OnMessage的一些自动行为,我没有看到任何方式对消息做任何事情,除了立即处理它我不想做.

解决方法

这对BlockingCollection来说非常容易.

var batchingQueue = new BlockingCollection<BrokeredMessage>();

myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});

和你的消费者线程:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    Console.WriteLine("Completing message");
    msg.Complete();
}

GetConsumingEnumerable返回一个迭代器,该迭代器使用队列中的项目,直到设置了IsCompleted属性并且队列为空.如果队列为空但IsCompleted为False,则会执行非忙等待下一个项目.

要取消使用者线程(即关闭程序),您将停止向队列添加内容并让主线程调用batchingQueue.CompleteAdding.使用者将清空队列,看到IsCompleted属性为True,然后退出.

在这里使用BlockingCollection比ConcurrentBag或ConcurrentQueue更好,因为BlockingCollection接口更容易使用.特别是,使用GetConsumingEnumerable可以使您不必担心检查计数或进行忙等待(轮询循环).它只是有效.

另请注意,ConcurrentBag有一些相当奇怪的删除行为.特别是,删除项目的顺序根据哪个线程删除项目而不同.创建包的线程以与其他线程不同的顺序删除项目.有关详细信息,请参阅Using the ConcurrentBag Collection.

您还没有说明为什么要在输入中批量处理项目.除非有一个压倒一切的性能原因,否则使用该批处理逻辑使代码复杂化似乎并不是一个特别好的主意.

如果你想对数据库进行批量写入,那么我建议使用一个简单的List< T>缓冲项目.如果必须在将项目写入数据库之前对其进行处理,请使用上面显示的技术来处理它们.然后,而是直接写入数据库,将项目添加到列表中.当列表获得1,000个项目或经过一定时间后,分配新列表并启动任务以将旧列表写入数据库.像这样:

// at class scope

// Flush every 5 minutes.
private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5);
private const int MaxBufferItems = 1000;

// Create a timer for the buffer flush.
System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush,FlushDelay.TotalMilliseconds,Timeout.Infinite);  

// A lock for the list. Unless you're getting hundreds of thousands
// of items per second,this will not be a performance problem.
object _listLock = new Object();

List<BrokeredMessage> _recordBuffer = new List<BrokeredMessage>();

然后,在您的消费者中:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    // process the message
    Console.WriteLine("Completing message");
    msg.Complete();
    lock (_listLock)
    {
        _recordBuffer.Add(msg);
        if (_recordBuffer.Count >= MaxBufferItems)
        {
            // Stop the timer
            _flushTimer.Change(Timeout.Infinite,Timeout.Infinite);

            // Save the old list and allocate a new one
            var myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();

            // Start a task to write to the database
            Task.Factory.StartNew(() => FlushBuffer(myList));

            // Restart the timer
            _flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite);
        }
    }
}

private void TimedFlush()
{
    bool lockTaken = false;
    List<BrokeredMessage> myList = null;

    try
    {
        if (Monitor.TryEnter(_listLock,out lockTaken))
        {
            // Save the old list and allocate a new one
            myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(_listLock);
        }
    }

    if (myList != null)
    {
        FlushBuffer(myList);
    }

    // Restart the timer
    _flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite);
}

这里的想法是,您将旧列表排除在外,分配新列表以便继续处理,然后将旧列表的项目写入数据库.锁是为了防止计时器和记录计数器相互踩踏.没有锁定,事情可能会在一段时间内正常工作,然后你会在不可预测的时间发生奇怪的崩溃.

我喜欢这种设计,因为它消除了消费者的轮询.我唯一不喜欢的是消费者必须知道计时器(即它必须停止然后重新启动计时器).稍微考虑一下,我可以消除这个要求.但它的编写方式很好.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读