加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

c# – 与BlockingCollection集成时,Parallel.ForEach停滞不前

发布时间:2020-12-15 08:36:19 所属栏目:百科 来源:网络整理
导读:我根据 this question中的代码采用了并行/消费者的实现 class ParallelConsumerT : IDisposable{ private readonly int _maxParallel; private readonly ActionT _action; private readonly TaskFactory _factory = new TaskFactory(); private Cancellation
我根据 this question中的代码采用了并行/消费者的实现
class ParallelConsumer<T> : IDisposable
{
    private readonly int _maxParallel;
    private readonly Action<T> _action;
    private readonly TaskFactory _factory = new TaskFactory();
    private CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
    private Task _task;

    public ParallelConsumer(int maxParallel,Action<T> action)
    {
        _maxParallel = maxParallel;
        _action = action;
    }

    public void Start()
    {
        try
        {
            _tokenSource = new CancellationTokenSource();
            _task = _factory.StartNew(
                () =>
                {
                    Parallel.ForEach(
                        _entries.GetConsumingEnumerable(),new ParallelOptions { MaxDegreeOfParallelism = _maxParallel,CancellationToken = _tokenSource.Token },(item,loopState) =>
                        {
                            Log("Taking" + item);
                            if (!_tokenSource.IsCancellationRequested)
                            {
                                _action(item);
                                Log("Finished" + item);
                            }
                            else
                            {
                                Log("Not Taking" + item);
                                _entries.CompleteAdding();
                                loopState.Stop();
                            }
                        });
                },_tokenSource.Token);
        }
        catch (OperationCanceledException oce)
        {
            System.Diagnostics.Debug.WriteLine(oce);
        }
    }

    private void Log(string message)
    {
        Console.WriteLine(message);
    }

    public void Stop()
    {
        Dispose();
    }

    public void Enqueue(T entry)
    {
        Log("Enqueuing" + entry);
        _entries.Add(entry);
    }

    public void Dispose()
    {
        if (_task == null)
        {
            return;
        }

        _tokenSource.Cancel();
        while (!_task.IsCanceled)
        {
        }

        _task.Dispose();
        _tokenSource.Dispose();
        _task = null;
    }
}

这是一个测试代码

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100,1);
    }

    private static void TestRepeatedEnqueue(int itemCount,int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,(i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}

测试总是失败 – 它总是停留在测试的100个项目的第93项.知道我的代码的哪一部分引起了这个问题,以及如何解决它?

解决方法

正如您所发现的,您不能将Parallel.Foreach()与BlockingCollection.GetConsumingEnumerable()一起使用.

有关解释,请参阅此博客文章:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

该博客还提供了一个名为GetConsumingPartitioner()的方法的源代码,您可以使用它来解决问题.

摘自博客:

BlockingCollection’s GetConsumingEnumerable implementation is using BlockingCollection’s internal synchronization which already supports multiple consumers concurrently,but ForEach doesn’t know that,and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.

As such,there’s more synchronization here than is actually necessary,resulting in a potentially non-negligable performance hit.

[Also] the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element,it’ll take the lock,grab a group of elements (a chunk),and then release the lock.

While this design can help with overall throughput,for scenarios that are focused more on low latency,that chunking can be prohibitive.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读