大纲
1.关于NioEventLoop的问题整理
2.理解Reactor线程模型主要分三部分
3.NioEventLoop的创建
4.NioEventLoop的启动
4.NioEventLoop的启动
(1)启动NioEventLoop的两大入口
(2)判断当前线程是否是NioEventLoop线程
(3)创建一个线程并启动
(4)NioEventLoop的启动总结
(1)启动NioEventLoop的两大入口
入口一:服务端启动,注册服务端Channel到Selector时
入口二:新连接接入,通过chooser绑定一个NioEventLoop时
下面先看入口一:
调用ServerBootstrap的bind()方法其实会调用AbstractBootstrap的doBind()方法,然后会调用AbstractBootstrap的initAndRegister()方法,接着执行config().group().register(channel)注册服务端Channel。最后会逐层深入调用到AbstractChannel.AbstractUnsafe的register()方法,来启动一个NioEventLoop将服务端Channel注册到Selector上。
//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {......
}//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.
//It support method-chaining to provide an easy way to configure the AbstractBootstrap.
//When not used in a ServerBootstrap context, the #bind() methods are useful for connectionless transports such as datagram (UDP).
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {...//Create a new Channel and bind it.public ChannelFuture bind(int inetPort) {//首先根据端口号创建一个InetSocketAddress对象,然后调用重载方法bind()return bind(new InetSocketAddress(inetPort));}//Create a new Channel and bind it.public ChannelFuture bind(SocketAddress localAddress) {//验证服务启动需要的必要参数validate();if (localAddress == null) throw new NullPointerException("localAddress");return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));}private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();//1.初始化和注册Channelfinal Channel channel = regFuture.channel();...doBind0(regFuture, channel, localAddress, promise);//2.绑定服务端端口...return promise;}final ChannelFuture initAndRegister() {Channel channel = null;...//1.创建服务端Channelchannel = channelFactory.newChannel();//2.初始化服务端Channelinit(channel);...//3.注册服务端Channel,比如通过NioEventLoopGroup的register()方法进行注册ChannelFuture regFuture = config().group().register(channel);...return regFuture;}...
}//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);...@Overridepublic final ServerBootstrapConfig config() {return config;}...
}public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {protected final B bootstrap;...protected AbstractBootstrapConfig(B bootstrap) {this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");}//Returns the configured EventLoopGroup or null if non is configured yet.public final EventLoopGroup group() {//比如返回一个NioEventLoopGroup对象return bootstrap.group();}...
}//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {......
}//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {...@Overridepublic ChannelFuture register(Channel channel) {//先通过next()方法获取一个NioEventLoop,然后通过NioEventLoop.register()方法注册服务端Channelreturn next().register(channel);}@Overridepublic EventLoop next() {return (EventLoop) super.next();}...
}//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {private final EventExecutorChooserFactory.EventExecutorChooser chooser;...@Overridepublic EventExecutor next() {//通过线程选择器chooser选择一个NioEventLoopreturn chooser.next();} ...
}//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {......
}//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {...@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");//调用AbstractUnsafe的register()方法promise.channel().unsafe().register(this, promise);return promise;}...
}//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private volatile EventLoop eventLoop;...//Unsafe implementation which sub-classes must extend and use.protected abstract class AbstractUnsafe implements Unsafe {...@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...//绑定事件循环器,即绑定一个NioEventLoop到该Channel上AbstractChannel.this.eventLoop = eventLoop;//注册Selector,并启动一个NioEventLoopif (eventLoop.inEventLoop()) {register0(promise);} else {...//通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上//其实执行的是SingleThreadEventExecutor的execute()方法eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});...}}}...
}
(2)判断当前线程是否是NioEventLoop线程
调用NioEventLoop的inEventLoop()方法可以判断当前线程是否是Netty的Reactor线程,也就是NioEventLoop对应的线程实体。NioEventLoop的线程实体被创建之后,会将该线程实体保存到NioEventLoop父类的成员变量thread中。
服务端启动、注册服务端Channel到Selector,执行到AbstractUnsafe.register()方法中的eventLoop.inEventLoop()代码时,会将main方法对应的主线程传递进来与this.thread进行比较。由于this.thread此时并未赋值,所以为空,因此inEventLoop()方法返回false,于是便会执行eventLoop.execute()代码创建一个线程并启动。
//SingleThreadEventLoop implementation which register the Channel's
//to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {......
}//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {......
}//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {......
}//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {......
}//Abstract base class for {@link EventExecutor} implementations.
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {...@Overridepublic boolean inEventLoop() {//注册服务端Channel时是通过主线程进行注册的,Thread.currentThread()对应的就是main线程//调用SingleThreadEventExecutor.inEventLoop()方法return inEventLoop(Thread.currentThread());}...
}//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private volatile Thread thread;...@Overridepublic boolean inEventLoop(Thread thread) {return thread == this.thread;//此时线程还没创建,this.thread为null}...
}
(3)创建一个线程并启动
AbstractUnsafe.register()方法准备将服务端Channel注册到Selector上时,首先在判断条件中执行eventLoop.inEventLoop()代码发现为false,于是便执行eventLoop.execute()代码创建一个线程并启动它去执行注册任务。而执行eventLoop.execute()代码其实就是调用SingleThreadEventExecutor的execute()方法。
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {...@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");//调用AbstractUnsafe的register()方法,并把NioEventLoop自己当作参数传入promise.channel().unsafe().register(this, promise);return promise;}...
}//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private volatile EventLoop eventLoop;...//Unsafe implementation which sub-classes must extend and use.protected abstract class AbstractUnsafe implements Unsafe {...@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...//绑定事件循环器,即绑定一个NioEventLoop到该Channel上AbstractChannel.this.eventLoop = eventLoop;//注册Selector,并启动一个NioEventLoopif (eventLoop.inEventLoop()) {register0(promise);} else {...//通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上//其实执行的是SingleThreadEventExecutor的execute()方法eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});...}}}...
}//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private volatile Thread thread;//创建NioEventLoop时会通过构造方法传入NioEventLoopGroup的线程执行器executorprivate final Executor executor;...@Overridepublic void execute(Runnable task) {if (task == null) throw new NullPointerException("task");boolean inEventLoop = inEventLoop();//判断当前线程是否是Netty的Reactor线程if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) reject();}if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop);}private void startThread() {//判断Reactor线程有没有被启动;如果没有被启动,则通过CAS调用doStartThread()方法启动线程if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {doStartThread();}}}private void doStartThread() {assert thread == null;//executor.execute()方法会创建出一个FastThreadLocalThread线程来执行Runnable任务//所以在Runnable的run()方法中,Thread.currentThread()指的是这个FastThreadLocalThread线程executor.execute(new Runnable() {@Overridepublic void run() {//Thread.currentThread()指的是FastThreadLocalThread线程thread = Thread.currentThread();...SingleThreadEventExecutor.this.run();//启动线程...}});}//具体的run()方法由子类比如NioEventLoop来实现protected abstract void run();...
}
SingleThreadEventExecutor的execute()方法的说明如下:
一.这个方法也可能会被用户代码使用,如ctx.executor().execute(task)。所以execute()方法里又调用inEventLoop()方法进行了一次外部线程判断,确保执行task任务时不会遇到线程问题。
二.如果当前线程不是Netty的Reactor线程,则调用startThread()方法启动一个Reactor线程。在startThread()方法中首先会判断当前NioEventLoop对应的Reactor线程实体有没有被启动。如果没有被启动,则通过设置CAS成功后调用doStartThread()方法启动线程。
三.执行doStartThread()方法时,会调用NioEventLoop的内部成员变量executor的execute()方法。executor就是线程执行器ThreadPerTaskExecutor,它的作用是每次执行Runnable任务时都会创建一个线程来执行。也就是executor.execute()方法会通过DefaultThreadFactory的newThread()方法,创建出一个FastThreadLocalThread线程来执行Runnable任务。
四.doStartThread()方法的Runnable任务会由一个FastThreadLocalThread线程来执行。在Runnable任务的run()方法里,会保存ThreadPerTaskExecutor创建出来的FastThreadLocalThread对象到SingleThreadEventExecutor的成员变量thread中,然后调用SingleThreadEventExecutor的run()方法。
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {private final EventExecutor[] children;private final EventExecutorChooserFactory.EventExecutorChooser chooser;... //Create a new instance.protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));//1.创建ThreadPerTaskExecutor线程执行器if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());//2.创建NioEventLoopchildren = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {...//创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数//传入线程执行器executor去创建NioEventLoopchildren[i] = newChild(executor, args);}//3.创建线程选择器chooser = chooserFactory.newChooser(children);...}protected ThreadFactory newDefaultThreadFactory() {//getClass()是获取该方法所属的对象类型,也就是NioEventLoopGroup类型//因为是通过NioEventLoopGroup的构造方法层层调用到这里的return new DefaultThreadFactory(getClass());}...
}public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) throw new NullPointerException("threadFactory");this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {//调用DefaultThreadFactory的newThread()方法执行Runnable任务threadFactory.newThread(command).start();}
}//A ThreadFactory implementation with a simple naming rule.
public class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolId = new AtomicInteger();private final AtomicInteger nextId = new AtomicInteger();private final boolean daemon;private final int priority;protected final ThreadGroup threadGroup;...public DefaultThreadFactory(Class<?> poolType) {this(poolType, false, Thread.NORM_PRIORITY);}public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {//toPoolName()方法会把NioEventLoopGroup的首字母变成小写this(toPoolName(poolType), daemon, priority);}public DefaultThreadFactory(String poolName, boolean daemon, int priority) {this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());}public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {...//prefix用来标记线程名字的前缀prefix = poolName + '-' + poolId.incrementAndGet() + '-';this.daemon = daemon;this.priority = priority;this.threadGroup = threadGroup;}@Overridepublic Thread newThread(Runnable r) {Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());if (t.isDaemon()) {if (!daemon) t.setDaemon(false);} else {if (daemon) t.setDaemon(true);}if (t.getPriority() != priority) t.setPriority(priority);return t;}protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(threadGroup, r, name);}...
}
NioEventLoop是如何与一个线程实体绑定的?NioEventLoop会通过线程执行器ThreadPerTaskExecutor创建一个FastThreadLocalThread,然后再将该FastThreadLocalThread线程保存到其成员变量中,从而实现与一个线程实体进行绑定。
(4)NioEventLoop的启动总结
一.在注册服务端Channel的过程中,主线程最终会调用AbstractUnsafe的register()方法。该方法首先会将一个NioEventLoop绑定到这个服务端Channel上,然后把实际注册Selector的逻辑封装成一个Runnable任务,接着调用NioEventLoop的execute()方法来执行这个Runnable任务。
二.NioEventLoop的execute()方法其实就是其父类SingleThreadEventExecutor的execute()方法,它会先判断当前调用execute()方法的线程是不是Netty的Reactor线程,如果不是就调用startThread()方法来创建一个Reactor线程。
三.startThread()方法会通过线程执行器ThreadPerTaskExecutor的execute()方法来创建一个线程。这个线程是一个FastThreadLocalThread线程,这个线程需要执行如下逻辑:把线程保存到NioEventLoop的成员变量thread中,然后调用NioEventLoop的run()方法来启动NioEventLoop。
NioEventLoop的启动流程如下:
bind() -> initAndRegister() -> config().group().register() -> eventloop.execute() //入口startThread() -> doStartThread() //创建线程ThreadPerTaskExecutor.execute() //线程执行器创建FastThreadLocalThread线程thread = Thread.currentThread() //保存FastThreadLocalThread线程到NioEventLoop的成员变量中NioEventLoop.run() //启动NioEventLoop
NioEventLoop的启动流程说明如下:
首先bind()方法会将具体绑定端口的操作封装成一个Runnable任务,然后调用NioEventLoop的execute()方法,接着Netty会判断调用execute()方法的线程是否是NIO线程,如果发现不是就会调用startThread()方法开始创建线程。
创建线程是通过线程执行器ThreadPerTaskExecutor来创建的。线程执行器的作用是每执行一个任务都会创建一个线程,而且创建出来的线程就是NioEventLoop底层的一个FastThreadLocalThread线程。
创建完FastThreadLocalThread线程后会执行一个Runnable任务,该Runnable任务首先会将这个线程保存到NioEventLoop对象。保存的目的是为了判断后续对NioEventLoop的相关执行线程是否为本身。如果不是就将封装好的一个任务放入TaskQueue中进行串行执行,实现线程安全。该Runnable任务然后会调用NioEventLoop的run()方法,从而启动NioEventLoop。NioEventLoop的run()方法是驱动Netty运转的核心方法。