asio reactor模拟Proactor代码分析笔记
今天看了ASIO的介绍,不太明白asio在POSIX上如何用reactor模拟proactor。所以稍微看了下源代码,此文当作笔记。 ASIO Proactor: Proactor design pattern (adapted from [POSA2]) — Asynchronous Operation
— Asynchronous Operation Processor
— Completion Event Queue
— Completion Handler
— Asynchronous Event Demultiplexer
— Proactor
— Initiator
Implementation Using Reactor On many platforms,Asio implements the Proactor design pattern in terms of a Reactor,such as — Asynchronous Operation Processor
— Completion Event Queue
— Asynchronous Event Demultiplexer
1. Initiator使用Asynchronous Operation Processor发起异步I/O操作
2. 保存每个异步I/O操作的参数,包括回调函数的地址,并将其放入Completion Event Queue
3. Proactor调用Asynchronous Event Demultiplexer检测完成事件。
4. 当检测到I/O操作完成事件,从Completion Event Queue中取出对应的异步I/O操作,并且分派到相应的Completion Handler。
5. Completion Handler调用回调函数。
reactor模拟的实现: task_io_service<reactor> io_service_impl。 // Run the event loop until interrupted or no more work. size_t run(boost::system::error_code& ec) { ec = boost::system::error_code(); if (outstanding_work_ == 0) { stop(); return 0; } typename call_stack<task_io_service>::context ctx(this); idle_thread_info this_idle_thread; this_idle_thread.next = 0; boost::asio::detail::mutex::scoped_lock lock(mutex_); size_t n = 0; for (; do_one(lock,&this_idle_thread); lock.lock()) if (n != (std::numeric_limits<size_t>::max)()) ++n; return n; } 1. this_idle_thread是个保存idle线程的链表。 2. 如果没有异步操作要处理,那么就加入到idle线程的链表中去,一直在那边阻塞等下去。 size_t do_one(boost::asio::detail::mutex::scoped_lock& lock,idle_thread_info* this_idle_thread) { ...... else if (this_idle_thread) { // Nothing to run right now,so just wait for work to do. this_idle_thread->next = first_idle_thread_; first_idle_thread_ = this_idle_thread; this_idle_thread->wakeup_event.clear(lock); this_idle_thread->wakeup_event.wait(lock); } else { return 0; } } return 0; } socket_service: 类型: datagram_socket_service raw_socket_service socket_acceptor_service stream_socket_service 当构造socket_service的时候间接会调用init_task(): // Initialise the task,if required. void init_task() { boost::asio::detail::mutex::scoped_lock lock(mutex_); if (!shutdown_ && !task_) { task_ = &use_service<Task>(this->get_io_service()); op_queue_.push(&task_operation_); wake_one_thread_and_unlock(lock); } } 1. 这边的task_operation就是用来表示异步操作即将开始的,而后面会发现op_queue中不但出存放异步操作即将开始的指针,还会存放完成队列指针。 2. 唤醒一个当前的idle线程。 这个时候idle线程就会继续循环,下面我们来看完整的do_one()函数: size_t do_one(boost::asio::detail::mutex::scoped_lock& lock,idle_thread_info* this_idle_thread) { bool polling = !this_idle_thread; bool task_has_run = false; while (!stopped_) { if (!op_queue_.empty()) // 操作队列不为空 { // Prepare to execute first handler from queue. operation* o = op_queue_.front(); op_queue_.pop(); bool more_handlers = (!op_queue_.empty()); if (o == &task_operation_) // 如果是异步操作即将开始,即socket_service创建完成 { task_interrupted_ = more_handlers || polling; // If the task has already run and we're polling then we're done. if (task_has_run && polling) { task_interrupted_ = true; op_queue_.push(&task_operation_); return 0; } task_has_run = true; if (!more_handlers || !wake_one_idle_thread_and_unlock(lock)) lock.unlock(); op_queue<operation> completed_ops; task_cleanup c = { this,&lock,&completed_ops }; // 当所在的block结束的时候,即下面的通过reactor去等待并且执行操作,去将任务加入到完成队列中,还会添加一个task_operation_ (void)c; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling,otherwise we want to return // as soon as possible. task_->run(!more_handlers && !polling,completed_ops); // 通过reactor去等待并且执行操作 } else { if (more_handlers) // 如果操作队列中还有需要处理的任务 wake_one_thread_and_unlock(lock); // 唤醒idle线程 else lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. work_finished_on_block_exit on_exit = { this }; (void)on_exit; // Complete the operation. May throw an exception. o->complete(*this); // deletes the operation object //异步操作完成回调 return 1; } } else if (this_idle_thread) // 加入到idle线程链表中,并且wait阻塞。 { // Nothing to run right now,so just wait for work to do. this_idle_thread->next = first_idle_thread_; first_idle_thread_ = this_idle_thread; this_idle_thread->wakeup_event.clear(lock); this_idle_thread->wakeup_event.wait(lock); } else { return 0; } } return 0; } struct task_cleanup { ~task_cleanup() { 添加完成操作指针到操作队列,并且重新插入task_operation_ lock_->lock(); task_io_service_->task_interrupted_ = true; task_io_service_->op_queue_.push(*ops_); task_io_service_->op_queue_.push(&task_io_service_->task_operation_); } task_io_service* task_io_service_; boost::asio::detail::mutex::scoped_lock* lock_; op_queue<operation>* ops_; };这边还会有很多细节,等到啥时有空的时候来个深入分析。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |