如何从多个线程读取/写入tempfile
语境:
操作系统:Windows 8.1 python –version:Python 2.7.8 我正在尝试从/向流读取/写入,并使用线程写入该流,而另一个用于从中读取已写入的新数据.我正在使用tempfile模块,使用一种方法将二进制数据写入其中,另一种方法从中读取. 在下面的代码中,t1运行写入,t2运行读取线程方法. t3& t4退出使t1和t2循环退出. 我读取的预期输出是: READ @0: 0|1|2|3|.........|N --- L Bytes read READ @L: N|N+1|N+2|.......|M --- J Bytes read READ @L+J: M|M+1|M+3|.....|P --- K Bytes read READ @L+J+K: --- 0 Bytes read (nothing was written by write thread) 等等,每当tempfile中有更多数据时,它就会被读取和输出,但同时写入必须由另一个线程继续进行,以便写入从流中接收的数据. 问题: 当我运行它时,我得到的输出会有所不同 > python.exe tmp.py Exception in thread Thread-1: Traceback (most recent call last): [truncated output] File "C:/tmp.py",line 11,in write_temp READ @ 0 : 264261|975|263976|263977|...[truncated output]...|263972|263 self.myf.write(str(self.current_count)+"|") IOError: [Errno 0] Error 另一个输出: > python.exe tmp.py READ @ 0 : 0|289721|289722|...[truncated output]...289718|28971 Exception in thread Thread-1: Traceback (most recent call last): [truncated output] self.myf.write(str(self.current_count)+"|") IOError: [Errno 0] Error 其他输出或多或少是上述输出的变化. 我认为问题是由于文件描述符指针被读取更改,但我认为追加始终写入文件末尾. 源代码 以下是流的实际代码的抽象,流是从子进程stdio读取的二进制数据流,并写入另一个子进程stdio. import threading,tempfile class MultipleThreadTIO: def __init__(self): self.myf = tempfile.TemporaryFile(mode='a+b') self.current_count = 0 self.do_write = True self.do_read = True def write_temp(self): while self.do_write: self.myf.write(str(self.current_count)+"|") self.current_count += 1 def read_temp(self): read_at = 0L while self.do_read: self.myf.seek(read_at) d = self.myf.read() if len(d.strip()) > 0: print "READ @",read_at,": ",self.myf.read() read_at = self.myf.tell() def stop_write(self): self.do_write = False def stop_read(self): self.do_read = False def __del__(self): #self.myf.seek(0) #print ":::DATA CONTENT:::n" #print self.myf.read() #print ":::END DATA CONTENT:::" self.myf.close() mtio = MultipleThreadTIO() t1 = threading.Timer(0.1,mtio.write_temp) t2 = threading.Timer(0.5,mtio.read_temp) t3 = threading.Timer(5,mtio.stop_write) t4 = threading.Timer(3,mtio.stop_read) t1.start() t2.start() t3.start() t4.start() 问题: 问题1:对上述问题的任何解决方法? 问题2:我应该使用queues / os.pipe(/ other?)代替tempfile吗? 问题3:针对这种情况的任何其他更好的方法? 重要提示:解决方案必须跨平台. 解决方法
这是一个使用
Queue 的抽象,我认为如果你使用队列/管道行为会更好.我不确定你在写什么,所以我只是将运行计数器添加到队列中.使用
Event 停止写入队列,并通过发送停止消息来停止从队列读取(但如果想要更精细的控制,可以添加信号):
import threading import Queue class MultipleThreadTIO: def __init__(self): self.queue = Queue.Queue() self.current_count = 0 self.stop_write = threading.Event() def write_temp(self): while not self.stop_write.isSet(): self.queue.put(str(self.current_count)+"|") self.current_count += 1 self.stop() def read_temp(self): while True: msg = self.queue.get() if msg == 'close': break else: print "READ @: " + msg def stop(self): self.queue.put('close') mtio = MultipleThreadTIO() t1 = threading.Timer(0.1,mtio.read_temp) t1.start() t2.start() t3 = threading.Timer(5,mtio.stop_write.set) t3.start() 我不能在Windows上测试,但我认为它应该工作,因为这是非常标准的.它运行在Ubuntu 14.04 x86_64上 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |