日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

netty reactor线程模型分析

發布時間:2025/4/5 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 netty reactor线程模型分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

netty4線程模型

ServerBootstrap http示例

// Configure the server.EventLoopGroup bossGroup = new EpollEventLoopGroup(1);EventLoopGroup workerGroup = new EpollEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.channel(EpollServerSocketChannel.class);b.option(ChannelOption.SO_BACKLOG, 1024);b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);b.group(bossGroup, workerGroup)// .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpHelloWorldServerInitializer(sslCtx));Channel ch = b.bind(PORT).sync().channel(); /* System.err.println("Open your web browser and navigate to " +(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');*/ch.closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}

綁定過程:

private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.executor = channel.eventLoop();}doBind0(regFuture, channel, localAddress, promise);}});return promise;}}

初始化過程:

final ChannelFuture initAndRegister() {final Channel channel = channelFactory().newChannel();try {init(channel);} catch (Throwable t) {channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture = group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;}

ServerBootStrap的初始化過程:

@Overridevoid init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = handler();if (handler != null) {pipeline.addLast(handler);} pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}

接收器ServerBootstrapAcceptor

@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}

ThreadPerChannelEventLoopGroup實現注冊

@Overridepublic ChannelFuture register(Channel channel) {if (channel == null) {throw new NullPointerException("channel");}try { EventLoop l = nextChild();return l.register(channel, new DefaultChannelPromise(channel, l));} catch (Throwable t) {return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);}}

獲取子eventLoop

private EventLoop nextChild() throws Exception {if (shuttingDown) {throw new RejectedExecutionException("shutting down");}EventLoop loop = idleChildren.poll();if (loop == null) {if (maxChannels > 0 && activeChildren.size() >= maxChannels) {throw tooManyChannels;}loop = newChild(childArgs);loop.terminationFuture().addListener(childTerminationListener);}activeChildren.add(loop);return loop;}

產生新子eventLoop(SingleThreadEventExecutor.java)

/*** Create a new instance** @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it* @param executor the {@link Executor} which will be used for executing* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the* executor thread*/protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {super(parent);if (executor == null) {throw new NullPointerException("executor");}this.addTaskWakesUp = addTaskWakesUp;this.executor = executor;taskQueue = newTaskQueue();}

其執行方法(SingleThreadEventExecutor.java):

@Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else { startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

啟動處理線程(SingleThreadEventExecutor.java):

private void startThread() {if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {doStartThread();}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}

其中的run方法由其子類(DefaultEventLoop,EpollEventLoop,NioEventLoop,ThreadPerChannelEventLoop)各種實現,以NioEventLoop為例:

@Overrideprotected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {if (hasTasks()) {selectNow();} else {select(oldWakenUp);// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys(); runAllTasks();} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break;}}} catch (Throwable t) {logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to// excessive CPU consumption.try {Thread.sleep(1000);} catch (InterruptedException e) {// Ignore. }}}}

運行所有任務(SingleThreadEventExecutor.java)

/*** Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.*/protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();Runnable task = pollTask();if (task == null) {return false;}final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception.", t);}runTasks ++;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}this.lastExecutionTime = lastExecutionTime;return true;}

小結

  本文從一個簡單的示例程序,一步步分析netty4的線程模型,從ServerBootstrapAcceptor到SingleThreadEventExecutor的源碼,環環相扣,可以根據上面的分析鏈理解

一個請求過來后,netty的處理流程。

?

轉載于:https://www.cnblogs.com/davidwang456/p/5118802.html

總結

以上是生活随笔為你收集整理的netty reactor线程模型分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 欧美视频在线观看免费 | 97超碰福利 | 日韩欧美第一区 | 美女屁股眼视频网站 | 欧美大片大全 | 手机在线观看免费av | 欧美精品亚洲精品 | wwww在线观看 | 日韩a级片在线观看 | 看了下面会湿的视频 | 亚洲中字 | 亚洲黄片一区二区 | 一区精品在线观看 | 91麻豆国产| 色播五月综合 | 亚洲第二色 | 中文资源在线观看 | 亚洲免费一级片 | 四虎av网址 | 国产在线观看av | 午夜av不卡 | 五月综合久久 | 欧美精品网站 | 天天色棕合合合合合合合 | 福利一二三区 | 精品成人一区二区 | 日本色视| 国产乡下妇女做爰视频 | 成人动作片 | 深夜精品福利 | 一级做a爱片久久毛片 | 亚洲色图欧美 | 日韩午夜在线播放 | 婷婷成人av | 91视频久久 | 国产精品一区二区白浆 | av最新天堂| 第五色婷婷 | 欧美色涩 | 国产综合精品久久久久成人影 | 亚洲精品国产精品国自产网站 | 男人的天堂网av | 亚洲欧美激情在线 | 91亚洲精品乱码久久久久久蜜桃 | 色视频免费在线观看 | av一起看香蕉 | 国产精品腿扒开做爽爽爽挤奶网站 | 日本熟妇一区二区 | 久久国产成人精品 | 成人av番号网 | 麻豆传媒观看 | 最新的黄色网址 | 亚洲一区二区精品在线 | 亚洲精品免费在线 | 久久久情 | 成人片黄网站久久久免费 | 极品少妇xxxx精品少妇偷拍 | av在线不卡观看 | 成年人av| 四虎网站在线 | 亚洲av熟女国产一区二区性色 | 免费福利影院 | 三级麻豆 | 懂色av中文字幕 | 欧美3p视频 | 久久伊人影视 | 国产亚洲精品自拍 | 黄色大片在线免费观看 | 亚洲欧洲自拍 | 免费在线看黄网站 | 午夜亚洲aⅴ无码高潮片苍井空 | 亚洲性色视频 | 毛片一二三区 | 日韩在线国产精品 | 亚洲伦理一区二区三区 | 国产又粗又猛又爽又 | 波多野结衣中文字幕一区二区三区 | 日韩免费av一区 | 国产成人无码www免费视频播放 | 午夜插插插 | 四虎黄网| www.天堂在线| 1769国产精品视频 | 青青草在线免费 | 亚洲欧美自拍另类 | 日日噜噜噜噜人人爽亚洲精品 | 99热这里只有精品久久 | www四虎精品视频免费网站 | 中文字幕一区二区三区夫目前犯 | 狠狠干,狠狠操 | a国产在线| 亚洲成人h | 天堂av电影在线观看 | 2017日日夜夜 | 欧美综合亚洲图片综合区 | 在线播放精品 | aaaaav| 毛片在线不卡 | 日韩精品视频在线播放 |