加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

c# – 任务排序和重新进行

发布时间:2020-12-15 06:17:42 所属栏目:百科 来源:网络整理
导读:我有以下情况,我认为可能是很常见的: 有一个任务(一个UI命令处理程序)可以同步或异步地完成. 命令可能会比处理得更快. 如果命令已经有待处理的任务,那么新的命令处理程序任务应该被排队并顺序处理. 每个新任务的结果可能取决于上一个任务的结果. 应该注意取
我有以下情况,我认为可能是很常见的:

>有一个任务(一个UI命令处理程序)可以同步或异步地完成.
>命令可能会比处理得更快.
>如果命令已经有待处理的任务,那么新的命令处理程序任务应该被排队并顺序处理.
>每个新任务的结果可能取决于上一个任务的结果.

应该注意取消,但为了简单起见,我想把它放在这个问题的范围之外.此外,线程安全(并发)不是必需的,但是必须支持重新入侵.

以下是我正在尝试实现的一个基本示例(作为控制台应用程序,为简单起见):

using System;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var asyncOp = new AsyncOp<int>();

            Func<int,Task<int>> handleAsync = async (arg) =>
            {
                Console.WriteLine("this task arg: " + arg);

                //await Task.Delay(arg); // make it async

                return await Task.FromResult(arg); // sync
            };

            Console.WriteLine("Test #1...");
            asyncOp.RunAsync(() => handleAsync(1000));
            asyncOp.RunAsync(() => handleAsync(900));
            asyncOp.RunAsync(() => handleAsync(800));
            asyncOp.CurrentTask.Wait();

            Console.WriteLine("nPress any key to continue to test #2...");
            Console.ReadLine();

            asyncOp.RunAsync(() =>
            {
                asyncOp.RunAsync(() => handleAsync(200));
                return handleAsync(100);
            });

            asyncOp.CurrentTask.Wait();
            Console.WriteLine("nPress any key to exit...");
            Console.ReadLine();
        }

        // AsyncOp
        class AsyncOp<T>
        {
            Task<T> _pending = Task.FromResult(default(T));

            public Task<T> CurrentTask { get { return _pending; } }

            public Task<T> RunAsync(Func<Task<T>> handler)
            {
                var pending = _pending;
                Func<Task<T>> wrapper = async () =>
                {
                    // await the prev task
                    var prevResult = await pending;
                    Console.WriteLine("nprev task result:  " + prevResult);
                    // start and await the handler
                    return await handler();
                };

                _pending = wrapper();
                return _pending;
            }
        }

    }
}

输出:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800

prev task result:  800
this task arg: 200
this task arg: 100

Press any key to exit...

它按照要求工作,直到在第2测试中引入重入:

asyncOp.RunAsync(() =>
{
    asyncOp.RunAsync(() => handleAsync(200));
    return handleAsync(100);
});

期望的输出应该是100,200,而不是200,100,因为已经有一个待处理的外部任务为100.这显然是因为内部任务同步执行,破坏逻辑var pending = _pending; / * … * / _pending = wrapper()作为外部任务.

如何使测试#2工作?

一个解决方案是使用Task.Factory.StartNew(…,TaskScheduler.FromCurrentSynchronizationContext())为每个任务强制执行异步,但是我不想对可能在内部同步的命令处理程序强加异步执行.,我不想依赖任何特定同步上下文的行为(即依赖该Task.Factory.StartNew应该在创建的任务实际启动之前返回).

在现实生活的项目中,我对AsyncOp在上面负责,但是无法控制命令处理程序(即在handleAsync内部的任何内容).

解决方法

我几乎忘记了可以手动构建一个任务,而不需要启动或调度它.然后,“Task.Factory.StartNew” vs “new Task(…).Start”把我放回轨道.我认为这是少数情况之一,Task<TResult>构造函数可能实际上是有用的,以及嵌套任务(Task< Task&T)>和 Task.Unwrap()
// AsyncOp
class AsyncOp<T>
{
    Task<T> _pending = Task.FromResult(default(T));

    public Task<T> CurrentTask { get { return _pending; } }

    public Task<T> RunAsync(Func<Task<T>> handler,bool useSynchronizationContext = false)
    {
        var pending = _pending;
        Func<Task<T>> wrapper = async () =>
        {
            // await the prev task
            var prevResult = await pending;
            Console.WriteLine("nprev task result:  " + prevResult);
            // start and await the handler
            return await handler();
        };

        var task = new Task<Task<T>>(wrapper);
        var inner = task.Unwrap();
        _pending = inner;

        task.RunSynchronously(useSynchronizationContext ?
            TaskScheduler.FromCurrentSynchronizationContext() :
            TaskScheduler.Current);

        return inner;
    }
}

输出:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800
this task arg: 100

prev task result:  100
this task arg: 200

如果需要,通过添加一个锁来保护_pending,使AsyncOp线程安全现在变得非常简单.

更新,以下是该模式的最新版本,它使用TaskCompletionSource并且是线程安全的:

