Unix I/O 模型浅析

本篇谈谈对 Unix 五种 I/O 模型的理解和对比,以及 JDK 提供的相应实现。

相关知识

TCP 网络编程

image-20191102224024597

以服务端为例:

  • socket()

    初始化套接字,比如我们指定 IPv4 为网络域(PF_INET / AF_INET),使用提供序列化的、可靠的、双向连接的字节流的 TCP 协议,创建套接字,返回一个 TCP Socket 文件描述符。可以通过 fcntl() 文件控制函数将这个文件描述符设置为非阻塞模式(O_NONBLOCK)。

  • bind()

    套接字与端口绑定,绑定后,IP 地址、端口、协议类型都设定好了。

  • listen()

    初始化服务可连接队列,设置客户端连接到队列长度 back-log(Linux 2.2 之后,指已建立连接但尚在等待 accept 调用的连接队列长度,而尚未连接到 syn 队列由 tcp_max_syn_backlog 设置)。

  • accept()

    接受一个客户端的连接请求,返回新的文件描述符,老的表示正在监听的 socket,新的表示与某个客户端的连接,之后的数据读写通过新文件描述符完成。

    accept 函数可能发生错误返回错误码,这里比较关注的有 EMFILE(已到当前进程允许打开的最大文件描述符数量)和 EAGAIN / EWOULDBLOCK(socket 使用了非阻塞模式,而现在调用 accept 是阻塞式的,且当下没有可接受的连接,在 VxWorks 和 Windows 上,EAGAIN 的名字叫做 EWOULDBLOCK)。后者之后还会有遇到。

非阻塞模式下,EAGAIN / EWOULDBLOCK 是正常的,下次循环继续相关调用即可,它并不是一个“错误”。另外,当内核执行一个系统调用(比如 fork())时,若发现没有足够的资源(比如虚拟内存),也可能会返回 EAGAIN 提示应用程序再试着调用一次,也许下次就能成功,因为整个环境是动态的。

Linux I/O 函数

基本的,有 read()write() 可以向文件描述符中读写数据,网络 I/O 则常用 recv()send() 等:

  • recv()

    从 socket 中接收数据放到缓冲区,数据接收方式可以设置为 MSG_DONTWAIT 以实现非阻塞读操作(临时的,不需要设置 socket 为非阻塞,若设置了 socket 为非阻塞则没必要在设置),此时可能返回 EAGAIN / EWOULDBLOCK,同理,表示阻塞调用 recv() 没收到数据。

  • send()

    发送数据,将缓冲区数据按照某种方式通过 socket 发送出去,方式和 recv() 一样可以是 MSG_DONTWAIT,也可能收到 EAGAIN / EWOULDBLOCK,道理相同。

另外还有 readv()writev() 支持从(向)多个缓冲区接收(写入)数据,以及支持更复杂的高级操作的 recvmsg()sendmsg() ,还有 UDP 使用的 recvfromsendto(),这里不再赘述,目的都一样,就是为了从 socket 读写数据。

I/O 模型

阻塞式 I/O 模型

image-20191103002214592

图中以数据报套接字作为例子,在 I/O 模型上,虽然 TCP 的 recv 同理,但是更复杂(考虑 low-water-mark 等)。

可以看到,进程在整个过程都是 阻塞的,只要不是被中断,那么 recvfrom() 返回意味着数据已经到达 socket read buffer 并且被拷贝到用户空间的应用进程的缓冲区中。

显然,阻塞式 I/O 是同步的。

非阻塞 I/O 模型

image-20191103002832098

设置参数可以让 recvfrom 函数工作在非阻塞模型下,同样,若 socket 本身已经被设置为非阻塞模式则不用。

设置非阻塞模式使得当前没有数据可读时不让进程进入睡眠,而是立即返回错误,下次循环再接着调用 recvfrom()。这里的“循环”我们称之为轮询(polling),进程持续轮询以查看某个操作是否就绪的方式很浪费 CPU。使用非阻塞 I/O 模型的进程在数据准备好前不会被阻塞,只是浪费时间在不断的轮询之中。

由于使用 recvfrom ,非阻塞 I/O 模型也是同步的。

I/O 复用模型

image-20191103020956842

阻塞式 I/O 阻塞在某一个 socket 是否发生读写事件上,非阻塞式 I/O 在轮询中检查事件就绪状态,而 I/O 复用(I/O multiplexing)调用了 select()poll(),阻塞在对多个 socket 的轮询上,从而支持了单线程轮询多个 socket。

我们也可以使用多线程模型,在每个线程内使用阻塞式 I/O,同样可以达到类似效果。“多线程阻塞 I/O” 这种方式的工作流程简单明了,容易理解,但也有相应弊端,比如在海量连接的场景下,多个线程间的上下文切换开销将非常大,效率是没有多路复用高的。

I/O 多路复用本身并不会比阻塞式 I/O 快,看起来没有显著优势,甚至有可能开销更大(关乎两个系统调用,先 select()recvfrom()),但 I/O 复用模式可以在单线程内等待多个 socket 就绪,在高并发海量连接下能够很大程度地减少服务器的并发线程数(或者说同样的线程池能支持更多的并发),提高 CPU 执行效率。

I/O 复用优化了多个 socket 通道的轮询效率,进程在数据准备好前不会被阻塞,但仍旧使用 recvfrom() 拷贝数据,I/O 过程仍然是同步的。

信号驱动式 I/O 模型

image-20191103010911150

信号驱动式 I/O(Signal-Driven I/O)需要使用 sigaction() 系统调用去配置一个信号处理函数(回调函数),调用后立即返回(非阻塞),之后系统内核等待数据准备好,然后发送 SIGIO 信号通知进程,随后进程执行 回调函数 中的 recvfrom(),将数据从内核空间复制到用户空间。

该模式遵循好莱坞原则,优势明显,进程在数据准备好前不会被阻塞,但数据拷贝使用 recvfrom(),故仍然是同步的。

异步 I/O 模型

image-20191103011410315

异步 I/O 模型(Asynchronous I/O)使用 aio_read() 系统调用,同样需要注册一个回调函数,但与信号驱动式 I/O 的不同点在于,异步 I/O 中,信号发出时数据已经从内核拷贝到了用户空间,我们的回调函数可以直接使用数据了,全过程进程都不阻塞,是真正的异步 I/O。

对比

image-20191103013755950

上文已经提到,由于真正执行 I/O 操作是通过 recvfrom() 系统调用,除异步 I/O 外的前四种模式都是同步 I/O,只有异步 I/O 模型才是真正的异步,也符合 POSIX 定义的异步 I/O。

POSIX 定义描述:POSIX 异步 I/O (AIO) 接口允许程序启动一个或多个异步执行(即在后台执行)的 I/O 操作,而程序可以被多种方式通知到,这种方式可以是信号、实例化线程,甚至也可以不通知。

同步与异步,阻塞与非阻塞

这里的区别更多的讲的是“角度的不同”。

“同步”与“异步”是站在 被调用者 的角度上说的,被调用者是系统内核,所以内核函数的执行方式和函数返回时机的不同,是“同步”与“异步”的区别所在。

“阻塞”与“非阻塞”是站在 调用者 的角度上说的,这里的调用者是应用程序,如果调用者“死脑筋”,I/O 不返回,线程就不走,那么就说这个 I/O 是阻塞式的,故“阻塞”与“非阻塞”的区别在于调用方在结果返回之前的行为方式。

Java Demo

阻塞 I/O

Server

@Slf4j
public class BioTimeServer {
    public static void main(String[] args) throws Exception {
        ServerSocket ss = new ServerSocket(8081);
        log.debug("started.");
        Socket socket = ss.accept();
        log.debug("connected");
        InputStream is = socket.getInputStream();
        OutputStream os = socket.getOutputStream();
        // "ping" in utf-8
        byte[] recvBytes = new byte[4];
        int read = IOUtils.read(is, recvBytes);
        while (read < 4) {
            read = read + IOUtils.read(is, recvBytes, read, 4 - read);
        }
        String req = new String(recvBytes, StandardCharsets.UTF_8);
        log.debug("request <== {}", req);
        String resp = String.valueOf(System.currentTimeMillis());
        os.write(resp.getBytes(StandardCharsets.UTF_8));
        os.flush();
        log.debug("response ==> {}", resp);
        Thread.sleep(1000L);
        if (!ss.isClosed()) {
            ss.close();
        }
    }
}

Server 端无拆包逻辑,固定读 4 字节。

Client

@Slf4j
public class BioPingClient {
    public static void main(String[] args) throws Exception {
        Socket socket = new Socket("127.0.0.1", 8081);
        log.debug("connected");
        InputStream is = socket.getInputStream();
        OutputStream os = socket.getOutputStream();
        String req1 = "pi";
        os.write(req1.getBytes(StandardCharsets.UTF_8));
        os.flush();
        Thread.sleep(1000L);
        String req2 = "ng";
        os.write(req2.getBytes(StandardCharsets.UTF_8));
        os.flush();
        log.debug("request ==> {}", req1 + req2);
        byte[] recvBytes = new byte[13];
        int read = IOUtils.read(is, recvBytes);
        while (read < 13) {
            read = read + IOUtils.read(is, recvBytes, read, 13 - read);
        }
        String resp = new String(recvBytes, StandardCharsets.UTF_8);
        log.debug("response <== {}", resp);
        if (!socket.isClosed()) {
            socket.close();
        }
    }
}

Run

Server:

image-20191107235720354

Client:

image-20191107235734957

非阻塞 I/O 与多路复用

这里简单模拟一下三次请求。

Server

@Slf4j
public class NioTimeServer {
    public static void main(String[] args) throws Exception {
        Selector selector;
        ServerSocketChannel server;
        try {
            // 多路复用器
            selector = Selector.open();
            server = ServerSocketChannel.open();
            // 设置 accept() 为非阻塞
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(8081));
        } catch (Throwable e) {
            log.error("failed to bind.", e);
            return;
        }
        log.debug("started.");
        server.register(selector, SelectionKey.OP_ACCEPT);
        while (!Thread.currentThread().isInterrupted()) {
            int selected = selector.select();
            if (selected > 0) {
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    handle(selector, key);
                }
            }
        }
    }

    private static void handle(Selector selector, SelectionKey key) {
        if (key.isAcceptable()) {
            try {
                SocketChannel channel = null;
                while (channel == null) {
                    channel = ((ServerSocketChannel) key.channel()).accept();
                }
                // 设置 read()/write() 为非阻塞
                channel.configureBlocking(false);
                channel.register(selector, SelectionKey.OP_READ);
                log.debug("connected");
            } catch (Throwable e) {
                log.error("accept error.", e);
            }
        } else if (!key.isValid()) {
            try {
                key.channel().close();
                key.cancel();
            } catch (Throwable ignored) {
            }
        } else if (key.isReadable()) {
            try {
                SocketChannel channel = (SocketChannel) key.channel();
                byte[] recvBytes = new byte[4];
                ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
                int read = 0;
                while (read < 4) {
                    if (read == -1) {
                        log.debug("channel closed.");
                        key.cancel();
                        return;
                    }
                    try {
                        read = read + channel.read(recvBuf);
                    } catch (IOException e) {
                        if (e.getMessage().contains("Connection reset by peer")) {
                            log.debug("channel closed.");
                            channel.close();
                            key.cancel();
                            return;
                        }
                    }
                }
                key.interestOps(SelectionKey.OP_WRITE);
                log.debug("request <== {}", new String(recvBytes, StandardCharsets.UTF_8));
            } catch (Throwable e) {
                log.error("read error.", e);
            }
        } else if (key.isWritable()) {
            try {
                SocketChannel channel = (SocketChannel) key.channel();
                String resp = String.valueOf(System.currentTimeMillis());
                ByteBuffer sendBuf = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
                int write = channel.write(sendBuf);
                while (write < 13) {
                    write = write + channel.write(sendBuf);
                }
                key.interestOps(SelectionKey.OP_READ);
                log.debug("response ==> {}", resp);
            } catch (Throwable e) {
                log.error("write error.", e);
            }
        }
    }
}

server.configureBlocking() 不影响 select(),也就是说 select() 还是阻塞的,不会变成 selectNow(),且在 Linux 中,socket 的 O_NONBLOCK 不会继承,所以建连后生成的 socket(SocketChannel)还要设置一下 O_NONBLOCK。

Client

@Slf4j
public class NioPingClient {
    private static volatile AtomicInteger count = new AtomicInteger();

    public static void main(String[] args) throws Exception {
        Selector selector;
        SocketChannel client;
        try {
            selector = Selector.open();
            client = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8081));
            client.configureBlocking(false);
        } catch (Throwable e) {
            log.error("connect failed.", e);
            return;
        }
        log.debug("connected.");
        client.register(selector, SelectionKey.OP_WRITE);
        while (!Thread.currentThread().isInterrupted()) {
            int selected = selector.select();
            if (selected > 0) {
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    handle(selector, key);
                }
            }
            if (count.get() > 3) {
                break;
            }
        }
    }

    private static void handle(Selector selector, SelectionKey key) {
        SocketChannel channel = (SocketChannel) key.channel();
        if (key.isWritable()) {
            try {
                count.incrementAndGet();
                String req = "ping";
                ByteBuffer sendBuf = ByteBuffer.wrap(req.getBytes(StandardCharsets.UTF_8));
                int write = 0;
                while (write < 4) {
                    write = write + channel.write(sendBuf);
                }
                key.interestOps(SelectionKey.OP_READ);
                log.debug("request ==> {}", req);
            } catch (Throwable e) {
                log.error("write error.", e);
            }
        } else if (key.isReadable()) {
            try {
                byte[] recvBytes = new byte[13];
                ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
                int read = 0;
                while (read < 13) {
                    read = read + channel.read(recvBuf);
                }
                key.interestOps(SelectionKey.OP_WRITE);
                log.debug("response <== {}", new String(recvBytes, StandardCharsets.UTF_8));
                if (count.get() >= 3) {
                    count.incrementAndGet();
                    channel.close();
                    key.cancel();
                    log.debug("channel closed.");
                }
            } catch (Throwable e) {
                log.error("read error.", e);
            }
        }
    }
}

Run

Server:

image-20191108013427701

Client:

image-20191108013437170

异步 I/O

Server

@Slf4j
public class AioTimeServer {
    public static void main(String[] args) throws Exception {
        AsynchronousChannelGroup group =
                AsynchronousChannelGroup.withFixedThreadPool(4, r -> new Thread(r, "I/O Thread"));
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
        server.bind(new InetSocketAddress(8081));
        log.debug("started.");
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel channel, Void att) {
                server.accept(null, this);
                log.debug("connected");
                ByteBuffer recvBuf = ByteBuffer.wrap(new byte[4]);
                channel.read(recvBuf, null, new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        if (result == -1) {
                            try {
                                channel.close();
                            } catch (Throwable e) {
                                log.error("close failed.", e);
                            }
                            log.debug("channel closed.");
                            return;
                        }
                        if (result < 4) {
                            channel.read(recvBuf, null, this);
                            return;
                        }
                        byte[] content = recvBuf.array();
                        log.debug("request <== {}", new String(content, StandardCharsets.UTF_8));
                        // 异步业务处理
                        // pool.submit(new BizTask(Arrays.copyOf(content, content.length)));
                        recvBuf.clear();
                        channel.read(recvBuf, null, this);

                        String resp = String.valueOf(System.currentTimeMillis());
                        ByteBuffer sendBuf = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
                        channel.write(sendBuf, null, new CompletionHandler<Integer, Object>() {
                            @Override
                            public void completed(Integer result, Object attachment) {
                                if (result == -1) {
                                    try {
                                        channel.close();
                                    } catch (Throwable e) {
                                        log.error("close failed.", e);
                                    }
                                    log.debug("channel closed.");
                                    return;
                                }
                                if (result < 13) {
                                    channel.write(sendBuf, null, this);
                                    return;
                                }
                                log.debug("response ==> {}", new String(sendBuf.array(), StandardCharsets.UTF_8));
                                sendBuf.clear();
                            }

                            @Override
                            public void failed(Throwable exc, Object attachment) {
                                log.error("write error.", exc);
                            }
                        });
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        log.error("read error.", exc);
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Void att) {
                log.error("accept error.", exc);
            }
        });
    }
}

Client

@Slf4j
public class AioPingClient {
    public static void main(String[] args) throws Exception {
        AsynchronousChannelGroup group =
                AsynchronousChannelGroup.withFixedThreadPool(4, r -> new Thread(r, "I/O Thread"));
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
        channel.connect(new InetSocketAddress("127.0.0.1", 8081), null, new CompletionHandler<Void, Object>() {
            @Override
            public void completed(Void result, Object attachment) {
                log.debug("connected");
                String req = "ping";
                ByteBuffer sendBuf = ByteBuffer.wrap(req.getBytes(StandardCharsets.UTF_8));
                channel.write(sendBuf, null, new CompletionHandler<Integer, Void>() {
                    @Override
                    public void completed(Integer result, Void attachment) {
                        if (result == -1) {
                            try {
                                channel.close();
                            } catch (Throwable e) {
                                log.error("close failed.", e);
                            }
                            log.debug("channel closed.");
                            return;
                        }
                        if (result < 4) {
                            channel.write(sendBuf, null, this);
                            return;
                        }
                        log.debug("request ==> {}", req);

                        byte[] recvBytes = new byte[13];
                        ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
                        channel.read(recvBuf, null, new CompletionHandler<Integer, Object>() {
                            @Override
                            public void completed(Integer result, Object attachment) {
                                if (result == -1) {
                                    try {
                                        channel.close();
                                    } catch (Throwable e) {
                                        log.error("close failed.", e);
                                    }
                                    log.debug("channel closed.");
                                    return;
                                }
                                if (result < 13) {
                                    channel.read(recvBuf, null, this);
                                    return;
                                }
                                log.debug("response <== {}", new String(recvBytes, StandardCharsets.UTF_8));
                                new Thread(() -> {
                                    try {
                                        channel.close();
                                        group.shutdown();
                                        group.awaitTermination(10, TimeUnit.SECONDS);
                                    } catch (Throwable e) {
                                        log.error("close failed.", e);
                                    }
                                }).start();
                            }

                            @Override
                            public void failed(Throwable exc, Object attachment) {
                                log.error("read error.", exc);
                            }
                        });
                    }

                    @Override
                    public void failed(Throwable exc, Void attachment) {
                        log.error("write error.", exc);
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                log.error("connect error.", exc);
            }
        });
    }
}

Run

Server:

image-20191108115242800

Client:

image-20191108115234816

与 Reactor、Proactor 模式的联系

Java 基于多路复用 I/O 的编程模型似乎是对 Reactor 模式的实现,而异步 I/O 的编程模型则是 Proactor 的实现,我们随后会进一步学习 Reactor 和 Proactor 模式。