c# – 与BlockingCollection集成时,Parallel.ForEach停滞不前
我根据
this question中的代码采用了并行/消费者的实现
class ParallelConsumer<T> : IDisposable { private readonly int _maxParallel; private readonly Action<T> _action; private readonly TaskFactory _factory = new TaskFactory(); private CancellationTokenSource _tokenSource; private readonly BlockingCollection<T> _entries = new BlockingCollection<T>(); private Task _task; public ParallelConsumer(int maxParallel,Action<T> action) { _maxParallel = maxParallel; _action = action; } public void Start() { try { _tokenSource = new CancellationTokenSource(); _task = _factory.StartNew( () => { Parallel.ForEach( _entries.GetConsumingEnumerable(),new ParallelOptions { MaxDegreeOfParallelism = _maxParallel,CancellationToken = _tokenSource.Token },(item,loopState) => { Log("Taking" + item); if (!_tokenSource.IsCancellationRequested) { _action(item); Log("Finished" + item); } else { Log("Not Taking" + item); _entries.CompleteAdding(); loopState.Stop(); } }); },_tokenSource.Token); } catch (OperationCanceledException oce) { System.Diagnostics.Debug.WriteLine(oce); } } private void Log(string message) { Console.WriteLine(message); } public void Stop() { Dispose(); } public void Enqueue(T entry) { Log("Enqueuing" + entry); _entries.Add(entry); } public void Dispose() { if (_task == null) { return; } _tokenSource.Cancel(); while (!_task.IsCanceled) { } _task.Dispose(); _tokenSource.Dispose(); _task = null; } } 这是一个测试代码 class Program { static void Main(string[] args) { TestRepeatedEnqueue(100,1); } private static void TestRepeatedEnqueue(int itemCount,int parallelCount) { bool[] flags = new bool[itemCount]; var consumer = new ParallelConsumer<int>(parallelCount,(i) => { flags[i] = true; } ); consumer.Start(); for (int i = 0; i < itemCount; i++) { consumer.Enqueue(i); } Thread.Sleep(1000); Debug.Assert(flags.All(b => b == true)); } } 测试总是失败 – 它总是停留在测试的100个项目的第93项.知道我的代码的哪一部分引起了这个问题,以及如何解决它? 解决方法
正如您所发现的,您不能将Parallel.Foreach()与BlockingCollection.GetConsumingEnumerable()一起使用.
有关解释,请参阅此博客文章: http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx 该博客还提供了一个名为GetConsumingPartitioner()的方法的源代码,您可以使用它来解决问题. 摘自博客:
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |