1 基本概念
Reactor模型是一種事件驅動(Event-Driven)的設計模式,主要用于高效處理高并發、I/O密集型場景(如網絡、服務器、分布式等)。其核心思想就是集中管理事件,將I/O操作與業務邏輯解耦,避免傳統多線程模型中線程切換的開銷,從而提升系統的吞吐量和響應速度。
核心目標:
在高并發場景下,傳統的 “一連接一線程” 模型會因線程創建 / 銷毀、上下文切換的開銷過大而效率低下。Reactor 模型通過以下方式解決這一問題:
- 用單個或少量線程監聽多個 I/O 事件(如網絡連接、數據讀寫),避免線程資源浪費;
- 僅當事件觸發(如客戶端發送數據)時才執行對應處理邏輯,實現 “事件就緒才處理” 的高效調度。
2 核心組件
?Reactor 模型的運行依賴四個關鍵組件,它們協同完成事件的檢測、分發與處理:
1、事件源
產生事件的源頭,通常是I/O相關的資源,例如:
網絡套接字(Socket):客戶端連接、數據發送/接收等事件的源頭? ? ? ??
文件描述符(FD):文件讀寫、異常等事件的源頭
2、事件多路分發器(Event Demultiplexer)
又稱 “I/O 多路復用器”,是 Reactor 模型的 “感知器官”。
作用:持續監聽多個事件源的事件(如 “可讀”“可寫”“異常”),當事件觸發時標記為 “就緒”;
底層依賴:操作系統提供的 I/O 多路復用系統調用,如 Unix/Linux 的select/poll/epoll,或 BSD 的kqueue。
3、反應器(Reactor)
模型的 “核心調度者”,是事件處理的中樞。
作用:從事件多路分發器獲取 “就緒事件”,根據事件類型和關聯的事件源,分發給對應的事件處理器;
本質:通過 “事件注冊 - 事件監聽 - 事件分發” 的邏輯,實現對所有事件的集中管理。
4 事件處理器(Handler)
負責具體業務邏輯的 “執行者”。
作用:定義事件處理的回調方法(如handleRead處理可讀事件、handleWrite處理可寫事件),由 Reactor 觸發執行;
特點:僅關注業務邏輯(如解析請求、生成響應),不關心事件的檢測與分發。
3 單Reactor單線程模型
3.1 概念
? ? ? ? 在單Reactor單線程模型中,他們的作用以及實現邏輯,首先客戶端訪問服務端,在服務端這邊首先是使用Reactor監聽accept事件和read事件,當有連接過來,就交給acceptor處理accept事件,當觸發read事件,同時accept或把read事件交給handler處理。所有動作都是由一個線程完成的。
特點:單線程Reactor模型編程簡單,比較適用于每個請求都可以快速完成的場景,但是不能發揮出多核CPU的優勢,在一般情況下,不會使用單Reactor單線程模型。
3.2 原理圖
3.3 代碼實現
3.3.1 入口
入口: 啟動Reactor線程
package com.bonnie.netty.reactor.single;import java.io.IOException;/*** 單Reactor單線程模型*/
public class Main {public static void main(String[] args) throws IOException {new Thread(new Reactor(8080, "Main-Thread")).start();}}
3.3.2 Reactor
1、啟動服務端ServerSocketChannel
2、監聽accept事件
3、監聽read事件
package com.bonnie.netty.reactor.single;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;/*** 模擬Reactor的單線程模型*/
public class Reactor implements Runnable {Selector selector;ServerSocketChannel serverSocketChannel;public Reactor(int port, String threadName) throws IOException {selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();// 綁定端口serverSocketChannel.bind(new InetSocketAddress(port));// 設置成非阻塞serverSocketChannel.configureBlocking(Boolean.FALSE);// 注冊OP_ACCEPT,事件,會調用Acceptor.run方法serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, serverSocketChannel));}@Overridepublic void run() {while (!Thread.interrupted()) {try {// 阻塞selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {// 我們之前說的分發事件就是這個地方分發了, 此處可能是accept事件,也可能是read事件dispatcher(iterator.next());// 分發完之后要刪除key,防止重復keyiterator.remove();}} catch (IOException e) {throw new RuntimeException(e);}}}private void dispatcher(SelectionKey key) {// 然后在這里通過key獲取這個attachment,執行他的run方法,記住,這里并沒有開啟線程,所有叫做單線程Reactor單線程模型Runnable runnable = (Runnable)key.attachment();if (runnable!=null) {runnable.run();}}}
?3.3.3?Acceptor
1、處理accept請求
2、把read事件轉發給handler處理
package com.bonnie.netty.reactor.single;import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;/*** 接收連接請求,并注冊handle到selector*/
public class Acceptor implements Runnable{Selector selector;ServerSocketChannel serverSocketChannel;public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {this.selector = selector;this.serverSocketChannel = serverSocketChannel;}@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();System.out.println(socketChannel.getRemoteAddress() + " 收到連接!!!");// 設置成非阻塞socketChannel.configureBlocking(Boolean.FALSE);// 注冊事件,交由Handler處理socketChannel.register(selector, SelectionKey.OP_READ, new Handler(socketChannel));} catch (IOException e) {throw new RuntimeException(e);}}}
?3.3.4?Handler
處理read事件
package com.bonnie.netty.reactor.single;import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;/*** 接收連接請求,并注冊handle到selector*/
public class Acceptor implements Runnable{Selector selector;ServerSocketChannel serverSocketChannel;public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {this.selector = selector;this.serverSocketChannel = serverSocketChannel;}@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();System.out.println(socketChannel.getRemoteAddress() + " 收到連接!!!");// 設置成非阻塞socketChannel.configureBlocking(Boolean.FALSE);// 注冊事件,交由Handler處理socketChannel.register(selector, SelectionKey.OP_READ, new Handler(socketChannel));} catch (IOException e) {throw new RuntimeException(e);}}}
3.3.5 碼云位置
git地址:?https://gitee.com/huyanqiu6666/netty.git? ? 分支:?250724-reactor
4 單Reactor多線程模型
4.1 概念
解決單Reactor單線程模型的不足,使用多線程處理handler提升處理能力,增加吞吐量。
4.2 原理圖
4.3 代碼實現
4.3.1 入口
package com.bonnie.netty.reactor.mult;import java.io.IOException;/*** 單reactor多線程模型:處理handle的時候是線程池*/
public class MultMain {public static void main(String[] args) throws IOException {new Thread(new MultReactor(8080, "Main-Thread")).start();}}
4.3.2 MultReactor
package com.bonnie.netty.reactor.mult;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;/*** 模擬單Reactor多線程模型* 1、監聽accept事件* 2、監聽read事件*/
public class MultReactor implements Runnable {Selector selector;ServerSocketChannel serverSocketChannel;public MultReactor(int port, String threadName) throws IOException {selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();// 綁定端口serverSocketChannel.bind(new InetSocketAddress(port));// 設置成非阻塞serverSocketChannel.configureBlocking(Boolean.FALSE);// 注冊OP_ACCEPT,事件,會調用Acceptor.run方法serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new MultAcceptor(selector, serverSocketChannel));}@Overridepublic void run() {while (!Thread.interrupted()) {try {// 阻塞selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {// 我們之前說的分發事件就是這個地方分發了, 此處可能是accept事件,也可能是read事件dispatcher(iterator.next());// 分發完之后要刪除key,防止重復keyiterator.remove();}} catch (IOException e) {throw new RuntimeException(e);}}}private void dispatcher(SelectionKey key) {// 然后在這里通過key獲取這個attachment,執行他的run方法,記住,這里并沒有開啟線程,所有叫做單線程Reactor單線程模型Runnable runnable = (Runnable)key.attachment();if (runnable!=null) {runnable.run();}}}
4.3.3 MultAcceptor
package com.bonnie.netty.reactor.mult;import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;/*** 接收連接請求,并注冊handle到selector* 1、處理accept事件* 2、read事件轉發給handler*/
public class MultAcceptor implements Runnable{Selector selector;ServerSocketChannel serverSocketChannel;public MultAcceptor(Selector selector, ServerSocketChannel serverSocketChannel) {this.selector = selector;this.serverSocketChannel = serverSocketChannel;}@Overridepublic void run() {try {SocketChannel socketChannel = serverSocketChannel.accept();System.out.println(socketChannel.getRemoteAddress() + " 收到連接!!!");// 設置成非阻塞socketChannel.configureBlocking(Boolean.FALSE);// 注冊事件,交由Handler處理socketChannel.register(selector, SelectionKey.OP_READ, new MultHandler(socketChannel));} catch (IOException e) {throw new RuntimeException(e);}}}
4.3.4 MultHandler
package com.bonnie.netty.reactor.mult;import org.apache.commons.lang3.StringUtils;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;/*** Handler將read事件給線程池處理*/
public class MultHandler implements Runnable {private SocketChannel socketChannel;public MultHandler(SocketChannel socketChannel) {this.socketChannel = socketChannel;}private Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);@Overridepublic void run() {// 放到線程池中處理executor.execute(new ReadHandle(socketChannel));}private class ReadHandle implements Runnable{private SocketChannel socketChannel;public ReadHandle(SocketChannel socketChannel) {this.socketChannel = socketChannel;}@Overridepublic void run() {System.out.println("線程名稱:" + Thread.currentThread().getName());// 定義一個ByteBuffer的數據結構ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int len=0, total=0;String msg = StringUtils.EMPTY;try {do {len = socketChannel.read(byteBuffer);if (len > 0) {total += len;msg += new String(byteBuffer.array());}System.out.println(socketChannel.getRemoteAddress() + "客戶端的消息已收到," + msg);} while (len>byteBuffer.capacity());} catch (IOException e) {throw new RuntimeException(e);}}}}
4.3.5 碼云位置
git地址:?https://gitee.com/huyanqiu6666/netty.git? ? 分支:?250724-reactor
5 主從Reactor模型
5.1 概念
5.2 原理圖
5.3 代碼實現
5.3.1 入口
package com.bonnie.netty.reactor.main;import java.io.IOException;/*** 主從Reactor多線程模型*/
public class MainMain {public static void main(String[] args) throws IOException {new Thread(new MainReactor(8080), "Main-Thread").start();}}
5.3.2?MainReactor
package com.bonnie.netty.reactor.main;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;/*** 構建Selector、ServerSocketChannel綁定端口,設置成非阻塞* 注冊accept事件*/
public class MainReactor implements Runnable {private final Selector selector;private final ServerSocketChannel serverSocketChannel;public MainReactor(int port) throws IOException {// 主Reactor負責監聽accept事件selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(port));serverSocketChannel.configureBlocking(Boolean.FALSE);// 添加attachment為acceptorserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new MainAcceptor(serverSocketChannel));}@Overridepublic void run() {while (!Thread.interrupted()) {try {// 等待客戶端的連接到來selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {// 當有連接過來的時候就會轉發任務dispatch(iterator.next());iterator.remove();}} catch (IOException e) {throw new RuntimeException(e);}}}private void dispatch(SelectionKey key) {// 可能拿到的對象有兩個 Acceptor HandlerRunnable runnable = (Runnable)key.attachment();if (runnable!=null) {runnable.run();}}
}
5.3.3?SubReactor
package com.bonnie.netty.reactor.main;import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;/*** 子Reactor*/
public class SubReactor implements Runnable{private Selector selector;public SubReactor(Selector selector) {this.selector = selector;}@Overridepublic void run() {while (true) {try {// 所有的子Reactor阻塞selector.select();System.out.println("selector:"+selector.toString()+"thread:"+Thread.currentThread().getName());Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {dispacher(iterator.next());iterator.remove();}} catch (IOException e) {throw new RuntimeException(e);}}}private void dispacher(SelectionKey selectionKey) {// 此處會調用workHandler里面的方法Runnable runnable = (Runnable) selectionKey.attachment();if (runnable!=null) {runnable.run();}}
}
5.3.4?MainAcceptor
package com.bonnie.netty.reactor.main;import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;/*** 處理MainAcceptor請求*/
public class MainAcceptor implements Runnable{private ServerSocketChannel serverSocketChannel;private final Integer core = Runtime.getRuntime().availableProcessors() * 2;private Integer index = 0;private Selector[] selectors = new Selector[core];private SubReactor[] subReactors = new SubReactor[core];private Thread[] threads = new Thread[core];/*** 構造方法* 1、初始化多個SubReactor* 2、初始化多個Selector* 3、每個SubReactor都有一個Selector* 4、創建線程包裝SubReactor* 5、啟動線程,也就是調用每一個SubReactor的run方法*/public MainAcceptor(ServerSocketChannel serverSocketChannel) throws IOException {this.serverSocketChannel = serverSocketChannel;for (int i=0; i<core; i++) {selectors[i] = Selector.open();subReactors[i] = new SubReactor(selectors[i]);threads[i] = new Thread(subReactors[i]);// 一初始化就工作起來threads[i].start();}}@Overridepublic void run() {try {System.out.println("acceptor thread: " + Thread.currentThread().getName());// 此處就會接收連接的socketChannelSocketChannel socketChannel = serverSocketChannel.accept();System.out.println("有客戶端上來了:"+socketChannel.getRemoteAddress());socketChannel.configureBlocking(Boolean.FALSE);// 立即喚醒第一個阻塞的selectorselectors[index].wakeup();// 然后注冊Read事件到該selectorsocketChannel.register(selectors[index], SelectionKey.OP_READ, new WorkHandler(socketChannel));index = (++index) % core;} catch (IOException e) {throw new RuntimeException(e);}}
}
5.3.5?WorkHandler
package com.bonnie.netty.reactor.main;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;/*** SubReactor把事件交給WorkHandler去執行*/
public class WorkHandler implements Runnable{private SocketChannel socketChannel;public WorkHandler(SocketChannel socketChannel) {this.socketChannel = socketChannel;}@Overridepublic void run() {try {System.out.println("WorkHandler thread:" + Thread.currentThread().getName());ByteBuffer buffer = ByteBuffer.allocate(1024);// 數據讀取到socketChannel中socketChannel.read(buffer);String msg = new String(buffer.array(), StandardCharsets.UTF_8);System.out.println(socketChannel.getRemoteAddress() + "發來了消息:" + msg);// 給客戶端會寫消息socketChannel.read(ByteBuffer.wrap("你的消息已收到".getBytes(StandardCharsets.UTF_8)));} catch (IOException e) {throw new RuntimeException(e);}}
}
5.3.6 碼云位置
git地址:?https://gitee.com/huyanqiu6666/netty.git? ? 分支:?250724-reactor