C#等待生产者/消费者中的多个事件
我正在使用
producer/consumer pattern实现数据链路层.数据链路层有自己的线程和状态机,通过线路传输数据链路协议(以太网,RS-232 ……).物理层的接口表示为System.IO.Stream.另一个线程将消息写入数据链接对象并从中读取消息.
数据链接对象具有空闲状态,必须等待以下四种情况之一: >收到一个字节 我很难找到最好的方法来实现这一点,而无需将通信分成读/写线程(从而大大增加了复杂性).以下是我如何获得4分中的3分: // Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token. stream.ReadTimeout = 10000; await stream.ReadAsync(buf,1,cts.Token); 要么 BlockingCollection<byte[]> SendQueue = new ...; ... // Check for a message from network layer. Timeout after 10 seconds. // Monitor cancellation token. SendQueue.TryTake(out msg,10000,cts.Token); 我该怎么做才能阻止线程,等待所有四个条件?欢迎所有建议.我没有设置任何架构或数据结构. 编辑:********感谢大家的帮助.这是我的解决方案******** 首先,我认为没有生产者/消费者队列的异步实现.所以我实现了类似于this stackoverflow post的东西. 我需要一个外部和内部取消源来分别停止使用者线程并取消中间任务,similar to this article. byte[] buf = new byte[1]; using (CancellationTokenSource internalTokenSource = new CancellationTokenSource()) { CancellationToken internalToken = internalTokenSource.Token; CancellationToken stopToken = stopTokenSource.Token; using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stopToken,internalToken)) { CancellationToken ct = linkedCts.Token; Task<int> readTask = m_stream.ReadAsync(buf,ct); Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct); Task keepAliveTask = Task.Delay(m_keepAliveTime,ct); // Wait for at least one task to complete await Task.WhenAny(readTask,msgTask,keepAliveTask); // Next cancel the other tasks internalTokenSource.Cancel(); try { await Task.WhenAll(readTask,keepAliveTask); } catch (OperationCanceledException e) { if (e.CancellationToken == stopToken) throw; } if (msgTask.IsCompleted) // Send the network layer message else if (readTask.IsCompleted) // Process the byte from the physical layer else Contract.Assert(keepAliveTask.IsCompleted); // Send a keep alive message } } 解决方法
在这种情况下,我只会使用取消令牌取消.像保持活动计时器这样的重复超时更好地表示为计时器.
所以,我会把它建模为三个可取消的任务.首先,取消令牌:
CancellationToken token = ...; 然后,三个并发操作:
var readByteTask = stream.ReadAsync(buf,token);
var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10),token);
这个有点棘手.您当前的代码使用BlockingCollection< T>,它不是异步兼容的.我建议切换到TPL Dataflow’s BufferBlock<byte[]> SendQueue = new ...; ... var messageTask = SendQueue.ReceiveAsync(token); 然后,您可以使用Task.WhenAny确定完成了哪些任务: var completedTask = await Task.WhenAny(readByteTask,keepAliveTimerTask,messageTask); 现在,您可以通过将completedTask与其他人进行比较并等待它们来检索结果: if (completedTask == readByteTask) { // Throw an exception if there was a read error or cancellation. await readByteTask; var byte = buf[0]; ... // Continue reading readByteTask = stream.ReadAsync(buf,token); } else if (completedTask == keepAliveTimerTask) { // Throw an exception if there was a cancellation. await keepAliveTimerTask; ... // Restart keepalive timer. keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10),token); } else if (completedTask == messageTask) { // Throw an exception if there was a cancellation (or the SendQueue was marked as completed) byte[] message = await messageTask; ... // Continue reading messageTask = SendQueue.ReceiveAsync(token); } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |