Reactor、Proactor 模式学习笔记

在事件驱动的架构被越来越广泛地应用的今天,Reactor 模式在采用同步 I/O 的网络服务领域中正变的流行起来,而使用异步 I/O 时,一般会采用 Proactor 模式构建高性能服务。

经典的服务设计

C/S 架构的网络服务应用,Server 端都包含大体一致的处理流程,对每个客户端的请求,Server 都需要读请求内容、解码、进行各种处理、编码、发送响应,这其中的不同点往往是每个步骤的消耗成本,比如 I/O 密集型 的文件服务或者 计算密集型 的应用服务。

image-20191029200301559

实际上,Reactor 模型在 Client 也有意义,这里暂且先从服务端的角度考虑。

为了使 Server 能同时并发的处理多个请求,每当一个客户端与服务器建立连接,我们都可以分配一个线程,用这个线程专门处理对应连接上的相应操作。

image-20191029200241577

ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted()) {
    new Thread(new HandlerContext(ss.accept())).start();
}

这种模型简单,但也有问题,由于线程切换消耗过多和内存占用大的缘故,连接数一旦多起来,并发性能会显著降低,早期 Tomcat 的 BIO 模型就是这种方式。

理想情况最好是线程模型能根据 CPU 资源而不是客户端数目来进行配置,最大程度的减少线程上下文的切换。

可扩展的服务设计

首先,为了满足可用性和性能指标,我们需要实现可扩展性,方式一般是“分而治之”。

分治思想

  • 将事务拆成多个小任务,并且每个任务都是 非阻塞
  • 任务的执行通过事件驱动(比如以 I/O 事件作为触发条件)

事件驱动

使用事件驱动设计应用程序有以下优势:

  1. 更少的资源占用
  2. 更小的开销(如线程切换、锁开销等)
  3. 由于需要手动绑定处理逻辑到事件上,有变更慢的可能性
  4. 更难编程

Java AWT 就是典型的事件驱动设计。

image-20191030014918666

引入 Reactor

Reactor 架构是一种 I/O 多路复用模式的运用,它对客户端的事件进行多路分离和派发,“反转”了程序控制流(事件驱动遵循的“好莱坞原则”),开发者只需在重用 reactor 的多路分离功能和派发机制的基础之上实现 Event Handlers 即可。

具体到实际应用,我们会在系统中加入 reactor 组件,组件起着 同步等待事件发生和多路分离事件并将其关联到事件处理器上 的作用。比如在使用 java.nio 时,利用 JDK 提供的 多路复用器(Selector)轮询多个通道,选中发生某些事件的 channel(SelectionKey),将其 dispatch 到对事件感兴趣的 Handler 上,由 Handler 对事件进行 非阻塞 的处理。

由此可见,Reactor 模式的关键点就是 非阻塞事件驱动事件分离

单线程版本

首先我们来实现事件驱动和事件分离。我们利用 Selector 实现多路复用,判断事件类型以分离事件到 Acceptor 或不同的 Handler 去处理。

image-20191030014944905

Reactor

@Slf4j
public class Reactor implements Runnable {
    private int port;
    private Selector selector;

    public Reactor(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        init();
        Thread me = Thread.currentThread();
        while (!me.isInterrupted()) {
            try {
                int selected = selector.select();
                if (selected > 0) {
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        dispatch(key);
                    }
                }
            } catch (Exception e) {
                log.error("reactor failed.", e);
            }
        }
    }

    private void init() {
        try {
            selector = Selector.open();
            ServerSocketChannel server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            log.error("init failed.", e);
        }
    }

    private void dispatch(SelectionKey key) {
        if (key.isAcceptable()) {
            Acceptor.onAccept(selector, key);
        } else if (key.isReadable()) {
            ReadHandler.onReadable(selector, key);
        } else if (key.isWritable()) {
            WriteHandler.onWritable(selector, key);
        } else if (!key.isValid()) {
            try {
                key.channel().close();
            } catch (Throwable ignored) {
            }
            key.cancel();
        } else {
            log.warn("never?");
        }
    }
}

Acceptor

@Slf4j
public class Acceptor {
    public static void onAccept(Selector selector, SelectionKey key) {
        try {
            SocketChannel channel = null;
            while (channel == null) {
                channel = ((ServerSocketChannel) key.channel()).accept();
            }
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);
            log.debug("connected");
        } catch (Throwable e) {
            log.error("accept error.", e);
        }
    }
}

Handlers

@Slf4j
public class ReadHandler {
    public static void onReadable(Selector selector, SelectionKey key) {
        try {
            SocketChannel channel = (SocketChannel) key.channel();
            byte[] recvBytes = new byte[4];
            ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
            int read = 0;
            while (read < 4) {
                if (read == -1) {
                    log.debug("channel closed.");
                    key.cancel();
                    return;
                }
                try {
                    read = read + channel.read(recvBuf);
                } catch (IOException e) {
                    if (e.getMessage().contains("Connection reset by peer")) {
                        log.debug("channel closed.");
                        channel.close();
                        key.cancel();
                        return;
                    }
                }
            }
            channel.register(selector, SelectionKey.OP_WRITE);
            log.debug("request <== {}", new String(recvBytes, StandardCharsets.UTF_8));
        } catch (Throwable e) {
            log.error("read error.", e);
        }
    }
}
@Slf4j
public class WriteHandler {
    public static void onWritable(Selector selector, SelectionKey key) {
        try {
            SocketChannel channel = (SocketChannel) key.channel();
            String resp = String.valueOf(System.currentTimeMillis());
            ByteBuffer sendBuf = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
            int write = channel.write(sendBuf);
            while (write < 13) {
                write = write + channel.write(sendBuf);
            }
            channel.register(selector, SelectionKey.OP_READ);
            log.debug("response ==> {}", resp);
        } catch (Throwable e) {
            log.error("write error.", e);
        }
    }
}

多线程版本

实现 Reactor,除了上面实现的事件驱动和分离之外,还要考虑利用多核,采用多线程实现非阻塞。

我们将在 Handler 和 Reactor 两个方面考虑多线程扩展。

扩展 Handler

目的:职责分离,Reactor 应该只管 I/O 工作,触发 Handler 后无须等待 Handler 完成工作。

做法:添加 Handler 线程池处理 decode、compute、encode 的工作。

image-20191030014656217

Reactor
@Slf4j
public class Reactor implements Runnable {
    private int port;
    private Selector selector;
  
    public static Semaphore semaphore = new Semaphore(1);

    public Reactor(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        init();
        Thread me = Thread.currentThread();
        while (!me.isInterrupted()) {
            try {
                int selected = selector.select();
                if (selected > 0) {
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        dispatch(key);
                    }
                }
            } catch (Exception e) {
                log.error("reactor failed.", e);
            }
        }
    }

    private void init() {
        try {
            selector = Selector.open();
            ServerSocketChannel server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            log.error("init failed.", e);
        }
    }

    private void dispatch(SelectionKey key) {
        if (key.isAcceptable()) {
            Acceptor.onAccept(selector, key);
        } else if (key.isReadable()) {
            ReadHandler.onReadable(selector, key);
        } else if (key.isWritable()) {
            WriteHandler.onWritable(key);
        } else if (!key.isValid()) {
            try {
                key.channel().close();
            } catch (Throwable ignored) {
            }
            key.cancel();
        } else {
            log.warn("never?");
        }
    }
}

selector.select 和 channel.register 会争抢同一把锁(SelectorImpl#publicKeys),理论上,Selector 被 wakeup 后,如果 SubReactor 执行循环的线程更快,先到下一次循环,又被 select 阻塞住,那么 channel 的 register 动作还是执行不下去。

故这里加了一个 Semaphore 是防止异步读写时 wakeup Selector 后,线程 A 还未及时 register 和更改 interest ops 的情况下,线程 B 的 Selector 又进入了 select 阻塞,导致线程 A 死锁。

Acceptor
@Slf4j
public class Acceptor {
    public static void onAccept(Selector selector, SelectionKey key) {
        try {
            SocketChannel channel = null;
            while (channel == null) {
                channel = ((ServerSocketChannel) key.channel()).accept();
            }
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);
            log.debug("connected");
        } catch (Throwable e) {
            log.error("accept error.", e);
        }
    }
}
Handlers
@Slf4j
public class ReadHandler {
    public static ExecutorService pool = Executors.newFixedThreadPool(4);

    public static void onReadable(Selector selector, SelectionKey key) {
        try {
            // 防止 Epoll LT 模式和异步处理导致的重复 onReadable 调用
            key.interestOps(0);
            SocketChannel channel = (SocketChannel) key.channel();
            byte[] recvBytes = new byte[4];
            ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
            int read = 0;
            while (read < 4) {
                if (read == -1) {
                    log.debug("channel closed.");
                    key.cancel();
                    return;
                }
                try {
                    read = read + channel.read(recvBuf);
                } catch (IOException e) {
                    if (e.getMessage().contains("Connection reset by peer")) {
                        log.debug("channel closed.");
                        channel.close();
                        key.cancel();
                        return;
                    }
                }
            }
            key.interestOps(SelectionKey.OP_READ);
            pool.submit(new DecodeHandler(selector, key, recvBytes));
        } catch (Throwable e) {
            log.error("read error.", e);
        }
    }
}
public class DecodeHandler implements Runnable {
    private final Selector selector;
    private final SelectionKey key;
    private final byte[] recvBytes;

    public DecodeHandler(Selector selector, SelectionKey key, byte[] recvBytes) {
        this.selector = selector;
        this.key = key;
        this.recvBytes = recvBytes;
    }

    @Override
    public void run() {
        String decodeResult = new String(recvBytes, StandardCharsets.UTF_8);
        ReadHandler.pool.submit(new BizHandler(selector, key, decodeResult));
    }
}
@Slf4j
public class BizHandler implements Runnable {
    private final Selector selector;
    private final SelectionKey key;
    private final String decodeResult;

    public BizHandler(Selector selector, SelectionKey key, String decodeResult) {
        this.selector = selector;
        this.key = key;
        this.decodeResult = decodeResult;
    }

    @Override
    public void run() {
        log.debug("request <== {}", decodeResult);
        String resp = String.valueOf(System.currentTimeMillis());
        ReadHandler.pool.submit(new EncodeHandler(selector, key, resp));
    }
}
@Slf4j
public class EncodeHandler implements Runnable {
    private final Selector selector;
    private final SelectionKey key;
    private final String resp;

    public EncodeHandler(Selector selector, SelectionKey key, String resp) {
        this.selector = selector;
        this.key = key;
        this.resp = resp;
    }

    @Override
    public void run() {
        ByteBuffer encoded = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
        try {
            //noinspection StatementWithEmptyBody
            while (key.attachment() != null) {
            }
            key.attach(encoded);
            Reactor.semaphore.acquire();
            // wakeup 正阻塞在 select() 的 Selector
            selector.wakeup();
            key.interestOps(SelectionKey.OP_WRITE);
            Reactor.semaphore.release();
        } catch (Throwable e) {
            log.error("register error.", e);
        }
    }
}
@Slf4j
public class WriteHandler {
    public static void onWritable(SelectionKey key) {
        try {
            SocketChannel channel;
            ByteBuffer sendBuf;
            try {
                key.interestOps(0);
                channel = (SocketChannel) key.channel();
                sendBuf = (ByteBuffer) key.attachment();
            } finally {
                key.attach(null);
            }
            sendBuf.mark();
            int write = channel.write(sendBuf);
            while (write < 13) {
                write = write + channel.write(sendBuf);
            }
            key.interestOps(SelectionKey.OP_READ);
            log.debug("response ==> {}", new String(((ByteBuffer) sendBuf.reset()).array(), StandardCharsets.UTF_8));
        } catch (Throwable e) {
            log.error("write error.", e);
        }
    }
}

扩展 Reactor

目的:职责更加分离。

做法:将 Reactor 分为 mainReactor 和 subReactor,每个 Reactor 线程都有自己的 Selector、线程池、dispatch 循环。mainReactor 可以更饱和地进行连接监听与建连,subReactor 可以实现匹配 CPU 及 I/O 速率的负载均衡。

image-20191030014849507

MainReactor
@Slf4j
public class MainReactor implements Runnable {
    private int port;
    private Selector selector;
    private Selector subSelector;

    public MainReactor(int port, Selector subSelector) {
        this.port = port;
        this.subSelector = subSelector;
    }

    @Override
    public void run() {
        init();
        Thread me = Thread.currentThread();
        while (!me.isInterrupted()) {
            try {
                int selected = selector.select();
                if (selected > 0) {
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        dispatch(key);
                    }
                }
            } catch (Exception e) {
                log.error("reactor failed.", e);
            }
        }
    }

    private void init() {
        try {
            selector = Selector.open();
            ServerSocketChannel server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            log.error("init failed.", e);
        }
    }

    private void dispatch(SelectionKey key) {
        if (key.isAcceptable()) {
            Acceptor.onAccept(subSelector, key);
        } else if (!key.isValid()) {
            try {
                key.channel().close();
            } catch (Throwable ignored) {
            }
            key.cancel();
        } else {
            log.warn("never?");
        }
    }
}
SubReactor
@Slf4j
public class SubReactor implements Runnable {
    private Selector subSelector;

    public static Semaphore semaphore = new Semaphore(1);

    public SubReactor(Selector subSelector) {
        this.subSelector = subSelector;
    }

    @Override
    public void run() {
        Thread me = Thread.currentThread();
        while (!me.isInterrupted()) {
            try {
                int selected = subSelector.select();
                if (selected > 0) {
                    Iterator<SelectionKey> iter = subSelector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        dispatch(key);
                    }
                } else {
                    semaphore.acquire();
                    semaphore.release();
                }
            } catch (Exception e) {
                log.error("reactor failed.", e);
            }
        }
    }

    private void dispatch(SelectionKey key) {
        if (key.isReadable()) {
            ReadHandler.onReadable(subSelector, key);
        } else if (key.isWritable()) {
            WriteHandler.onWritable(key);
        } else if (!key.isValid()) {
            try {
                key.channel().close();
            } catch (Throwable ignored) {
            }
            key.cancel();
        } else {
            log.warn("never?");
        }
    }
}
Acceptor
@Slf4j
public class Acceptor {
    public static void onAccept(Selector subSelector, SelectionKey key) {
        try {
            SocketChannel channel = null;
            while (channel == null) {
                channel = ((ServerSocketChannel) key.channel()).accept();
            }
            channel.configureBlocking(false);
            SubReactor.semaphore.acquire();
            subSelector.wakeup();
            channel.register(subSelector, SelectionKey.OP_READ);
            SubReactor.semaphore.release();
            log.debug("connected");
        } catch (Throwable e) {
            log.error("accept error.", e);
        }
    }
}
Handlers
@Slf4j
public class ReadHandler {
    public static ExecutorService pool = Executors.newFixedThreadPool(4);

    public static void onReadable(Selector subSelector, SelectionKey key) {
        try {
            // 防止 Epoll LT 模式和异步处理导致的重复 onReadable 调用
            key.interestOps(0);
            SocketChannel channel = (SocketChannel) key.channel();
            byte[] recvBytes = new byte[4];
            ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
            int read = 0;
            while (read < 4) {
                if (read == -1) {
                    log.debug("channel closed.");
                    key.cancel();
                    return;
                }
                try {
                    read = read + channel.read(recvBuf);
                } catch (IOException e) {
                    if (e.getMessage().contains("Connection reset by peer")) {
                        log.debug("channel closed.");
                        channel.close();
                        key.cancel();
                        return;
                    }
                }
            }
            key.interestOps(SelectionKey.OP_READ);
            pool.submit(new DecodeHandler(subSelector, key, recvBytes));
        } catch (Throwable e) {
            log.error("read error.", e);
        }
    }
}
public class DecodeHandler implements Runnable {
    private final Selector subSelector;
    private final SelectionKey key;
    private final byte[] recvBytes;

    public DecodeHandler(Selector subSelector, SelectionKey key, byte[] recvBytes) {
        this.subSelector = subSelector;
        this.key = key;
        this.recvBytes = recvBytes;
    }

    @Override
    public void run() {
        String decodeResult = new String(recvBytes, StandardCharsets.UTF_8);
        ReadHandler.pool.submit(new BizHandler(subSelector, key, decodeResult));
    }
}
@Slf4j
public class BizHandler implements Runnable {
    private final Selector subSelector;
    private final SelectionKey key;
    private final String decodeResult;

    public BizHandler(Selector subSelector, SelectionKey key, String decodeResult) {
        this.subSelector = subSelector;
        this.key = key;
        this.decodeResult = decodeResult;
    }

    @Override
    public void run() {
        log.debug("request <== {}", decodeResult);
        String resp = String.valueOf(System.currentTimeMillis());
        ReadHandler.pool.submit(new EncodeHandler(subSelector, key, resp));
    }
}
@Slf4j
public class EncodeHandler implements Runnable {
    private final Selector subSelector;
    private final SelectionKey key;
    private final String resp;

    public EncodeHandler(Selector subSelector, SelectionKey key, String resp) {
        this.subSelector = subSelector;
        this.key = key;
        this.resp = resp;
    }

    @Override
    public void run() {
        ByteBuffer encoded = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
        try {
            //noinspection StatementWithEmptyBody
            while (key.attachment() != null) {
            }
            key.attach(encoded);
            SubReactor.semaphore.acquire();
            // wakeup 正阻塞在 select() 的 Selector
            subSelector.wakeup();
            key.interestOps(SelectionKey.OP_WRITE);
            SubReactor.semaphore.release();
        } catch (Throwable e) {
            log.error("register error.", e);
        }
    }
}
@Slf4j
public class WriteHandler {
    public static void onWritable(SelectionKey key) {
        try {
            SocketChannel channel;
            ByteBuffer sendBuf;
            try {
                key.interestOps(0);
                channel = (SocketChannel) key.channel();
                sendBuf = (ByteBuffer) key.attachment();
            } finally {
                key.attach(null);
            }
            sendBuf.mark();
            int write = channel.write(sendBuf);
            while (write < 13) {
                write = write + channel.write(sendBuf);
            }
            key.interestOps(SelectionKey.OP_READ);
            log.debug("response ==> {}", new String(((ByteBuffer) sendBuf.reset()).array(), StandardCharsets.UTF_8));
        } catch (Throwable e) {
            log.error("write error.", e);
        }
    }
}

这样我们就实现了一个比较完整的 Reactor 服务端模型,这个模型通过在主 reactor 组件上处理建连事件,在从 reactor 组件上实现事件分离,并且使用线程池实现业务处理异步化。

具体可以查看 代码仓库

思考

  1. Reactor 为什么叫这个名字?

    reactor 组件在独立的线程中运行,它将“工作”分配给适当的 Handler 来对 I/O 事件做出 反应(reaction)

  2. 与连接、线程1:1的 BIO 模型相比,基于 Reactor 的非阻塞 I/O 有什么优势?

    实际上,非阻塞 I/O 本身并不会一定比阻塞式 I/O 快,但我们的假设场景是 I/O 密集型服务,这种服务在高并发访问时,非阻塞 I/O 能够减少服务器 瞬间的并发线程数,从而减少内存占用,并且减少 CPU 上下文切换次数,提升 CPU 效率。

  3. Reactor 模式有可以优化的点吗?

    实际上,Selector.select() 的过程和 recv()send() 的 I/O 本身是阻塞的,所以我们可能并没有完全实现非阻塞。我们虽然在 Reactor 模式中通过 reactor 组件对 I/O 多路复用的实现达到了“非阻塞地调用 I/O”,但是 I/O 本身还是同步阻塞住的。我们可以自行封装 Future 或者 Callback 达到全异步。

Proactor 模式

除 Reactor 外,Proactor 模式作为另一种 I/O 多路复用的模型,也能实现可扩展的高性能网络服务。

Reactor 模式的 I/O 模型可能存在优化空间,而 Proactor 模型采用了 异步 I/O 模型,似乎是对 Reactor 模式的改进优化,下面我们通过“Connect”、“Request”两个流程对比 Reactor 来看二者的异同。

Connect

Reactor:

image-20191110113118791

  1. 注册 Acceptor 到 reactor 组件上;
  2. reactor 开启事件循环;
  3. 客户端建连;
  4. reactor 通知 Acceptor 处理建连请求,Acceptor 接受请求,连接建立;
  5. Acceptor 为客户端创建 Handler 以应对之后该连接过来的请求;
  6. Handler 在 reactor 组件上注册该连接,让 reactor 在该连接准备好读数据时通知它;
  7. Handler 初始化完成,开始工作。

Proactor:

image-20191110113135774

  1. Server 指示 Acceptor 去初始化异步接受建连;
  2. Acceptor 将自身作为一个完成处理程序(即回调方),通过 OS 系统调用监听建连完成事件,同时也在 完成事件分离器 Completion Dispatcher 上注册,后者执行回调动作;
  3. 开启“完成事件”循环;
  4. 客户端建连;
  5. 建连成功,OS 通知事件循环;
  6. 事件循环通知 Acceptor;
  7. Acceptor 为客户端创建 Handler 以应对之后该连接过来的请求;
  8. Handler 将自身作为一个完成处理程序(即回调方),通过 OS 系统调用监听读完成事件,同时也在 完成事件分离器 Completion Dispatcher 上注册,后者执行回调动作。

I/O

Reactor:

image-20191110112127587

  1. 客户端发送请求;
  2. reactor 在数据到达时通知 Handler;
  3. 设置 I/O 操作为非阻塞模式读取数据(2、3 步可能需要多次轮询直到读取一个完整的请求);
  4. Handler 解析请求;
  5. Handler 同步地把所请求的数据从文件系统读到内存中;
  6. Handler 在 reactor 组件注册该连接,让 reactor 在“写就绪”时通知它;
  7. 连接可写,事件循环(reactor 组件)通知 Handler;
  8. 设置 I/O 操作为非阻塞模式写数据(7、8 步可能需要多次轮询直到写完一个完整的请求)。

Proactor:

image-20191110112139280

  1. 客户端发送请求;
  2. 完成读操作,OS 通知 完成事件分离器 Completion Dispatcher
  3. 分离器通知 Handler(2、3 步可能进行多次以读取一个完整的请求);
  4. Handler 解析请求;
  5. Handler 同步地把所请求的数据从文件系统读到内存中;
  6. Handler 进行异步写操作,将自身作为一个完成处理程序(即回调方),通过 OS 系统调用监听写完成事件,同时也在 完成事件分离器 Completion Dispatcher 上注册,后者执行回调动作;
  7. 写完成,OS 回调 完成事件分离器 Completion Dispatcher
  8. 分离器通知 Handler(6~8 步可能重复多次以完整的写一个请求)。

reactor 组件即 就绪事件分离器 Initiation Dispatcher,Proactor 对应的则是 完成事件分离器 Completion Dispatcher

对比

很明显,Proactor 在 OS 层面注册了回调,它进行分离操作的时机是 “读(写)完成”,而 Reactor 模式是在 “读(写)就绪” 的情况下进行事件派发的,而产生区别的原因就是两个模式利用的 OS 能力不一样,Reactor 模式下,一般利用 OS 的多路复用能力,比如 epoll,而 Proactor 模式则利用 OS 的异步 I/O 能力,比如 Windows 的 iocp 和 Linux boost 库的 asio。

实际上,相比于 Windows 上开启的 iocp,Linux 本身没有比较好用的 aio 实现, Java 7 的异步 I/O 在 Linux 上还是利用 epoll 自行模拟实现了一个异步 I/O 的回调。