本篇谈谈对 Unix 五种 I/O 模型的理解和对比,以及 JDK 提供的相应实现。
相关知识
TCP 网络编程
以服务端为例:
socket()
初始化套接字,比如我们指定 IPv4 为网络域(PF_INET / AF_INET),使用提供序列化的、可靠的、双向连接的字节流的 TCP 协议,创建套接字,返回一个 TCP Socket 文件描述符。可以通过
fcntl()
文件控制函数将这个文件描述符设置为非阻塞模式(O_NONBLOCK)。bind()
套接字与端口绑定,绑定后,IP 地址、端口、协议类型都设定好了。
listen()
初始化服务可连接队列,设置客户端连接到队列长度 back-log(Linux 2.2 之后,指已建立连接但尚在等待
accept
调用的连接队列长度,而尚未连接的 syn 队列由tcp_max_syn_backlog
设置)。accept()
接受一个客户端的连接请求,返回新的文件描述符,老的表示正在监听的 socket,新的表示与某个客户端的连接,之后的数据读写通过新文件描述符完成。
accept 函数可能发生错误返回错误码,这里比较关注的有 EMFILE(已到当前进程允许打开的最大文件描述符数量)和 EAGAIN / EWOULDBLOCK(socket 使用了非阻塞模式,而现在调用 accept 是阻塞式的,且当下没有可接受的连接,在 VxWorks 和 Windows 上,EAGAIN 的名字叫做 EWOULDBLOCK)。后者之后还会有遇到。
非阻塞模式下,EAGAIN / EWOULDBLOCK 是正常的,下次循环继续相关调用即可,它并不是一个“错误”。另外,当内核执行一个系统调用(比如
fork()
)时,若发现没有足够的资源(比如虚拟内存),也可能会返回 EAGAIN 提示应用程序再试着调用一次,也许下次就能成功,因为整个环境是动态的。
Linux I/O 函数
基本的,有 read()
和 write()
可以向文件描述符中读写数据,网络 I/O 则常用 recv()
和 send()
等:
recv()
从 socket 中接收数据放到缓冲区,数据接收方式可以设置为 MSG_DONTWAIT 以实现非阻塞读操作(临时的,不需要设置 socket 为非阻塞,若设置了 socket 为非阻塞则没必要在设置),此时可能返回 EAGAIN / EWOULDBLOCK,同理,表示阻塞调用
recv()
没收到数据。send()
发送数据,将缓冲区数据按照某种方式通过 socket 发送出去,方式和
recv()
一样可以是 MSG_DONTWAIT,也可能收到 EAGAIN / EWOULDBLOCK,道理相同。
另外还有
readv()
、writev()
支持从(向)多个缓冲区接收(写入)数据,以及支持更复杂的高级操作的recvmsg()
和sendmsg()
,还有 UDP 使用的recvfrom
和sendto()
,这里不再赘述,目的都一样,就是为了从 socket 读写数据。
I/O 模型
阻塞式 I/O 模型
图中以数据报套接字作为例子,在 I/O 模型上,虽然 TCP 的
recv
同理,但是更复杂(考虑 low-water-mark 等)。
可以看到,进程在整个过程都是 阻塞的,只要不是被中断,那么 recvfrom()
返回意味着数据已经到达 socket read buffer 并且被拷贝到用户空间的应用进程的缓冲区中。
显然,阻塞式 I/O 是同步的。
非阻塞 I/O 模型
设置参数可以让 recvfrom
函数工作在非阻塞模型下,同样,若 socket 本身已经被设置为非阻塞模式则不用。
设置非阻塞模式使得当前没有数据可读时不让进程进入睡眠,而是立即返回错误,下次循环再接着调用 recvfrom()
。这里的“循环”我们称之为轮询(polling),进程持续轮询以查看某个操作是否就绪的方式很浪费 CPU。使用非阻塞 I/O 模型的进程在数据准备好前不会被阻塞,只是浪费时间在不断的轮询之中。
由于使用 recvfrom
,非阻塞 I/O 模型也是同步的。
I/O 复用模型
阻塞式 I/O 阻塞在某一个 socket 是否发生读写事件上,非阻塞式 I/O 在轮询中检查事件就绪状态,而 I/O 复用(I/O multiplexing)调用了 select()
或 poll()
,阻塞在对多个 socket 的轮询上,从而支持了单线程轮询多个 socket。
我们也可以使用多线程模型,在每个线程内使用阻塞式 I/O,同样可以达到类似效果。“多线程阻塞 I/O” 这种方式的工作流程简单明了,容易理解,但也有相应弊端,比如在海量连接的场景下,多个线程间的上下文切换开销将非常大,效率是没有多路复用高的。
I/O 多路复用本身并不会比阻塞式 I/O 快,看起来没有显著优势,甚至有可能开销更大(关乎两个系统调用,先 select()
后 recvfrom()
),但 I/O 复用模式可以在单线程内等待多个 socket 就绪,在高并发海量连接下能够很大程度地减少服务器的并发线程数(或者说同样的线程池能支持更多的并发),提高 CPU 执行效率。
I/O 复用优化了多个 socket 通道的轮询效率,进程在数据准备好前不会被阻塞,但仍旧使用 recvfrom()
拷贝数据,I/O 过程仍然是同步的。
信号驱动式 I/O 模型
信号驱动式 I/O(Signal-Driven I/O)需要使用 sigaction()
系统调用去配置一个信号处理函数(回调函数),调用后立即返回(非阻塞),之后系统内核等待数据准备好,然后发送 SIGIO 信号通知进程,随后进程执行 回调函数 中的 recvfrom()
,将数据从内核空间复制到用户空间。
该模式遵循好莱坞原则,优势明显,进程在数据准备好前不会被阻塞,但数据拷贝使用 recvfrom()
,故仍然是同步的。
异步 I/O 模型
异步 I/O 模型(Asynchronous I/O)使用 aio_read()
系统调用,同样需要注册一个回调函数,但与信号驱动式 I/O 的不同点在于,异步 I/O 中,信号发出时数据已经从内核拷贝到了用户空间,我们的回调函数可以直接使用数据了,全过程进程都不阻塞,是真正的异步 I/O。
对比
上文已经提到,由于真正执行 I/O 操作是通过 recvfrom()
系统调用,除异步 I/O 外的前四种模式都是同步 I/O,只有异步 I/O 模型才是真正的异步,也符合 POSIX 定义的异步 I/O。
POSIX 定义描述:POSIX 异步 I/O (AIO) 接口允许程序启动一个或多个异步执行(即在后台执行)的 I/O 操作,而程序可以被多种方式通知到,这种方式可以是信号、实例化线程,甚至也可以不通知。
同步与异步,阻塞与非阻塞
这里的区别更多的讲的是“角度的不同”。
“同步”与“异步”是站在 被调用者 的角度上说的,被调用者是系统内核,所以内核函数的执行方式和函数返回时机的不同,是“同步”与“异步”的区别所在。
“阻塞”与“非阻塞”是站在 调用者 的角度上说的,这里的调用者是应用程序,如果调用者“死脑筋”,I/O 不返回,线程就不走,那么就说这个 I/O 是阻塞式的,故“阻塞”与“非阻塞”的区别在于调用方在结果返回之前的行为方式。
Java Demo
阻塞 I/O
Server
public class BioTimeServer {
public static void main(String[] args) throws Exception {
ServerSocket ss = new ServerSocket(8081);
log.debug("started.");
Socket socket = ss.accept();
log.debug("connected");
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
// "ping" in utf-8
byte[] recvBytes = new byte[4];
int read = IOUtils.read(is, recvBytes);
while (read < 4) {
read = read + IOUtils.read(is, recvBytes, read, 4 - read);
}
String req = new String(recvBytes, StandardCharsets.UTF_8);
log.debug("request <== {}", req);
String resp = String.valueOf(System.currentTimeMillis());
os.write(resp.getBytes(StandardCharsets.UTF_8));
os.flush();
log.debug("response ==> {}", resp);
Thread.sleep(1000L);
if (!ss.isClosed()) {
ss.close();
}
}
}
Server 端无拆包逻辑,固定读 4 字节。
Client
public class BioPingClient {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 8081);
log.debug("connected");
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
String req1 = "pi";
os.write(req1.getBytes(StandardCharsets.UTF_8));
os.flush();
Thread.sleep(1000L);
String req2 = "ng";
os.write(req2.getBytes(StandardCharsets.UTF_8));
os.flush();
log.debug("request ==> {}", req1 + req2);
byte[] recvBytes = new byte[13];
int read = IOUtils.read(is, recvBytes);
while (read < 13) {
read = read + IOUtils.read(is, recvBytes, read, 13 - read);
}
String resp = new String(recvBytes, StandardCharsets.UTF_8);
log.debug("response <== {}", resp);
if (!socket.isClosed()) {
socket.close();
}
}
}
Run
Server:
Client:
非阻塞 I/O 与多路复用
这里简单模拟一下三次请求。
Server
public class NioTimeServer {
public static void main(String[] args) throws Exception {
Selector selector;
ServerSocketChannel server;
try {
// 多路复用器
selector = Selector.open();
server = ServerSocketChannel.open();
// 设置 accept() 为非阻塞
server.configureBlocking(false);
server.bind(new InetSocketAddress(8081));
} catch (Throwable e) {
log.error("failed to bind.", e);
return;
}
log.debug("started.");
server.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.currentThread().isInterrupted()) {
int selected = selector.select();
if (selected > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
handle(selector, key);
}
}
}
}
private static void handle(Selector selector, SelectionKey key) {
if (key.isAcceptable()) {
try {
SocketChannel channel = null;
while (channel == null) {
channel = ((ServerSocketChannel) key.channel()).accept();
}
// 设置 read()/write() 为非阻塞
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
log.debug("connected");
} catch (Throwable e) {
log.error("accept error.", e);
}
} else if (!key.isValid()) {
try {
key.channel().close();
key.cancel();
} catch (Throwable ignored) {
}
} else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
byte[] recvBytes = new byte[4];
ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
int read = 0;
while (read < 4) {
if (read == -1) {
log.debug("channel closed.");
key.cancel();
return;
}
try {
read = read + channel.read(recvBuf);
} catch (IOException e) {
if (e.getMessage().contains("Connection reset by peer")) {
log.debug("channel closed.");
channel.close();
key.cancel();
return;
}
}
}
key.interestOps(SelectionKey.OP_WRITE);
log.debug("request <== {}", new String(recvBytes, StandardCharsets.UTF_8));
} catch (Throwable e) {
log.error("read error.", e);
}
} else if (key.isWritable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
String resp = String.valueOf(System.currentTimeMillis());
ByteBuffer sendBuf = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
int write = channel.write(sendBuf);
while (write < 13) {
write = write + channel.write(sendBuf);
}
key.interestOps(SelectionKey.OP_READ);
log.debug("response ==> {}", resp);
} catch (Throwable e) {
log.error("write error.", e);
}
}
}
}
server.configureBlocking()
不影响select()
,也就是说select()
还是阻塞的,不会变成selectNow()
,且在 Linux 中,socket 的 O_NONBLOCK 不会继承,所以建连后生成的 socket(SocketChannel)还要设置一下 O_NONBLOCK。
Client
public class NioPingClient {
private static volatile AtomicInteger count = new AtomicInteger();
public static void main(String[] args) throws Exception {
Selector selector;
SocketChannel client;
try {
selector = Selector.open();
client = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8081));
client.configureBlocking(false);
} catch (Throwable e) {
log.error("connect failed.", e);
return;
}
log.debug("connected.");
client.register(selector, SelectionKey.OP_WRITE);
while (!Thread.currentThread().isInterrupted()) {
int selected = selector.select();
if (selected > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
handle(selector, key);
}
}
if (count.get() > 3) {
break;
}
}
}
private static void handle(Selector selector, SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
if (key.isWritable()) {
try {
count.incrementAndGet();
String req = "ping";
ByteBuffer sendBuf = ByteBuffer.wrap(req.getBytes(StandardCharsets.UTF_8));
int write = 0;
while (write < 4) {
write = write + channel.write(sendBuf);
}
key.interestOps(SelectionKey.OP_READ);
log.debug("request ==> {}", req);
} catch (Throwable e) {
log.error("write error.", e);
}
} else if (key.isReadable()) {
try {
byte[] recvBytes = new byte[13];
ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
int read = 0;
while (read < 13) {
read = read + channel.read(recvBuf);
}
key.interestOps(SelectionKey.OP_WRITE);
log.debug("response <== {}", new String(recvBytes, StandardCharsets.UTF_8));
if (count.get() >= 3) {
count.incrementAndGet();
channel.close();
key.cancel();
log.debug("channel closed.");
}
} catch (Throwable e) {
log.error("read error.", e);
}
}
}
}
Run
Server:
Client:
异步 I/O
Server
public class AioTimeServer {
public static void main(String[] args) throws Exception {
AsynchronousChannelGroup group =
AsynchronousChannelGroup.withFixedThreadPool(4, r -> new Thread(r, "I/O Thread"));
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
server.bind(new InetSocketAddress(8081));
log.debug("started.");
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
public void completed(AsynchronousSocketChannel channel, Void att) {
server.accept(null, this);
log.debug("connected");
ByteBuffer recvBuf = ByteBuffer.wrap(new byte[4]);
channel.read(recvBuf, null, new CompletionHandler<Integer, Object>() {
public void completed(Integer result, Object attachment) {
if (result == -1) {
try {
channel.close();
} catch (Throwable e) {
log.error("close failed.", e);
}
log.debug("channel closed.");
return;
}
if (result < 4) {
channel.read(recvBuf, null, this);
return;
}
byte[] content = recvBuf.array();
log.debug("request <== {}", new String(content, StandardCharsets.UTF_8));
// 异步业务处理
// pool.submit(new BizTask(Arrays.copyOf(content, content.length)));
recvBuf.clear();
channel.read(recvBuf, null, this);
String resp = String.valueOf(System.currentTimeMillis());
ByteBuffer sendBuf = ByteBuffer.wrap(resp.getBytes(StandardCharsets.UTF_8));
channel.write(sendBuf, null, new CompletionHandler<Integer, Object>() {
public void completed(Integer result, Object attachment) {
if (result == -1) {
try {
channel.close();
} catch (Throwable e) {
log.error("close failed.", e);
}
log.debug("channel closed.");
return;
}
if (result < 13) {
channel.write(sendBuf, null, this);
return;
}
log.debug("response ==> {}", new String(sendBuf.array(), StandardCharsets.UTF_8));
sendBuf.clear();
}
public void failed(Throwable exc, Object attachment) {
log.error("write error.", exc);
}
});
}
public void failed(Throwable exc, Object attachment) {
log.error("read error.", exc);
}
});
}
public void failed(Throwable exc, Void att) {
log.error("accept error.", exc);
}
});
}
}
Client
public class AioPingClient {
public static void main(String[] args) throws Exception {
AsynchronousChannelGroup group =
AsynchronousChannelGroup.withFixedThreadPool(4, r -> new Thread(r, "I/O Thread"));
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
channel.connect(new InetSocketAddress("127.0.0.1", 8081), null, new CompletionHandler<Void, Object>() {
public void completed(Void result, Object attachment) {
log.debug("connected");
String req = "ping";
ByteBuffer sendBuf = ByteBuffer.wrap(req.getBytes(StandardCharsets.UTF_8));
channel.write(sendBuf, null, new CompletionHandler<Integer, Void>() {
public void completed(Integer result, Void attachment) {
if (result == -1) {
try {
channel.close();
} catch (Throwable e) {
log.error("close failed.", e);
}
log.debug("channel closed.");
return;
}
if (result < 4) {
channel.write(sendBuf, null, this);
return;
}
log.debug("request ==> {}", req);
byte[] recvBytes = new byte[13];
ByteBuffer recvBuf = ByteBuffer.wrap(recvBytes);
channel.read(recvBuf, null, new CompletionHandler<Integer, Object>() {
public void completed(Integer result, Object attachment) {
if (result == -1) {
try {
channel.close();
} catch (Throwable e) {
log.error("close failed.", e);
}
log.debug("channel closed.");
return;
}
if (result < 13) {
channel.read(recvBuf, null, this);
return;
}
log.debug("response <== {}", new String(recvBytes, StandardCharsets.UTF_8));
new Thread(() -> {
try {
channel.close();
group.shutdown();
group.awaitTermination(10, TimeUnit.SECONDS);
} catch (Throwable e) {
log.error("close failed.", e);
}
}).start();
}
public void failed(Throwable exc, Object attachment) {
log.error("read error.", exc);
}
});
}
public void failed(Throwable exc, Void attachment) {
log.error("write error.", exc);
}
});
}
public void failed(Throwable exc, Object attachment) {
log.error("connect error.", exc);
}
});
}
}
Run
Server:
Client:
与 Reactor、Proactor 模式的联系
Java 基于多路复用 I/O 的编程模型似乎是对 Reactor 模式的实现,而异步 I/O 的编程模型则是 Proactor 的实现,我们随后会进一步学习 Reactor 和 Proactor 模式。