宜人贷蜂巢API网关技术解密之Netty使用实践
2019獨角獸企業(yè)重金招聘Python工程師標準>>>
一、背景
宜人貸蜂巢團隊,由Michael創(chuàng)立于2013年,通過使用互聯(lián)網(wǎng)科技手段助力金融生態(tài)和諧健康發(fā)展。自成立起一直致力于多維度數(shù)據(jù)閉環(huán)平臺建設(shè)。目前團隊規(guī)模超過百人,涵蓋征信、電商、金融、社交、五險一金和保險等用戶授信數(shù)據(jù)的抓取解析業(yè)務(wù),輔以先進的數(shù)據(jù)分析、挖掘和機器學(xué)習(xí)等技術(shù)對用戶信用級別、欺詐風(fēng)險進行預(yù)測評定,全面對外輸出金融反欺詐、社交圖譜、自動化模型定制等服務(wù)或產(chǎn)品。
目前宜人貸蜂巢基于用戶授權(quán)數(shù)據(jù)實時抓取解析技術(shù),并結(jié)合頂尖大數(shù)據(jù)技術(shù),快速迭代和自主的創(chuàng)新,已形成了強大而領(lǐng)先的聚合和輸出能力。
為了適應(yīng)完成宜人貸蜂巢強大的服務(wù)輸出能力,蜂巢設(shè)計開發(fā)了自己的API網(wǎng)關(guān)系統(tǒng),集中實現(xiàn)了鑒權(quán)、加解密、路由、限流等功能,使各業(yè)務(wù)抓取團隊關(guān)注其核心抓取和分析工作,而API網(wǎng)關(guān)系統(tǒng)更專注于安全、流量、路由等問題,從而更好的保障蜂巢服務(wù)系統(tǒng)的質(zhì)量。今天帶著大家解密API網(wǎng)關(guān)的Netty線程池技術(shù)實踐細節(jié)。
API網(wǎng)關(guān)作為宜人貸蜂巢數(shù)據(jù)開放平臺的統(tǒng)一入口,所有的客戶端及消費端通過統(tǒng)一的API來使用各類抓取服務(wù)。從面向?qū)ο笤O(shè)計的角度看,它與外觀模式類似,包裝各類不同的實現(xiàn)細節(jié),對外表現(xiàn)出統(tǒng)一的調(diào)用形式。
本文首先簡要地介紹API網(wǎng)關(guān)的項目框架,其次對比BIO和NIO的特點,再引入Netty作為項目的基礎(chǔ)框架,然后介紹Netty線程池的原理,最后深入Netty線程池的初始化、ServerBootstrap的初始化與啟動及channel與線程池的綁定過程,讓讀者了解Netty在承載高并發(fā)訪問的設(shè)計路思。
二、項目框架
圖1 - API網(wǎng)關(guān)項目框架
圖中描繪了API網(wǎng)關(guān)系統(tǒng)的處理流程,以及與服務(wù)注冊發(fā)現(xiàn)、日志分析、報警系統(tǒng)、各類爬蟲的關(guān)系。其中API網(wǎng)關(guān)系統(tǒng)接收請求,對請求進行編解碼、鑒權(quán)、限流、加解密,再基于Eureka服務(wù)注冊發(fā)現(xiàn)模塊,將請求發(fā)送到有效的服務(wù)節(jié)點上;網(wǎng)關(guān)及抓取系統(tǒng)的日志,會被收集到elk平臺中,做業(yè)務(wù)分析及報警處理。
三、BIO vs NIO
API網(wǎng)關(guān)承載數(shù)倍于爬蟲的流量,提升服務(wù)器的并發(fā)處理能力、縮短系統(tǒng)的響應(yīng)時間,通信模型的選擇是至關(guān)重要的,是選擇BIO,還是NIO?
Streamvs Buffer & 阻塞 vs 非阻塞
BIO是面向流的,io的讀寫,每次只能處理一個或者多個bytes,如果數(shù)據(jù)沒有讀寫完成,線程將一直等待于此,而不能暫時跳過io或者等待io讀寫完成異步通知,線程滯留在io讀寫上,不能充分利用機器有限的線程資源,造成server的吞吐量較低,見圖2。而NIO與此不同,面向Buffer,線程不需要滯留在io讀寫上,采用操作系統(tǒng)的epoll模式,在io數(shù)據(jù)準備好了,才由線程來處理,見圖3。
圖2 – BIO 從流中讀取數(shù)據(jù)
圖3 – NIO 從Buffer中讀取數(shù)據(jù)
Selectors
NIO的selector使一個線程可以監(jiān)控多個channel的讀寫,多個channel注冊到一個selector上,這個selector可以監(jiān)測到各個channel的數(shù)據(jù)準備情況,從而使用有限的線程資源處理更多的連接,見圖4。所以可以這樣說,NIO極大的提升了服務(wù)器接受并發(fā)請求的能力,而服務(wù)器性能還是要取決于業(yè)務(wù)處理時間和業(yè)務(wù)線程池模型。
圖4 – NIO 單一線程管理多個連接
而BIO采用的是request-per-thread模式,用一個線程負責(zé)接收TCP連接請求,并建立鏈路,然后將請求dispatch給負責(zé)業(yè)務(wù)邏輯處理的線程,見圖5。一旦訪問量過多,就會造成機器的線程資源緊張,造成請求延遲,甚至服務(wù)宕機。
圖5 – BIO 一連接一線程
對比JDK NIO與諸多NIO框架后,鑒于Netty優(yōu)雅的設(shè)計、易用的API、優(yōu)越的性能、安全性支持、API網(wǎng)關(guān)使用Netty作為通信模型,實現(xiàn)了基礎(chǔ)框架的搭建。
四、Netty線程池
考慮到API網(wǎng)關(guān)的高并發(fā)訪問需求,線程池設(shè)計,見圖6。
圖6 – API網(wǎng)關(guān)線程池設(shè)計
Netty的線程池理念有點像ForkJoinPool,不是一個線程大池子并發(fā)等待一條任務(wù)隊列,而是每條線程都有一個任務(wù)隊列。而且Netty的線程,并不只是簡單的阻塞地拉取任務(wù),而是在每個循環(huán)中做三件事情:
-
先SelectKeys()處理NIO的事件
-
然后獲取本線程的定時任務(wù),放到本線程的任務(wù)隊列里
-
最后執(zhí)行其他線程提交給本線程的任務(wù)
每個循環(huán)里處理NIO事件與其他任務(wù)的時間消耗比例,還能通過ioRatio變量來控制,默認是各占50%。可見,Netty的線程根本沒有阻塞等待任務(wù)的清閑日子,所以也不使用有鎖的BlockingQueue來做任務(wù)隊列了,而是使用無鎖的MpscLinkedQueue(Mpsc 是Multiple Producer, Single Consumer的縮寫)
五、NioEventLoopGroup初始化
下面分析下Netty線程池NioEventLoopGroup的設(shè)計與實現(xiàn)細節(jié),NioEventLoopGroup的類層次關(guān)系見圖7
圖7 –NioEvenrLoopGroup類層次關(guān)系
其創(chuàng)建過程——方法調(diào)用,見下圖
圖8 –NioEvenrLoopGroup創(chuàng)建調(diào)用關(guān)系
NioEvenrLoopGroup的創(chuàng)建,具體執(zhí)行過程是執(zhí)行類MultithreadEventExecutorGroup的構(gòu)造方法
/*** Create a new instance.** @param nThreads the number of threads that will be used by this instance.* @param executor the Executor to use, or {@code null} if the default should be used.* @param chooserFactory the {@link EventExecutorChooserFactory} to use.* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call*/protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(executor, args);success = true;} catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e);} finally {if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}chooser = chooserFactory.newChooser(children);final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}其中,創(chuàng)建細節(jié)如下:
-
線程池中的線程數(shù)nThreads必須大于0;
-
如果executor為null,創(chuàng)建默認executor,executor用于創(chuàng)建線程(newChild方法使用executor對象);
-
依次創(chuàng)建線程池中的每一個線程即NioEventLoop,如果其中有一個創(chuàng)建失敗,將關(guān)閉之前創(chuàng)建的所有線程;
-
chooser為線程池選擇器,用來選擇下一個EventExecutor,可以理解為,用來選擇一個線程來執(zhí)行task;
chooser的創(chuàng)建細節(jié),如下:
DefaultEventExecutorChooserFactory根據(jù)線程數(shù)創(chuàng)建具體的EventExecutorChooser,線程數(shù)如果等于2^n,可使用按位與替代取模運算,節(jié)省cpu的計算資源,見源碼:
@SuppressWarnings("unchecked")@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTowEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}} private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTowEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}}private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}}newChild(executor, args)的創(chuàng)建細節(jié),如下
MultithreadEventExecutorGroup的newChild方法是一個抽象方法,故使用NioEventLoopGroup的newChild方法,即調(diào)用NioEventLoop的構(gòu)造函數(shù)。
@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}在這里先看下NioEventLoop的類層次關(guān)系
NioEventLoop的繼承關(guān)系比較復(fù)雜,在AbstractScheduledEventExecutor 中, Netty 實現(xiàn)了 NioEventLoop 的 schedule 功能, 即我們可以通過調(diào)用一個 NioEventLoop 實例的 schedule 方法來運行一些定時任務(wù). 而在 SingleThreadEventLoop 中, 又實現(xiàn)了任務(wù)隊列的功能, 通過它, 我們可以調(diào)用一個NioEventLoop 實例的 execute 方法來向任務(wù)隊列中添加一個 task, 并由 NioEventLoop 進行調(diào)度執(zhí)行.
通常來說, NioEventLoop 肩負著兩種任務(wù), 第一個是作為 IO 線程, 執(zhí)行與 Channel 相關(guān)的 IO 操作, 包括調(diào)用 select 等待就緒的 IO 事件、讀寫數(shù)據(jù)與數(shù)據(jù)的處理等; 而第二個任務(wù)是作為任務(wù)隊列, 執(zhí)行 taskQueue 中的任務(wù), 例如用戶調(diào)用 eventLoop.schedule 提交的定時任務(wù)也是這個線程執(zhí)行的.
具體的構(gòu)造過程,如下
創(chuàng)建任務(wù)隊列tailTasks(內(nèi)部為有界的LinkedBlockingQueue)
創(chuàng)建線程的任務(wù)隊列taskQueue(內(nèi)部為有界的LinkedBlockingQueue),以及任務(wù)過多防止系統(tǒng)宕機的拒絕策略rejectedHandler
其中tailTasks和taskQueue均是任務(wù)隊列,而優(yōu)先級不同,taskQueue的優(yōu)先級高于tailTasks,定時任務(wù)的優(yōu)先級高于taskQueue。
六、ServerBootstrap初始化及啟動
了解了Netty線程池NioEvenrLoopGroup的創(chuàng)建過程后,下面看下API網(wǎng)關(guān)服務(wù)ServerBootstrap的是如何使用線程池引入服務(wù)中,為高并發(fā)訪問服務(wù)的。
API網(wǎng)關(guān)ServerBootstrap初始化及啟動代碼,如下:
serverBootstrap = new ServerBootstrap();bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()).option(ChannelOption.SO_BACKLOG, config.getBacklogSize()).option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())// Memory pooled.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(channelInitializer);ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();log.info("API-gateway started on port: {}", config.getPort());future.channel().closeFuture().sync();API網(wǎng)關(guān)系統(tǒng)使用netty自帶的線程池,共有三組線程池,分別為bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暫不作介紹)。其中,bossGroup用于接收客戶端的TCP連接,workerGroup用于處理I/O、執(zhí)行系統(tǒng)task和定時任務(wù),executorGroup用于處理網(wǎng)關(guān)業(yè)務(wù)加解密、限流、路由,及將請求轉(zhuǎn)發(fā)給后端的抓取服務(wù)等業(yè)務(wù)操作。
七、Channel與線程池的綁定
ServerBootstrap初始化后,通過調(diào)用bind(port)方法啟動Server,bind的調(diào)用鏈如下:
AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化過程指定channel為NioServerSocketChannel.class,至此將NioServerSocketChannel與bossGroup綁定到一起,bossGroup負責(zé)客戶端連接的建立。那么NioSocketChannel是如何與workerGroup綁定到一起的?
調(diào)用鏈AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}其中,childGroup.register(child)就是將NioSocketChannel與workderGroup綁定到一起,那又是什么觸發(fā)了ServerBootstrapAcceptor的channelRead方法?
其實當(dāng)一個 client 連接到 server 時, Java 底層的 NIO ServerSocketChannel 會有一個SelectionKey.OP_ACCEPT 就緒事件, 接著就會調(diào)用到 NioServerSocketChannel.doReadMessages方法
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {…}return 0;}javaChannel().accept() 會獲取到客戶端新連接的SocketChannel,實例化為一個 NioSocketChannel, 并且傳入 NioServerSocketChannel 對象(即 this), 由此可知, 我們創(chuàng)建的這個NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 .
接下來就經(jīng)由 Netty 的 ChannelPipeline 機制, 將讀取事件逐級發(fā)送到各個 handler 中, 于是就會觸發(fā)前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦。
至此,分析了Netty線程池的初始化、ServerBootstrap的啟動及channel與線程池的綁定過程,能夠看出Netty中線程池的優(yōu)雅設(shè)計,使用不同的線程池負責(zé)連接的建立、IO讀寫等,為API網(wǎng)關(guān)項目的高并發(fā)訪問提供了技術(shù)基礎(chǔ)。
八、總結(jié)
至此,對API網(wǎng)關(guān)技術(shù)的Netty實踐分享就到這里,各位如果對中間的各個環(huán)節(jié)有什么疑問和建議,歡迎大家指正,我們一起討論,共同學(xué)習(xí)提高。
參考
http://tutorials.jenkov.com/java-nio/nio-vs-io.html
http://netty.io/wiki/user-guide-for-4.x.html
http://netty.io/
http://www.tuicool.com/articles/mUFnqeM
https://segmentfault.com/a/1190000007403873
https://segmentfault.com/a/1190000007283053
作者:蜂巢團隊
來源:宜信技術(shù)學(xué)院
轉(zhuǎn)載于:https://my.oschina.net/u/4007037/blog/3058640
總結(jié)
以上是生活随笔為你收集整理的宜人贷蜂巢API网关技术解密之Netty使用实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: weh shell高大上?一文教你实现
- 下一篇: 清北NOIP训练营集训笔记——图论(提高