加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > asp.Net > 正文

具有依赖关系的并行操作执行

发布时间:2020-12-16 09:11:44 所属栏目:asp.Net 来源:网络整理
导读:今天看到看到一篇 MSDN 文章《Parallelizing Operations With Dependencies 》 , 作者是微软 Parallel Computing Platform 团队的一个开发经理。文中提供出一种用于并行执行一组具有依赖关系的操作的解决方案,这不由得想起我在一年之前写的一个具有相同的功

今天看到看到一篇MSDN文章《Parallelizing Operations With Dependencies,作者是微软Parallel Computing Platform团队的一个开发经理。文中提供出一种用于并行执行一组具有依赖关系的操作的解决方案,这不由得想起我在一年之前写的一个具有相同的功能的组件。于是翻箱倒柜找了出来,进行了一些加工,与大家分享一下。

一、问题分析

我们知道,较之串行化的操作,并行计算将多个任务同时执行,从而充分利用了资源,提高了应用的整体性能。对于多个互不相干的操作,我们可以直接按照异步的方式执行就可以。但是,我们遇到的很多情况下是,部分操作之间具有相互依赖的关系,一个操作需要在其他依赖的操作执行完成后方可执行。 以下图为例,每一个圆圈代表要执行的操作,操作之间的肩头代表它们之间的依赖关系。

我们需要一个组件,帮助我们完成这样的工作:将相应的操作和依赖关系直接添加到一个容器中,我们的组件能够自动分析操作之间的依赖关系,在执行的时候根据依赖编排执行顺序。

二、采用并行操作执行器

使用我所提供的这样一个并行操作执行器(ParallelExecutor),可以帮我们解决这个问题。首先对操作本身进行抽象,用以下三个属性来描述一个并行计算场景中的操作:

  • ?Operation ID: 操作的唯一标识,字符类型

  • ?Action:操作具体执行的功能,使用Action代理表示

  • ?Depedencies:依赖操作列表

在使用ParallelExecutor对操作进行并行执行之前,我们需要通过ParallelExecutor的两个AddOperation方法添加需要执行的操作。AddOperation定义如下。其中dependencies代表以来操作ID数组,返回值为当前创建的操作ID

   1: public class ParallelExecutor
   3:  
   5:     {
   7:     }
   9:     string[] dependencies)
  11:         //省略实现
  13: }
3: Action<string> action = id=> {Console.WriteLine(id);};
   5:     var executor = new ParallelExecutor();
   7:     var a2 = executor.AddOperation("A2",1)">"A2"));
   9:  
  11:     var b2 = executor.AddOperation("B2",1)">"B2"),1)">string[] { a3 }); 
  13:     var c1 = executor.AddOperation("C1",1)">"C1"),1)">string[] { b1,b2 });
  15:  
  17: Console.Read();
  19:? 

由于是操作的并行执行,线程调度的不确定性使每次输出的结果各有不同。但是无论如何,需要满足上图中展现的依赖关系。下面是其中一种执行结果,可以看出这是合理的执行顺序。

2: B2
   4: A2
   6: B1
ActionAction类型,操作具体是实现的功能

  • DependenciesOperation数组,依赖的操作

  • StatusOperation枚举,操作当前的状态

  • ExecutionContextExecutionContext类型,用于传递线程执行的上下文


  • 2: {
    string ID
       6:  
       8:     { get; private set; } 
      11:     { get;   13:     public OperationStatus Status
      17:     { get;   18:  
      20:     {
      22:         {
      24:         } 
      26:         null == action)
      28:             "action");
      30:         this.Status = OperationStatus.Created;
      32:         this.Action = action;
      34:     } 
      36:       37:         : this(id,action)
      39:         null == dependencies)
      41:             "dependencies");
      43:  
      45:     }     
      47:? 

    操作事件

    当前操作执行的状态通过OperationStatus表示,四个枚举值分别表示被创建、正在运行、运行结束和失败(抛出异常)。

    3: Created,
       5:     Completed,1)" id="lnum6">   6: Failed
    event EventHandler<OperationEventArgs> OperationStarted;
       6:     event EventHandler<OperationEventArgs> OperationCompleted;     
       8:? 

    OperationStartedOperationCompleted事件对应的参数类型为OperationEventArgs直接继承自EventArgs,并定义了一个Operation属性代表对应的对象。

    public OperationEventArgs(Operation operation)
       5:         null == operation)
       7:             "operation");
      10:         this.Operation = operation;
    public Operation Operation
      15: }
    private List<Operation> _registeredParentOps = new List<Operation>();     
       8:     {
    foreach (var op in operation.Dependencies)
      12:             if (op._registeredParentOps.Contains(operation))
      14:                 continue;
      16:             RegisterCompletedEvents(op);
      18:                 {
      20:                     if (operation._remainingDependencies <= 0)
      22:                         operation.DoExecute();
      24:                 };
      26:         }            
      28:  
      30:     {
      32:    }
       2: {   
    this.Status != OperationStatus.Created)
       8:             return;
      10:  
      12:         {
      14:         } 
      17:         try
      19:             this.ExecutionContext)
      21:                 ExecutionContext.Run(this.ExecutionContext.CreateCopy(),state => this.Action(),1)">null);
    else
      25:                 this.Action();
      27:  
      29:             this.OperationCompleted)
      31:                 this.OperationCompleted(  32:             }
      34:         catch (Exception ex)
      36:             this.Status = OperationStatus.Failed;
      38:             {
      40:             }
      42:     } 
      45:     void Execute()
      47:         this.Dependencies.Length == 0)
      49:             this.DoExecute();
      51:  
      53:         {
      55:             ThreadPool.UnsafeQueueUserWorkItem(state => op.Execute(),1)" id="lnum56">  56:         }        
      58:     } 
      60:? 

    ParallelExecutor

    提供操作的添加和整体执行。添加操作实现在两个重载的AddOperation方法中,逻辑并不复杂。当执行方法对所有的操作进行并行执行的时候,需要调用方法对每个操作进行初始化。然后异步调用每个操作的方法即可。

    this.Operations = new Dictionary<string,Operation>();
       7:  
       9:     { get; private set; }
      13:         this.Operations.Values)
      15:             operation.Initialize();
      17:  
      19:         {
      21:             ThreadPool.UnsafeQueueUserWorkItem(state => op.Execute(),1)" id="lnum22">  22:         }
      24:  
      27:         ValidateOperation(id,action);
      29:         return id;
      31:  
      37:         }
    this.Operations.ContainsKey(id))
      42:         }
      44:  
      49:         {
      51:         }
      53:         in dependencies)
      55:             if (!this.Operations.ContainsKey(op))
      57:                 "The operation whose ID is "{0}" does not exist!",op));
      59:         }
      61:         var operation =   62:                this.Operations.Values.
      64:  
      66:         return id;
      68: }