Doug Lea 的 《Scalable I/O in Java》 对 java.nio 实现非阻塞 I/O 以及 Reactor 模型进行了介绍,本篇简单记录一下学习过程。

经典的服务设计

网络服务系统大多包含同样的流程: 1

这其中的不同点往往是每个步骤的性质和消耗成本,一般可以将服务划分为两大类,即 I/O 密集型计算密集型

经典的 C/S 系统设计中,每当一个客户端与服务器建立连接,服务器都要分配一个线程作为 Handler 去执行上述流程:

2

实现时,循环可能长这样:

class Server implements Runnable {
    public void run() {
        try {
        ServerSocket ss = new ServerSocket(PORT);
        while (!Thread.interrupted())
            // 或线程池
            new Thread(new Handler(ss.accept())).start();
        } catch (IOException ex) { /* ... */ }
    }
    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

可扩展的服务设计

目的

实现可扩展性主要还是为了满足可用性和性能指标:

  • 低延迟
  • 高吞吐量
  • 动态伸缩

实现的方式一般是“分而治之”。

分治思想

  • 将事务拆成多个小任务,并且每个任务都是 非阻塞
  • 任务的执行通过事件驱动(比如以 I/O 事件作为触发条件)

事件驱动

优势

  1. 更少的资源占用
  2. 更小的开销(如线程切换、锁开销等)
  3. 由于需要手动绑定处理逻辑到事件上,有变更慢的可能性
  4. 更难编程

关于 ④,有几个关键点:

  • 事务必须分解为足够简单的非阻塞操作
  • 必须能跟踪逻辑代码状态(线程切换了)

例子

Java AWT 是典型的事件驱动应用,比如,点击按钮时的情况可能如下:

3

事件驱动 I/O 使用类似的思想,但是设计上有所不同。

Reactor 模型

Reactor 模式通过响应 I/O 事件,将事件分发给相应的 Handler(Handler 绑定事件),在 Handler 执行非阻塞操作。

单线程版本

4

以 java.nio API 实现单线程版本的响应式模型:

@Slf4j
public class Reactor implements Runnable {
    private int port;
    private Selector selector;
    private ServerSocketChannel server;
    private Reactor(Builder builder) throws Exception {
        this.selector = builder.selector;
        this.port = builder.port;
        this.server = builder.server;
        init();
    }
    private void init() throws Exception {
        this.server.configureBlocking(false);
        this.server.bind(new InetSocketAddress(port));
        // ServerSocketChannel 绑定 Selector,并注册 OP_ACCEPT 状态,绑定事件处理单元为一个 Acceptor 实例
        this.server.register(this.selector, SelectionKey.OP_ACCEPT)
                .attach(new Acceptor(server, selector));
    }
    public static class Builder {
        private int port;
        private Selector selector;
        private ServerSocketChannel server;
        public Builder() {
            try {
                selector = Selector.open();
                server = ServerSocketChannel.open();
            } catch (Exception ignored) {
            }
        }
        public Builder port(int port) {
            this.port = port;
            return this;
        }
        public Reactor build() throws Exception {
            return new Reactor(this);
        }
    }
    public static Builder builder() {
        return new Builder();
    }
    @Override
    public void run() {
        log.debug("selector thread started.");
        while (!Thread.currentThread().isInterrupted()) {
            try {
                this.selector.select();
                Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
                Iterator<SelectionKey> iter = selectedKeys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    dispatch(key);
                }
                selectedKeys.clear();
            } catch (Exception e) {
                Thread.currentThread().interrupt();
            }
        }
        log.debug("selector thread stopped.");
    }
    private void dispatch(SelectionKey key) {
        Object attachment = key.attachment();
        if (attachment != null) {
            // 避免 new 多个 Handler,这里没有使用 if-else 判断 Ops 的写法
            ((Runnable) attachment).run();
        }
    }
}
@Slf4j
public class Acceptor implements Runnable {
    private final ServerSocketChannel server;
    private final Selector selector;
    public Acceptor(ServerSocketChannel server, Selector selector) {
        this.server = server;
        this.selector = selector;
    }
    @Override
    public void run() {
        try {
            SocketChannel curChannel = server.accept();
            if (curChannel != null) {
                new Worker(curChannel, selector);
            }
        } catch (Exception e) {
            log.error("", e);
        }
    }
}
@Slf4j
public class Worker implements Runnable {
    private final SocketChannel curChannel;
    private volatile int state;
    private SelectionKey key;
    private ByteBuffer inBuf = ByteBuffer.allocate(1024);
    private ByteBuffer outBuf;
    public static final int READING = 1 << 0;
    public static final int WRITING = 1 << 1;
    public Worker(SocketChannel curChannel, Selector selector) {
        this.curChannel = curChannel;
        try {
            state = READING;
            curChannel.configureBlocking(false);
            key = curChannel.register(selector, SelectionKey.OP_READ);
            key.attach(this);
            selector.wakeup();
        } catch (Exception e) {
            log.error("", e);
        }
    }
    @Override
    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == WRITING) {
                write();
            }
        } catch (Exception e) {
            log.error("", e);
        }
    }
    private void read() {
        try {
            int readLen = curChannel.read(inBuf);
            if (readIsComplete() &amp;&amp; readLen > 0) {
                process();
                state = WRITING;
                key.interestOps(SelectionKey.OP_WRITE);
            }
        } catch (Exception e) {
            log.error("", e);
        }
    }
    private void process() {
        // 业务处理 Request 写 Response
        log.debug("request <== {}", new String(inBuf.array(), StandardCharsets.UTF_8));
        String res = "this is a response from server.";
        outBuf = ByteBuffer.wrap(res.getBytes(StandardCharsets.UTF_8));
        log.debug("response ==> {}", res);
    }
    private boolean readIsComplete() {
        // ...
        return true;
    }
    private boolean writeIsComplete() {
        // ...
        return true;
    }
    private void write() {
        try {
            curChannel.write(outBuf);
            if (writeIsComplete()) {
                // 短链接
                key.cancel();
            }
        } catch (Exception e) {
            log.error("", e);
        }
    }
}
public class Bootstrap {
    public static void main(String[] args) throws Exception {
        new CommonThreadFactory("BootstrapThread")
                .newThread(
                        Reactor.builder()
                                .port(8083)
                                .build())
                .start();
    }
}
public class CommonDaemonThreadFactory extends CommonThreadFactory {
    public CommonDaemonThreadFactory(String name) {
        super(name);
    }
    @Override
    protected void setDaemon(Thread t) {
        t.setDaemon(true);
    }
}
public class CommonThreadFactory implements ThreadFactory {
    private AtomicInteger incr;
    private String name;
    public CommonThreadFactory(String name) {
        this.incr = new AtomicInteger(0);
        this.name = name;
    }
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-" + incr.incrementAndGet());
        setDaemon(t);
        return t;
    }
    protected void setDaemon(Thread t) {
    }
}

多线程版本

考虑到利用多核,可以在 Worker 和 Reactor 两个方面考虑多线程扩展。

扩展 Worker

职责分离,Reactor 应该只管 I/O 工作,触发 Worker 后无须等待 Worker 完成工作。

Reactor 添加 workerThreads,Acceptor 持有 Worker 线程池:

private final ExecutorService pool;
public Acceptor(ServerSocketChannel server, Selector selector, int workerThreads) {
    this.server = server;
    this.selector = selector;
    this.pool = new ThreadPoolExecutor(
            workerThreads,
            workerThreads,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(800),
            new CommonDaemonThreadFactory("WorkerThread"));
}

将 process 过程抽取到 Processor 中,提交 Worker 线程池处理:

@Slf4j
public class Processor implements Runnable {
    private final Worker worker;
    public Processor(Worker worker) {
        this.worker = worker;
    }
    @Override
    public void run() {
        process();
        worker.setState(Worker.WRITING);
    }
    private void process() {
        // 业务处理 Request 写 Response
        log.debug("request <== {}", new String(worker.getInBuf().array(), StandardCharsets.UTF_8));
        String res = "this is a response from server.";
        worker.setOutBuf(ByteBuffer.wrap(res.getBytes(StandardCharsets.UTF_8)));
        log.debug("response ==> {}", res);
    }
}

Worker 新增一个状态:

public static final int PROCESSING = 1 << 2;
@Override
public void run() {
    try {
        if (state == READING) {
            read();
        } else if (state == WRITING) {
            write();
        }
    } catch (Exception e) {
        log.error("", e);
    }
}
private void read() {
    try {
        int readLen = curChannel.read(inBuf);
        if (readIsComplete() &amp;&amp; readLen > 0) {
            // process();
            // state = WRITING;
            state = PROCESSING;
            pool.execute(new Processor(this));
        }
    } catch (Exception e) {
        log.error("", e);
    }
}

扩展 Reactor

将 Reactor 分为 mainReactor 和 subReactor,每个 Reactor 线程都有自己的 Selector、线程池、dispatch 循环。mainReactor 可以更饱和地进行连接监听与建连,subReactor 可以实现匹配 CPU 及 I/O 速率的负载均衡:

5

代码略。