c# – 这是TPL Dataflow的工作吗?
我在不同的任务上运行一个非常典型的生产者/消费者模型.
Task1:从二进制文件读取byte []批次,并为每个字节数组集合启动一个新任务. (操作批量用于内存管理目的). 任务2-n:这些是工作任务,每个操作都是通过字节数组的传入集合(来自Tasks1)进行操作,并对字节数组进行序列化,按照某些条件进行排序,然后存储一个结果对象的集合(每个字节数组并行字典反序列化成这样的对象) 任务(n 1)我选择了一个并发字典,因为这个任务的任务是将并行字典中存储的那些集合按照与从Task1起源的顺序相同的顺序进行合并.我通过传递一个collectionID(对于Task1中的每个新集合都是int类型,并且递增)一直从Task1到此任务来实现.此任务基本检查下一个预期的collectionID是否已经存储在并发字典中,如果是,则将其添加到Final Queue并检查并发字典中的下一个集合. 现在,从我所看到的和我看过的视频来看,TPL Dataflow似乎是这种制作人/消费者模型的完美选择.我似乎无法设计出一个设计,因此我从来没有使用过TPL Dataflow.在吞吐量和延迟方面,这个库甚至可以完成任务?我目前在最终的集合中处理了250万字节的数组,从而处理每秒的对象. TPL数据流可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL数据流可以在任务1中生成工作任务时保留收集批次的顺序,并在工作任务完成工作后重新合并它能优化事情吗?对于整个结构的剖析,我觉得由于旋转而浪费了很多时间,并且涉及太多并发收藏. 任何想法,想法? 解决方法
编辑:原来我错了. TransformBlock确实返回的项目与他们进入的顺序相同,即使它被配置为并行性.因为这样,我原来的答案中的代码完全没用,而是可以使用正常的TransformBlock.
原来的答案: 据我所知,.Net中只有一个并行构造支持按照他们进入的顺序返回处理的项目:PLINQ with 另一方面,TPL Dataflow很适合,但是它并没有一个可以同时支持并行和返回项目的块(TransformBlock支持它们,而不是在同一时间).幸运的是,数据流块的设计考虑到可组合性,因此我们可以构建我们自己的块. 但首先,我们必须弄清楚如何排序结果.使用一个并行字典,就像你所建议的一样,以及一些同步机制,一定会奏效.但我认为有一个更简单的解决方案:使用任务队列.在输出任务中,您将出现一个Task,等待它完成(异步),当它发生时,会发送其结果.当队列为空时,我们仍然需要一些同步,但如果我们选择使用巧妙的队列,我们??可以免费获得同步. 所以,一般的想法是这样的:我们正在写的是一个IPropagatorBlock,有一些输入和一些输出.创建自定义IPropagatorBlock的最简单方法是创建一个处理输入的块,另一个生成结果的块将其视为使用 输入块将必须以正确的顺序处理传入的项目,因此在那里没有并行化.它将创建一个新的任务(实际上是一个 输出块将不得不从队列中取出任务,异步等待它们,然后发送它们.但是由于所有块都嵌入在其中,并且采用代理的块具有异步等待内置,这将非常简单:新的TransformBlock< Task& TOutput>,TOutput>(t => t).该块将同时用作队列和输出块.因此,我们不必处理任何同步. 最后一块拼图实际上是并行处理物品.为此,我们可以使用另一个ActionBlock,这次用MaxDegreeOfParallelism设置.它将采取输入,处理它,并将正确的任务的结果设置在队列中. 放在一起,可能看起来像这样: public static IPropagatorBlock<TInput,TOutput> CreateConcurrentOrderedTransformBlock<TInput,TOutput>( Func<TInput,TOutput> transform) { var queue = new TransformBlock<Task<TOutput>,TOutput>(t => t); var processor = new ActionBlock<Tuple<TInput,Action<TOutput>>>( tuple => tuple.Item2(transform(tuple.Item1)),new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); var enqueuer = new ActionBlock<TInput>( async item => { var tcs = new TaskCompletionSource<TOutput>(); await processor.SendAsync( new Tuple<TInput,Action<TOutput>>(item,tcs.SetResult)); await queue.SendAsync(tcs.Task); }); enqueuer.Completion.ContinueWith( _ => { queue.Complete(); processor.Complete(); }); return DataflowBlock.Encapsulate(enqueuer,queue); } 经过这么多的谈话,我认为这是相当少量的代码. 看来你关心性能很多,所以你可能需要微调这个代码.例如,将处理器块的MaxDegreeOfParallelism设置为 此外,如果要限制收到的项目,您可以设置enqueer的BoundedCapacity. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |