multiple reactors(35 -1 )
发布时间:2020-12-15 05:26:52 所属栏目:百科 来源:网络整理
导读:muduo库如何支持多线程 EventLoopThread(IO线程类) EventLoopThreadPool(IO线程池类) IO线程池的功能是开启若干个IO线程,并让这些IO线程处于事件循环的状态 下面的这些代码可能和前面给出的源代码有些不一样,阅读的同学请注意了 EventLoopThreadPool头
下面的这些代码可能和前面给出的源代码有些不一样,阅读的同学请注意了 EventLoopThreadPool头文件eventloopthreadpool.h // Copyright 2010,Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is an internal header file,you should not include this. #ifndef MUDUO_NET_EVENTLOOPTHREADPOOL_H #define MUDUO_NET_EVENTLOOPTHREADPOOL_H #include <muduo/base/Condition.h> #include <muduo/base/Mutex.h> #include <vector> #include <boost/function.hpp> #include <boost/noncopyable.hpp> #include <boost/ptr_container/ptr_vector.hpp> namespace muduo { namespace net { class EventLoop; class EventLoopThread; class EventLoopThreadPool : boost::noncopyable { public: typedef boost::function<void(EventLoop*)> ThreadInitCallback; EventLoopThreadPool(EventLoop* baseLoop); ~EventLoopThreadPool(); void setThreadNum(int numThreads) { numThreads_ = numThreads; } void start(const ThreadInitCallback& cb = ThreadInitCallback()); EventLoop* getNextLoop(); private: EventLoop* baseLoop_; // 与Acceptor所属EventLoop相同 bool started_; /*是否启动*/ int numThreads_; // 线程数 int next_; // 新连接到来,所选择的EventLoop对象下标 boost::ptr_vector<EventLoopThread> threads_; // IO线程列表 std::vector<EventLoop*> loops_; // EventLoop列表 }; } } #endif // MUDUO_NET_EVENTLOOPTHREADPOOL_H EventLoopThreadPool源文件eventloopthreadpool.cc
// Copyright 2010,Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include <muduo/net/EventLoopThreadPool.h> #include <muduo/net/EventLoop.h> #include <muduo/net/EventLoopThread.h> #include <boost/bind.hpp> using namespace muduo; using namespace muduo::net; EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop) : baseLoop_(baseLoop),// 与Acceptor所属EventLoop相同 started_(false),numThreads_(0),next_(0) { } EventLoopThreadPool::~EventLoopThreadPool() { // Don't delete loop,it's stack variable } /*每个线程初始化时的回调函数cb*/ void EventLoopThreadPool::start(const ThreadInitCallback& cb) { assert(!started_); baseLoop_->assertInLoopThread(); started_ = true; for (int i = 0; i < numThreads_; ++i) { EventLoopThread* t = new EventLoopThread(cb); threads_.push_back(t); loops_.push_back(t->startLoop()); // 启动EventLoopThread线程,在进入事件循环之前,会调用cb } if (numThreads_ == 0 && cb) { // 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb cb(baseLoop_); } } EventLoop* EventLoopThreadPool::getNextLoop() { baseLoop_->assertInLoopThread(); EventLoop* loop = baseLoop_; // 如果loops_为空,则loop指向baseLoop_ // 如果不为空,按照round-robin(RR,轮叫)的调度方式选择一个EventLoop if (!loops_.empty()) { // round-robin loop = loops_[next_]; ++next_; if (implicit_cast<size_t>(next_) >= loops_.size()) { next_ = 0; } } return loop; }
TcpServer头文件TcpServer.h
// Copyright 2010,Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is a public header file,it must only include public header files. #ifndef MUDUO_NET_TCPSERVER_H #define MUDUO_NET_TCPSERVER_H #include <muduo/base/Types.h> #include <muduo/net/TcpConnection.h> #include <map> #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> namespace muduo { namespace net { class Acceptor; class EventLoop; class EventLoopThreadPool; /// /// TCP server,supports single-threaded and thread-pool models. /// /// This is an interface class,so don't expose too much details. class TcpServer : boost::noncopyable { public: typedef boost::function<void(EventLoop*)> ThreadInitCallback; //TcpServer(EventLoop* loop,const InetAddress& listenAddr); TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg); ~TcpServer(); // force out-line dtor,for scoped_ptr members. const string& hostport() const { return hostport_; } const string& name() const { return name_; } /// Set the number of threads for handling input. /// /// Always accepts new connection in loop's thread. /// Must be called before @c start /// @param numThreads /// - 0 means all I/O in loop's thread,no thread will created. /// this is the default value. /// - 1 means all I/O in another thread. /// - N means a thread pool with N threads,new connections /// are assigned on a round-robin basis. void setThreadNum(int numThreads); void setThreadInitCallback(const ThreadInitCallback& cb) { threadInitCallback_ = cb; } /// Starts the server if it's not listenning. /// /// It's harmless to call it multiple times. /// Thread safe. void start(); /// Set connection callback. /// Not thread safe. // 设置连接到来或者连接关闭回调函数 void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } /// Set message callback. /// Not thread safe. // 设置消息到来回调函数 void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } private: /// Not thread safe,but in loop void newConnection(int sockfd,const InetAddress& peerAddr); /// Thread safe. void removeConnection(const TcpConnectionPtr& conn); /// Not thread safe,but in loop void removeConnectionInLoop(const TcpConnectionPtr& conn); typedef std::map<string,TcpConnectionPtr> ConnectionMap; EventLoop* loop_; // the acceptor loop const string hostport_; // 服务端口 const string name_; // 服务名 boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor boost::scoped_ptr<EventLoopThreadPool> threadPool_; //线程池 ConnectionCallback connectionCallback_; MessageCallback messageCallback_; ThreadInitCallback threadInitCallback_; // IO线程池中的线程在进入事件循环前,会回调用此函数 bool started_; // always in loop thread int nextConnId_; // 下一个连接ID ConnectionMap connections_; // 连接列表 }; } } #endif // MUDUO_NET_TCPSERVER_H
TcpServer源文件TcpServer.cc
// Copyright 2010,Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include <muduo/net/TcpServer.h> #include <muduo/base/Logging.h> #include <muduo/net/Acceptor.h> #include <muduo/net/EventLoop.h> #include <muduo/net/EventLoopThreadPool.h> #include <muduo/net/SocketsOps.h> #include <boost/bind.hpp> #include <stdio.h> // snprintf using namespace muduo; using namespace muduo::net; TcpServer::TcpServer(EventLoop* loop,/*这是main reactor*/ const InetAddress& listenAddr,const string& nameArg) : loop_(CHECK_NOTNULL(loop)),hostport_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop,listenAddr)),threadPool_(new EventLoopThreadPool(loop)),/*connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),*/ started_(false),nextConnId_(1) { // Acceptor::handleRead函数中会回调用TcpServer::newConnection // _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress) acceptor_->setNewConnectionCallback( boost::bind(&TcpServer::newConnection,this,_1,_2)); } TcpServer::~TcpServer() { loop_->assertInLoopThread(); LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing"; for (ConnectionMap::iterator it(connections_.begin()); it != connections_.end(); ++it) { TcpConnectionPtr conn = it->second; it->second.reset(); // 释放当前所控制的对象,引用计数减一 conn->getLoop()->runInLoop( boost::bind(&TcpConnection::connectDestroyed,conn)); conn.reset(); // 释放当前所控制的对象,引用计数减一 } } void TcpServer::setThreadNum(int numThreads) { /*numThreads不包含main reactor thread*/ assert(0 <= numThreads); threadPool_->setThreadNum(numThreads); } // 该函数多次调用是无害的 // 该函数可以跨线程调用 void TcpServer::start() { if (!started_) { started_ = true; /*启动线程池*/ threadPool_->start(threadInitCallback_); } if (!acceptor_->listenning()) { // get_pointer返回原生指针 loop_->runInLoop( boost::bind(&Acceptor::listen,get_pointer(acceptor_))); } } void TcpServer::newConnection(int sockfd,const InetAddress& peerAddr) { loop_->assertInLoopThread(); // 按照轮叫的方式选择一个EventLoop EventLoop* ioLoop = threadPool_->getNextLoop(); char buf[32]; snprintf(buf,sizeof buf,":%s#%d",hostport_.c_str(),nextConnId_); ++nextConnId_; string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort(); InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary /*TcpConnectionPtr conn(new TcpConnection(loop_,connName,sockfd,localAddr,peerAddr));*/ TcpConnectionPtr conn(new TcpConnection(ioLoop,peerAddr)); LOG_TRACE << "[1] usecount=" << conn.use_count(); connections_[connName] = conn; LOG_TRACE << "[2] usecount=" << conn.use_count(); conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setCloseCallback( boost::bind(&TcpServer::removeConnection,_1)); // conn->connectEstablished(); 这个表示直接在当前线程中调用 ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished,conn)); LOG_TRACE << "[5] usecount=" << conn.use_count(); } void TcpServer::removeConnection(const TcpConnectionPtr& conn) { /* loop_->assertInLoopThread(); LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ << "] - connection " << conn->name(); LOG_TRACE << "[8] usecount=" << conn.use_count(); size_t n = connections_.erase(conn->name()); LOG_TRACE << "[9] usecount=" << conn.use_count(); (void)n; assert(n == 1); loop_->queueInLoop( boost::bind(&TcpConnection::connectDestroyed,conn)); LOG_TRACE << "[10] usecount=" << conn.use_count(); */ loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop,conn)); } void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) { loop_->assertInLoopThread(); LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ << "] - connection " << conn->name(); LOG_TRACE << "[8] usecount=" << conn.use_count(); size_t n = connections_.erase(conn->name()); LOG_TRACE << "[9] usecount=" << conn.use_count(); (void)n; assert(n == 1); EventLoop* ioLoop = conn->getLoop(); ioLoop->queueInLoop( boost::bind(&TcpConnection::connectDestroyed,conn)); //loop_->queueInLoop( // boost::bind(&TcpConnection::connectDestroyed,conn)); LOG_TRACE << "[10] usecount=" << conn.use_count(); } 测试程序
#include <muduo/net/TcpServer.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <boost/bind.hpp> #include <stdio.h> using namespace muduo; using namespace muduo::net; class TestServer { public: TestServer(EventLoop* loop,int numThreads) : loop_(loop),server_(loop,listenAddr,"TestServer"),numThreads_(numThreads) { server_.setConnectionCallback( boost::bind(&TestServer::onConnection,_1)); server_.setMessageCallback( boost::bind(&TestServer::onMessage,_2,_3)); server_.setThreadNum(numThreads); } void start() { server_.start(); } private: void onConnection(const TcpConnectionPtr& conn) { if (conn->connected()) { printf("onConnection(): new connection [%s] from %sn",conn->name().c_str(),conn->peerAddress().toIpPort().c_str()); } else { printf("onConnection(): connection [%s] is downn",conn->name().c_str()); } } void onMessage(const TcpConnectionPtr& conn,const char* data,ssize_t len) { printf("onMessage(): received %zd bytes from connection [%s]n",len,conn->name().c_str()); } EventLoop* loop_; TcpServer server_; int numThreads_; }; int main() { printf("main(): pid = %dn",getpid()); InetAddress listenAddr(8888); EventLoop loop; TestServer server(&loop,4); server.start(); loop.loop(); } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |