日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码...

發(fā)布時間:2025/3/21 62 豆豆
生活随笔 收集整理的這篇文章主要介紹了 真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码... 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

# 前言

談到RPC肯定繞不開TCP通信,而主流的RPC框架都依賴于Netty等通信框架,這時候我們還要考慮是使用長連接還是短連接:

  • 短連接:每次通信結(jié)束后關(guān)閉連接,下次通信需要重新創(chuàng)建連接;優(yōu)點就是無需管理連接,無需保活連接;
  • 長連接:每次通信結(jié)束不關(guān)閉連接,連接可以復(fù)用,保證了性能;缺點就是連接需要統(tǒng)一管理,并且需要?;?#xff1b;

主流的RPC框架都會追求性能選擇使用長連接,所以如何?;钸B接就是一個重要的話題,也是本文的主題,下面會重點介紹一些保活策略;

# 為什么需要保活

上面介紹的長連接、短連接并不是TCP提供的功能,所以長連接是需要應(yīng)用端自己來實現(xiàn)的,包括:連接的統(tǒng)一管理,如何?;畹?#xff1b;如何?;钪拔覀兞私庖幌聻槭裁葱枰;?#xff1f;

主要原因是網(wǎng)絡(luò)不是100%可靠的,我們創(chuàng)建好的連接可能由于網(wǎng)絡(luò)原因?qū)е逻B接已經(jīng)不可用了,如果連接一直有消息往來,那么系統(tǒng)馬上可以感知到連接斷開;

但是我們系統(tǒng)可能長時間沒有消息來往,導(dǎo)致系統(tǒng)不能及時感知到連接不可用,也就是不能及時處理重連或者釋放連接;常見的保活策略使用心跳機(jī)制由應(yīng)用層來實現(xiàn),還有網(wǎng)絡(luò)層提供的TCP Keepalive保活探測機(jī)制;

# TCP Keepalive機(jī)制

TCP Keepalive是操作系統(tǒng)實現(xiàn)的功能,并不是TCP協(xié)議的一部分,需要在操作系統(tǒng)下進(jìn)行相關(guān)配置,開啟此功能后,如果連接在一段時間內(nèi)沒有數(shù)據(jù)往來,TCP將發(fā)送Keepalive探針來確認(rèn)連接的可用性,Keepalive幾個內(nèi)核參數(shù)配置:

  • tcp_keepalive_time:連接多長時間沒有數(shù)據(jù)往來發(fā)送探針請求,默認(rèn)為7200s(2h);
  • tcp_keepalive_probes:探測失敗重試的次數(shù)默認(rèn)為10次;
  • tcp_keepalive_intvl:重試的間隔時間默認(rèn)75s;

以上參數(shù)可以修改到/etc/sysctl.conf文件中;是否使用Keepalive用來保活就夠了,其實還不夠,Keepalive只是在網(wǎng)絡(luò)層就行?;?#xff0c;如果網(wǎng)絡(luò)本身沒有問題,但是系統(tǒng)由于其他原因已經(jīng)不可用了,這時候Keepalive并不能發(fā)現(xiàn);所以往往還需要結(jié)合心跳機(jī)制來一起使用;

# 心跳機(jī)制

何為心跳機(jī)制,簡單來講就是客戶端啟動一個定時器用來定時發(fā)送請求,服務(wù)端接到請求進(jìn)行響應(yīng),如果多次沒有接受到響應(yīng),那么客戶端認(rèn)為連接已經(jīng)斷開,可以斷開半打開的連接或者進(jìn)行重連處理;下面以Dubbo為例來看看是如何具體實施的;

Dubbo2.6.X

在HeaderExchangeClient中啟動了定時器ScheduledThreadPoolExecutor來定期執(zhí)行心跳請求

ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));

在實例化HeaderExchangeClient時啟動心跳定時器:

private void startHeartbeatTimer() {stopHeartbeatTimer(); if (heartbeat > 0) {heartbeatTimer = scheduled.scheduleWithFixedDelay(new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Overridepublic Collection<Channel> getChannels() { return Collections.<Channel>singletonList(HeaderExchangeClient.this);}}, heartbeat, heartbeatTimeout),heartbeat, heartbeat, TimeUnit.MILLISECONDS);} }

heartbeat默認(rèn)為60秒,heartbeatTimeout默認(rèn)為heartbeat*3,可以理解至少出現(xiàn)三次心跳請求還未收到回復(fù)才會任務(wù)連接已經(jīng)斷開;HeartBeatTask為執(zhí)行心跳的任務(wù):

public void run() {long now = System.currentTimeMillis(); for (Channel channel : channelProvider.getChannels()) { if (channel.isClosed()) { continue;} Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_READ\_TIMESTAMP); Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_WRITE\_TIMESTAMP); if ((lastRead != null && now - lastRead > heartbeat)|| (lastWrite != null && now - lastWrite > heartbeat)) { // 發(fā)送心跳} if (lastRead != null && now - lastRead > heartbeatTimeout) { if (channel instanceof Client) {((Client) channel).reconnect();} else {channel.close();}}} }

因為Dubbo雙端都會發(fā)送心跳請求,所以可以發(fā)現(xiàn)有兩個時間點分別是:

lastRead和lastWrite;當(dāng)然時間和最后讀取,最后寫的時間間隔大于heartbeat就會發(fā)送心跳請求;

如果多次心跳未返回結(jié)果,也就是最后讀取消息時間大于heartbeatTimeout會判定當(dāng)前是Client還是Server,如果是Client會發(fā)起reconnect,Server會關(guān)閉連接,這樣的考慮是合理的,客戶端調(diào)用是強(qiáng)依賴可用連接的,而服務(wù)端可以等待客戶端重新建立連接;

以上只是介紹的Client,同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;

Dubbo2.7.0

Dubbo2.7.0的心跳機(jī)制在2.6.X的基礎(chǔ)上得到了加強(qiáng),同樣在HeaderExchangeClient中使用HashedWheelTimer開啟心跳檢測,這是Netty提供的一個時間輪定時器,在任務(wù)非常多,并且任務(wù)執(zhí)行時間很短的情況下,HashedWheelTimer比Schedule性能更好,特別適合心跳檢測;

HashedWheelTimer heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,TimeUnit.MILLISECONDS, Constants.TICKS\_PER\_WHEEL);

分別啟動了兩個定時任務(wù):startHeartBeatTask和startReconnectTask:

private void startHeartbeatTimer() {AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); long heartbeatTick = calculateLeastDuration(heartbeat); long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);   // init task and start timer.heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); }

HeartbeatTimerTask:用來定時發(fā)送心跳請求,心跳間隔時間默認(rèn)為60秒;這里重新計算了時間,其實就是在原來的基礎(chǔ)上除以3,其實就是縮短了檢測間隔時間,增大了及時發(fā)現(xiàn)死鏈的概率;分別看一下兩個任務(wù):

protected void doTask(Channel channel) {Long lastRead = lastRead(channel);Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat)|| (lastWrite != null && now() - lastWrite > heartbeat)) {Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setEvent(Request.HEARTBEAT_EVENT);channel.send(req);} }

同上檢測最后讀寫時間和heartbeat的大小,注:普通請求和心跳請求都會更新讀寫時間

protected void doTask(Channel channel) {Long lastRead = lastRead(channel);Long now = now(); if (lastRead != null && now - lastRead > heartbeatTimeout) { if (channel instanceof Client) {((Client) channel).reconnect();} else {channel.close();}} }

同樣的在超時的情況下,Client重連,Server關(guān)閉連接;同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;

Dubbo2.7.1-X

在Dubbo2.7.1之后,借助了Netty提供的IdleStateHandler來實現(xiàn)心跳機(jī)制服務(wù)

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); }
  • readerIdleTime:讀超時時間;
  • writerIdleTime:寫超時時間;
  • allIdleTime:所有類型的超時時間;

根據(jù)設(shè)置的超時時間,循環(huán)檢查讀寫事件多久沒有發(fā)生了,在pipeline中加入IdleSateHandler之后,可以在此pipeline的任意Handler的userEventTriggered方法之中檢測IdleStateEvent事件;下面看看具體Client和Server端添加的IdleStateHandler:

Client端

protected void initChannel(Channel ch) throws Exception { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());ch.pipeline().addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)).addLast("handler", nettyClientHandler); }

Client端在NettyClient中添加了IdleStateHandler,指定了讀寫超時時間默認(rèn)為60秒;60秒內(nèi)沒有讀寫事件發(fā)生,會觸發(fā)IdleStateEvent事件在NettyClientHandler處理:

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { try {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setEvent(Request.HEARTBEAT_EVENT);channel.send(req);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}} else { super.userEventTriggered(ctx, evt);} }

可以發(fā)現(xiàn)接收到IdleStateEvent事件發(fā)送了心跳請求;至于Client端如何處理重連,同樣在HeaderExchangeClient中使用HashedWheelTimer定時器啟動了兩個任務(wù):心跳任務(wù)和重連任務(wù),感覺這里已經(jīng)不需要心跳任務(wù)了,至于重連任務(wù)其實也可以放到userEventTriggered中處理;

Server端

protected void initChannel(NioSocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);ch.pipeline().addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)).addLast("handler", nettyServerHandler); }

Server端指定的超時時間默認(rèn)為60*3秒,在NettyServerHandler中處理

userEventTriggered

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try {channel.close();} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}} super.userEventTriggered(ctx, evt); }

Server端在指定的超時時間內(nèi)沒有發(fā)生讀寫,會直接關(guān)閉連接;相比之前現(xiàn)在只有Client發(fā)送心跳,單向發(fā)送心跳;

同樣的在HeaderExchangeServer中并沒有啟動多個認(rèn)為,僅僅啟動了一個CloseTimerTask,用來檢測超時時間關(guān)閉連接;感覺這個任務(wù)是不是也可以不需要了,IdleStateHandler已經(jīng)實現(xiàn)了此功能;

綜上:在使用IdleStateHandler的情況下來同時在HeaderExchangeClient啟動心跳+重連機(jī)制,HeaderExchangeServer啟動了關(guān)閉連接機(jī)制;主要是因為IdleStateHandler是Netty框架特有了,而Dubbo是支持多種底層通訊框架的包括Mina,Grizzy等,應(yīng)該是為了兼容此類框架存在的;?

# 總結(jié)

本文首先介紹了RPC中引入的長連接方式,繼而引出長連接的保活機(jī)制,為什么需要保活?然后分別介紹了網(wǎng)絡(luò)層?;顧C(jī)制TCP Keepalive機(jī)制,應(yīng)用層心跳機(jī)制;最后已Dubbo為例看各個版本中對心跳機(jī)制的進(jìn)化。

《新程序員》:云原生和全面數(shù)字化實踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。