【Java】NIO中Selector的select方法源码分析

该篇博客的有些内容和在之前介绍过了,在这里再次涉及到的就不详细说了,如果有不理解请看【Java】NIO中Channel的注册源码分析, 【Java】NIO中Selector的创建源码分析

 

Selector的创建在Windows下默认生成WindowsSelectorImpl对象,那么Selector的select方法使用的就是WindowsSelectorImpl的select方法,而在WindowsSelectorImpl下并没有覆盖这个方法,而是由其基类SelectorImpl实现的:

1 public int select() throws IOException {
2     return this.select(0L);
3 }

这个方法调用了另一个重载的方法:

1 public int select(long var1) throws IOException {
2     if (var1 < 0L) {
3         throw new IllegalArgumentException("Negative timeout");
4     } else {
5         return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
6     }
7 }

首先对var1参数的合法性进行判断,无参传入进来的是0,实则交给lockAndDoSelect方法去完成,并且令参数为-1。

private int lockAndDoSelect(long var1) throws IOException {synchronized(this) {if (!this.isOpen()) {throw new ClosedSelectorException();} else {Set var4 = this.publicKeys;int var10000;synchronized(this.publicKeys) {Set var5 = this.publicSelectedKeys;synchronized(this.publicSelectedKeys) {var10000 = this.doSelect(var1);}}return var10000;}}
}

在方法执行时先使用同步块包裹,使用this作为锁;进入同步块先判断当前的Selector对象是否关闭了,因为在初始化时就是开启状态,只有在关闭后isOpen才是false;isOpen是由AbstractSelector实现的:

 1 private AtomicBoolean selectorOpen = new AtomicBoolean(true);
 2 public final boolean isOpen() {
 3     return selectorOpen.get();
 4 }
 5 public final void close() throws IOException {
 6     boolean open = selectorOpen.getAndSet(false);
 7     if (!open)
 8         return;
 9     implCloseSelector();
10 }

可以看到在AbstractSelector中使用了原子化Boolean值表示开启关闭。

回到SelectorImpl的lockAndDoSelect,若是Selector已经关闭则抛出ClosedSelectorException异常,否则分别以publicKeys以及publicSelectedKeys为锁,最终的实现交给抽象方法doSelect完成;

1 protected abstract int doSelect(long var1) throws IOException;

其中publicKeys是供外部访问的SelectionKey集合,publicSelectedKeys是供外部访问并且已经就绪的SelectionKey集合。

因为使用的是WindowsSelectorImpl,所以来看看WindowsSelectorImpl的doSelect实现:

 1 protected int doSelect(long var1) throws IOException {
 2     if (this.channelArray == null) {
 3         throw new ClosedSelectorException();
 4     } else {
 5         this.timeout = var1;
 6         this.processDeregisterQueue();
 7         if (this.interruptTriggered) {
 8             this.resetWakeupSocket();
 9             return 0;
10         } else {
11             this.adjustThreadsCount();
12             this.finishLock.reset();
13             this.startLock.startThreads();
14 
15             try {
16                 this.begin();
17 
18                 try {
19                     this.subSelector.poll();
20                 } catch (IOException var7) {
21                     this.finishLock.setException(var7);
22                 }
23 
24                 if (this.threads.size() > 0) {
25                     this.finishLock.waitForHelperThreads();
26                 }
27             } finally {
28                 this.end();
29             }
30 
31             this.finishLock.checkForException();
32             this.processDeregisterQueue();
33             int var3 = this.updateSelectedKeys();
34             this.resetWakeupSocket();
35             return var3;
36         }
37     }
38 }

首先判断channelArray是否为空,上一篇博客说了channelArray是一个SelectionKeyImpl数组,SelectionKeyImpl负责记录Channel和SelectionKey状态,channelArray是根据连接的Channel数量动态维持的,初始化大小是8。

1 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8];

SelectionKeyImpl是SelectionKey的子类,只有当Selector调用close方法时,在回调函数中才会令channelArray=null,所以这还是检测Selector是否关闭了。
接着继续,在前面传入的long类型的参数是-1,在这里令超时时间timeout就等于-1,
接着调用processDeregisterQueue方法来取消准备撤销的集合
所谓的准备撤销的集合是因为SelectionKey对象在调用cancel方法时,会使Selector将其加入cancelledKeys,仅仅如此,真真的取消是在Selector调用selector方法时执行

SelectionKey的cancel方法是在AbstractSelectionKey中实现的:

 1 public final void cancel() {
 2    // Synchronizing "this" to prevent this key from getting canceled
 3    // multiple times by different threads, which might cause race
 4    // condition between selector's select() and channel's close().
 5    synchronized (this) {
 6        if (valid) {
 7            valid = false;
 8            ((AbstractSelector)selector()).cancel(this);
 9        }
10    }
11 }

这个方法在上一篇讲过,可以看到基本上什么都没做,仅仅时调用了与它关联的Selector对象(AbstractSelector)的cancel方法:
AbstractSelector的cancel方法:

1 private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
2 
3 void cancel(SelectionKey k) {                      
4     synchronized (cancelledKeys) {
5         cancelledKeys.add(k);
6     }
7 }

cancelledKeys就是所谓的准备撤销的集合,可以看到AbstractSelector的cancel方法仅仅是把此时请求取消的SelectionKey对象加入到cancelledKeys集合中,并没有多余的操作。

回到doSelect方法,processDeregisterQueue这个方法的实现是在SelectorImpl中:

 1 void processDeregisterQueue() throws IOException {
 2     Set var1 = this.cancelledKeys();
 3     synchronized(var1) {
 4         if (!var1.isEmpty()) {
 5             Iterator var3 = var1.iterator();
 6 
 7             while(var3.hasNext()) {
 8                 SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();
 9 
10                 try {
11                     this.implDereg(var4);
12                 } catch (SocketException var11) {
13                     throw new IOException("Error deregistering key", var11);
14                 } finally {
15                     var3.remove();
16                 }
17             }
18         }
19 
20     }
21 }

这个方法的逻辑比较简单,首先得到准备撤销的集合cancelledKeys,判断是否有请求取消的,若有那么就进行遍历,实际的取消操作主要逻辑交给了抽象方法implDereg执行,最后再从集合中删除这个SelectionKeyImpl对象。

implDereg方法的实现是在WindowsSelectorImpl中:

 1 protected void implDereg(SelectionKeyImpl var1) throws IOException {
 2     int var2 = var1.getIndex();
 3 
 4     assert var2 >= 0;
 5 
 6     Object var3 = this.closeLock;
 7     synchronized(this.closeLock) {
 8         if (var2 != this.totalChannels - 1) {
 9             SelectionKeyImpl var4 = this.channelArray[this.totalChannels - 1];
10             this.channelArray[var2] = var4;
11             var4.setIndex(var2);
12             this.pollWrapper.replaceEntry(this.pollWrapper, this.totalChannels - 1, this.pollWrapper, var2);
13         }
14 
15         var1.setIndex(-1);
16     }
17 
18     this.channelArray[this.totalChannels - 1] = null;
19     --this.totalChannels;
20     if (this.totalChannels != 1 && this.totalChannels % 1024 == 1) {
21         --this.totalChannels;
22         --this.threadsCount;
23     }
24 
25     this.fdMap.remove(var1);
26     this.keys.remove(var1);
27     this.selectedKeys.remove(var1);
28     this.deregister(var1);
29     SelectableChannel var7 = var1.channel();
30     if (!var7.isOpen() && !var7.isRegistered()) {
31         ((SelChImpl)var7).kill();
32     }
33 
34 }

首先获取SelectionKeyImpl的下标Index,这个下标就是其在channelArray中的下标,检验下标的合法性;
在同步块内,首先检验这个SelectionKeyImpl对象是否是数组的最后一个元素,若不是那么就直接用最后一个元素覆盖当前位置的SelectionKeyImpl对象,同时还需要将pollWrapper中最后一个元素对应的Channel描述符和事件响应覆盖到相应位置。无论该SelectionKeyImpl对象是否是最后一个,都将其下标置为-1,防止再次访问。

再完成上述操作后,channelArray中的最后一个元素必然是不需要的,直接置为null,再totalChannels再自减。
接着根据totalChannels的数量来判断是否需要减少轮询线程的个数,这和注册时同理,就不再多说。
然后在fdMap中移除掉该SelectionKeyImpl和Channel的描述符映射(fdMap保存的是Channel的描述符和SelectionKeyImpl的映射关系,在上一篇提到过),keys和selectedKeys中同样也需要移除(keys所有注册了的SelectionKey集合,selectedKeys是所有有事件就绪的SelectionKey集合)。

这些操作仅仅是删除了其在Selector中的映射关系,而真正的Channel的(虽说是SelectionKey的cancel方法,实则是Channel要取消对某一事件的响应)取消操作是在deregister中执行:
deregister方法在AbstractSelector中实现:

1 protected final void deregister(AbstractSelectionKey key) {
2     ((AbstractSelectableChannel)key.channel()).removeKey(key);
3 }

可以看到直接获取SelectionKey对应的channel对象,然后调用AbstractSelectableChannel的removeKey方法:

 1 void removeKey(SelectionKey k) {                  
 2     synchronized (keyLock) {
 3         for (int i = 0; i < keys.length; i++)
 4             if (keys[i] == k) {
 5                 keys[i] = null;
 6                 keyCount--;
 7             }
 8         ((AbstractSelectionKey)k).invalidate();
 9     }
10 }

前面的遍历很简单,通过遍历Channel的所有绑定的SelectionKey,即keys,直接将要取消的置为null,keyCount再自减,最后调用SelectionKey(AbstractSelectionKey)的invalidate方法:

1 void invalidate() {                               
2     valid = false;
3 }

直接设置valid属性为false,表明不可用。

回到implDereg中,最后一步操作,检查Channel的活跃性,若是Channel既没有打开且当且也没有注册了的SelectionKey,那么直接“杀死”该Channel。
而这个kill方法,在不同的Channel中有不同的实现,
SocketChannelImpl中:

 1 public void kill() throws IOException {
 2    Object var1 = this.stateLock;
 3     synchronized(this.stateLock) {
 4         if (this.state != 4) {
 5             if (this.state == -1) {
 6                 this.state = 4;
 7             } else {
 8                 assert !this.isOpen() && !this.isRegistered();
 9 
10                 if (this.readerThread == 0L && this.writerThread == 0L) {
11                     nd.close(this.fd);
12                     this.state = 4;
13                 } else {
14                     this.state = 3;
15                 }
16 
17             }
18         }
19     }
20 }

其中state表示SocketChannelImpl的状态,一共有六种:

1 private static final int ST_UNINITIALIZED = -1;     // 尚未初始化
2 private static final int ST_UNCONNECTED = 0;         // 尚未建立连接
3 private static final int ST_PENDING = 1;              // 未决状态
4 private static final int ST_CONNECTED = 2;             // 连接状态
5 private static final int ST_KILLPENDING = 3;         // KILL的未决状态
6 private static final int ST_KILLED = 4;             // KILL状态
7 private int state = -1;

这样就很清晰,若是SocketChannelImpl尚未初始化直接变为KILL状态,否则检查再次检查Channel的活跃性,若是不活跃就断言为false,直接结束,否则“杀死”。
接下来的判断中的readerThread和writerThread,我在看完SocketChannelImpl后,发现一直都是赋值的0,并不知道会在何时发生修改,而且这两个成员的赋值都是在有数据读、写操作后,若是有知道的朋友想请教一下!
这个就先不讨论了,但是通过它们的赋值都是发生在有数据读、写操作后,那么就可以明白,若是完成了读、写,那么直接变为KILL状态,否则,等待读、写完成,就变为KILL的未决状态。
其中 nd.close(this.fd),nd是Socket描述符,fd是文件描述符,这就是由操作系统来关闭Socket描述符对应的文件描述符。

ServerSocketChannelImpl中kill:

 1 private static final int ST_UNINITIALIZED = -1;      // 尚未初始化
 2 private static final int ST_INUSE = 0;                 // 使用中
 3 private static final int ST_KILLED = 1;             // KILL状态
 4 private int state = -1;
 5 
 6 public void kill() throws IOException {
 7     Object var1 = this.stateLock;
 8     synchronized(this.stateLock) {
 9         if (this.state != 1) {
10             if (this.state == -1) {
11                 this.state = 1;
12             } else {
13                 assert !this.isOpen() && !this.isRegistered();
14 
15                 nd.close(this.fd);
16                 this.state = 1;
17             }
18         }
19     }
20 }

ServerSocketChannelImpl就要简单一点,基本上一样,由于ServerSocketChannel只能注册ACCEPT事件响应,所以就没有判断读、写。

implDereg方法结束,processDeregisterQueue也彻底结束,再回到doSelect方法
接着检验interruptTriggered,表示是否触发中断。
interruptTriggered初始化时就是false,表示未触发中断,而在调用close或者wakeup方法时会触发中断,赋值true;

先看wakeup方法:

 1 public Selector wakeup() {
 2     Object var1 = this.interruptLock;
 3     synchronized(this.interruptLock) {
 4         if (!this.interruptTriggered) {
 5             this.setWakeupSocket();
 6             this.interruptTriggered = true;
 7         }
 8         
 9         return this;
10     }
11 }

可以看到核心是setWakeupSocket方法,当目前没有触发中断调用setWakeupSocket:

1 private void setWakeupSocket() {
2     this.setWakeupSocket0(this.wakeupSinkFd);
3 }
4 private native void setWakeupSocket0(int var1);

在讲Selector的创建时说过,在Selector创建时会产生一对SocketChannel,分别是SourceChannelImpl和SinkChannelImpl,wakeupSinkFd是SinkChannelImpl的描述符。

再来看看setWakeupSocket0的实现:

Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,jint scoutFd) {/* Write one byte into the pipe */const char byte = 1;send(scoutFd, &byte, 1, 0);
}

虽然是用C写的,但是依旧很清晰,就是通过这个双向通道的sink端向source发送一个字节的数据,这样source端描述符就进入就绪状态,就能被select感知到,Selector便被唤醒。

再来看下close方法,在AbstractSelector中实现的:

1 public final void close() throws IOException {
2     boolean open = selectorOpen.getAndSet(false);
3     if (!open)
4         return;
5     implCloseSelector();
6 }

核心是implCloseSelector,在SelectorImpl中实现:

 1 public void implCloseSelector() throws IOException {
 2     this.wakeup();
 3     synchronized(this) {
 4         Set var2 = this.publicKeys;
 5         synchronized(this.publicKeys) {
 6             Set var3 = this.publicSelectedKeys;
 7             synchronized(this.publicSelectedKeys) {
 8                 this.implClose();
 9             }
10         }
11 
12     }
13 }

一开始就直接调用wakeup方法唤醒,然后调用implClose方法:
implClose是在WindowsSelectorImpl中实现的:

 1 protected void implClose() throws IOException {
 2     Object var1 = this.closeLock;
 3     synchronized(this.closeLock) {
 4         if (this.channelArray != null && this.pollWrapper != null) {
 5             Object var2 = this.interruptLock;
 6             synchronized(this.interruptLock) {
 7                 this.interruptTriggered = true;
 8             }
 9 
10             this.wakeupPipe.sink().close();
11             this.wakeupPipe.source().close();
12 
13             for(int var7 = 1; var7 < this.totalChannels; ++var7) {
14                 if (var7 % 1024 != 0) {
15                     this.deregister(this.channelArray[var7]);
16                     SelectableChannel var3 = this.channelArray[var7].channel();
17                     if (!var3.isOpen() && !var3.isRegistered()) {
18                         ((SelChImpl)var3).kill();
19                     }
20                 }
21             }
22 
23             this.pollWrapper.free();
24             this.pollWrapper = null;
25             this.selectedKeys = null;
26             this.channelArray = null;
27             Iterator var8 = this.threads.iterator();
28 
29             while(var8.hasNext()) {
30                 WindowsSelectorImpl.SelectThread var9 = (WindowsSelectorImpl.SelectThread)var8.next();
31                 var9.makeZombie();
32             }
33 
34             this.startLock.startThreads();
35         }
36 
37     }
38 }

根据channelArray和pollWrapper是否为null来检验是否有必要关闭资源,后面就是对一些资源的关闭,可以看到关闭了我们一开始建立的双向通道,取消了所有注册事件,顺便“杀死”不活跃的Channel,删除所有映射关系,将所有轮询线程从阻塞中唤醒,关于makeZombie和startLock后面给出。

再次回到doSelect上,若是发生了中断,调用resetWakeupSocket方法恢复中断:

1 private void resetWakeupSocket() {
2     Object var1 = this.interruptLock;
3     synchronized(this.interruptLock) {
4         if (this.interruptTriggered) {
5             this.resetWakeupSocket0(this.wakeupSourceFd);
6             this.interruptTriggered = false;
7         }
8     }
9 }

resetWakeupSocket0也是一个native方法,和setWakeupSocket0正好互补,用来读取setWakeupSocket0中发送的数据,再将interruptTriggered设置为false,最后doSelect将会立即返回0,而不会调用poll操作。

在doSelect判断没有触发中断后,首先调用adjustThreadsCount调整轮询线程数量:

 1 private void adjustThreadsCount() {
 2     int var1;
 3     if (this.threadsCount > this.threads.size()) {
 4         for(var1 = this.threads.size(); var1 < this.threadsCount; ++var1) {
 5             WindowsSelectorImpl.SelectThread var2 = new WindowsSelectorImpl.SelectThread(var1);
 6             this.threads.add(var2);
 7             var2.setDaemon(true);
 8             var2.start();
 9         }
10     } else if (this.threadsCount < this.threads.size()) {
11         for(var1 = this.threads.size() - 1; var1 >= this.threadsCount; --var1) {
12             ((WindowsSelectorImpl.SelectThread)this.threads.remove(var1)).makeZombie();
13         }
14     }
15 
16 }

threads是用ArrayList存放的:

1 private final List<WindowsSelectorImpl.SelectThread> threads = new ArrayList();

逻辑比较简单,通过检查threadsCount的数量和threads的大小比较,若是threadsCount大于threads,则产生一个新的轮询线程SelectThread,将其加入threads,并且设置轮询线程是守护线程,直接启动;若是threadsCount小于threads,则移除并唤醒多余的轮询线程;若是threadsCount等于threads什么都不做。

来看一下SelectThread这个轮询线程具体是怎么工作的:

 1 private final class SelectThread extends Thread {
 2     private final int index;
 3     final WindowsSelectorImpl.SubSelector subSelector;
 4     private long lastRun;
 5     private volatile boolean zombie;
 6 
 7     private SelectThread(int var2) {
 8         this.lastRun = 0L;
 9         this.index = var2;
10         this.subSelector = WindowsSelectorImpl.this.new SubSelector(var2);
11         this.lastRun = WindowsSelectorImpl.this.startLock.runsCounter;
12     }
13 
14     void makeZombie() {
15         this.zombie = true;
16     }
17 
18     boolean isZombie() {
19         return this.zombie;
20     }
21 
22     public void run() {
23         for(; !WindowsSelectorImpl.this.startLock.waitForStart(this); WindowsSelectorImpl.this.finishLock.threadFinished()) {
24             try {
25                 this.subSelector.poll(this.index);
26             } catch (IOException var2) {
27                 WindowsSelectorImpl.this.finishLock.setException(var2);
28             }
29         }
30 
31     }
32 }

在构造方法中对几个成员完成初始化,index对应的是其在ArrayList中的下标,lastRun 和startLock有关等会再说,subSelector是真正执行轮询的对象;zombie是一个标志,在startLock中会使用到。
再来看run方法,核心就是调用subSelector的poll方法,而何时调用该方法由startLock来决定。

StartLock的定义:

 1 private final class StartLock {
 2     private long runsCounter;
 3 
 4     private StartLock() {
 5     }
 6 
 7     private synchronized void startThreads() {
 8         ++this.runsCounter;
 9         this.notifyAll();
10     }
11 
12     private synchronized boolean waitForStart(WindowsSelectorImpl.SelectThread var1) {
13         while(this.runsCounter == var1.lastRun) {
14             try {
15                 WindowsSelectorImpl.this.startLock.wait();
16             } catch (InterruptedException var3) {
17                 Thread.currentThread().interrupt();
18             }
19         }
20 
21         if (var1.isZombie()) {
22             return true;
23         } else {
24             var1.lastRun = this.runsCounter;
25             return false;
26         }
27     }
28 }

在startThreads方法中,仅仅是通过synchronized 包裹,使runsCounter自增,然后notifyAll唤醒所有持有StartLock对象锁的阻塞。
在WindowsSelectorImpl中StartLock对象有且只有一份,对于所有SelectThread来说StartLock是公共的
waitForStart方法需要结合SelectThread的run方法来看,首先先检验SelectThread的lastRun成员是否和runsCounter相等,若是相等直接阻塞,等待startThreads方法将其唤醒;若是不相等,说明它的run是在startThreads之后运行的,需要将lastRun更新后再执行。

回到SelectThread中,我们再来看看SubSelector的定义:

 1 private final class SubSelector {
 2     private final int pollArrayIndex;
 3     private final int[] readFds;
 4     private final int[] writeFds;
 5     private final int[] exceptFds;
 6     
 7     private SubSelector() {
 8         this.readFds = new int[1025];
 9         this.writeFds = new int[1025];
10         this.exceptFds = new int[1025];
11         this.pollArrayIndex = 0;
12     }
13     
14     private SubSelector(int var2) {
15         this.readFds = new int[1025];
16         this.writeFds = new int[1025];
17         this.exceptFds = new int[1025];
18         this.pollArrayIndex = (var2 + 1) * 1024;
19     }
20     ......
21 }

其中无参构造是WindowsSelectorImpl使用的,单参构造由SelectThread使用。
之前在讲Channel的注册时说过,每1024个注册了的Channel会开启一个SelectThread轮询,如果是1024个以内,那么直接由WindowsSelectorImpl轮询,不交给SelectThread处理,超过1024则WindowsSelectorImpl和SelectThread一起轮询。

readFds 、writeFds、exceptFds 分别对应读、写、异常描述符 ,在SubSelector构造中初始化大小都是1025,多出来的一个就是前面说过的wakeupSourceFd描述符,用于唤醒,所以是1025。pollArrayIndex 对应其在pollWrapper中的wakeupSourceFd描述符的起始位置。

再来看看poll方法:

1 private int poll() throws IOException {
2     return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
3 }
4 
5 private int poll(int var1) throws IOException {
6     return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress + (long)(this.pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(1024, WindowsSelectorImpl.this.totalChannels - (var1 + 1) * 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
7 }
8 
9 private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);

无参poll方法是WindowsSelectorImpl执行的,单参poll是由SelectThread执行;
最后都调用poll0这个native方法,这个方法是真正的轮询核心,交由操作系统来完成。
其中pollArrayAddress是pollArray在内存空间的起始位置,在poll()中直接定位到最开始,而在poll(int var1)中通过加上pollArrayIndex * PollArrayWrapper.SIZE_POLLFD这个偏移量定位。
PollArrayWrapper.SIZE_POLLFD是8,表示pollWrapper中存放的一对Channel描述符和事件响应共8位,0-3位保存Channel描述符fdVal,4-7位保存事件响应events。
第二个参数表明需要底层轮询的描述符fd个数,最后一个是超时时间,若是底层超时是会结束的。

还是回到doSelect方法,在adjustThreadsCount调整完轮询线程后,调用finishLock的reset方法
finishLock定义如下:

 1 private final class FinishLock {
 2     private int threadsToFinish;
 3     IOException exception;
 4     
 5     private FinishLock() {
 6         this.exception = null;
 7     }
 8     
 9     private void reset() {
10         this.threadsToFinish = WindowsSelectorImpl.this.threads.size();
11     }
12     
13     private synchronized void threadFinished() {
14         if (this.threadsToFinish == WindowsSelectorImpl.this.threads.size()) {
15             WindowsSelectorImpl.this.wakeup();
16         }
17     
18         --this.threadsToFinish;
19         if (this.threadsToFinish == 0) {
20             this.notify();
21         }
22     
23     }
24     ......
25 }

这个和startLock很相似,也是WindowsSelectorImpl持有,有且仅有一份,所有SelectThread共享,reset方法用来记录在当前select方法执行时需要的轮询线程个数,在SelectThread的run方法中执行完poll方法后,会执行threadFinished,首先this.threadsToFinish == WindowsSelectorImpl.this.threads.size()的判断是为帮助唤醒所有处于poll阻塞的轮询。SelectThread执行完毕,就需要让threadsToFinish自减,至于notify的唤醒和后面有关系。

doSelect中执行完finishLock的reset后,就需要调用startLock的startThreads唤醒所有轮询线程工作。接着调用begin方法:
begin方法在AbstractSelector中实现:

 1 private Interruptible interruptor = null;
 2 
 3 protected final void begin() {
 4     if (interruptor == null) {
 5         interruptor = new Interruptible() {
 6                 public void interrupt(Thread ignore) {
 7                     AbstractSelector.this.wakeup();
 8                 }};
 9     }
10     AbstractInterruptibleChannel.blockedOn(interruptor);
11     Thread me = Thread.currentThread();
12     if (me.isInterrupted())
13         interruptor.interrupt(me);
14 }

若是中断器interruptor=null,就创建一个,当当前线程阻塞在I/O操作上并且发生了线程级别的中断时,就会调用wakeup方法唤醒Selector。

doSelect中begin完毕后,调用subSelector的poll方法轮询;若是poll上有事件就绪,那么就不会阻塞,继续往下进行;若poll上没有事件就绪就会等待SelectThread上的事件就绪,通过threadFinished将其唤醒;若是SelectThread上也没有事件就绪就会一直阻塞,除非被外部唤醒,或者调用的是select的单参方法,会阻塞到超时结束。

接着判断是否有轮询线程的工作,调用waitForHelperThreads等待轮询线程的结束:

 1 private synchronized void waitForHelperThreads() {
 2     if (this.threadsToFinish == WindowsSelectorImpl.this.threads.size() {
 3         WindowsSelectorImpl.this.wakeup();
 4     }
 5 
 6     while(this.threadsToFinish != 0) {
 7         try {
 8             WindowsSelectorImpl.this.finishLock.wait();
 9         } catch (InterruptedException var2) {
10             Thread.currentThread().interrupt();
11         }
12     }
13 
14 }

waitForHelperThreads方法就呼应了threadFinished方法,若是threadsToFinish != 0说明还有轮询线程没有结束,就wait阻塞,一直等到threadsToFinish == 0时再将其唤醒。

当所有轮询结束后,调用end方法:

1 protected final void end() {
2     AbstractInterruptibleChannel.blockedOn(null);
3 }

这个方法是处理发生中断,具体就不详细介绍了。

然后调用finishLock的checkForException方法检查异常,这个没啥好说的,然后又调用processDeregisterQueue来取消可能在select轮询时发生的SelectionKeyl的撤销。

接着调用updateSelectedKeys方法:

 1 private long updateCount = 0L;
 2 
 3 private int updateSelectedKeys() {
 4     ++this.updateCount;
 5     byte var1 = 0;
 6     int var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount);
 7 
 8     WindowsSelectorImpl.SelectThread var3;
 9     for(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) {
10         var3 = (WindowsSelectorImpl.SelectThread)var2.next();
11     }
12 
13     return var4;
14 }

updateCount记录更新次数,即select调用次数;然后调用subSelector的processSelectedKeys方法,得到poll返回的就绪的Channel描述符,也就是得到事件就绪的Channel个数,同理也就需要得到所有SelectThread中的。

其中processSelectedKeys方法如下:

1 private int processSelectedKeys(long var1) {
2     byte var3 = 0;
3     int var4 = var3 + this.processFDSet(var1, this.readFds, Net.POLLIN, false);
4     var4 += this.processFDSet(var1, this.writeFds, Net.POLLCONN | Net.POLLOUT, false);
5     var4 += this.processFDSet(var1, this.exceptFds, Net.POLLIN | Net.POLLCONN | Net.POLLOUT, true);
6     return var4;
7 }

分别对读、写、异常都处理了,主要还是调用processFDSet方法:

 1 private int processFDSet(long var1, int[] var3, int var4, boolean var5) {
 2     int var6 = 0;
 3 
 4     for(int var7 = 1; var7 <= var3[0]; ++var7) {
 5         int var8 = var3[var7];
 6         if (var8 == WindowsSelectorImpl.this.wakeupSourceFd) {
 7             synchronized(WindowsSelectorImpl.this.interruptLock) {
 8                 WindowsSelectorImpl.this.interruptTriggered = true;
 9             }
10         } else {
11             WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8);
12             if (var9 != null) {
13                 SelectionKeyImpl var10 = var9.ski;
14                 if (!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {
15                     if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
16                         if (var9.clearedCount != var1) {
17                             if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
18                                 var9.updateCount = var1;
19                                 ++var6;
20                             }
21                         } else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
22                             var9.updateCount = var1;
23                             ++var6;
24                         }
25 
26                         var9.clearedCount = var1;
27                     } else {
28                         if (var9.clearedCount != var1) {
29                             var10.channel.translateAndSetReadyOps(var4, var10);
30                             if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
31                                 WindowsSelectorImpl.this.selectedKeys.add(var10);
32                                 var9.updateCount = var1;
33                                 ++var6;
34                             }
35                         } else {
36                             var10.channel.translateAndUpdateReadyOps(var4, var10);
37                             if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
38                                 WindowsSelectorImpl.this.selectedKeys.add(var10);
39                                 var9.updateCount = var1;
40                                 ++var6;
41                             }
42                         }
43 
44                         var9.clearedCount = var1;
45                     }
46                 }
47             }
48         }
49     }
50 
51     return var6;
52 }

