Selector
概念
Selector是Java NIO中的一个组件,用于检查一个或多个NIO Channel的状态是否处于可读、可写。
如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
作用
用单线程处理多个channels的好处是我需要更少的线程来处理channel。
实际上,你甚至可以用一个线程来处理所有的channels。从操作系统的角度来看,切换线程开销是比较昂贵的,并且每个线程都需要占用系统资源,因此暂用线程越少越好。
需要留意的是,现代操作系统和CPU在多任务处理上已经变得越来越好,所以多线程带来的影响也越来越小。
如果一个CPU是多核的,如果不执行多任务反而是浪费了机器的性能。不过这些设计讨论是另外的话题了。
简而言之,通过Selector我们可以实现单线程操作多个channel。
这有一幅示意图,描述了单线程处理三个channel的情况:
简介
创建
Selector selector = Selector.open();
注册Channel到Selector上
为了同Selector挂了Channel,我们必须先把Channel注册到Selector上:
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
- 非阻塞的 Channel
Channel必须是非阻塞的。所以FileChannel不适用Selector,因为FileChannel不能切换为非阻塞模式。Socket channel可以正常使用。
- 第二个参数
注意register的第二个参数,这个参数是一个“关注集合”,代表我们关注的channel状态,有四种基础类型可供监听:
-
Connect
-
Accept
-
Read
-
Write
一个channel触发了一个事件也可视作该事件处于就绪状态。
因此当channel与server连接成功后,那么就是“连接就绪”状态。
server channel接收请求连接时处于“可连接就绪”状态。
channel有数据可读时处于“读就绪”状态。
channel可以进行数据写入时处于“写就绪”状态。
SelectionKey
上述的四种就绪状态用SelectionKey中的常量表示如下:
-
SelectionKey.OP_CONNECT
-
SelectionKey.OP_ACCEPT
-
SelectionKey.OP_READ
-
SelectionKey.OP_WRITE
如果对多个事件感兴趣可利用位的或运算结合多个常量,比如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
SelectionKey’s
在上一小节中,我们利用register方法把Channel注册到了Selectors上,这个方法的返回值是SelectionKeys,这个返回的对象包含了一些比较有价值的属性:
-
The interest set
-
The ready set
-
The Channel
-
The Selector
-
An attached object (optional)
这5个属性都代表什么含义呢?下面会一一介绍。
Interest Set
这个“关注集合”实际上就是我们希望处理的事件的集合,它的值就是注册时传入的参数,我们可以用按为与运算把每个事件取出来:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
Ready Set
“就绪集合”中的值是当前channel处于就绪的值,一般来说在调用了select方法后都会需要用到就绪状态,select的介绍在胡须文章中继续展开。
int readySet = selectionKey.readyOps();
从“就绪集合”中取值的操作类似月“关注集合”的操作,当然还有更简单的方法,SelectionKey提供了一系列返回值为boolean的的方法:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel + Selector
从SelectionKey操作Channel和Selector非常简单:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
Attaching Objects
我们可以给一个SelectionKey附加一个Object,这样做一方面可以方便我们识别某个特定的channel,同时也增加了channel相关的附加信息。
例如,可以把用于channel的buffer附加到SelectionKey上:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
附加对象的操作也可以在register的时候就执行:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
从Selector中选择channel
一旦我们向Selector注册了一个或多个channel后,就可以调用select来获取channel。select方法会返回所有处于就绪状态的channel。
select
select方法具体如下:
int select()
int select(long timeout)
int selectNow()
select() 方法在返回channel之前处于阻塞状态。 select(long timeout)和select做的事一样,不过他的阻塞有一个超时限制。
selectNow() 不会阻塞,根据当前状态立刻返回合适的channel。
select() 方法的返回值是一个int整形,代表有多少channel处于就绪了。
也就是自上一次select后有多少channel进入就绪。
举例来说,假设第一次调用select时正好有一个channel就绪,那么返回值是1,并且对这个channel做任何处理,接着再次调用select,此时恰好又有一个新的channel就绪,那么返回值还是1,现在我们一共有两个channel处于就绪,但是在每次调用select时只有一个channel是就绪的。
selectedKeys()
在调用select并返回了有channel就绪之后,可以通过选中的key集合来获取channel,这个操作通过调用selectedKeys()方法:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
还记得在register时的操作吧,我们register后的返回值就是SelectionKey实例,也就是我们现在通过selectedKeys()方法所返回的SelectionKey。
wakeUp()
由于调用select而被阻塞的线程,可以通过调用Selector.wakeup()来唤醒即便此时已然没有channel处于就绪状态。
具体操作是,在另外一个线程调用wakeup,被阻塞与select方法的线程就会立刻返回。
close()
当操作Selector完毕后,需要调用close方法。
close的调用会关闭Selector并使相关的SelectionKey都无效。channel本身不管被关闭。
完整案例
- SelectorTest.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class SelectorTest {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 4000));
Selector selector = Selector.open();
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
// 返回 channel 之前一直等待
int readyChannels = selector.select();
System.out.println("readyChannels: " + readyChannels);
if (readyChannels == 0) {
return;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
if (selectionKey.isAcceptable()) {
System.out.println("a connection was accepted by a ServerSocketChannel.");
} else if (selectionKey.isConnectable()) {
System.out.println("a connection was established with a remote server.");
} else if (selectionKey.isReadable()) {
// a channel is ready for reading
System.out.println("a channel is ready for reading");
} else if (selectionKey.isWritable()) {
System.out.println("a channel is ready for writing");
}
keyIterator.remove();
}
}
}
- 测试日志
不知道为何,可能要等待 10S 左右
readyChannels: 1
a channel is ready for reading