具有依赖关系的并行操作执行
今天看到看到一篇MSDN文章《Parallelizing Operations With Dependencies》,作者是微软Parallel Computing Platform团队的一个开发经理。文中提供出一种用于并行执行一组具有依赖关系的操作的解决方案,这不由得想起我在一年之前写的一个具有相同的功能的组件。于是翻箱倒柜找了出来,进行了一些加工,与大家分享一下。 一、问题分析 我们知道,较之串行化的操作,并行计算将多个任务同时执行,从而充分利用了资源,提高了应用的整体性能。对于多个互不相干的操作,我们可以直接按照异步的方式执行就可以。但是,我们遇到的很多情况下是,部分操作之间具有相互依赖的关系,一个操作需要在其他依赖的操作执行完成后方可执行。 以下图为例,每一个圆圈代表要执行的操作,操作之间的肩头代表它们之间的依赖关系。 我们需要一个组件,帮助我们完成这样的工作:将相应的操作和依赖关系直接添加到一个容器中,我们的组件能够自动分析操作之间的依赖关系,在执行的时候根据依赖编排执行顺序。 二、采用并行操作执行器 使用我所提供的这样一个并行操作执行器(ParallelExecutor),可以帮我们解决这个问题。首先对操作本身进行抽象,用以下三个属性来描述一个并行计算场景中的操作:
在使用ParallelExecutor对操作进行并行执行之前,我们需要通过ParallelExecutor的两个AddOperation方法添加需要执行的操作。AddOperation定义如下。其中dependencies代表以来操作ID数组,返回值为当前创建的操作ID。 1: public class ParallelExecutor 3: 5: { 7: } 9: string[] dependencies)
11: //省略实现
13: }
由于是操作的并行执行,线程调度的不确定性使每次输出的结果各有不同。但是无论如何,需要满足上图中展现的依赖关系。下面是其中一种执行结果,可以看出这是合理的执行顺序。 2: B2
4: A2 6: B1 Action:Action类型,操作具体是实现的功能 2: {
string ID 6: 8: { get; private set; }
11: { get; 13: public OperationStatus Status 17: { get; 18:
20: { 22: { 24: } 26: null == action)
28: "action");
30: this.Status = OperationStatus.Created;
32: this.Action = action;
34: } 36: 37: : this(id,action) 39: null == dependencies)
41: "dependencies");
43: 45: } 47:? 操作事件 当前操作执行的状态通过OperationStatus表示,四个枚举值分别表示被创建、正在运行、运行结束和失败(抛出异常)。 3: Created,
5: Completed,1)" id="lnum6"> 6: Failed event EventHandler<OperationEventArgs> OperationStarted; 6: event EventHandler<OperationEventArgs> OperationCompleted;
8:? OperationStarted和OperationCompleted事件对应的参数类型为OperationEventArgs。直接继承自EventArgs,并定义了一个Operation属性代表对应的对象。 public OperationEventArgs(Operation operation)
5: null == operation)
7: "operation");
10: this.Operation = operation;
public Operation Operation 15: } private List<Operation> _registeredParentOps = new List<Operation>();
8: { foreach (var op in operation.Dependencies)
12: if (op._registeredParentOps.Contains(operation))
14: continue;
16: RegisterCompletedEvents(op); 18: { 20: if (operation._remainingDependencies <= 0)
22: operation.DoExecute(); 24: }; 26: } 28: 30: { 32: } 2: { this.Status != OperationStatus.Created) 8: return;
10: 12: { 14: } 17: try
19: this.ExecutionContext)
21: ExecutionContext.Run(this.ExecutionContext.CreateCopy(),state => this.Action(),1)">null); else 25: this.Action();
27: 29: this.OperationCompleted)
31: this.OperationCompleted( 32: } 34: catch (Exception ex)
36: this.Status = OperationStatus.Failed;
38: { 40: } 42: } 45: void Execute()
47: this.Dependencies.Length == 0)
49: this.DoExecute();
51: 53: { 55: ThreadPool.UnsafeQueueUserWorkItem(state => op.Execute(),1)" id="lnum56"> 56: } 58: } 60:? ParallelExecutor 提供操作的添加和整体执行。添加操作实现在两个重载的AddOperation方法中,逻辑并不复杂。当执行方法对所有的操作进行并行执行的时候,需要调用方法对每个操作进行初始化。然后异步调用每个操作的方法即可。 this.Operations = new Dictionary<string,Operation>();
7: 9: { get; private set; }
13: this.Operations.Values)
15: operation.Initialize(); 17: 19: { 21: ThreadPool.UnsafeQueueUserWorkItem(state => op.Execute(),1)" id="lnum22"> 22: } 24: 27: ValidateOperation(id,action); 29: return id;
31: 37: } this.Operations.ContainsKey(id)) 42: } 44: 49: { 51: } 53: in dependencies)
55: if (!this.Operations.ContainsKey(op)) 57: "The operation whose ID is "{0}" does not exist!",op));
59: } 61: var operation = 62: this.Operations.Values. 64: 66: return id;
68: } 相关内容
推荐文章
站长推荐
热点阅读
|