这个方法其实就是把poll0方法轮询的描述符结果放入传入的数组中,然后通过遍历这个数组,得到相应的Channel描述符,因为之前通过fdMap保存了Channel的描述符和SelectionKeyImpl的映射关系,那么就可以根据Channel描述符找到对应的SelectionKeyImpl对象,再根据传入的状态值var4来更新Channel的状态,最后将其保存在selectedKeys集合中供外部访问。


Selector的select方法到此全部结束。

转载于:https://www.cnblogs.com/a526583280/p/10890215.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/350509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

简单小程序代码_小程序该如何运营

小型程序升级有三种最重要的方法&#xff1a;1.借助微信附近小程序功能实现客户扩展大家可能都知道&#xff0c;微信平台具有附近小程序的功能。此功能使用户可以查看周围五公里范围内的所有小程序。正是由于这个原因&#xff0c;一旦商人完成了这个小程序并加入了附近的小程序…

在javafx中界面主题_最小的JavaFX演示文稿(在JavaFX中)

在javafx中界面主题如果您要进行有关JavaFX的演示&#xff0c;那么使用JavaFX本身进行演示非常方便。 这样&#xff0c;您无需离开演示文稿即可轻松显示示例。 这是一个非常简单的示例。 在NetBeans中&#xff0c;设置一个新的JavaFX项目“ New Project”->“ JavaFX”->…

OpenCV cvLine

cvLine 函数原型&#xff1a; void cvLine( CvArr* img, CvPoint pt1, CvPoint pt2, CvScalar color, int thickness1, int line_type8, int shift0 ); img 图像。pt1 线段的第一个端点。pt2 线段的第二个端点。color 线段的颜色。thickness 线段的粗细程度。line_type 线段…

lengthOfLongestSubstring

lengthOflongestSubstring 试例&#xff1a; 输入: "abcabcbb" 输出: 3 解释: 因为无重复字符的最长子串是 "abc"&#xff0c;所以其长度为 3。 /**** param {string} s* 无重复字符的最长子串*/ function lengthOfLongestSubstring(s) {if (typeof s ! s…

编译原理--词法分析C++

一、实验项目要求 1.实验目的 通过设计编制调试一个具体的词法分析程序&#xff0c;加深对词法分析原理的理解。并掌握在对程序设计语言源程序进行扫描过程中将其分解为各类单词的词法分析方法。 编制一个读单词过程&#xff0c;从输入的源程序中&#xff0c;识别出各个具有…

python list转map_Python 进阶之术 Map Filter Reduce

" 本文字数&#xff1a;763 字 || 阅读时间&#xff1a;3分钟"Map Map 会将⼀个函数映射到⼀个输⼊列表的所有元素上。这是它的规范&#xff1a;规范 map(function_to_apply, list_of_inputs) ⼤多数时候&#xff0c;我们要把列表中所有元素⼀个个地传递给⼀个函数&a…

JavaFX动画工具

好的&#xff0c;我想是时候让您讲一个小秘密了。 在过去的大约三个月中&#xff0c;我从事一个私有项目&#xff0c;目标是创建一个工具&#xff0c;使我可以轻松地为Java桌面应用程序创建动画。 JavaFX在API级别上提供了出色的动画制作支持&#xff0c;但对于初学者甚至中级程…

ImageWatch的使用

1.能够实时显示图像和矩阵Mat的内容&#xff0c;跟Matlab很像&#xff0c;方便程序调试&#xff0c;相当好用。 下载地址&#xff1a; https://marketplace.visualstudio.com/items?itemNameVisualCPPTeam.ImageWatch 2.下载好插件ImageWatch&#xff0c;双击ImageWatch.vs…

springboot与任务(邮件任务)

邮件发送需要引入spring-boot-starter-mailSpring Boot 自动配置MailSenderAutoConfiguration定义MailProperties内容&#xff0c;配置在application.yml中自动装配JavaMailSender 测试邮件发送 pom文件配置&#xff1a; <!--邮件发送--><dependency><groupId&g…

具有ESB,API管理和Now ..服务网格的应用程序网络功能。

我最近谈论了微服务模式的演变&#xff0c;以及来自Lyft的Envoy之类的服务代理如何帮助将弹性&#xff0c;服务发现&#xff0c;路由&#xff0c;指标收集等责任推到应用程序下一层。 否则&#xff0c;我们冒着希望并祈祷各种应用程序将正确实现这些关键功能或依赖于特定语言的…

宝塔定时任务执行php源码任务_Linux at命令详解:定时执行任务

要想使用 at 命令&#xff0c;读者需提前安装好 at 软件包&#xff0c;并开启 atd 服务。因此&#xff0c;首先来看看如何安装 at 软件包。在 Linux 系统中&#xff0c;查看 at 软件包是否已安装&#xff0c;可以使用 rpm -q 命令&#xff0c;如下所示&#xff1a;[rootlocalho…

Eigen(1)配置VS2015

1.在Eigen官网下载Eigen&#xff1a; Windows选压缩包zip下载&#xff0c;点击一下即可。下载后解压。 2.配置VS2015 打开vs2013&#xff0c;可以新建一个项目&#xff0c;也可以打开以前的项目&#xff0c;右键项目名&#xff0c;选择属性&#xff0c;在c/c附加目录里加入解压…

尺度不变性是指什么不变_不变性的来龙去脉

尺度不变性是指什么不变因此&#xff0c;在我的第一篇文章中&#xff0c;我谈到了一些构建器模式&#xff0c;并提到了一个非常强大但却被忽视的概念&#xff1a;不变性。 什么是不可变类&#xff1f; 这只是一个其实例无法修改的类。 类属性的每个值都在其声明或其构造函数中…

光模块

一、光模块&#xff1a; 1、光模块&#xff08;opTIcalmodule&#xff09;由光电子器件、功能电路和光接口等组成&#xff0c;光电子器件包括发射和接收两部分。 发射部分是&#xff1a;输入一定码率的电信号经内部的驱动芯片处理后驱动半导体激光器&#xff08;LD&#xff09;…

radmin提示授权码过期_IdentityServer4 客户端授权模式(Client Credentials)

(给DotNet加星标&#xff0c;提升.Net技能)转自&#xff1a;朝闲cnblogs.com/Zing/p/13361386.html前言1、源码(.NET Core 2.2)git地址&#xff1a;https://github.com/yizhaoxian/CoreIdentityServer4Demo.git2、相关章节2.1、《IdentityServer4 (1) 客户端授权模式(Client Cr…

OpenCV访问像素点的灰度值

1.Mat矩阵数值的存储方式 这里以指针的方式访问图像素为例 (1)单通道 定义一个单通道图像&#xff1a; cv::Mat img_1 (320, 640, CV_8UC1, Scalar(0)); 对于单通道M(i,j)即为第i行j列的其灰度值&#xff1b;程序中表示为&#xff1a; img_1.ptr<uchar>(i)[j]; (2)多通道…

asp.net中大文件下载

因为IIS支持的最大文件为int32的最大值位数的文件下载&#xff0c;所以&#xff0c;超过2G的文件无法通过IIS进行下载。 通过网上查找的资料&#xff0c;如下可实现文件的下载&#xff0c;使用filestream进行下载。 public void download(){System.IO.Stream iStream null;byt…

Java的终结器仍然存在

当我第一次学习Java并从C 过渡到Java时&#xff0c;我记得我经常被告知&#xff0c;经常读到它不应该像C 析构函数那样对待Java终结器&#xff0c;也不应该依靠它。 该建议的频率和坚持性对我产生了影响&#xff0c;以至于我无法回忆起我上一次编写finalize&#xff08;&#x…

时隙aloha协议仿真程序_工控ModbusTCP/IP协议仿真环境搭建

01ModbusTCP/IP协议简介Modbus TCP/IP是简单的、中立厂商的用于管理和控制自动化设备的MODBUS系列通讯协议的派生产品。它覆盖了使用TCP/IP协议的“intranet”和“internet”环境中MODBUS报文的用途。协议的最通用用途是为诸如PLC’S&#xff0c;I/O模块&#xff0c;以及连接其…

vector的初始化及常用操作

1.vector的初始化&#xff1a;可以有五种方式,举例说明如下&#xff1a; &#xff08;1&#xff09; vector<int> a(10); //定义了10个整型元素的向量&#xff08;尖括号中为元素类型名&#xff0c;它可以是任何合法的数据类型&#xff09;&#xff0c;但没有给出初值&a…