博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Muduo 多线程模型对比
阅读量:5918 次
发布时间:2019-06-19

本文共 5388 字,大约阅读时间需要 17 分钟。

  本文主要对比Muduo多线程模型方案8 和方案9 。

  方案8:reactor + thread pool ,有一个线程来充当reactor 接受连接分发事件,将要处理的事件分配给thread pool中的线程,由thread pool 来完成事件处理。实例代码见:examples/sudoku/server_threadpool.cc

  这里截取关键部分代码进行说明。

class
 SudokuServer
{
 
public 
:
  SudokuServer(EventLoop* loop, 
const
 InetAddress& listenAddr, 
int
 numThreads)
    : loop_(loop),
      server_(loop, listenAddr, 
"SudokuServer"
),
      numThreads_(numThreads),
      startTime_(Timestamp::now())
  {
    server_.setConnectionCallback(
        boost::bind(&SudokuServer::onConnection, 
this
, _1));
    server_.setMessageCallback(
        boost::bind(&SudokuServer::onMessage, 
this
, _1, _2, _3));
  }
 
  
void 
start()
  {
    LOG_INFO << 
"starting "
 << numThreads_ << 
" threads."
;
    threadPool_.start(numThreads_); 
// 注意这里,threadPool 的类型是: ThreadPool,且位置在start 里面
    server_.start();
  }
 
 
private 
:
  
void 
onConnection(
const 
TcpConnectionPtr& conn)
  {
    LOG_TRACE << conn->peerAddress().toIpPort() << 
" -> "
        << conn->localAddress().toIpPort() << 
" is "
        << (conn->connected() ? 
"UP"
 : 
"DOWN"
);
  }
 
  
void 
onMessage(
const 
TcpConnectionPtr& conn, Buffer* buf, Timestamp)
  {
...
        
if
 (!processRequest(conn, request)) // 封装计算任务执行方法
        {
          conn->send( 
"Bad Request!\r\n"
);
          conn->shutdown();
          
break
;
        }
      }
...
    }
  }
 
  
bool 
processRequest(
const 
TcpConnectionPtr& conn, 
const 
string& request)
  {
...
 
    
if
 (puzzle.size() == implicit_cast<size_t>(kCells))
    {
      threadPool_.run(boost::bind(&solve, conn, puzzle, id));
// 将计算任务转移到 threadPool 线程
    }
    
else
    {
      goodRequest = 
false
;
    }
    
return
 goodRequest;
  }
 
  
static 
void 
solve(
const 
TcpConnectionPtr& conn,
                    
const
 string& puzzle,
                    
const
 string& id)
  {
    LOG_DEBUG << conn->name();
    string result = solveSudoku(puzzle); // solveSudou 是一个pure function, 是可重入的 
    
if
 (id.empty())
    {
      conn->send(result+ 
"\r\n"
);
    }
    
else
    {
      conn->send(id+
 ":"
+result+
 "\r\n"
);
    }
  }
 
  EventLoop* loop_;
  TcpServer server_;
  ThreadPool threadPool_; 
// 注意类型,方案8, reactor + threadpool
  
int 
numThreads_;
  Timestamp startTime_;
};
 
void
 ThreadPool::start( 
int
 numThreads)  // 创建 thread pool,具体thread 调度这里暂时不分析
{
  assert(threads_.empty());
  running_ = 
true
;
  threads_.reserve(numThreads);
  
for 
(
int 
i = 0; i < numThreads; ++i)
  {
    
char
 id[32];
    snprintf(id, 
sizeof
 id, 
"%d"
, i);
    threads_.push_back(
 new
 muduo::Thread(
          boost::bind(&ThreadPool::runInThread, 
this
), name_+id));
    threads_[i].start();
  }
}
 
方案9:main-reactor + subreactors, one loop per thread, 有一个主线程来扮演main-reactor 专门语句 accept 连接,其它线程负责读写文件描述符(socket)
 
class 
SudokuServer
{
 
public 
:
  SudokuServer(EventLoop* loop, 
const 
InetAddress& listenAddr, 
int 
numThreads)
    : loop_(loop),
      server_(loop, listenAddr, 
"SudokuServer"
),
      numThreads_(numThreads),
      startTime_(Timestamp::now())
  {
    server_.setConnectionCallback(
        boost::bind(&SudokuServer::onConnection, 
this
, _1));
    server_.setMessageCallback(
        boost::bind(&SudokuServer::onMessage, 
this
, _1, _2, _3));
    server_.setThreadNum(numThreads); 
// 设置 EventLoopThreadPool里面的thread数量
  }
 
  
void 
start()
  {
    LOG_INFO << 
"starting " 
<< numThreads_ << 
" threads."
;
    server_.start();
  }
 
 
private 
:
  
void 
onConnection(
const 
TcpConnectionPtr& conn)
  {
    LOG_TRACE << conn->peerAddress().toIpPort() << 
" -> "
        << conn->localAddress().toIpPort() << 
" is "
        << (conn->connected() ? 
"UP" 
: 
"DOWN"
);
  }
 
  
void 
onMessage(
const 
TcpConnectionPtr& conn, Buffer* buf, Timestamp)
  {
...
        
if 
(!processRequest(conn, request)) 
//准备计算
        {
          conn->send( 
"Bad Request!\r\n"
);
          conn->shutdown();
          
break
;
        }
...
    }
  }
 
  
bool 
processRequest(
const 
TcpConnectionPtr& conn, 
const 
string& request)
  {
...
    
if 
(puzzle.size() == implicit_cast<size_t>(kCells))
    {
      LOG_DEBUG << conn->name();
      string result = solveSudoku(puzzle); 
// 计算在当前线程完成
      
if 
(id.empty())
      {
        conn->send(result+ 
"\r\n"
);
      }
...
  }
  // 注意这里没有类型为ThreadPool的 threadPool_成员,整个类使用Muduo默认线程模型的EventLoopThreadPool,TcpServer 聚合了EventLoopThreadPool
  EventLoop* loop_;
  TcpServer server_;
  
int 
numThreads_;
  Timestamp startTime_;
};
 
 
void 
TcpServer::setThreadNum( 
int 
numThreads)
{
  assert(0 <= numThreads);
  threadPool_->setThreadNum(numThreads); 
// 设置了 EventLoopThreadPool 里面的线程个数,为后面的threadPool_->start()服务
}
 
void 
TcpServer::start()
{
  
if 
(!started_)
  {
    started_ = 
true
;
    threadPool_->start(threadInitCallback_); 
// TcpServer 中的 threadPool 类型是 EventLoopThreadPool
  }
 
  
if 
(!acceptor_->listenning())
  {
    loop_->runInLoop(
        boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}
 
void 
EventLoopThreadPool::start( 
const 
ThreadInitCallback& cb) // 开启线程的方式是使用EventLoopThread,这个类将EventLoop 和 Thread 封装在一起实现 one loop per thread
{
  assert(!started_);
  baseLoop_->assertInLoopThread();
 
  started_ = 
true
;
 
  
for 
(
int 
i = 0; i < numThreads_; ++i)
  {
    EventLoopThread* t = 
new 
EventLoopThread(cb); // 设置线程的 callback
    threads_.push_back(t); 
    loops_.push_back(t->startLoop()); // 保存loop方便管理和分配任务,任务分配其实是通过EventLoop::runInLoop() 来进行的
  }
  
if 
(numThreads_ == 0 && cb)
  {
    cb(baseLoop_);
  }
}
 
  总结一下,这里所谓的Reactor就是持有Poller的结构(稍微有点狭隘,这里先就这样理解),Poller负责事件监听和分发。持有EventLoop的结构就持有Poller。
  对于方案8只有一个类持有EventLoop,也就是只创建了一个EventLoop,这个Loop就是reactor,其它的Thread 是通过ThreadPool来实现的,因此只有reactor所在的线程来完成I/O,其它线程用于完成计算任务,所以说这个模型适合于计算密集型而不是I/O密集型。
  对于方案9,存在多个Reactor,其中main reactor 持有Acceptor,专门用于监听三个半事件中的连接建立,消息到达和连接断开以及消息发送事件都让sub reactor来完成。由于main reactor 只关心连接建立事件,能够适应高并发的IO请求,多个subreactor的存在也能兼顾I/O与计算,因此被认为是一个比较好的方案。
   后面还会深入学习Muduo网络库相关的内容,包括Reactor结构的简化,线程池的实现,现代C++的编写方式,使用C++11进行重写等。现在看来C++11 thread library 提供的接口基本可以替换 posix thread library,虽然底层也许是通过posix thread实现的,毕竟Linux内核针对NPTL进行过修改。C++11 提供了 thread_local 来描述 线程局部存储,但是没有pthread_key_create() 提供 destructor那样的功能,或者遇到需要使用TLS的地方转过来使用posix 提供的接口。
 
Muduo 多线程 线程池 reactor

转载地址:http://dmfvx.baihongyu.com/

你可能感兴趣的文章
python设计模式(二):建造者模式
查看>>
Spring Cloud 2.x系列之整合rocketMQ
查看>>
MVVM模式下关闭窗口的实现
查看>>
使用最新版本MySQL8.0.12报错记录
查看>>
ASP.NET Core 2 学习笔记(二)生命周期
查看>>
centos搭建virtualenv环境
查看>>
ECMAScript 2018 语言规范正式发布,改进正则表达式
查看>>
Mysql查询一段时间记录
查看>>
Java虚拟机内存模型
查看>>
超好用的Unix/Linux 命令技巧 大神为你详细解读
查看>>
基于Bootstrap的步骤引导html页面
查看>>
消息中间件选型分析
查看>>
详解socket,包含TCP,Http讲解(转载)
查看>>
如何构建ASP.NET MVC4&JQuery&AJax&JSon示例
查看>>
【Android 学习】之ListView使用大全
查看>>
JSP无法加载静态资源:Failed to load resource: the server responded with a status of 404 ()
查看>>
Debian9 安装好后 update升级 失败 解决办法
查看>>
UDP10040 和 setsockopt设置大全
查看>>
ElasticSearch Tune for search speed Translation
查看>>
报表工具实现单据套打
查看>>