c# – TPL架构问题
我目前正在开展一个项目,我们面临着并行处理项目的挑战.到目前为止没什么大不了的;)
现在来问题了.我们有一个ID列表,我们定期(每2秒)为每个ID调用StoredProcedure. 每个项目需要单独检查2秒,因为它们是在运行时添加和删除的. 此外,我们希望配置最大并行度,因为DB不应同时充满300个线程. 正在处理的项目在完成上一次执行之前不应重新安排进行处理.原因是我们想要防止排队很多项目,以防数据库出现延迟. 现在我们正在使用一个自行开发的组件,它有一个主线程,定期检查需要安排处理的项目.一旦它有了列表,它就会删除那些基于自定义IOCP的线程池,然后使用等待句柄等待正在处理的项目.然后下一次迭代开始. IOCP因为它提供的工作窃取. 我想用TPL / .NET 4版本替换这个自定义实现,我想知道你将如何解决它(理想的简单和易读/可维护). 理想情况下,它将成为一个通用组件,可用于需要定期为项目列表完成的所有任务. 欢迎任何投入, 解决方法
我认为你真的不需要为此直接使用TPL
Tasks .对于初学者,我会在
ConcurrentQueue (默认值)附近设置
BlockingCollection ,而在BlockingCollection上没有设置
BoundedCapacity 来存储需要处理的ID.
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service) BlockingCollection<string> idsToProcess = new BlockingCollection<string>(); 从那里我将使用 现在,一旦存储过程执行完成,您就说您不想重新安排执行至少两秒钟.没问题,安排一个带有回调的 Parallel.ForEach( idsToProcess.GetConsumingEnumerable(),new ParallelOptions { MaxDegreeOfParallelism = 4 // read this from config },(id) => { // ... execute sproc ... // Need to declare/assign this before the delegate so that we can dispose of it inside Timer timer = null; timer = new Timer( _ => { // Add the id back to the collection so it will be processed again idsToProcess.Add(id); // Cleanup the timer timer.Dispose(); },null,// no state,id wee need is "captured" in the anonymous delegate 2000,// probably should read this from config Timeout.Infinite); } 最后,当进程关闭时,您将调用 // When ready to shutdown you just signal you're done adding idsToProcess.CompleteAdding(); 更新 您在评论中提出了一个有效的问题,即您可能在任何给定点处理大量ID,并担心每个ID的计时器会产生过多的开销.我绝对同意这一点.因此,在您同时处理大量ID的情况下,我将从使用每个ID的定时器更改为使用另一个队列来保存由单个短间隔计时器监视的“休眠”ID.首先,你需要一个ConcurrentQueue来放置睡着的ID: ConcurrentQueue<Tuple<string,DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string,DateTime>>(); 现在,我在这里使用两部分 现在,您还需要设置将监视此队列的计时器: Timer wakeSleepingIdsTimer = new Timer( _ => { DateTime utcNow = DateTime.UtcNow; // Pull all items from the sleeping queue that have been there for at least 2 seconds foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2)) { // Add this id back to the processing queue idsToProcess.Enqueue(id); } },// no state Timeout.Infinite,// no due time 100 // wake up every 100ms,probably should read this from config ); 然后你只需要改变Parallel :: ForEach来执行以下操作,而不是为每个设置一个计时器: (id) => { // ... execute sproc ... sleepingIds.Enqueue(Tuple.Create(id,DateTime.UtcNow)); } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- [React Native]去掉WebStorm中黄色警告
- iphone – 两个滚动视图工作同时一键式
- 使用AudioConverter Swift将.m4a文件转换为.aiff
- c# – 如何使用publish_stream发布到Facebook用户的朋友的墙
- c# – 如何将Combobox绑定到ReactiveUI中的命令?
- swift – SafariViewController:如何从URL获取OAuth令牌?
- Oracle RAC 故障处理(二)(+DATA磁盘组故障)
- The Swift Programming Language阅读笔记
- Jsoncpp更新
- ruby-on-rails – Rails has_many:通过“子对象”的sum属性