加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

multiple reactors(35 -1 )

发布时间:2020-12-15 05:26:52 所属栏目:百科 来源:网络整理
导读:muduo库如何支持多线程 EventLoopThread(IO线程类) EventLoopThreadPool(IO线程池类) IO线程池的功能是开启若干个IO线程,并让这些IO线程处于事件循环的状态 下面的这些代码可能和前面给出的源代码有些不一样,阅读的同学请注意了 EventLoopThreadPool头






  • muduo库如何支持多线程
    EventLoopThread(IO线程类)
    EventLoopThreadPool(IO线程池类)
    IO线程池的功能是开启若干个IO线程,并让这些IO线程处于事件循环的状态


下面的这些代码可能和前面给出的源代码有些不一样,阅读的同学请注意了



EventLoopThreadPool头文件

eventloopthreadpool.h

// Copyright 2010,Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file,you should not include this.

#ifndef MUDUO_NET_EVENTLOOPTHREADPOOL_H
#define MUDUO_NET_EVENTLOOPTHREADPOOL_H

#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>

#include <vector>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/ptr_container/ptr_vector.hpp>

namespace muduo
{

namespace net
{

class EventLoop;
class EventLoopThread;

class EventLoopThreadPool : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThreadPool(EventLoop* baseLoop);
  ~EventLoopThreadPool();
  void setThreadNum(int numThreads) { numThreads_ = numThreads; }
  void start(const ThreadInitCallback& cb = ThreadInitCallback());
  EventLoop* getNextLoop();

 private:

  EventLoop* baseLoop_; // 与Acceptor所属EventLoop相同
  bool started_;        /*是否启动*/
  int numThreads_;      // 线程数
  int next_;            // 新连接到来,所选择的EventLoop对象下标
  boost::ptr_vector<EventLoopThread> threads_;        // IO线程列表
  std::vector<EventLoop*> loops_;                 // EventLoop列表
};

}
}

#endif  // MUDUO_NET_EVENTLOOPTHREADPOOL_H

EventLoopThreadPool源文件

eventloopthreadpool.cc


// Copyright 2010,Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include <muduo/net/EventLoopThreadPool.h>

#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>

#include <boost/bind.hpp>

using namespace muduo;
using namespace muduo::net;


EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop)
  : baseLoop_(baseLoop),// 与Acceptor所属EventLoop相同
    started_(false),numThreads_(0),next_(0)
{
}

EventLoopThreadPool::~EventLoopThreadPool()
{
  // Don't delete loop,it's stack variable
}
/*每个线程初始化时的回调函数cb*/
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
  assert(!started_);
  baseLoop_->assertInLoopThread();

  started_ = true;

  for (int i = 0; i < numThreads_; ++i)
  {
    EventLoopThread* t = new EventLoopThread(cb);
    threads_.push_back(t);
    loops_.push_back(t->startLoop());    // 启动EventLoopThread线程,在进入事件循环之前,会调用cb
  }
  if (numThreads_ == 0 && cb)
  {
    // 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb
    cb(baseLoop_);
  }
}

EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  EventLoop* loop = baseLoop_;

  // 如果loops_为空,则loop指向baseLoop_
  // 如果不为空,按照round-robin(RR,轮叫)的调度方式选择一个EventLoop
  if (!loops_.empty())
  {
    // round-robin
    loop = loops_[next_];
    ++next_;
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}




TcpServer头文件

TcpServer.h


// Copyright 2010,Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file,it must only include public header files.

#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H

#include <muduo/base/Types.h>
#include <muduo/net/TcpConnection.h>

#include <map>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>

namespace muduo
{
namespace net
{

class Acceptor;
class EventLoop;
class EventLoopThreadPool;

///
/// TCP server,supports single-threaded and thread-pool models.
///
/// This is an interface class,so don't expose too much details.
class TcpServer : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;

  //TcpServer(EventLoop* loop,const InetAddress& listenAddr);
  TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg);
  ~TcpServer();  // force out-line dtor,for scoped_ptr members.

  const string& hostport() const { return hostport_; }
  const string& name() const { return name_; }

  /// Set the number of threads for handling input.
  ///
  /// Always accepts new connection in loop's thread.
  /// Must be called before @c start
  /// @param numThreads
  /// - 0 means all I/O in loop's thread,no thread will created.
  ///   this is the default value.
  /// - 1 means all I/O in another thread.
  /// - N means a thread pool with N threads,new connections
  ///   are assigned on a round-robin basis.
  void setThreadNum(int numThreads);
  void setThreadInitCallback(const ThreadInitCallback& cb)
  { threadInitCallback_ = cb; }

  /// Starts the server if it's not listenning.
  ///
  /// It's harmless to call it multiple times.
  /// Thread safe.
  void start();

  /// Set connection callback.
  /// Not thread safe.
  // 设置连接到来或者连接关闭回调函数
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  /// Set message callback.
  /// Not thread safe.
  // 设置消息到来回调函数
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }


 private:
  /// Not thread safe,but in loop
  void newConnection(int sockfd,const InetAddress& peerAddr);
  /// Thread safe.
  void removeConnection(const TcpConnectionPtr& conn);
  /// Not thread safe,but in loop
  void removeConnectionInLoop(const TcpConnectionPtr& conn);

  typedef std::map<string,TcpConnectionPtr> ConnectionMap;

  EventLoop* loop_;  // the acceptor loop
  const string hostport_;       // 服务端口
  const string name_;           // 服务名
  boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
  boost::scoped_ptr<EventLoopThreadPool> threadPool_; //线程池
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  ThreadInitCallback threadInitCallback_;   // IO线程池中的线程在进入事件循环前,会回调用此函数
  bool started_;
  // always in loop thread
  int nextConnId_;              // 下一个连接ID
  ConnectionMap connections_;   // 连接列表
};

}
}

#endif  // MUDUO_NET_TCPSERVER_H


TcpServer源文件

TcpServer.cc


// Copyright 2010,Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include <muduo/net/TcpServer.h>

#include <muduo/base/Logging.h>
#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/SocketsOps.h>

#include <boost/bind.hpp>

#include <stdio.h>  // snprintf

using namespace muduo;
using namespace muduo::net;

TcpServer::TcpServer(EventLoop* loop,/*这是main reactor*/
                     const InetAddress& listenAddr,const string& nameArg)
  : loop_(CHECK_NOTNULL(loop)),hostport_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop,listenAddr)),threadPool_(new EventLoopThreadPool(loop)),/*connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),*/
    started_(false),nextConnId_(1)
{
  // Acceptor::handleRead函数中会回调用TcpServer::newConnection
  // _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
  acceptor_->setNewConnectionCallback(
      boost::bind(&TcpServer::newConnection,this,_1,_2));
}

TcpServer::~TcpServer()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";

  for (ConnectionMap::iterator it(connections_.begin());
      it != connections_.end(); ++it)
  {
    TcpConnectionPtr conn = it->second;
    it->second.reset();      // 释放当前所控制的对象,引用计数减一
    conn->getLoop()->runInLoop(
      boost::bind(&TcpConnection::connectDestroyed,conn));
    conn.reset();           // 释放当前所控制的对象,引用计数减一
  }
}

void TcpServer::setThreadNum(int numThreads)
{
  /*numThreads不包含main reactor thread*/
  assert(0 <= numThreads);
  threadPool_->setThreadNum(numThreads);
}

// 该函数多次调用是无害的
// 该函数可以跨线程调用
void TcpServer::start()
{
  if (!started_)
  {
    started_ = true;
    /*启动线程池*/
    threadPool_->start(threadInitCallback_);
  }

  if (!acceptor_->listenning())
  {
    // get_pointer返回原生指针
    loop_->runInLoop(
        boost::bind(&Acceptor::listen,get_pointer(acceptor_)));
  }
}

void TcpServer::newConnection(int sockfd,const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  // 按照轮叫的方式选择一个EventLoop
  EventLoop* ioLoop = threadPool_->getNextLoop();
  char buf[32];
  snprintf(buf,sizeof buf,":%s#%d",hostport_.c_str(),nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;

  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toIpPort();
  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  /*TcpConnectionPtr conn(new TcpConnection(loop_,connName,sockfd,localAddr,peerAddr));*/

  TcpConnectionPtr conn(new TcpConnection(ioLoop,peerAddr));

  LOG_TRACE << "[1] usecount=" << conn.use_count();
  connections_[connName] = conn;
  LOG_TRACE << "[2] usecount=" << conn.use_count();
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);

  conn->setCloseCallback(
      boost::bind(&TcpServer::removeConnection,_1));

  // conn->connectEstablished(); 这个表示直接在当前线程中调用
  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished,conn));
  LOG_TRACE << "[5] usecount=" << conn.use_count();

}

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
    /*
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();


  LOG_TRACE << "[8] usecount=" << conn.use_count();
  size_t n = connections_.erase(conn->name());
  LOG_TRACE << "[9] usecount=" << conn.use_count();

  (void)n;
  assert(n == 1);

  loop_->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed,conn));
  LOG_TRACE << "[10] usecount=" << conn.use_count();
  */

  loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop,conn));

}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();


  LOG_TRACE << "[8] usecount=" << conn.use_count();
  size_t n = connections_.erase(conn->name());
  LOG_TRACE << "[9] usecount=" << conn.use_count();

  (void)n;
  assert(n == 1);

  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed,conn));

  //loop_->queueInLoop(
  //    boost::bind(&TcpConnection::connectDestroyed,conn));
  LOG_TRACE << "[10] usecount=" << conn.use_count();



}



测试程序


#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <boost/bind.hpp>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

class TestServer
{
 public:
  TestServer(EventLoop* loop,int numThreads)
    : loop_(loop),server_(loop,listenAddr,"TestServer"),numThreads_(numThreads)
  {
    server_.setConnectionCallback(
        boost::bind(&TestServer::onConnection,_1));
    server_.setMessageCallback(
        boost::bind(&TestServer::onMessage,_2,_3));
    server_.setThreadNum(numThreads);
  }

  void start()
  {
      server_.start();
  }

 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %sn",conn->name().c_str(),conn->peerAddress().toIpPort().c_str());
    }
    else
    {
      printf("onConnection(): connection [%s] is downn",conn->name().c_str());
    }
  }

  void onMessage(const TcpConnectionPtr& conn,const char* data,ssize_t len)
  {
    printf("onMessage(): received %zd bytes from connection [%s]n",len,conn->name().c_str());
  }

  EventLoop* loop_;
  TcpServer server_;
  int numThreads_;
};


int main()
{
  printf("main(): pid = %dn",getpid());

  InetAddress listenAddr(8888);
  EventLoop loop;

  TestServer server(&loop,4);
  server.start();

  loop.loop();
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读