bigchaindb源码分析(二)——pipeline
bigchaindb源码分析(一)分析了bigchaindb如何解析命令行参数与配置文件,并据此启动了日志publisher与subscriber。本节来分析bigchaindb的pipeline机制。 之前说到,对于命令行命令 def start():
events_queue = setup_events_queue()
# start the processes
logger.info('Starting block')
block.start()
logger.info('Starting voter')
vote.start()
logger.info('Starting stale transaction monitor')
stale.start()
logger.info('Starting election')
election.start(events_queue=events_queue)
# start the web api
app_server = server.create_server(bigchaindb.config['server'])
p_webapi = mp.Process(name='webapi',target=app_server.run)
p_webapi.start()
logger.info('WebSocket server started')
p_websocket_server = mp.Process(name='ws',target=websocket_server.start,args=(events_queue,))
p_websocket_server.start()
# start message
logger.info(BANNER.format(bigchaindb.config['server']['bind']))
其中 # pipeline/block.py
def start():
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline
# pipeline/vote.py
def start():
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline
# pipeline/election.py
def start(events_queue=None):
pipeline = create_pipeline(events_queue=events_queue)
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline
以block.start为例,创建pipeline的函数如下。Pipeline的构造函数中输入的为一个list,其中第一个元素是一个队列,其他每个元素对应到类BlockPipeline的成员函数。暂时按照名称来猜测,其作用应当是按照这个list的顺序来处理输入的数据。 def create_pipeline():
block_pipeline = BlockPipeline()
pipeline = Pipeline([
Pipe(maxsize=1000),Node(block_pipeline.filter_tx),Node(block_pipeline.validate_tx,fraction_of_cores=1),Node(block_pipeline.create,timeout=1),Node(block_pipeline.write),Node(block_pipeline.delete_tx),])
return pipeline
def Pipe(maxsize=0):
return queues.Queue(maxsize,ctx=mp.get_context())
Node与Pipeline类均来自于一个额外的包 Node我们先来看看Node类。其 def pass_through(val):
return val
class Node:
def __init__(self,target=None,inqueue=None,outqueue=None,name=None,timeout=None,number_of_processes=None,fraction_of_cores=None):
self.target = target if target else pass_through
...
self.processes = [mp.Process(target=self.safe_run_forever)
for i in range(self.number_of_processes)]
def run(self):
if self.inqueue:
args = self.inqueue.get(timeout=self.timeout)
result = self.target(*args)
if result is not None and self.outqueue:
self.outqueue.put(result)
除此之外,还对外提供了start、terminate、 pipeline在知道Node其实就是一个用来执行target程序的多进程程序后,我们来分析pipeline类,pipeline的 class Pipeline:
def __init__(self,items):
self.items = items
self.setup()
def setup(self,indata=None,outdata=None):
items_copy = self.items[:]
if indata:
items_copy.insert(0,indata)
if outdata:
items_copy.append(outdata)
self.nodes = [item for item in items_copy if isinstance(item,Node)]
self.connect(items_copy,False)
def connect(self,rest,pipe=None):
if not rest:
return pipe
head,*tail = rest
if isinstance(head,queues.Queue):
if pipe:
raise ValueError('Cannot have two or more pipes next'
' to each other.')
return self.connect(tail,pipe=head)
elif isinstance(head,Node):
if pipe is None:
pipe = Pipe()
if pipe is not False:
head.inqueue = pipe
head.outqueue = self.connect(tail)
return head.inqueue
附带说明下
回到
因此, startstart的第一条则是创建了一个上面说明的链式结构,对于block.py来说,其pipeline中对应的都是要依次执行 # pipeline/block.py
def start():
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline
start在构建完pipeline之后又执行了一次setup,并给出了一个indata。回顾setup代码,当indata不为None时,setup将indata放在pipeline的最前面。相当于indata的outqueue指向队列,而第一个node的inqueue也指向队列。其实也就是将indata给出的数据放入到第一个node的inqueue了。 def setup(self,outdata=None):
items_copy = self.items[:]
if indata:
items_copy.insert(0,indata)
if outdata:
items_copy.append(outdata)
重新将pipeline构建完成后,再调用 def start(self):
for node in self.nodes:
node.start()
因此,只要indata还产生数据到队列里,pipeline就会流水线地处理这个数据。而 再回过头来看bigchaindb在最开始启动的这些进程: def start():
events_queue = setup_events_queue()
# start the processes
logger.info('Starting block')
block.start()
logger.info('Starting voter')
vote.start()
logger.info('Starting stale transaction monitor')
stale.start()
logger.info('Starting election')
election.start(events_queue=events_queue)
# start the web api
app_server = server.create_server(bigchaindb.config['server'])
p_webapi = mp.Process(name='webapi',))
p_websocket_server.start()
# start message
logger.info(BANNER.format(bigchaindb.config['server']['bind']))
block/vote/election的结构基本都是一样的了,等待各自的反馈出现,出现后调用对应类的函数按流水线来进行处理..stale虽然有所不同,但也区别不大了。除此之外也就还启动了一个web api,这个下次再看… (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |