Unix I/O 模型浅析

本篇谈谈对 Unix 五种 I/O 模型的理解和对比,以及 JDK 提供的相应实现。

相关知识

TCP 网络编程

image-20191102224024597

以服务端为例:

  • socket()

    初始化套接字,比如我们指定 IPv4 为网络域(PF_INET / AF_INET),使用提供序列化的、可靠的、双向连接的字节流的 TCP 协议,创建套接字,返回一个 TCP Socket 文件描述符。可以通过 fcntl() 文件控制函数将这个文件描述符设置为非阻塞模式(O_NONBLOCK)。

  • bind()

    套接字与端口绑定,绑定后,IP 地址、端口、协议类型都设定好了。

  • listen()

    初始化服务可连接队列,设置客户端连接到队列长度 back-log(Linux 2.2 之后,指已建立连接但尚在等待 accept 调用的连接队列长度,而尚未连接到 syn 队列由 tcp_max_syn_backlog 设置)。

  • accept()

    接受一个客户端的连接请求,返回新的文件描述符,老的表示正在监听的 socket,新的表示与某个客户端的连接,之后的数据读写通过新文件描述符完成。

    accept 函数可能发生错误返回错误码,这里比较关注的有 EMFILE(已到当前进程允许打开的最大文件描述符数量)和 EAGAIN / EWOULDBLOCK(socket 使用了非阻塞模式,而现在调用 accept 是阻塞式的,且当下没有可接受的连接,在 VxWorks 和 Windows 上,EAGAIN 的名字叫做 EWOULDBLOCK)。后者之后还会有遇到。

非阻塞模式下,EAGAIN / EWOULDBLOCK 是正常的,下次循环继续相关调用即可,它并不是一个“错误”。另外,当内核执行一个系统调用(比如 fork())时,若发现没有足够的资源(比如虚拟内存),也可能会返回 EAGAIN 提示应用程序再试着调用一次,也许下次就能成功,因为整个环境是动态的。

Linux I/O 函数

基本的,有 read()write() 可以向文件描述符中读写数据,网络 I/O 则常用 recv()send() 等:

  • recv()

    从 socket 中接收数据放到缓冲区,数据接收方式可以设置为 MSG_DONTWAIT 以实现非阻塞读操作(临时的,不需要设置 socket 为非阻塞,若设置了 socket 为非阻塞则没必要在设置),此时可能返回 EAGAIN / EWOULDBLOCK,同理,表示阻塞调用 recv() 没收到数据。

  • send()

    发送数据,将缓冲区数据按照某种方式通过 socket 发送出去,方式和 recv() 一样可以是 MSG_DONTWAIT,也可能收到 EAGAIN / EWOULDBLOCK,道理相同。

另外还有 readv()writev() 支持从(向)多个缓冲区接收(写入)数据,以及支持更复杂的高级操作的 recvmsg()sendmsg() ,还有 UDP 使用的 recvfromsendto(),这里不再赘述,目的都一样,就是为了从 socket 读写数据。

I/O 模型

阻塞式 I/O 模型

image-20191103002214592

图中以数据报套接字作为例子,在 I/O 模型上,虽然 TCP 的 recv 同理,但是更复杂(考虑 low-water-mark 等)。

可以看到,进程在整个过程都是 阻塞的,只要不是被中断,那么 recvfrom() 返回意味着数据已经到达 socket read buffer 并且被拷贝到用户空间的应用进程的缓冲区中。

显然,阻塞式 I/O 是同步的。

非阻塞 I/O 模型

image-20191103002832098

设置参数可以让 recvfrom 函数工作在非阻塞模型下,同样,若 socket 本身已经被设置为非阻塞模式则不用。

设置非阻塞模式使得当前没有数据可读时不让进程进入睡眠,而是立即返回错误,下次循环再接着调用 recvfrom()。这里的“循环”我们称之为轮询(polling),进程持续轮询以查看某个操作是否就绪的方式很浪费 CPU。使用非阻塞 I/O 模型的进程在数据准备好前不会被阻塞,只是浪费时间在不断的轮询之中。

由于使用 recvfrom ,非阻塞 I/O 模型也是同步的。

I/O 复用模型

image-20191103020956842

阻塞式 I/O 阻塞在单 socket 的读写上,非阻塞式 I/O 在轮询中循环,而 I/O 复用(I/O multiplexing)调用了 select()poll(),阻塞在对多个 socket 的轮询上,从而支持了单线程轮询多个 socket。

我们也可以使用多线程模型,在每个线程内使用阻塞式 I/O,同样可以达到类似效果。“多线程阻塞 I/O” 这种方式的工作流程简单明了,容易理解,但也有相应弊端,比如在海量连接的场景下,多个线程间的上下文切换开销将非常大,效率是没有多路复用高的。

I/O 多路复用本身并不会比阻塞式 I/O 快,看起来没有显著优势,甚至有可能开销更大(关乎两个系统调用,先 select()recvfrom()),但 I/O 复用模式可以在单线程内等待多个 socket 就绪,在高并发海量连接下能够很大程度地减少服务器的并发线程数(或者说同样的线程池能支持更多的并发),提高 CPU 执行效率。

I/O 复用优化了多个 socket 通道的轮询效率,进程在数据准备好前不会被阻塞,但仍旧使用 recvfrom() 拷贝数据,I/O 过程仍然是同步的。

信号驱动式 I/O 模型

image-20191103010911150

信号驱动式 I/O(Signal-Driven I/O)需要使用 sigaction() 系统调用去配置一个信号处理函数(回调函数),调用后立即返回(非阻塞),之后系统内核等待数据准备好,然后发送 SIGIO 信号通知进程,随后进程执行 回调函数 中的 recvfrom(),将数据从内核空间复制到用户空间。

该模式遵循好莱坞原则,优势明显,进程在数据准备好前不会被阻塞,但数据拷贝使用 recvfrom(),故仍然是同步的。

异步 I/O 模型

image-20191103011410315

异步 I/O 模型(Asynchronous I/O)使用 aio_read() 系统调用,同样需要注册一个回调函数,但与信号驱动式 I/O 的不同点在于,异步 I/O 中,信号发出时数据已经从内核拷贝到了用户空间,我们的回调函数可以直接使用数据了,全过程进程都不阻塞,是真正的异步 I/O。

对比

image-20191103013755950

上文已经提到,由于真正执行 I/O 操作是通过 recvfrom() 系统调用,除异步 I/O 外的前四种模式都是同步 I/O,只有异步 I/O 模型才是真正的异步,也符合 POSIX 定义的异步 I/O。

POSIX 定义描述:POSIX 异步 I/O (AIO) 接口允许程序启动一个或多个异步执行(即在后台执行)的 I/O 操作,而程序可以被多种方式通知到,这种方式可以是信号、实例化线程,甚至也可以不通知。

同步与异步,阻塞与非阻塞

这里的区别更多的讲的是“角度的不同”。

“同步”与“异步”是站在 被调用者 的角度上说的,被调用者是系统内核,所以内核函数的执行方式和函数返回时机的不同,是“同步”与“异步”的区别所在。

“阻塞”与“非阻塞”是站在 调用者 的角度上说的,这里的调用者是应用程序,如果调用者“死脑筋”,I/O 不返回,线程就不走,那么就说这个 I/O 是阻塞式的,故“阻塞”与“非阻塞”的区别在于调用方在结果返回之前的行为方式。

Java Demo

阻塞 I/O

Server

@Slf4j
public class BioTimeServer {
public static void main(String[] args) throws Exception {
ServerSocket ss = new ServerSocket(8081);
log.debug("started.");
Socket socket = ss.accept();
log.debug("connected");
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
// "ping" in utf-8
byte[] recvBytes = new byte[4];
int read = IOUtils.read(is, recvBytes);
while (read < 4) {
read = read + IOUtils.read(is, recvBytes, read, 4 - read);
}
String req = new String(recvBytes, StandardCharsets.UTF_8);
log.debug("request <== {}", req);
String resp = String.valueOf(System.currentTimeMillis());
os.write(resp.getBytes(StandardCharsets.UTF_8));
os.flush();
log.debug("response ==> {}", resp);
Thread.sleep(1000L);
if (!ss.isClosed()) {
ss.close();
}
}
}

Server 端无拆包逻辑,固定读 4 字节。

Client

@Slf4j
public class BioPingClient {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 8081);
log.debug("connected");
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
String req1 = "pi";
os.write(req1.getBytes(StandardCharsets.UTF_8));
os.flush();
Thread.sleep(1000L);
String req2 = "ng";
os.write(req2.getBytes(StandardCharsets.UTF_8));
os.flush();
log.debug("request ==> {}", req1 + req2);
byte[] recvBytes = new byte[13];
int read = IOUtils.read(is, recvBytes);
while (read < 13) {
read = read + IOUtils.read(is, recvBytes, read, 13 - read);
}
String resp = new String(recvBytes, StandardCharsets.UTF_8);
log.debug("response <== {}", resp);
if (!socket.isClosed()) {
socket.close();
}
}
}

Run

Server:

image-20191107235720354

Client:

image-20191107235734957

非阻塞 I/O 与多路复用

这里简单模拟一下三次请求。

Server

@Slf4j
public class NioTimeServer {
public static void main(String[] args) throws Exception {
Selector selector;
ServerSocketChannel server;
try {
// 多路复用器
selector = Selector.open();
server = ServerSocketChannel.open();
// 设置 accept() 为非阻塞
server.configureBlocking(false);
server.bind(new InetSocketAddress(8081));
} catch (Throwable e) {
log.error("failed to bind.", e);
return;
}
log.debug("started.");
server.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.currentThread().isInterrupted()) {
int selected = selector.select();
if (selected > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
handle(selector, key);
}
}
}
}

private static void handle(Selector selector, SelectionKey key) {
if (key.isAcceptable()) {
try {
SocketChannel channel = null;
while (channel == null) {
channel = ((ServerSocketChannel) key.channel()).accept();
}
// 设置 read()/write() 为非阻塞
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
log.debug("connected");
} catch (Throwable e) {
log.error("accept error.", e);
}
} else if (!key.isValid()) {
try {
key.channel().close();
key.cancel();
} catch (Throwable ignored) {
}
} else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
byte[] recvBytes = new byte[4];
ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
int read = 0;
while (read < 4) {
if (read == -1) {
log.debug("channel closed.");
key.cancel();
return;
}
try {
read = read + channel.read(recvBuf);
} catch (IOException e) {
if (e.getMessage().contains("Connection reset by peer")) {
log.debug("channel closed.");
channel.close();
key.cancel();
return;
}
}
}
key.interestOps(SelectionKey.OP_WRITE);
log.debug("request <== {}", new String(recvBytes, StandardCharsets.UTF_8));
} catch (Throwable e) {
log.error("read error.", e);
}
} else if (key.isWritable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
String resp = String.valueOf(System.currentTimeMillis());
ByteBuffer sendBuf = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
int write = channel.write(sendBuf);
while (write < 13) {
write = write + channel.write(sendBuf);
}
key.interestOps(SelectionKey.OP_READ);
log.debug("response ==> {}", resp);
} catch (Throwable e) {
log.error("write error.", e);
}
}
}
}

server.configureBlocking() 不影响 select(),也就是说 select() 还是阻塞的,不会变成 selectNow(),且在 Linux 中,socket 的 O_NONBLOCK 不会继承,所以建连后生成的 socket(SocketChannel)还要设置一下 O_NONBLOCK。

Client

@Slf4j
public class NioPingClient {
private static volatile AtomicInteger count = new AtomicInteger();

public static void main(String[] args) throws Exception {
Selector selector;
SocketChannel client;
try {
selector = Selector.open();
client = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8081));
client.configureBlocking(false);
} catch (Throwable e) {
log.error("connect failed.", e);
return;
}
log.debug("connected.");
client.register(selector, SelectionKey.OP_WRITE);
while (!Thread.currentThread().isInterrupted()) {
int selected = selector.select();
if (selected > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
handle(selector, key);
}
}
if (count.get() > 3) {
break;
}
}
}

private static void handle(Selector selector, SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
if (key.isWritable()) {
try {
count.incrementAndGet();
String req = "ping";
ByteBuffer sendBuf = ByteBuffer.wrap(req.getBytes(StandardCharsets.UTF_8));
int write = 0;
while (write < 4) {
write = write + channel.write(sendBuf);
}
key.interestOps(SelectionKey.OP_READ);
log.debug("request ==> {}", req);
} catch (Throwable e) {
log.error("write error.", e);
}
} else if (key.isReadable()) {
try {
byte[] recvBytes = new byte[13];
ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
int read = 0;
while (read < 13) {
read = read + channel.read(recvBuf);
}
key.interestOps(SelectionKey.OP_WRITE);
log.debug("response <== {}", new String(recvBytes, StandardCharsets.UTF_8));
if (count.get() >= 3) {
count.incrementAndGet();
channel.close();
key.cancel();
log.debug("channel closed.");
}
} catch (Throwable e) {
log.error("read error.", e);
}
}
}
}

Run

Server:

image-20191108013427701

Client:

image-20191108013437170

异步 I/O

Server

@Slf4j
public class AioTimeServer {
public static void main(String[] args) throws Exception {
AsynchronousChannelGroup group =
AsynchronousChannelGroup.withFixedThreadPool(4, r -> new Thread(r, "I/O Thread"));
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
server.bind(new InetSocketAddress(8081));
log.debug("started.");
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel channel, Void att) {
server.accept(null, this);
log.debug("connected");
ByteBuffer recvBuf = ByteBuffer.wrap(new byte[4]);
channel.read(recvBuf, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
if (result == -1) {
try {
channel.close();
} catch (Throwable e) {
log.error("close failed.", e);
}
log.debug("channel closed.");
return;
}
if (result < 4) {
channel.read(recvBuf, null, this);
return;
}
byte[] content = recvBuf.array();
log.debug("request <== {}", new String(content, StandardCharsets.UTF_8));
// 异步业务处理
// pool.submit(new BizTask(Arrays.copyOf(content, content.length)));
recvBuf.clear();
channel.read(recvBuf, null, this);

String resp = String.valueOf(System.currentTimeMillis());
ByteBuffer sendBuf = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
channel.write(sendBuf, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
if (result == -1) {
try {
channel.close();
} catch (Throwable e) {
log.error("close failed.", e);
}
log.debug("channel closed.");
return;
}
if (result < 13) {
channel.write(sendBuf, null, this);
return;
}
log.debug("response ==> {}", new String(sendBuf.array(), StandardCharsets.UTF_8));
sendBuf.clear();
}

@Override
public void failed(Throwable exc, Object attachment) {
log.error("write error.", exc);
}
});
}

@Override
public void failed(Throwable exc, Object attachment) {
log.error("read error.", exc);
}
});
}

@Override
public void failed(Throwable exc, Void att) {
log.error("accept error.", exc);
}
});
}
}

Client

@Slf4j
public class AioPingClient {
public static void main(String[] args) throws Exception {
AsynchronousChannelGroup group =
AsynchronousChannelGroup.withFixedThreadPool(4, r -> new Thread(r, "I/O Thread"));
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
channel.connect(new InetSocketAddress("127.0.0.1", 8081), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
log.debug("connected");
String req = "ping";
ByteBuffer sendBuf = ByteBuffer.wrap(req.getBytes(StandardCharsets.UTF_8));
channel.write(sendBuf, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
if (result == -1) {
try {
channel.close();
} catch (Throwable e) {
log.error("close failed.", e);
}
log.debug("channel closed.");
return;
}
if (result < 4) {
channel.write(sendBuf, null, this);
return;
}
log.debug("request ==> {}", req);

byte[] recvBytes = new byte[13];
ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
channel.read(recvBuf, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
if (result == -1) {
try {
channel.close();
} catch (Throwable e) {
log.error("close failed.", e);
}
log.debug("channel closed.");
return;
}
if (result < 13) {
channel.read(recvBuf, null, this);
return;
}
log.debug("response <== {}", new String(recvBytes, StandardCharsets.UTF_8));
new Thread(() -> {
try {
channel.close();
group.shutdown();
group.awaitTermination(10, TimeUnit.SECONDS);
} catch (Throwable e) {
log.error("close failed.", e);
}
}).start();
}

@Override
public void failed(Throwable exc, Object attachment) {
log.error("read error.", exc);
}
});
}

@Override
public void failed(Throwable exc, Void attachment) {
log.error("write error.", exc);
}
});
}

@Override
public void failed(Throwable exc, Object attachment) {
log.error("connect error.", exc);
}
});
}
}

Run

Server:

image-20191108115242800

Client:

image-20191108115234816

与 Reactor、Proactor 模式的联系

Java 基于多路复用 I/O 的编程模型似乎是对 Reactor 模式的实现,而异步 I/O 的编程模型则是 Proactor 的实现,我们随后会进一步学习 Reactor 和 Proactor 模式。