Java NIO 源码解析

本篇为 Java 原生 NIO 编程的源码浅析。

Demo

Selector selector;
ServerSocketChannel server;
try {
    selector = Selector.open();
    server = ServerSocketChannel.open();
    server.configureBlocking(false);
    server.bind(new InetSocketAddress(8081));
} catch (Throwable e) {
    // ...
}
server.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.currentThread().isInterrupted()) {
    if (selector.select() > 0) {
        Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
        while (iter.hasNext()) {
            SelectionKey key = iter.next();
            iter.remove();
            handle(selector, key);
        }
    }
}

源码分析

Selector 实例化

open() 方法通过 Provider 机制创建 Selector:

public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

provider() 中,采取了三种策略获取 SelectorProvider:

if (loadProviderFromProperty())
    return provider;
if (loadProviderAsService())
    return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
  • loadProviderFromProperty:尝试查找 Key 为 “java.nio.channels.spi.SelectorProvider” 的系统属性值;

  • loadProviderAsService:尝试通过 JDK 的 SPI 扩展机制加载 java.nio.channels.spi.SelectorProvider 实现;

  • DefaultSelectorProvider:不同平台的 JRE 有不同实现:

    image-20191201143846996

本文 JDK 源码来源为 OpenJDK 8。

若不通过系统属性和 SPI 文件特别指明,那么我们在 Linux 下(共用 solaris 目录)使用的就是 EPollSelectorProvider,调用其 openSelector() 方法拿到的 Selector 为 EPollSelectorImpl 实例:

if (osname.equals("SunOS"))
    return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
    return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
public class EPollSelectorProvider extends SelectorProviderImpl {
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}

EPollSelectorImpl 继承体系如下:

EPollSelectorImpl

SelectorImpl 基本具有了一个 Selector 的框架,并将 doSelect()implRegister()wakeup() 等方法留给子类实现。EPollSelectorImpl 最主要的作用就是持有一个 epoll 组件 EPollArrayWrapper,后者负责 epoll 的一系列系统调用:

EPollSelectorImpl(SelectorProvider sp) throws IOException {
    ...
    long pipeFds = IOUtil.makePipe(false);
    fd0 = (int) (pipeFds >>> 32);
    fd1 = (int) pipeFds;
    try {
        pollWrapper = new EPollArrayWrapper();
        pollWrapper.initInterrupt(fd0, fd1);
        ...
    } catch (Throwable t) {
        ...
    }
}
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
{
    int fd[2];
    pipe(fd);
    configureBlocking(fd[0], JNI_FALSE);
    configureBlocking(fd[1], JNI_FALSE);
    return ((jlong) fd[0] << 32) | (jlong) fd[1];
}
void initInterrupt(int fd0, int fd1) {
    outgoingInterruptFD = fd1;
    incomingInterruptFD = fd0;
    epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
public void interrupt() {
    interrupt(outgoingInterruptFD);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
    int fakebuf[1];
    fakebuf[0] = 1;
    if (write(fd, fakebuf, 1) < 0) {
        JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
    }
}

这里使用了 IOUtil 工具类的 native 方法 makePipe(),该方法通过 POSIX API pipe() 构造了一个非阻塞的进程间通道,这里是为了响应 wakeup() 的中断,具体做法是 initInterrupt() 将 fd0 交给 epoll,然后在调用 wakeup() 时使用 fd1 去写一个 buffer 导致 fd0 这个读通道读到这个 input,从而使 select 返回(”Self-Pipe Trick”)。

EPollArrayWrapper 的部分属性:

private final int epfd;

private final AllocatedNativeObject pollArray;

private final long pollArrayAddress;

private int outgoingInterruptFD;

private int incomingInterruptFD;

private int interruptedIndex;
EPollArrayWrapper() throws IOException {
    epfd = epollCreate();
  
    int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
    pollArray = new AllocatedNativeObject(allocationSize, true);
    pollArrayAddress = pollArray.address();

    if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
        eventsHigh = new HashMap<>();
}

epollCreate() 方法触发 epoll_create() 系统调用,详见 Epoll 简介

NUM_EPOLLEVENTS 取 OPEN_MAX 和 8192 的较小者。

OPEN_MAX 为 getrlimit() 系统调用得到的 Linux 限制的单个进程能打开的最大文件描述符数量,可以在 shell 中通过 ulimit -n 命令得到。

SIZE_EPOLLEVENT 则为 epoll_wait() 系统调用中填充的结构体 epoll_event 的大小。

NUM_EPOLLEVENTS 与 SIZE_EPOLLEVENT 二者相乘即为 event 数组内存空间大小,之后以这个大小开辟堆外内存(AllocatedNativeObject 通过 Unsafe.allocateMemory),将内存地址赋给 pollArrayAddress。

这里有个优化,当 OPEN_MAX 过于大时(文件描述符超过 64k 或者系统属性 sun.nio.ch.maxUpdateArraySize 定义的值),采用 HashMap(fd -> 事件)而非 byte[] 数组去存储事件。

使用 byte[] 数组存储事件时 fd 为数组下标,速度快,而如果 fd 过多,可能产生空间浪费,所以改用 HashMap。

void initInterrupt(int fd0, int fd1) {
    outgoingInterruptFD = fd1;
    incomingInterruptFD = fd0;
    epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}

存储 fd0 和 fd1 用于中断,触发 epoll_ctl() 系统调用,将 fd0 添加到 epoll 结构中,监听 连接到达 事件,由于只配置了 EPOLLIN,故使用的触发模式为默认的 LT 模式。

ServerSocketChannel 实例化、创建 socket

ServerSocketChannel.open() 使用相同的机制拿到 EPollSelectorProvider,调用继承自父类 SelectorProviderImpl 的 openServerSocketChannel() 方法获取 ServerSocketChannel 类型实例:

public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this);
}

ServerSocketChannelImpl 继承体系如下:

image-20191201150601830

ServerSocketChannelImpl 具备进行 accept 连接的能力,并且可以被 Selector “select”,具有 interestOps 和 readyOps,也即订阅感兴趣的事件的能力。

ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
    this.fd = Net.serverSocket(true);
    this.fdVal = IOUtil.fdVal(fd);
    this.state = ST_INUSE;
}

父类构造器仅作 provider 的持有,通过 Net 类调用 native 方法,我们创建了一个基于可靠连接的 socket,默认打开了 SO_REUSEADDR。我们使用 socket0() 返回的文件描述符构造了一个 Java 的 FileDescriptor 对象返回:

static FileDescriptor serverSocket(boolean stream) {
    return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}

private static native int socket0(boolean preferIPv6, boolean stream, boolean reuse,
                                  boolean fastLoopback);

其中,fastLoopback 由系统属性 jdk.net.useFastTcpLoopback 控制,是 Windows 特有的一项优化,可以在内核 IP 解析前拦截本地环回地址的调用,加快本地调用速度,这个值在 Linux 的实现中已被忽略。socket0() 方法的主要代码就是通过系统调用 socket(domain, type, 0) 打开一个 socket,设置地址重用,并且在系统支持且 java.net.preferIPv4Stack 系统属性未开启时使用 IPv4/IPv6 双协议栈。

设置 ServerSocketChannel 为非阻塞

ServerSocketChannelImpl 通过 IOUtil 的 native 方法调用 POSIX 标准的 fcntl() 函数完成 socket 的非阻塞的设置:

protected void implConfigureBlocking(boolean block) throws IOException {
    IOUtil.configureBlocking(fd, block);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz,
                                         jobject fdo, jboolean blocking)
{
    if (configureBlocking(fdval(env, fdo), blocking) < 0)
        JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
}
static int
configureBlocking(int fd, jboolean blocking)
{
    int flags = fcntl(fd, F_GETFL);
    int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);

    return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}

ServerSocketChannel 绑定端口实现监听

// 绑定 socket 到端口
Net.bind(fd, isa.getAddress(), isa.getPort());
// 初始化 backlog(默认 50)
Net.listen(fd, backlog < 1 ? 50 : backlog);

bind() 方法调用 bind0() native 方法,listen 本身是 native 方法,二者都是去调相关的系统调用(bind 和 listen)。

将 ServerSocketChannel 注册到多路复用器

server.register(selector, SelectionKey.OP_ACCEPT);

reigster 实现在 AbstractSelectableChannel 中:

SelectionKey k = findKey(sel);
if (k != null) {
    k.interestOps(ops);
}
if (k == null) {
    k = ((AbstractSelector)sel).register(this, ops, att);
    addKey(k);
}

AbstractSelectableChannel 持有一个 SelectionKey[] keys 用于存放已经产生的 SelectionKey(将当前 channel 注册到 selector 时产生)。

如果 keys 里没有这个 selector,则将 channel 自己注册到 selector 上,并且将返回的“身份证” SelectionKey 存到 keys 中。如果有则更新 interestOps 订阅。

EPollSelectorImpl 使用 SelectorImpl 中实现的 register() 方法:

SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
synchronized (publicKeys) {
    implRegister(k);
}
k.interestOps(ops);

注意这里会获取 publickKeys 对象锁,并且仅有 implRegister 在锁内,而 Selector.select() 会尝试获取同样的锁。关于这个锁的争抢问题,我们在《Reactor、Proactor 模式学习笔记》中有提到。

构造一个 SelectionKey 实例,设置 interestOps。

implRegister() 由 EPollSelectorImpl 自己实现:

protected void implRegister(SelectionKeyImpl ski) {
    SelChImpl ch = ski.channel;
    int fd = Integer.valueOf(ch.getFDVal());
    fdToKey.put(fd, ski);
    pollWrapper.add(fd);
    keys.add(ski);
}

fdToKey.put(fd, ski) 建立 “ServerSocketChannelImpl 文件描述符 -> SelectionKey” 的映射关系存到一个 Map 中,方便查找当前 Selector 下某个 Channel 对应的 Key。

pollWrapper.add(fd) 将 Channel 的文件描述符添加到 poll 组件中。

keys.add(ski) 将 SelectionKey 存到一个 HashSet 中。

Selector.select()

SelectorImpl 实现了 select 的一些检查,并且获取了多个锁(包括前面提到的 publicKeys 对象锁),多路复用的核心流程 doSelect() 则留给了子类实现,EPollSelectorImpl 进行一次 select 的实现如下:

private int lockAndDoSelect(long timeout) throws IOException {
    synchronized (this) {
        synchronized (publicKeys) {
            synchronized (publicSelectedKeys) {
                return doSelect(timeout);
            }
        }
    }
}

protected int doSelect(long timeout) throws IOException {
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();
    // 清理中断
    // ...
    return numKeysUpdated;
}

begin():线程被打断时触发 wakeup() 方法。

end():取消 begin 的设置。

processDeregisterQueue():清理上一次 select 后被 cancel 的 key 的资源。

最终返回更新过的 SelectionKey(即发生过事件的 Channel)的数量 updateSelectedKeys。

显然,关键方法为 poll()

int poll(long timeout) throws IOException {
    updateRegistrations();
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    // 处理中断
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}

updateRegistrations() 方法中使用 epoll_ctl() 系统调用更新内核 epoll 结构中等待事件发生的 fd 列表的感兴趣事件。poll 的核心是 epollWait() 方法,也就是 epoll_wait() 系统调用,不再赘述。