/// <summary>
/// AsyncOperation
/// By Noseratio - https://stackoverflow.com/a/21427264
/// </summary>
/// <typeparam name="T">Task result type</typeparam>
class AsyncOperation<T>
{
    readonly object _lock = new Object();
    Task<T> _currentTask = null;
    CancellationTokenSource _currentCts = null;

    // a client of this class (e.g. a ViewModel) has an option 
    // to handle TaskSucceeded or TaskFailed,if needed
    public event EventHandler<TaskEventArgs> TaskSucceeded = null;
    public event EventHandler<TaskEventArgs> TaskFailing = null;

    public Task<T> CurrentTask
    {
        get
        {
            lock (_lock)
                return _currentTask;
        }
    }

    public bool IsCurrent(Task task)
    {
        lock (_lock)
            return task == _currentTask;
    }

    public bool IsPending
    {
        get
        {
            lock (_lock)
                return _currentTask != null && !_currentTask.IsCompleted;
        }
    }

    public bool IsCancellationRequested
    {
        get
        {
            lock (_lock)
                return _currentCts != null && _currentCts.IsCancellationRequested;
        }
    }

    public void Cancel()
    {
        lock (_lock)
        {
            if (_currentTask != null && !_currentTask.IsCompleted)
                _currentCts.Cancel();
        }
    }

    /// <summary>
    /// Start the task routine and observe the result of the previous task routine
    /// </summary>
    /// <param name="routine"></param>
    /// <param name="token"></param>
    /// <param name="cancelPrevious"></param>
    /// <param name="throwImmediately"></param>
    public Task<T> StartAsync(
        Func<CancellationToken,Task<T>> routine,CancellationToken token,bool cancelPrevious = true,bool throwImmediately = true)
    {
        Task<T> previousTask = null; // pending instance
        CancellationTokenSource previousCts = null; // pending instance CTS

        CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token);
        TaskCompletionSource<T> thisTcs = new TaskCompletionSource<T>(); // this task
        CancellationToken thisToken; // this task's cancellation Token
        Task<T> routineTask = null; // as returned by routine

        lock (_lock)
        {
            // remember the _currentTask as previousTask
            previousTask = _currentTask;
            previousCts = _currentCts;

            thisToken = thisCts.Token;

            // set the new _currentTask 
            _currentTask = thisTcs.Task;
            _currentCts = thisCts;
        }

        Action startAsync = async () =>
        {
            // because startAsync is "async void" method,// any exception not handled inside it
            // will be immediately thrown on the current synchronization context,// more details: https://stackoverflow.com/a/22395161/1768303

            // run and await this task
            try
            {
                // await the previous task instance
                if (previousTask != null)
                {
                    if (cancelPrevious)
                        previousCts.Cancel();
                    try
                    {
                        await previousTask;
                    }
                    catch (OperationCanceledException)
                    {
                        // ignore previous cancellations
                    }
                }

                thisToken.ThrowIfCancellationRequested();

                routineTask = routine(thisToken);
                await routineTask;
            }
            catch (Exception ex)
            {
                // ignore cancellation
                if (ex is OperationCanceledException)
                {
                    System.Diagnostics.Debug.Print("Task cancelled,id={0}",thisTcs.Task.Id);
                    thisTcs.SetCanceled();
                    return;
                }

                // fire TaskFailing
                System.Diagnostics.Debug.Print("Task failing,thisTcs.Task.Id);
                if (this.TaskFailing != null)
                {
                    var args = new TaskEventArgs(thisTcs.Task,ex);
                    this.TaskFailing(this,args);
                    if (args.Handled)
                    {
                        // exception handled
                        // make thisTcs cancelled rather than faulted 
                        thisTcs.SetCanceled();
                        return;
                    }
                }

                // exception unhandled
                thisTcs.SetException(ex);
                if (throwImmediately)
                    throw; // rethrow on the current synchronization context

                // exception should be observed via CurrentTask.Exception
                return;
            }

            // success,fire TaskSucceeded
            System.Diagnostics.Debug.Print("Task succeded,thisTcs.Task.Id);
            thisTcs.SetResult(routineTask.Result);
            if (this.TaskSucceeded != null)
                this.TaskSucceeded(this,new TaskEventArgs(thisTcs.Task));
        };

        startAsync();
        return thisTcs.Task;
    }

    // StartAsync with CancellationToken.None
    public Task<T> StartAsync(
        Func<CancellationToken,bool throwImmediately = true)
    {
        return StartAsync(routine,CancellationToken.None,cancelPrevious: true,throwImmediately: true);
    }

    /// <summary>
    /// TaskEventArgs
    /// </summary>
    public class TaskEventArgs : EventArgs
    {
        public Task<T> Task { get; private set; }
        public Exception Exception { get; private set; }
        public bool Handled { get; set; }

        public TaskEventArgs(Task<T> task,Exception exception = null)
        {
            this.Task = task;
            this.Exception = exception;
        }
    }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读