本篇为 Java 原生 NIO 编程的源码浅析。
Demo
Selector selector;
ServerSocketChannel server;
try {
selector = Selector.open();
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(8081));
} catch (Throwable e) {
// ...
}
server.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.currentThread().isInterrupted()) {
if (selector.select() > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
handle(selector, key);
}
}
}
源码分析
Selector 实例化
open()
方法通过 Provider 机制创建 Selector:
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
provider()
中,采取了三种策略获取 SelectorProvider:
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
loadProviderFromProperty:尝试查找 Key 为 “java.nio.channels.spi.SelectorProvider” 的系统属性值;
loadProviderAsService:尝试通过 JDK 的 SPI 扩展机制加载 java.nio.channels.spi.SelectorProvider 实现;
DefaultSelectorProvider:不同平台的 JRE 有不同实现:
本文 JDK 源码来源为 OpenJDK 8。
若不通过系统属性和 SPI 文件特别指明,那么我们在 Linux 下(共用 solaris 目录)使用的就是 EPollSelectorProvider,调用其 openSelector()
方法拿到的 Selector 为 EPollSelectorImpl 实例:
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
public class EPollSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
EPollSelectorImpl 继承体系如下:
SelectorImpl 基本具有了一个 Selector 的框架,并将 doSelect()
、implRegister()
、wakeup()
等方法留给子类实现。EPollSelectorImpl 最主要的作用就是持有一个 epoll 组件 EPollArrayWrapper,后者负责 epoll 的一系列系统调用:
EPollSelectorImpl(SelectorProvider sp) throws IOException {
...
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
try {
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
...
} catch (Throwable t) {
...
}
}
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
{
int fd[2];
pipe(fd);
configureBlocking(fd[0], JNI_FALSE);
configureBlocking(fd[1], JNI_FALSE);
return ((jlong) fd[0] << 32) | (jlong) fd[1];
}
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
public void interrupt() {
interrupt(outgoingInterruptFD);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
int fakebuf[1];
fakebuf[0] = 1;
if (write(fd, fakebuf, 1) < 0) {
JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
}
}
这里使用了 IOUtil 工具类的 native 方法 makePipe()
,该方法通过 POSIX API pipe()
构造了一个非阻塞的进程间通道,这里是为了响应 wakeup()
的中断,具体做法是 initInterrupt()
将 fd0 交给 epoll,然后在调用 wakeup()
时使用 fd1 去写一个 buffer 导致 fd0 这个读通道读到这个 input,从而使 select 返回(”Self-Pipe Trick”)。
EPollArrayWrapper 的部分属性:
private final int epfd;
private final AllocatedNativeObject pollArray;
private final long pollArrayAddress;
private int outgoingInterruptFD;
private int incomingInterruptFD;
private int interruptedIndex;
EPollArrayWrapper() throws IOException {
epfd = epollCreate();
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
epollCreate()
方法触发 epoll_create()
系统调用,详见 Epoll 简介。
NUM_EPOLLEVENTS 取 OPEN_MAX 和 8192 的较小者。
OPEN_MAX 为
getrlimit()
系统调用得到的 Linux 限制的单个进程能打开的最大文件描述符数量,可以在 shell 中通过ulimit -n
命令得到。
SIZE_EPOLLEVENT 则为 epoll_wait()
系统调用中填充的结构体 epoll_event 的大小。
NUM_EPOLLEVENTS 与 SIZE_EPOLLEVENT 二者相乘即为 event 数组内存空间大小,之后以这个大小开辟堆外内存(AllocatedNativeObject 通过 Unsafe.allocateMemory),将内存地址赋给 pollArrayAddress。
这里有个优化,当 OPEN_MAX 过于大时(文件描述符超过 64k 或者系统属性 sun.nio.ch.maxUpdateArraySize
定义的值),采用 HashMap(fd ->
事件)而非 byte[] 数组去存储事件。
使用 byte[] 数组存储事件时 fd 为数组下标,速度快,而如果 fd 过多,可能产生空间浪费,所以改用 HashMap。
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
存储 fd0 和 fd1 用于中断,触发 epoll_ctl()
系统调用,将 fd0 添加到 epoll 结构中,监听 连接到达 事件,由于只配置了 EPOLLIN,故使用的触发模式为默认的 LT 模式。
ServerSocketChannel 实例化、创建 socket
ServerSocketChannel.open()
使用相同的机制拿到 EPollSelectorProvider,调用继承自父类 SelectorProviderImpl 的 openServerSocketChannel()
方法获取 ServerSocketChannel 类型实例:
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
ServerSocketChannelImpl 继承体系如下:
ServerSocketChannelImpl 具备进行 accept 连接的能力,并且可以被 Selector “select”,具有 interestOps 和 readyOps,也即订阅感兴趣的事件的能力。
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
父类构造器仅作 provider 的持有,通过 Net 类调用 native 方法,我们创建了一个基于可靠连接的 socket,默认打开了 SO_REUSEADDR。我们使用 socket0()
返回的文件描述符构造了一个 Java 的 FileDescriptor 对象返回:
static FileDescriptor serverSocket(boolean stream) {
return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}
private static native int socket0(boolean preferIPv6, boolean stream, boolean reuse,
boolean fastLoopback);
其中,fastLoopback 由系统属性 jdk.net.useFastTcpLoopback
控制,是 Windows 特有的一项优化,可以在内核 IP 解析前拦截本地环回地址的调用,加快本地调用速度,这个值在 Linux 的实现中已被忽略。socket0()
方法的主要代码就是通过系统调用 socket(domain, type, 0)
打开一个 socket,设置地址重用,并且在系统支持且 java.net.preferIPv4Stack
系统属性未开启时使用 IPv4/IPv6 双协议栈。
设置 ServerSocketChannel 为非阻塞
ServerSocketChannelImpl 通过 IOUtil 的 native 方法调用 POSIX 标准的 fcntl()
函数完成 socket 的非阻塞的设置:
protected void implConfigureBlocking(boolean block) throws IOException {
IOUtil.configureBlocking(fd, block);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz,
jobject fdo, jboolean blocking)
{
if (configureBlocking(fdval(env, fdo), blocking) < 0)
JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
}
static int
configureBlocking(int fd, jboolean blocking)
{
int flags = fcntl(fd, F_GETFL);
int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}
ServerSocketChannel 绑定端口实现监听
// 绑定 socket 到端口
Net.bind(fd, isa.getAddress(), isa.getPort());
// 初始化 backlog(默认 50)
Net.listen(fd, backlog < 1 ? 50 : backlog);
bind()
方法调用 bind0()
native 方法,listen 本身是 native 方法,二者都是去调相关的系统调用(bind 和 listen)。
将 ServerSocketChannel 注册到多路复用器
server.register(selector, SelectionKey.OP_ACCEPT);
reigster 实现在 AbstractSelectableChannel 中:
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
}
if (k == null) {
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
AbstractSelectableChannel 持有一个 SelectionKey[] keys
用于存放已经产生的 SelectionKey(将当前 channel 注册到 selector 时产生)。
如果 keys 里没有这个 selector,则将 channel 自己注册到 selector 上,并且将返回的“身份证” SelectionKey 存到 keys 中。如果有则更新 interestOps 订阅。
EPollSelectorImpl 使用 SelectorImpl 中实现的 register()
方法:
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
注意这里会获取 publickKeys 对象锁,并且仅有 implRegister 在锁内,而 Selector.select() 会尝试获取同样的锁。关于这个锁的争抢问题,我们在《Reactor、Proactor 模式学习笔记》中有提到。
构造一个 SelectionKey 实例,设置 interestOps。
implRegister()
由 EPollSelectorImpl 自己实现:
protected void implRegister(SelectionKeyImpl ski) {
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
fdToKey.put(fd, ski)
建立 “ServerSocketChannelImpl 文件描述符 ->
SelectionKey” 的映射关系存到一个 Map 中,方便查找当前 Selector 下某个 Channel 对应的 Key。
pollWrapper.add(fd)
将 Channel 的文件描述符添加到 poll 组件中。
keys.add(ski)
将 SelectionKey 存到一个 HashSet 中。
Selector.select()
SelectorImpl 实现了 select 的一些检查,并且获取了多个锁(包括前面提到的 publicKeys 对象锁),多路复用的核心流程 doSelect()
则留给了子类实现,EPollSelectorImpl 进行一次 select 的实现如下:
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}
protected int doSelect(long timeout) throws IOException {
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
// 清理中断
// ...
return numKeysUpdated;
}
begin()
:线程被打断时触发 wakeup()
方法。
end()
:取消 begin 的设置。
processDeregisterQueue()
:清理上一次 select 后被 cancel 的 key 的资源。
最终返回更新过的 SelectionKey(即发生过事件的 Channel)的数量 updateSelectedKeys。
显然,关键方法为 poll()
:
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
// 处理中断
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
updateRegistrations()
方法中使用 epoll_ctl()
系统调用更新内核 epoll 结构中等待事件发生的 fd 列表的感兴趣事件。poll 的核心是 epollWait()
方法,也就是 epoll_wait()
系统调用,不再赘述。