Java NIO 源码解析

本篇为 Java 原生 NIO 编程的源码浅析。

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Selector selector;
ServerSocketChannel server;
try {
selector = Selector.open();
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(8081));
} catch (Throwable e) {
// ...
}
server.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.currentThread().isInterrupted()) {
if (selector.select() > 0) {
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
handle(selector, key);
}
}
}

源码分析

Selector 实例化

open() 方法通过 Provider 机制创建 Selector:

1
2
3
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

provider() 中,采取了三种策略获取 SelectorProvider:

1
2
3
4
5
6
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
  • loadProviderFromProperty:尝试查找 Key 为 “java.nio.channels.spi.SelectorProvider” 的系统属性值;

  • loadProviderAsService:尝试通过 JDK 的 SPI 扩展机制加载 java.nio.channels.spi.SelectorProvider 实现;

  • DefaultSelectorProvider:不同平台的 JRE 有不同实现:

    image-20191201143846996

本文 JDK 源码来源为 OpenJDK 8。

若不通过系统属性和 SPI 文件特别指明,那么我们在 Linux 下(共用 solaris 目录)使用的就是 EPollSelectorProvider,调用其 openSelector() 方法拿到的 Selector 为 EPollSelectorImpl 实例:

1
2
3
4
5
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
1
2
3
4
5
6
7
8
9
public class EPollSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}

public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}

EPollSelectorImpl 继承体系如下:

EPollSelectorImpl

SelectorImpl 基本具有了一个 Selector 的框架,并将 doSelect()implRegister()wakeup() 等方法留给子类实现。EPollSelectorImpl 最主要的作用就是持有一个 epoll 组件 EPollArrayWrapper,后者负责 epoll 的一系列系统调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
EPollSelectorImpl(SelectorProvider sp) throws IOException {
...
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
try {
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
...
} catch (Throwable t) {
...
}
}
1
2
3
4
5
6
7
8
9
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
{
int fd[2];
pipe(fd);
configureBlocking(fd[0], JNI_FALSE);
configureBlocking(fd[1], JNI_FALSE);
return ((jlong) fd[0] << 32) | (jlong) fd[1];
}
1
2
3
4
5
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
1
2
3
public void interrupt() {
interrupt(outgoingInterruptFD);
}
1
2
3
4
5
6
7
8
9
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
int fakebuf[1];
fakebuf[0] = 1;
if (write(fd, fakebuf, 1) < 0) {
JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
}
}

这里使用了 IOUtil 工具类的 native 方法 makePipe(),该方法通过 POSIX API pipe() 构造了一个非阻塞的进程间通道,这里是为了响应 wakeup() 的中断,具体做法是 initInterrupt() 将 fd0 交给 epoll,然后在调用 wakeup() 时使用 fd1 去写一个 buffer 导致 fd0 这个读通道读到这个 input,从而使 select 返回(”Self-Pipe Trick”)。

EPollArrayWrapper 的部分属性:

1
2
3
4
5
6
7
8
9
10
11
private final int epfd;

private final AllocatedNativeObject pollArray;

private final long pollArrayAddress;

private int outgoingInterruptFD;

private int incomingInterruptFD;

private int interruptedIndex;
1
2
3
4
5
6
7
8
9
10
EPollArrayWrapper() throws IOException {
epfd = epollCreate();

int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();

if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}

epollCreate() 方法触发 epoll_create() 系统调用,详见 Epoll 简介

NUM_EPOLLEVENTS 取 OPEN_MAX 和 8192 的较小者。

OPEN_MAX 为 getrlimit() 系统调用得到的 Linux 限制的单个进程能打开的最大文件描述符数量,可以在 shell 中通过 ulimit -n 命令得到。

SIZE_EPOLLEVENT 则为 epoll_wait() 系统调用中填充的结构体 epoll_event 的大小。

NUM_EPOLLEVENTS 与 SIZE_EPOLLEVENT 二者相乘即为 event 数组内存空间大小,之后以这个大小开辟堆外内存(AllocatedNativeObject 通过 Unsafe.allocateMemory),将内存地址赋给 pollArrayAddress。

这里有个优化,当 OPEN_MAX 过于大时(文件描述符超过 64k 或者系统属性 sun.nio.ch.maxUpdateArraySize 定义的值),采用 HashMap(fd -> 事件)而非 byte[] 数组去存储事件。

使用 byte[] 数组存储事件时 fd 为数组下标,速度快,而如果 fd 过多,可能产生空间浪费,所以改用 HashMap。

1
2
3
4
5
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}

存储 fd0 和 fd1 用于中断,触发 epoll_ctl() 系统调用,将 fd0 添加到 epoll 结构中,监听 连接到达 事件,由于只配置了 EPOLLIN,故使用的触发模式为默认的 LT 模式。

ServerSocketChannel 实例化、创建 socket

ServerSocketChannel.open() 使用相同的机制拿到 EPollSelectorProvider,调用继承自父类 SelectorProviderImpl 的 openServerSocketChannel() 方法获取 ServerSocketChannel 类型实例:

1
2
3
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}

ServerSocketChannelImpl 继承体系如下:

image-20191201150601830

ServerSocketChannelImpl 具备进行 accept 连接的能力,并且可以被 Selector “select”,具有 interestOps 和 readyOps,也即订阅感兴趣的事件的能力。

1
2
3
4
5
6
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}

父类构造器仅作 provider 的持有,通过 Net 类调用 native 方法,我们创建了一个基于可靠连接的 socket,默认打开了 SO_REUSEADDR。我们使用 socket0() 返回的文件描述符构造了一个 Java 的 FileDescriptor 对象返回:

1
2
3
4
5
6
static FileDescriptor serverSocket(boolean stream) {
return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}

private static native int socket0(boolean preferIPv6, boolean stream, boolean reuse,
boolean fastLoopback);

其中,fastLoopback 由系统属性 jdk.net.useFastTcpLoopback 控制,是 Windows 特有的一项优化,可以在内核 IP 解析前拦截本地环回地址的调用,加快本地调用速度,这个值在 Linux 的实现中已被忽略。socket0() 方法的主要代码就是通过系统调用 socket(domain, type, 0) 打开一个 socket,设置地址重用,并且在系统支持且 java.net.preferIPv4Stack 系统属性未开启时使用 IPv4/IPv6 双协议栈。

设置 ServerSocketChannel 为非阻塞

ServerSocketChannelImpl 通过 IOUtil 的 native 方法调用 POSIX 标准的 fcntl() 函数完成 socket 的非阻塞的设置:

1
2
3
protected void implConfigureBlocking(boolean block) throws IOException {
IOUtil.configureBlocking(fd, block);
}
1
2
3
4
5
6
7
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz,
jobject fdo, jboolean blocking)
{
if (configureBlocking(fdval(env, fdo), blocking) < 0)
JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
}
1
2
3
4
5
6
7
8
static int
configureBlocking(int fd, jboolean blocking)
{
int flags = fcntl(fd, F_GETFL);
int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);

return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}

ServerSocketChannel 绑定端口实现监听

1
2
3
4
// 绑定 socket 到端口
Net.bind(fd, isa.getAddress(), isa.getPort());
// 初始化 backlog(默认 50)
Net.listen(fd, backlog < 1 ? 50 : backlog);

bind() 方法调用 bind0() native 方法,listen 本身是 native 方法,二者都是去调相关的系统调用(bind 和 listen)。

将 ServerSocketChannel 注册到多路复用器

1
server.register(selector, SelectionKey.OP_ACCEPT);

reigster 实现在 AbstractSelectableChannel 中:

1
2
3
4
5
6
7
8
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
}
if (k == null) {
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}

AbstractSelectableChannel 持有一个 SelectionKey[] keys 用于存放已经产生的 SelectionKey(将当前 channel 注册到 selector 时产生)。

如果 keys 里没有这个 selector,则将 channel 自己注册到 selector 上,并且将返回的“身份证” SelectionKey 存到 keys 中。如果有则更新 interestOps 订阅。

EPollSelectorImpl 使用 SelectorImpl 中实现的 register() 方法:

1
2
3
4
5
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);

注意这里会获取 publickKeys 对象锁,并且仅有 implRegister 在锁内,而 Selector.select() 会尝试获取同样的锁。关于这个锁的争抢问题,我们在《Reactor、Proactor 模式学习笔记》中有提到。

构造一个 SelectionKey 实例,设置 interestOps。

implRegister() 由 EPollSelectorImpl 自己实现:

1
2
3
4
5
6
7
protected void implRegister(SelectionKeyImpl ski) {
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}

fdToKey.put(fd, ski) 建立 “ServerSocketChannelImpl 文件描述符 -> SelectionKey” 的映射关系存到一个 Map 中,方便查找当前 Selector 下某个 Channel 对应的 Key。

pollWrapper.add(fd) 将 Channel 的文件描述符添加到 poll 组件中。

keys.add(ski) 将 SelectionKey 存到一个 HashSet 中。

Selector.select()

SelectorImpl 实现了 select 的一些检查,并且获取了多个锁(包括前面提到的 publicKeys 对象锁),多路复用的核心流程 doSelect() 则留给了子类实现,EPollSelectorImpl 进行一次 select 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}

protected int doSelect(long timeout) throws IOException {
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
// 清理中断
// ...
return numKeysUpdated;
}

begin():线程被打断时触发 wakeup() 方法。

end():取消 begin 的设置。

processDeregisterQueue():清理上一次 select 后被 cancel 的 key 的资源。

最终返回更新过的 SelectionKey(即发生过事件的 Channel)的数量 updateSelectedKeys。

显然,关键方法为 poll()

1
2
3
4
5
6
7
8
9
10
11
12
13
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
// 处理中断
for (int i=0; i
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}

updateRegistrations() 方法中使用 epoll_ctl() 系统调用更新内核 epoll 结构中等待事件发生的 fd 列表的感兴趣事件。poll 的核心是 epollWait() 方法,也就是 epoll_wait() 系统调用,不再赘述。