netty reactor线程模型分析
生活随笔
收集整理的這篇文章主要介紹了
netty reactor线程模型分析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
netty4線程模型
ServerBootstrap http示例
// Configure the server.EventLoopGroup bossGroup = new EpollEventLoopGroup(1);EventLoopGroup workerGroup = new EpollEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.channel(EpollServerSocketChannel.class);b.option(ChannelOption.SO_BACKLOG, 1024);b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);b.group(bossGroup, workerGroup)// .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpHelloWorldServerInitializer(sslCtx));Channel ch = b.bind(PORT).sync().channel(); /* System.err.println("Open your web browser and navigate to " +(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');*/ch.closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}綁定過程:
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.executor = channel.eventLoop();}doBind0(regFuture, channel, localAddress, promise);}});return promise;}}初始化過程:
final ChannelFuture initAndRegister() {final Channel channel = channelFactory().newChannel();try {init(channel);} catch (Throwable t) {channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture = group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;}ServerBootStrap的初始化過程:
@Overridevoid init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = handler();if (handler != null) {pipeline.addLast(handler);} pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}接收器ServerBootstrapAcceptor
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}ThreadPerChannelEventLoopGroup實現注冊
@Overridepublic ChannelFuture register(Channel channel) {if (channel == null) {throw new NullPointerException("channel");}try { EventLoop l = nextChild();return l.register(channel, new DefaultChannelPromise(channel, l));} catch (Throwable t) {return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);}}獲取子eventLoop
private EventLoop nextChild() throws Exception {if (shuttingDown) {throw new RejectedExecutionException("shutting down");}EventLoop loop = idleChildren.poll();if (loop == null) {if (maxChannels > 0 && activeChildren.size() >= maxChannels) {throw tooManyChannels;}loop = newChild(childArgs);loop.terminationFuture().addListener(childTerminationListener);}activeChildren.add(loop);return loop;}產生新子eventLoop(SingleThreadEventExecutor.java)
/*** Create a new instance** @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it* @param executor the {@link Executor} which will be used for executing* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the* executor thread*/protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {super(parent);if (executor == null) {throw new NullPointerException("executor");}this.addTaskWakesUp = addTaskWakesUp;this.executor = executor;taskQueue = newTaskQueue();}其執行方法(SingleThreadEventExecutor.java):
@Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else { startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}啟動處理線程(SingleThreadEventExecutor.java):
private void startThread() {if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {doStartThread();}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}其中的run方法由其子類(DefaultEventLoop,EpollEventLoop,NioEventLoop,ThreadPerChannelEventLoop)各種實現,以NioEventLoop為例:
@Overrideprotected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {if (hasTasks()) {selectNow();} else {select(oldWakenUp);// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys(); runAllTasks();} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break;}}} catch (Throwable t) {logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to// excessive CPU consumption.try {Thread.sleep(1000);} catch (InterruptedException e) {// Ignore. }}}}運行所有任務(SingleThreadEventExecutor.java)
/*** Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.*/protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();Runnable task = pollTask();if (task == null) {return false;}final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception.", t);}runTasks ++;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}this.lastExecutionTime = lastExecutionTime;return true;}小結
本文從一個簡單的示例程序,一步步分析netty4的線程模型,從ServerBootstrapAcceptor到SingleThreadEventExecutor的源碼,環環相扣,可以根據上面的分析鏈理解
一個請求過來后,netty的處理流程。
?
轉載于:https://www.cnblogs.com/davidwang456/p/5118802.html
總結
以上是生活随笔為你收集整理的netty reactor线程模型分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: netty检测系统工具PlatformD
- 下一篇: 1号店11.11:从应用架构落地点谈高可