日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码...

發布時間:2025/3/21 java 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

# 前言

談到RPC肯定繞不開TCP通信,而主流的RPC框架都依賴于Netty等通信框架,這時候我們還要考慮是使用長連接還是短連接:

  • 短連接:每次通信結束后關閉連接,下次通信需要重新創建連接;優點就是無需管理連接,無需?;钸B接;
  • 長連接:每次通信結束不關閉連接,連接可以復用,保證了性能;缺點就是連接需要統一管理,并且需要?;?#xff1b;

主流的RPC框架都會追求性能選擇使用長連接,所以如何保活連接就是一個重要的話題,也是本文的主題,下面會重點介紹一些?;畈呗?#xff1b;

# 為什么需要?;?/strong>

上面介紹的長連接、短連接并不是TCP提供的功能,所以長連接是需要應用端自己來實現的,包括:連接的統一管理,如何保活等;如何保活之前我們了解一下為什么需要保活?

主要原因是網絡不是100%可靠的,我們創建好的連接可能由于網絡原因導致連接已經不可用了,如果連接一直有消息往來,那么系統馬上可以感知到連接斷開;

但是我們系統可能長時間沒有消息來往,導致系統不能及時感知到連接不可用,也就是不能及時處理重連或者釋放連接;常見的?;畈呗允褂眯奶鴻C制由應用層來實現,還有網絡層提供的TCP Keepalive?;钐綔y機制;

# TCP Keepalive機制

TCP Keepalive是操作系統實現的功能,并不是TCP協議的一部分,需要在操作系統下進行相關配置,開啟此功能后,如果連接在一段時間內沒有數據往來,TCP將發送Keepalive探針來確認連接的可用性,Keepalive幾個內核參數配置:

  • tcp_keepalive_time:連接多長時間沒有數據往來發送探針請求,默認為7200s(2h);
  • tcp_keepalive_probes:探測失敗重試的次數默認為10次;
  • tcp_keepalive_intvl:重試的間隔時間默認75s;

以上參數可以修改到/etc/sysctl.conf文件中;是否使用Keepalive用來?;罹蛪蛄?#xff0c;其實還不夠,Keepalive只是在網絡層就行?;?#xff0c;如果網絡本身沒有問題,但是系統由于其他原因已經不可用了,這時候Keepalive并不能發現;所以往往還需要結合心跳機制來一起使用;

# 心跳機制

何為心跳機制,簡單來講就是客戶端啟動一個定時器用來定時發送請求,服務端接到請求進行響應,如果多次沒有接受到響應,那么客戶端認為連接已經斷開,可以斷開半打開的連接或者進行重連處理;下面以Dubbo為例來看看是如何具體實施的;

Dubbo2.6.X

在HeaderExchangeClient中啟動了定時器ScheduledThreadPoolExecutor來定期執行心跳請求

ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));

在實例化HeaderExchangeClient時啟動心跳定時器:

private void startHeartbeatTimer() {stopHeartbeatTimer(); if (heartbeat > 0) {heartbeatTimer = scheduled.scheduleWithFixedDelay(new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Overridepublic Collection<Channel> getChannels() { return Collections.<Channel>singletonList(HeaderExchangeClient.this);}}, heartbeat, heartbeatTimeout),heartbeat, heartbeat, TimeUnit.MILLISECONDS);} }

heartbeat默認為60秒,heartbeatTimeout默認為heartbeat*3,可以理解至少出現三次心跳請求還未收到回復才會任務連接已經斷開;HeartBeatTask為執行心跳的任務:

public void run() {long now = System.currentTimeMillis(); for (Channel channel : channelProvider.getChannels()) { if (channel.isClosed()) { continue;} Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_READ\_TIMESTAMP); Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_WRITE\_TIMESTAMP); if ((lastRead != null && now - lastRead > heartbeat)|| (lastWrite != null && now - lastWrite > heartbeat)) { // 發送心跳} if (lastRead != null && now - lastRead > heartbeatTimeout) { if (channel instanceof Client) {((Client) channel).reconnect();} else {channel.close();}}} }

因為Dubbo雙端都會發送心跳請求,所以可以發現有兩個時間點分別是:

lastRead和lastWrite;當然時間和最后讀取,最后寫的時間間隔大于heartbeat就會發送心跳請求;

如果多次心跳未返回結果,也就是最后讀取消息時間大于heartbeatTimeout會判定當前是Client還是Server,如果是Client會發起reconnect,Server會關閉連接,這樣的考慮是合理的,客戶端調用是強依賴可用連接的,而服務端可以等待客戶端重新建立連接;

以上只是介紹的Client,同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;

Dubbo2.7.0

Dubbo2.7.0的心跳機制在2.6.X的基礎上得到了加強,同樣在HeaderExchangeClient中使用HashedWheelTimer開啟心跳檢測,這是Netty提供的一個時間輪定時器,在任務非常多,并且任務執行時間很短的情況下,HashedWheelTimer比Schedule性能更好,特別適合心跳檢測;

HashedWheelTimer heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,TimeUnit.MILLISECONDS, Constants.TICKS\_PER\_WHEEL);

