delphi – TThreadedQueue不能多个消费者?
试图在单个生产者多消费者方案中使用TThreadedQueue(Generics.Collections)。 (Delphi-XE)。
想法是将对象推入队列,并让几个工作线程排空队列。 它不能按预期工作,但。 如果PopItem的调用序列化了一个临界区,一切都很好。 当然TThreadedQueue应该能够处理多个消费者,所以我缺少的东西或者这是一个纯的bug在TThreadedQueue? 这里是一个简单的例子来产生错误。 program TestThreadedQueue; {$APPTYPE CONSOLE} uses // FastMM4 in '......FastMM4FastMM4.pas',Windows,Messages,Classes,SysUtils,SyncObjs,Generics.Collections; type TThreadTaskMsg = class(TObject) private threadID : integer; threadMsg : string; public Constructor Create( ID : integer; const msg : string); end; type TThreadReader = class(TThread) private fPopQueue : TThreadedQueue<TObject>; fSync : TCriticalSection; fMsg : TThreadTaskMsg; fException : Exception; procedure DoSync; procedure DoHandleException; public Constructor Create( popQueue : TThreadedQueue<TObject>; sync : TCriticalSection); procedure Execute; override; end; Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>; sync : TCriticalSection); begin fPopQueue:= popQueue; fMsg:= nil; fSync:= sync; Self.FreeOnTerminate:= FALSE; fException:= nil; Inherited Create( FALSE); end; procedure TThreadReader.DoSync ; begin WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId)); end; procedure TThreadReader.DoHandleException; begin WriteLn('Exception ->' + fException.Message); end; procedure TThreadReader.Execute; var signal : TWaitResult; begin NameThreadForDebugging('QueuePop worker'); while not Terminated do begin try {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } Sleep(20); {- Serializing calls to PopItem works } if Assigned(fSync) then fSync.Enter; try signal:= fPopQueue.PopItem( TObject(fMsg)); finally if Assigned(fSync) then fSync.Release; end; if (signal = wrSignaled) then begin try if Assigned(fMsg) then begin fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>'; fMsg.Free; // We are just dumping the message in this test //Synchronize( Self.DoSync); //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); end; except on E:Exception do begin end; end; end; except FException:= Exception(ExceptObject); try if not (FException is EAbort) then begin {Synchronize(} DoHandleException; //); end; finally FException:= nil; end; end; end; end; Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string); begin Inherited Create; threadID:= ID; threadMsg:= msg; end; var fSync : TCriticalSection; fThreadQueue : TThreadedQueue<TObject>; fReaderArr : array[1..4] of TThreadReader; i : integer; begin try IsMultiThread:= TRUE; fSync:= TCriticalSection.Create; fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100); try {- Calling without fSync throws exceptions when two or more threads calls PopItem at the same time } WriteLn('Creating worker threads ...'); for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil); {- Calling with fSync works ! } //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync); WriteLn('Init done. Pushing items ...'); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); ReadLn; finally for i:= 1 to 4 do fReaderArr[i].Free; fThreadQueue.Free; fSync.Free; end; except on E: Exception do begin Writeln(E.ClassName,': ',E.Message); ReadLn; end; end; end. 更新:导致TThreadedQueue崩溃的TMonitor中的错误是在Delphi XE2中修复的。 更新2:上面的测试强调队列在空状态。 Darian Miller发现,在满状态下强调队列,仍然可以再现XE2中的错误。错误再次出现在TMonitor中。有关更多信息,请参阅下面的答案。还有一个链接到QC101114。 更新3: 解决方法
嗯,很难确定没有大量的测试,但它肯定看起来像是一个错误,无论是在TThreadedQueue或在TMonitor。无论哪种方式,它在RTL而不是你的代码。您应该将此文件作为QC报告,并使用上面的示例作为“如何再现”代码。
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |