加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

c# – 创建一个使用async / await的IObservable以原始顺序返回已

发布时间:2020-12-15 04:07:43 所属栏目:百科 来源:网络整理
导读:假设您有一个包含100个URL的列表,并且您想要下载它们,解析响应并通过IObservable推送结果: public IObservableImageSource GetImages(IEnumerablestring urls){ return urls .ToObservable() .Select(async url = { var bytes = await this.DownloadImage(u
假设您有一个包含100个URL的列表,并且您想要下载它们,解析响应并通过IObservable推送结果:
public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
    return urls
        .ToObservable()
        .Select(async url =>
        {
            var bytes = await this.DownloadImage(url);
            var image = await this.ParseImage(bytes);
            return image;
        });
}

我有一些问题.

一个是同时敲击服务器100个请求是不礼貌的 – 理想情况下,你会在给定时刻限制为6个请求.但是,如果我添加一个Buffer调用,由于Select中的异步lambda,所有内容仍然会同时触发.

此外,结果将以与URL的输入序列不同的顺序返回,这是不好的,因为图像是将在UI上显示的动画的一部分.

我已经尝试了各种各样的东西,我有一个有效的解决方案,但感觉很复杂:

public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
    var semaphore = new SemaphoreSlim(6);

    return Observable.Create<ImageSource>(async observable =>
    {
        var tasks = urls
            .Select(async url =>
            {
                await semaphore.WaitAsync();
                var bytes = await this.DownloadImage(url);
                var image = await this.ParseImage(url);
            })
            .ToList();

        foreach (var task in tasks)
        {
            observable.OnNext(await task);
        }

        observable.OnCompleted();
    });
}

它工作,但现在我正在做Observable.Create而不仅仅是IObservable.Select,我必须弄乱信号量.此外,在UI上运行的其他动画在运行时停止(它们基本上只是DispatcherTimer实例),所以我认为我一定做错了.

解决方法

尝试一下:
urls.ToObservable()
    .Select(url => Observable.FromAsync(async () => {
        var bytes = await this.DownloadImage(url);
        var image = await this.ParseImage(bytes);
        return image;        
    }))
    .Merge(6 /*at a time*/);

我们在这里做什么?

对于每个URL,我们创建一个Cold Observable(即一个根本不会做任何事情,直到某人调用Subscribe). FromAsync返回一个Observable,当您订阅它时,它会运行您提供的异步块.因此,我们选择将URL作为一个对象来完成我们的工作,但前提是我们稍后会问它.

然后,我们的结果是IObservable< IObservable< Image>> – 未来结果流.我们想要将该流展平为一个结果流,因此我们使用Merge(int).合并操作符将一次订阅n个项目,当他们回来时,我们将订阅更多.即使url list非常大,Merge正在缓冲的项目也只是一个URL和一个Func对象(即要做什么的描述),所以相对较小.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读