分別啟動了兩個定時任務:startHeartBeatTask和startReconnectTask:

private void startHeartbeatTimer() {AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); long heartbeatTick = calculateLeastDuration(heartbeat); long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);   // init task and start timer.heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); }

HeartbeatTimerTask:用來定時發送心跳請求,心跳間隔時間默認為60秒;這里重新計算了時間,其實就是在原來的基礎上除以3,其實就是縮短了檢測間隔時間,增大了及時發現死鏈的概率;分別看一下兩個任務:

protected void doTask(Channel channel) {Long lastRead = lastRead(channel);Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat)|| (lastWrite != null && now() - lastWrite > heartbeat)) {Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setEvent(Request.HEARTBEAT_EVENT);channel.send(req);} }

同上檢測最后讀寫時間和heartbeat的大小,注:普通請求和心跳請求都會更新讀寫時間

protected void doTask(Channel channel) {Long lastRead = lastRead(channel);Long now = now(); if (lastRead != null && now - lastRead > heartbeatTimeout) { if (channel instanceof Client) {((Client) channel).reconnect();} else {channel.close();}} }

同樣的在超時的情況下,Client重連,Server關閉連接;同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;

Dubbo2.7.1-X

在Dubbo2.7.1之后,借助了Netty提供的IdleStateHandler來實現心跳機制服務

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); }
  • readerIdleTime:讀超時時間;
  • writerIdleTime:寫超時時間;
  • allIdleTime:所有類型的超時時間;

根據設置的超時時間,循環檢查讀寫事件多久沒有發生了,在pipeline中加入IdleSateHandler之后,可以在此pipeline的任意Handler的userEventTriggered方法之中檢測IdleStateEvent事件;下面看看具體Client和Server端添加的IdleStateHandler:

Client端

protected void initChannel(Channel ch) throws Exception { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());ch.pipeline().addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)).addLast("handler", nettyClientHandler); }

Client端在NettyClient中添加了IdleStateHandler,指定了讀寫超時時間默認為60秒;60秒內沒有讀寫事件發生,會觸發IdleStateEvent事件在NettyClientHandler處理:

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { try {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setEvent(Request.HEARTBEAT_EVENT);channel.send(req);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}} else { super.userEventTriggered(ctx, evt);} }

可以發現接收到IdleStateEvent事件發送了心跳請求;至于Client端如何處理重連,同樣在HeaderExchangeClient中使用HashedWheelTimer定時器啟動了兩個任務:心跳任務和重連任務,感覺這里已經不需要心跳任務了,至于重連任務其實也可以放到userEventTriggered中處理;

Server端

protected void initChannel(NioSocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);ch.pipeline().addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)).addLast("handler", nettyServerHandler); }

Server端指定的超時時間默認為60*3秒,在NettyServerHandler中處理

userEventTriggered

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try {channel.close();} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}} super.userEventTriggered(ctx, evt); }

Server端在指定的超時時間內沒有發生讀寫,會直接關閉連接;相比之前現在只有Client發送心跳,單向發送心跳;

同樣的在HeaderExchangeServer中并沒有啟動多個認為,僅僅啟動了一個CloseTimerTask,用來檢測超時時間關閉連接;感覺這個任務是不是也可以不需要了,IdleStateHandler已經實現了此功能;

綜上:在使用IdleStateHandler的情況下來同時在HeaderExchangeClient啟動心跳+重連機制,HeaderExchangeServer啟動了關閉連接機制;主要是因為IdleStateHandler是Netty框架特有了,而Dubbo是支持多種底層通訊框架的包括Mina,Grizzy等,應該是為了兼容此類框架存在的;?

# 總結

本文首先介紹了RPC中引入的長連接方式,繼而引出長連接的?;顧C制,為什么需要?;?#xff1f;然后分別介紹了網絡層保活機制TCP Keepalive機制,應用層心跳機制;最后已Dubbo為例看各個版本中對心跳機制的進化。

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。