在事件驱动的架构被越来越广泛地应用的今天,Reactor 模式在采用同步 I/O 的网络服务领域中正变的流行起来,而使用异步 I/O 时,一般会采用 Proactor 模式构建高性能服务。
经典的服务设计
C/S 架构的网络服务应用,Server 端都包含大体一致的处理流程,对每个客户端的请求,Server 都需要读请求内容、解码、进行各种处理、编码、发送响应,这其中的不同点往往是每个步骤的消耗成本,比如 I/O 密集型 的文件服务或者 计算密集型 的应用服务。
实际上,Reactor 模型在 Client 也有意义,这里暂且先从服务端的角度考虑。
为了使 Server 能同时并发的处理多个请求,每当一个客户端与服务器建立连接,我们都可以分配一个线程,用这个线程专门处理对应连接上的相应操作。
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted()) {
new Thread(new HandlerContext(ss.accept())).start();
}
这种模型简单,但也有问题,由于线程切换消耗过多和内存占用大的缘故,连接数一旦多起来,并发性能会显著降低,早期 Tomcat 的 BIO 模型就是这种方式。
理想情况最好是线程模型能根据 CPU 资源而不是客户端数目来进行配置,最大程度的减少线程上下文的切换。
可扩展的服务设计
首先,为了满足可用性和性能指标,我们需要实现可扩展性,方式一般是“分而治之”。
分治思想
- 将事务拆成多个小任务,并且每个任务都是 非阻塞 的
- 任务的执行通过事件驱动(比如以 I/O 事件作为触发条件)
事件驱动
使用事件驱动设计应用程序有以下优势:
- 更少的资源占用
- 更小的开销(如线程切换、锁开销等)
- 由于需要手动绑定处理逻辑到事件上,有变更慢的可能性
- 更难编程
Java AWT 就是典型的事件驱动设计。
引入 Reactor
Reactor 架构是一种 I/O 多路复用模式的运用,它对客户端的事件进行多路分离和派发,“反转”了程序控制流(事件驱动遵循的“好莱坞原则”),开发者只需在重用 reactor 的多路分离功能和派发机制的基础之上实现 Event Handlers 即可。
具体到实际应用,我们会在系统中加入 reactor 组件,组件起着 同步等待事件发生和多路分离事件并将其关联到事件处理器上 的作用。比如在使用 java.nio 时,利用 JDK 提供的 多路复用器(Selector)轮询多个通道,选中发生某些事件的 channel(SelectionKey),将其 dispatch 到对事件感兴趣的 Handler 上,由 Handler 对事件进行 非阻塞 的处理。
由此可见,Reactor 模式的关键点就是 非阻塞、事件驱动 和 事件分离。
单线程版本
首先我们来实现事件驱动和事件分离。我们利用 Selector 实现多路复用,判断事件类型以分离事件到 Acceptor 或不同的 Handler 去处理。
Reactor
public class Reactor implements Runnable {
private int port;
private Selector selector;
public Reactor(int port) {
this.port = port;
}
public void run() {
init();
Thread me = Thread.currentThread();
while (!me.isInterrupted()) {
try {
int selected = selector.select();
if (selected > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
dispatch(key);
}
}
} catch (Exception e) {
log.error("reactor failed.", e);
}
}
}
private void init() {
try {
selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
log.error("init failed.", e);
}
}
private void dispatch(SelectionKey key) {
if (key.isAcceptable()) {
Acceptor.onAccept(selector, key);
} else if (key.isReadable()) {
ReadHandler.onReadable(selector, key);
} else if (key.isWritable()) {
WriteHandler.onWritable(selector, key);
} else if (!key.isValid()) {
try {
key.channel().close();
} catch (Throwable ignored) {
}
key.cancel();
} else {
log.warn("never?");
}
}
}
Acceptor
public class Acceptor {
public static void onAccept(Selector selector, SelectionKey key) {
try {
SocketChannel channel = null;
while (channel == null) {
channel = ((ServerSocketChannel) key.channel()).accept();
}
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
log.debug("connected");
} catch (Throwable e) {
log.error("accept error.", e);
}
}
}
Handlers
public class ReadHandler {
public static void onReadable(Selector selector, SelectionKey key) {
try {
SocketChannel channel = (SocketChannel) key.channel();
byte[] recvBytes = new byte[4];
ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
int read = 0;
while (read < 4) {
if (read == -1) {
log.debug("channel closed.");
key.cancel();
return;
}
try {
read = read + channel.read(recvBuf);
} catch (IOException e) {
if (e.getMessage().contains("Connection reset by peer")) {
log.debug("channel closed.");
channel.close();
key.cancel();
return;
}
}
}
channel.register(selector, SelectionKey.OP_WRITE);
log.debug("request <== {}", new String(recvBytes, StandardCharsets.UTF_8));
} catch (Throwable e) {
log.error("read error.", e);
}
}
}
public class WriteHandler {
public static void onWritable(Selector selector, SelectionKey key) {
try {
SocketChannel channel = (SocketChannel) key.channel();
String resp = String.valueOf(System.currentTimeMillis());
ByteBuffer sendBuf = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
int write = channel.write(sendBuf);
while (write < 13) {
write = write + channel.write(sendBuf);
}
channel.register(selector, SelectionKey.OP_READ);
log.debug("response ==> {}", resp);
} catch (Throwable e) {
log.error("write error.", e);
}
}
}
多线程版本
实现 Reactor,除了上面实现的事件驱动和分离之外,还要考虑利用多核,采用多线程实现非阻塞。
我们将在 Handler 和 Reactor 两个方面考虑多线程扩展。
扩展 Handler
目的:职责分离,Reactor 应该只管 I/O 工作,触发 Handler 后无须等待 Handler 完成工作。
做法:添加 Handler 线程池处理 decode、compute、encode 的工作。
Reactor
public class Reactor implements Runnable {
private int port;
private Selector selector;
public static Semaphore semaphore = new Semaphore(1);
public Reactor(int port) {
this.port = port;
}
public void run() {
init();
Thread me = Thread.currentThread();
while (!me.isInterrupted()) {
try {
int selected = selector.select();
if (selected > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
dispatch(key);
}
}
} catch (Exception e) {
log.error("reactor failed.", e);
}
}
}
private void init() {
try {
selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
log.error("init failed.", e);
}
}
private void dispatch(SelectionKey key) {
if (key.isAcceptable()) {
Acceptor.onAccept(selector, key);
} else if (key.isReadable()) {
ReadHandler.onReadable(selector, key);
} else if (key.isWritable()) {
WriteHandler.onWritable(key);
} else if (!key.isValid()) {
try {
key.channel().close();
} catch (Throwable ignored) {
}
key.cancel();
} else {
log.warn("never?");
}
}
}
selector.select 和 channel.register 会争抢同一把锁(SelectorImpl#publicKeys),理论上,Selector 被 wakeup 后,如果 SubReactor 执行循环的线程更快,先到下一次循环,又被 select 阻塞住,那么 channel 的 register 动作还是执行不下去。
故这里加了一个 Semaphore 是防止异步读写时 wakeup Selector 后,线程 A 还未及时 register 和更改 interest ops 的情况下,线程 B 的 Selector 又进入了 select 阻塞,导致线程 A 死锁。
Acceptor
public class Acceptor {
public static void onAccept(Selector selector, SelectionKey key) {
try {
SocketChannel channel = null;
while (channel == null) {
channel = ((ServerSocketChannel) key.channel()).accept();
}
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
log.debug("connected");
} catch (Throwable e) {
log.error("accept error.", e);
}
}
}
Handlers
public class ReadHandler {
public static ExecutorService pool = Executors.newFixedThreadPool(4);
public static void onReadable(Selector selector, SelectionKey key) {
try {
// 防止 Epoll LT 模式和异步处理导致的重复 onReadable 调用
key.interestOps(0);
SocketChannel channel = (SocketChannel) key.channel();
byte[] recvBytes = new byte[4];
ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
int read = 0;
while (read < 4) {
if (read == -1) {
log.debug("channel closed.");
key.cancel();
return;
}
try {
read = read + channel.read(recvBuf);
} catch (IOException e) {
if (e.getMessage().contains("Connection reset by peer")) {
log.debug("channel closed.");
channel.close();
key.cancel();
return;
}
}
}
key.interestOps(SelectionKey.OP_READ);
pool.submit(new DecodeHandler(selector, key, recvBytes));
} catch (Throwable e) {
log.error("read error.", e);
}
}
}
public class DecodeHandler implements Runnable {
private final Selector selector;
private final SelectionKey key;
private final byte[] recvBytes;
public DecodeHandler(Selector selector, SelectionKey key, byte[] recvBytes) {
this.selector = selector;
this.key = key;
this.recvBytes = recvBytes;
}
public void run() {
String decodeResult = new String(recvBytes, StandardCharsets.UTF_8);
ReadHandler.pool.submit(new BizHandler(selector, key, decodeResult));
}
}
public class BizHandler implements Runnable {
private final Selector selector;
private final SelectionKey key;
private final String decodeResult;
public BizHandler(Selector selector, SelectionKey key, String decodeResult) {
this.selector = selector;
this.key = key;
this.decodeResult = decodeResult;
}
public void run() {
log.debug("request <== {}", decodeResult);
String resp = String.valueOf(System.currentTimeMillis());
ReadHandler.pool.submit(new EncodeHandler(selector, key, resp));
}
}
public class EncodeHandler implements Runnable {
private final Selector selector;
private final SelectionKey key;
private final String resp;
public EncodeHandler(Selector selector, SelectionKey key, String resp) {
this.selector = selector;
this.key = key;
this.resp = resp;
}
public void run() {
ByteBuffer encoded = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
try {
//noinspection StatementWithEmptyBody
while (key.attachment() != null) {
}
key.attach(encoded);
Reactor.semaphore.acquire();
// wakeup 正阻塞在 select() 的 Selector
selector.wakeup();
key.interestOps(SelectionKey.OP_WRITE);
Reactor.semaphore.release();
} catch (Throwable e) {
log.error("register error.", e);
}
}
}
public class WriteHandler {
public static void onWritable(SelectionKey key) {
try {
SocketChannel channel;
ByteBuffer sendBuf;
try {
key.interestOps(0);
channel = (SocketChannel) key.channel();
sendBuf = (ByteBuffer) key.attachment();
} finally {
key.attach(null);
}
sendBuf.mark();
int write = channel.write(sendBuf);
while (write < 13) {
write = write + channel.write(sendBuf);
}
key.interestOps(SelectionKey.OP_READ);
log.debug("response ==> {}", new String(((ByteBuffer) sendBuf.reset()).array(), StandardCharsets.UTF_8));
} catch (Throwable e) {
log.error("write error.", e);
}
}
}
扩展 Reactor
目的:职责更加分离。
做法:将 Reactor 分为 mainReactor 和 subReactor,每个 Reactor 线程都有自己的 Selector、线程池、dispatch 循环。mainReactor 可以更饱和地进行连接监听与建连,subReactor 可以实现匹配 CPU 及 I/O 速率的负载均衡。
MainReactor
public class MainReactor implements Runnable {
private int port;
private Selector selector;
private Selector subSelector;
public MainReactor(int port, Selector subSelector) {
this.port = port;
this.subSelector = subSelector;
}
public void run() {
init();
Thread me = Thread.currentThread();
while (!me.isInterrupted()) {
try {
int selected = selector.select();
if (selected > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
dispatch(key);
}
}
} catch (Exception e) {
log.error("reactor failed.", e);
}
}
}
private void init() {
try {
selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
log.error("init failed.", e);
}
}
private void dispatch(SelectionKey key) {
if (key.isAcceptable()) {
Acceptor.onAccept(subSelector, key);
} else if (!key.isValid()) {
try {
key.channel().close();
} catch (Throwable ignored) {
}
key.cancel();
} else {
log.warn("never?");
}
}
}
SubReactor
public class SubReactor implements Runnable {
private Selector subSelector;
public static Semaphore semaphore = new Semaphore(1);
public SubReactor(Selector subSelector) {
this.subSelector = subSelector;
}
public void run() {
Thread me = Thread.currentThread();
while (!me.isInterrupted()) {
try {
int selected = subSelector.select();
if (selected > 0) {
Iterator<SelectionKey> iter = subSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
dispatch(key);
}
} else {
semaphore.acquire();
semaphore.release();
}
} catch (Exception e) {
log.error("reactor failed.", e);
}
}
}
private void dispatch(SelectionKey key) {
if (key.isReadable()) {
ReadHandler.onReadable(subSelector, key);
} else if (key.isWritable()) {
WriteHandler.onWritable(key);
} else if (!key.isValid()) {
try {
key.channel().close();
} catch (Throwable ignored) {
}
key.cancel();
} else {
log.warn("never?");
}
}
}
Acceptor
public class Acceptor {
public static void onAccept(Selector subSelector, SelectionKey key) {
try {
SocketChannel channel = null;
while (channel == null) {
channel = ((ServerSocketChannel) key.channel()).accept();
}
channel.configureBlocking(false);
SubReactor.semaphore.acquire();
subSelector.wakeup();
channel.register(subSelector, SelectionKey.OP_READ);
SubReactor.semaphore.release();
log.debug("connected");
} catch (Throwable e) {
log.error("accept error.", e);
}
}
}
Handlers
public class ReadHandler {
public static ExecutorService pool = Executors.newFixedThreadPool(4);
public static void onReadable(Selector subSelector, SelectionKey key) {
try {
// 防止 Epoll LT 模式和异步处理导致的重复 onReadable 调用
key.interestOps(0);
SocketChannel channel = (SocketChannel) key.channel();
byte[] recvBytes = new byte[4];
ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
int read = 0;
while (read < 4) {
if (read == -1) {
log.debug("channel closed.");
key.cancel();
return;
}
try {
read = read + channel.read(recvBuf);
} catch (IOException e) {
if (e.getMessage().contains("Connection reset by peer")) {
log.debug("channel closed.");
channel.close();
key.cancel();
return;
}
}
}
key.interestOps(SelectionKey.OP_READ);
pool.submit(new DecodeHandler(subSelector, key, recvBytes));
} catch (Throwable e) {
log.error("read error.", e);
}
}
}
public class DecodeHandler implements Runnable {
private final Selector subSelector;
private final SelectionKey key;
private final byte[] recvBytes;
public DecodeHandler(Selector subSelector, SelectionKey key, byte[] recvBytes) {
this.subSelector = subSelector;
this.key = key;
this.recvBytes = recvBytes;
}
public void run() {
String decodeResult = new String(recvBytes, StandardCharsets.UTF_8);
ReadHandler.pool.submit(new BizHandler(subSelector, key, decodeResult));
}
}
public class BizHandler implements Runnable {
private final Selector subSelector;
private final SelectionKey key;
private final String decodeResult;
public BizHandler(Selector subSelector, SelectionKey key, String decodeResult) {
this.subSelector = subSelector;
this.key = key;
this.decodeResult = decodeResult;
}
public void run() {
log.debug("request <== {}", decodeResult);
String resp = String.valueOf(System.currentTimeMillis());
ReadHandler.pool.submit(new EncodeHandler(subSelector, key, resp));
}
}
public class EncodeHandler implements Runnable {
private final Selector subSelector;
private final SelectionKey key;
private final String resp;
public EncodeHandler(Selector subSelector, SelectionKey key, String resp) {
this.subSelector = subSelector;
this.key = key;
this.resp = resp;
}
public void run() {
ByteBuffer encoded = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
try {
//noinspection StatementWithEmptyBody
while (key.attachment() != null) {
}
key.attach(encoded);
SubReactor.semaphore.acquire();
// wakeup 正阻塞在 select() 的 Selector
subSelector.wakeup();
key.interestOps(SelectionKey.OP_WRITE);
SubReactor.semaphore.release();
} catch (Throwable e) {
log.error("register error.", e);
}
}
}
public class WriteHandler {
public static void onWritable(SelectionKey key) {
try {
SocketChannel channel;
ByteBuffer sendBuf;
try {
key.interestOps(0);
channel = (SocketChannel) key.channel();
sendBuf = (ByteBuffer) key.attachment();
} finally {
key.attach(null);
}
sendBuf.mark();
int write = channel.write(sendBuf);
while (write < 13) {
write = write + channel.write(sendBuf);
}
key.interestOps(SelectionKey.OP_READ);
log.debug("response ==> {}", new String(((ByteBuffer) sendBuf.reset()).array(), StandardCharsets.UTF_8));
} catch (Throwable e) {
log.error("write error.", e);
}
}
}
这样我们就实现了一个比较完整的 Reactor 服务端模型,这个模型通过在主 reactor 组件上处理建连事件,在从 reactor 组件上实现事件分离,并且使用线程池实现业务处理异步化。
思考
Reactor 为什么叫这个名字?
reactor 组件在独立的线程中运行,它将“工作”分配给适当的 Handler 来对 I/O 事件做出 反应(reaction)。
与连接、线程1:1的 BIO 模型相比,基于 Reactor 的非阻塞 I/O 有什么优势?
实际上,非阻塞 I/O 本身并不会一定比阻塞式 I/O 快,但我们的假设场景是 I/O 密集型服务,这种服务在高并发访问时,非阻塞 I/O 能够减少服务器 瞬间的并发线程数,从而减少内存占用,并且减少 CPU 上下文切换次数,提升 CPU 效率。
Reactor 模式有可以优化的点吗?
实际上,
Selector.select()
的过程和recv()
、send()
的 I/O 本身是阻塞的,所以我们可能并没有完全实现非阻塞。我们虽然在 Reactor 模式中通过 reactor 组件对 I/O 多路复用的实现达到了“非阻塞地调用 I/O”,但是 I/O 本身还是同步阻塞住的。我们可以自行封装 Future 或者 Callback 达到全异步。
Proactor 模式
除 Reactor 外,Proactor 模式作为另一种 I/O 多路复用的模型,也能实现可扩展的高性能网络服务。
Reactor 模式的 I/O 模型可能存在优化空间,而 Proactor 模型采用了 异步 I/O 模型,似乎是对 Reactor 模式的改进优化,下面我们通过“Connect”、“Request”两个流程对比 Reactor 来看二者的异同。
Connect
Reactor:
- 注册 Acceptor 到 reactor 组件上;
- reactor 开启事件循环;
- 客户端建连;
- reactor 通知 Acceptor 处理建连请求,Acceptor 接受请求,连接建立;
- Acceptor 为客户端创建 Handler 以应对之后该连接过来的请求;
- Handler 在 reactor 组件上注册该连接,让 reactor 在该连接准备好读数据时通知它;
- Handler 初始化完成,开始工作。
Proactor:
- Server 指示 Acceptor 去初始化异步接受建连;
- Acceptor 将自身作为一个完成处理程序(即回调方),通过 OS 系统调用监听建连完成事件,同时也在 完成事件分离器 Completion Dispatcher 上注册,后者执行回调动作;
- 开启“完成事件”循环;
- 客户端建连;
- 建连成功,OS 通知事件循环;
- 事件循环通知 Acceptor;
- Acceptor 为客户端创建 Handler 以应对之后该连接过来的请求;
- Handler 将自身作为一个完成处理程序(即回调方),通过 OS 系统调用监听读完成事件,同时也在 完成事件分离器 Completion Dispatcher 上注册,后者执行回调动作。
I/O
Reactor:
- 客户端发送请求;
- reactor 在数据到达时通知 Handler;
- 设置 I/O 操作为非阻塞模式读取数据(2、3 步可能需要多次轮询直到读取一个完整的请求);
- Handler 解析请求;
- Handler 同步地把所请求的数据从文件系统读到内存中;
- Handler 在 reactor 组件注册该连接,让 reactor 在“写就绪”时通知它;
- 连接可写,事件循环(reactor 组件)通知 Handler;
- 设置 I/O 操作为非阻塞模式写数据(7、8 步可能需要多次轮询直到写完一个完整的请求)。
Proactor:
- 客户端发送请求;
- 完成读操作,OS 通知 完成事件分离器 Completion Dispatcher;
- 分离器通知 Handler(2、3 步可能进行多次以读取一个完整的请求);
- Handler 解析请求;
- Handler 同步地把所请求的数据从文件系统读到内存中;
- Handler 进行异步写操作,将自身作为一个完成处理程序(即回调方),通过 OS 系统调用监听写完成事件,同时也在 完成事件分离器 Completion Dispatcher 上注册,后者执行回调动作;
- 写完成,OS 回调 完成事件分离器 Completion Dispatcher;
- 分离器通知 Handler(6~8 步可能重复多次以完整的写一个请求)。
reactor 组件即 就绪事件分离器 Initiation Dispatcher,Proactor 对应的则是 完成事件分离器 Completion Dispatcher。
对比
很明显,Proactor 在 OS 层面注册了回调,它进行分离操作的时机是 “读(写)完成”,而 Reactor 模式是在 “读(写)就绪” 的情况下进行事件派发的,而产生区别的原因就是两个模式利用的 OS 能力不一样,Reactor 模式下,一般利用 OS 的多路复用能力,比如 epoll,而 Proactor 模式则利用 OS 的异步 I/O 能力,比如 Windows 的 iocp 和 Linux boost 库的 asio。
实际上,相比于 Windows 上开启的 iocp,Linux 本身没有比较好用的 aio 实现,可以关注一下 io_uring。
Java 7 的异步 I/O 在 Linux 上还是利用 epoll 自行模拟实现了一个异步 I/O 的回调。