c# – 如何聚合来自异步生成器的数据并将其写入文件?
我正在学习C#中的异步/等待模式.目前我正在尝试解决这样的问题:
>有一个生产者(硬件设备)每秒生成1000个数据包.我需要将此数据记录到文件中. 到目前为止,我写了类似下面的内容.它有效,但我不确定这是否是解决问题的最佳方法.有任何意见或建议吗?在消费者需要汇总从生产者处收到的数据时,采用这种生产者/消费者问题的最佳做法是什么? static async Task TestLogger(Device device,int seconds) { const int bufLength = 1000; bool firstIteration = true; Task writerTask = null; using (var writer = new StreamWriter("test.log"))) { do { var buffer = new byte[bufLength][]; for (int i = 0; i < bufLength; i++) { buffer[i] = await device.ReadAsync(); } if (!firstIteration) { if (!writerTask.IsCompleted) throw new Exception("Write Time Out!"); } writerTask = Task.Run(() => { foreach (var b in buffer) writer.WriteLine(ToHexString(b)); }); firstIteration = false; } while (--seconds > 0); } } 解决方法
您可以使用以下想法,前提是flush的条件是数据包的数量(最多1000个).我没有测试它.它利用了
this question中Stephen Cleary的
AsyncProducerConsumerQueue<T> 特色.
AsyncProducerConsumerQueue<byte[]> _queue; Stream _stream; // producer async Task ReceiveAsync(CancellationToken token) { while (true) { var list = new List<byte>(); while (true) { token.ThrowIfCancellationRequested(token); var packet = await _device.ReadAsync(token); list.Add(packet); if (list.Count == 1000) break; } // push next batch await _queue.EnqueueAsync(list.ToArray(),token); } } // consumer async Task LogAsync(CancellationToken token) { Task previousFlush = Task.FromResult(0); CancellationTokenSource cts = null; while (true) { token.ThrowIfCancellationRequested(token); // get next batch var nextBatch = await _queue.DequeueAsync(token); if (!previousFlush.IsCompleted) { cts.Cancel(); // cancel the previous flush if not ready throw new Exception("failed to flush on time."); } await previousFlush; // it's completed,observe for any errors // start flushing cts = CancellationTokenSource.CreateLinkedTokenSource(token); previousFlush = _stream.WriteAsync(nextBatch,nextBatch.Count,cts.Token); } } 如果您不想让记录器失败,而是希望取消刷新并继续下一批,则可以对此代码进行最小的更改. 回应@ l3arnon评论:
>“数据包不是字节,它是字节[]” – 数据包是一个字节,这从OP的代码中很明显:buffer [i] = await device.ReadAsync().然后,一批数据包是byte []. var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + Environment.NewLine); _stream.WriteAsync(hexBytes,hexBytes.Length,token); >“AsyncProducerConsumerQueue的稳健性和测试性都不如.Net的TPL数据流” – 我不认为这是一个确定的事实.但是,如果OP关心它,他可以使用常规的BlockingCollection,它不会阻塞生产者线程.在等待下一批时阻止使用者线程是可以的,因为写入是并行完成的.与此相反,您的TPL Dataflow版本带有一个冗余CPU和锁密集操作:使用logAction.Post(数据包)将数据从生产者管道移动到写入器pipleline,逐字节.我的代码不这样做.>“在抛出导致该行冗余的异常之后,您等待previousFlush出错.” – 这条线不是多余的.也许,你错过了这一点:当previousFlush.IsFaulted或previousFlush.IsCancelled也为true时,previousFlush.IsCompleted可能为true.因此,等待previousFlush与之相关,以观察已完成任务上的任何错误(例如,写入失败),否则将丢失. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |