1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > c++ socket线程池_Netty(3)——Reactor线程模型

c++ socket线程池_Netty(3)——Reactor线程模型

时间:2019-03-18 23:16:36

相关推荐

c++ socket线程池_Netty(3)——Reactor线程模型

Reactor模式是什么:

反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。

为何要用Reactor:

常见的网络服务中,如果每一个客户端都维持一个与登陆服务器的连接。那么服务器将维护多个和客户端的连接以出来和客户端的contnect 、read、write ,特别是对于长链接的服务,有多少个c端,就需要在s端维护同等的IO连接。这对服务器来说是一个很大的开销。

1、BIO比如我们采用BIO的方式来维护和客户端的连接:

// 主线程维护连接 public void run() {try {while (true) {Socket socket = serverSocket.accept(); //提交线程池处理 executorService.submit(new Handler(socket));}} catch (Exception e) {e.printStackTrace();} } // 处理读写服务 class Handler implements Runnable {public void run() {try {//获取Socket的输入流,接收数据 BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream())); String readData = buf.readLine(); while (readData != null) {readData = buf.readLine(); System.out.println(readData); }} catch (Exception e) {e.printStackTrace();}} }

很明显,为了避免资源耗尽,我们采用线程池的方式来处理读写服务。但是这么做依然有很明显的弊端:

同步阻塞IO,读写阻塞,线程等待时间过长

在制定线程策略的时候,只能根据CPU的数目来限定可用线程资源,不能根据连接并发数目来制定,也就是连接有限制。否则很难保证对客户端请求的高效和公平。

多线程之间的上下文切换,造成线程使用效率并不高,并且不易扩展

状态数据以及其他需要保持一致的数据,需要采用并发同步控制

2、NIO,那么可以有其他方式来更好的处理么,我们可以采用NIO来处理,NIO中支持的基本机制:

public NIOServer(int port) throws Exception {selector = Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(port));serverSocket.configureBlocking(false);serverSocket.register(selector, SelectionKey.OP_ACCEPT); } @Override public void run() {while (!Thread.interrupted()) {try {//阻塞等待事件 selector.select(); // 事件列表 Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) {it.remove(); //分发事件 dispatch((SelectionKey) (it.next())); }} catch (Exception e) {}} } private void dispatch(SelectionKey key) throws Exception {if (key.isAcceptable()) {register(key);//新链接建立,注册} else if (key.isReadable()) {read(key);//读事件处理} else if (key.isWritable()) {wirete(key);//写事件处理} } private void register(SelectionKey key) throws Exception {ServerSocketChannel server = (ServerSocketChannel) key .channel();// 获得和客户端连接的通道SocketChannel channel = server.accept();channel.configureBlocking(false);//客户端通道注册到selector 上channel.register(this.selector, SelectionKey.OP_READ); }

我们可以看到上述的NIO例子已经差不多拥有reactor的影子了

基于事件驱动-> selector(支持对多个socketChannel的监听)

统一的事件分派中心-> dispatch

事件处理服务-> read & write

3、Reactor

首先我们基于Reactor Pattern 处理模式中,定义以下三种角色:

Reactor将I/O事件分派给对应的Handler

Acceptor处理客户端新连接,并分派请求到处理器链中

Handlers执行非阻塞读/写 任务

1、单Reactor单线程模型

/** * 等待事件到来,分发事件处理 */ class Reactor implements Runnable {private Reactor() throws Exception {SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);// attach Acceptor 处理新连接sk.attach(new Acceptor());}public void run() {try {while (!Thread.interrupted()) {selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) {it.remove(); //分发事件处理 dispatch((SelectionKey) (it.next())); } }} catch (IOException ex) {//do something}}void dispatch(SelectionKey k) {// 若是连接事件获取是acceptor// 若是IO读写事件获取是handlerRunnable runnable = (Runnable) (k.attachment());if (runnable != null) {runnable.run();}} } /** * 连接事件就绪,处理连接事件 */ class Acceptor implements Runnable {@Overridepublic void run() {try {SocketChannel c = serverSocket.accept(); if (c != null) {// 注册读写 new Handler(c, selector); }} catch (Exception e) {}} }

2、单Reactor多线程模型

/** * 多线程处理读写业务逻辑 */ class MultiThreadHandler implements Runnable {public static final int READING = 0, WRITING = 1;int state;final SocketChannel socket;final SelectionKey sk;//多线程处理业务逻辑ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {this.state = READING;this.socket = socket;sk = socket.register(selector, SelectionKey.OP_READ);sk.attach(this);socket.configureBlocking(false);}@Overridepublic void run() {if (state == READING) {read();} else if (state == WRITING) {write();}}private void read() {//任务异步处理executorService.submit(() -> process());//下一步处理写事件sk.interestOps(SelectionKey.OP_WRITE);this.state = WRITING;}private void write() {//任务异步处理executorService.submit(() -> process());//下一步处理读事件sk.interestOps(SelectionKey.OP_READ);this.state = READING;}/** * task 业务处理 */public void process() {//do IO ,task,queue something} }

3、多Reactor多线程模型

/** * 多work 连接事件Acceptor,处理连接事件 */ class MultiWorkThreadAcceptor implements Runnable {// cpu线程数相同多work线程int workCount =Runtime.getRuntime().availableProcessors();SubReactor[] workThreadHandlers = new SubReactor[workCount];volatile int nextHandler = 0;public MultiWorkThreadAcceptor() {this.init();}public void init() {nextHandler = 0;for (int i = 0; i < workThreadHandlers.length; i++) {try {workThreadHandlers[i] = new SubReactor(); } catch (Exception e) {}}}@Overridepublic void run() {try {SocketChannel c = serverSocket.accept(); if (c != null) {// 注册读写 synchronized (c) {// 顺序获取SubReactor,然后注册channel SubReactor work = workThreadHandlers[nextHandler]; work.registerChannel(c); nextHandler++; if (nextHandler >= workThreadHandlers.length) {nextHandler = 0; } } }} catch (Exception e) {}} } /** * 多work线程处理读写业务逻辑 */ class SubReactor implements Runnable {final Selector mySelector;//多线程处理业务逻辑int workCount =Runtime.getRuntime().availableProcessors();ExecutorService executorService = Executors.newFixedThreadPool(workCount);public SubReactor() throws Exception {// 每个SubReactor 一个selector this.mySelector = SelectorProvider.provider().openSelector();}/** * 注册chanel * * @param sc * @throws Exception */public void registerChannel(SocketChannel sc) throws Exception {sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);}@Overridepublic void run() {while (true) {try {//每个SubReactor 自己做事件分派处理读写事件 selector.select(); Set keys = selector.selectedKeys(); Iterator iterator = keys.iterator(); while (iterator.hasNext()) {SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) {read(); } else if (key.isWritable()) {write(); } } } catch (Exception e) {}}}private void read() {//任务异步处理executorService.submit(() -> process());}private void write() {//任务异步处理executorService.submit(() -> process());}/** * task 业务处理 */public void process() {//do IO ,task,queue something} }

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。