日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

心跳实现_真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码...

發(fā)布時間:2023/11/27 80 豆豆
生活随笔 收集整理的這篇文章主要介紹了 心跳实现_真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码... 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

# 前言

談到RPC肯定繞不開TCP通信,而主流的RPC框架都依賴于Netty等通信框架,這時候我們還要考慮是使用長連接還是短連接:

  • 短連接:每次通信結(jié)束后關(guān)閉連接,下次通信需要重新創(chuàng)建連接;優(yōu)點(diǎn)就是無需管理連接,無需保活連接;

  • 長連接:每次通信結(jié)束不關(guān)閉連接,連接可以復(fù)用,保證了性能;缺點(diǎn)就是連接需要統(tǒng)一管理,并且需要?;?#xff1b;

主流的RPC框架都會追求性能選擇使用長連接,所以如何?;钸B接就是一個重要的話題,也是本文的主題,下面會重點(diǎn)介紹一些?;畈呗?#xff1b;

# 為什么需要保活

上面介紹的長連接、短連接并不是TCP提供的功能,所以長連接是需要應(yīng)用端自己來實現(xiàn)的,包括:連接的統(tǒng)一管理,如何保活等;如何保活之前我們了解一下為什么需要保活?

主要原因是網(wǎng)絡(luò)不是100%可靠的,我們創(chuàng)建好的連接可能由于網(wǎng)絡(luò)原因?qū)е逻B接已經(jīng)不可用了,如果連接一直有消息往來,那么系統(tǒng)馬上可以感知到連接斷開;

但是我們系統(tǒng)可能長時間沒有消息來往,導(dǎo)致系統(tǒng)不能及時感知到連接不可用,也就是不能及時處理重連或者釋放連接;常見的?;畈呗允褂眯奶鴻C(jī)制由應(yīng)用層來實現(xiàn),還有網(wǎng)絡(luò)層提供的TCP Keepalive?;钐綔y機(jī)制;

# TCP Keepalive機(jī)制

TCP Keepalive是操作系統(tǒng)實現(xiàn)的功能,并不是TCP協(xié)議的一部分,需要在操作系統(tǒng)下進(jìn)行相關(guān)配置,開啟此功能后,如果連接在一段時間內(nèi)沒有數(shù)據(jù)往來,TCP將發(fā)送Keepalive探針來確認(rèn)連接的可用性,Keepalive幾個內(nèi)核參數(shù)配置:

  • tcp_keepalive_time:連接多長時間沒有數(shù)據(jù)往來發(fā)送探針請求,默認(rèn)為7200s(2h);

  • tcp_keepalive_probes:探測失敗重試的次數(shù)默認(rèn)為10次;

  • tcp_keepalive_intvl:重試的間隔時間默認(rèn)75s;

以上參數(shù)可以修改到/etc/sysctl.conf文件中;是否使用Keepalive用來保活就夠了,其實還不夠,Keepalive只是在網(wǎng)絡(luò)層就行?;?#xff0c;如果網(wǎng)絡(luò)本身沒有問題,但是系統(tǒng)由于其他原因已經(jīng)不可用了,這時候Keepalive并不能發(fā)現(xiàn);所以往往還需要結(jié)合心跳機(jī)制來一起使用;

# 心跳機(jī)制

何為心跳機(jī)制,簡單來講就是客戶端啟動一個定時器用來定時發(fā)送請求,服務(wù)端接到請求進(jìn)行響應(yīng),如果多次沒有接受到響應(yīng),那么客戶端認(rèn)為連接已經(jīng)斷開,可以斷開半打開的連接或者進(jìn)行重連處理;下面以Dubbo為例來看看是如何具體實施的;

Dubbo2.6.X

在HeaderExchangeClient中啟動了定時器ScheduledThreadPoolExecutor來定期執(zhí)行心跳請求:關(guān)注公眾號Java面試那些事兒,回復(fù)關(guān)鍵字面試,獲取最新面試題

ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, ????new?NamedThreadFactory("dubbo-remoting-client-heartbeat",?true));
在實例化HeaderExchangeClient時啟動心跳定時器:
private void startHeartbeatTimer() {    stopHeartbeatTimer();            if (heartbeat > 0) {        heartbeatTimer = scheduled.scheduleWithFixedDelay(            new HeartBeatTask(new HeartBeatTask.ChannelProvider() {                                        @Override                public CollectiongetChannels() {                                                return Collections.singletonList(HeaderExchangeClient.this);            }        }, heartbeat, heartbeatTimeout),        heartbeat, heartbeat, TimeUnit.MILLISECONDS);    }}

heartbeat默認(rèn)為60秒,heartbeatTimeout默認(rèn)為heartbeat*3,可以理解至少出現(xiàn)三次心跳請求還未收到回復(fù)才會任務(wù)連接已經(jīng)斷開;HeartBeatTask為執(zhí)行心跳的任務(wù):

public void run() {    long now = System.currentTimeMillis();            for (Channel channel : channelProvider.getChannels()) {                    if (channel.isClosed()) {                            continue;        }                    Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_READ\_TIMESTAMP);                    Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_WRITE\_TIMESTAMP);                    if ((lastRead != null && now - lastRead > heartbeat)                    || (lastWrite != null && now - lastWrite > heartbeat)) {                            // 發(fā)送心跳        }                    if (lastRead != null && now - lastRead > heartbeatTimeout) {                            if (channel instanceof Client) {                    ((Client) channel).reconnect();            } else {                channel.close();            }        }    }}

因為Dubbo雙端都會發(fā)送心跳請求,所以可以發(fā)現(xiàn)有兩個時間點(diǎn)分別是:

lastRead和lastWrite;當(dāng)然時間和最后讀取,最后寫的時間間隔大于heartbeat就會發(fā)送心跳請求;

如果多次心跳未返回結(jié)果,也就是最后讀取消息時間大于heartbeatTimeout會判定當(dāng)前是Client還是Server,如果是Client會發(fā)起reconnect,Server會關(guān)閉連接,這樣的考慮是合理的,客戶端調(diào)用是強(qiáng)依賴可用連接的,而服務(wù)端可以等待客戶端重新建立連接;

以上只是介紹的Client,同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;

Dubbo2.7.0

Dubbo2.7.0的心跳機(jī)制在2.6.X的基礎(chǔ)上得到了加強(qiáng),同樣在HeaderExchangeClient中使用HashedWheelTimer開啟心跳檢測,這是Netty提供的一個時間輪定時器,在任務(wù)非常多,并且任務(wù)執(zhí)行時間很短的情況下,HashedWheelTimer比Schedule性能更好,特別適合心跳檢測;

HashedWheelTimer heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,????????????????????TimeUnit.MILLISECONDS,?Constants.TICKS\_PER\_WHEEL);
分別啟動了兩個定時任務(wù):startHeartBeatTask和startReconnectTask:
private void startHeartbeatTimer() {    AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);            long heartbeatTick = calculateLeastDuration(heartbeat);            long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);    HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);    ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);          // init task and start timer.    heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);    heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);}

HeartbeatTimerTask:用來定時發(fā)送心跳請求,心跳間隔時間默認(rèn)為60秒;這里重新計算了時間,其實就是在原來的基礎(chǔ)上除以3,其實就是縮短了檢測間隔時間,增大了及時發(fā)現(xiàn)死鏈的概率;分別看一下兩個任務(wù):

protected void doTask(Channel channel) {    Long lastRead = lastRead(channel);    Long lastWrite = lastWrite(channel);        if ((lastRead != null && now() - lastRead > heartbeat)            || (lastWrite != null && now() - lastWrite > heartbeat)) {        Request req = new Request();        req.setVersion(Version.getProtocolVersion());        req.setTwoWay(true);        req.setEvent(Request.HEARTBEAT_EVENT);        channel.send(req);    }}

同上檢測最后讀寫時間和heartbeat的大小,注:普通請求和心跳請求都會更新讀寫時間

protected void doTask(Channel channel) {    Long lastRead = lastRead(channel);    Long now = now();        if (lastRead != null && now - lastRead > heartbeatTimeout) {            if (channel instanceof Client) {            ((Client) channel).reconnect();        } else {            channel.close();        }    }}

同樣的在超時的情況下,Client重連,Server關(guān)閉連接;同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;

Dubbo2.7.1-X

在Dubbo2.7.1之后,借助了Netty提供的IdleStateHandler來實現(xiàn)心跳機(jī)制服務(wù):關(guān)注公眾號Java面試那些事兒,回復(fù)關(guān)鍵字面試,獲取最新面試題

public IdleStateHandler(        long readerIdleTime, long writerIdleTime, long allIdleTime,        TimeUnit unit) {    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}
  • readerIdleTime:讀超時時間;

  • writerIdleTime:寫超時時間;

  • allIdleTime:所有類型的超時時間;

根據(jù)設(shè)置的超時時間,循環(huán)檢查讀寫事件多久沒有發(fā)生了,在pipeline中加入IdleSateHandler之后,可以在此pipeline的任意Handler的userEventTriggered方法之中檢測IdleStateEvent事件;下面看看具體Client和Server端添加的IdleStateHandler:

Client端

protected void initChannel(Channel ch) throws Exception {            final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());    ch.pipeline().addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))                .addLast("handler", nettyClientHandler);}

Client端在NettyClient中添加了IdleStateHandler,指定了讀寫超時時間默認(rèn)為60秒;60秒內(nèi)沒有讀寫事件發(fā)生,會觸發(fā)IdleStateEvent事件在NettyClientHandler處理:

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (evt instanceof IdleStateEvent) {            try {            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);            Request req = new Request();            req.setVersion(Version.getProtocolVersion());            req.setTwoWay(true);            req.setEvent(Request.HEARTBEAT_EVENT);            channel.send(req);        } finally {            NettyChannel.removeChannelIfDisconnected(ctx.channel());        }   } else {                 super.userEventTriggered(ctx, evt);   }}

可以發(fā)現(xiàn)接收到IdleStateEvent事件發(fā)送了心跳請求;至于Client端如何處理重連,同樣在HeaderExchangeClient中使用HashedWheelTimer定時器啟動了兩個任務(wù):心跳任務(wù)和重連任務(wù),感覺這里已經(jīng)不需要心跳任務(wù)了,至于重連任務(wù)其實也可以放到userEventTriggered中處理;

Server端

protected void initChannel(NioSocketChannel ch) throws Exception {        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);    ch.pipeline().addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))                .addLast("handler", nettyServerHandler);}
Server端指定的超時時間默認(rèn)為60*3秒,在NettyServerHandler中處理
userEventTriggered
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (evt instanceof IdleStateEvent) {        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);            try {            channel.close();        } finally {            NettyChannel.removeChannelIfDisconnected(ctx.channel());        }    }            super.userEventTriggered(ctx, evt);}

Server端在指定的超時時間內(nèi)沒有發(fā)生讀寫,會直接關(guān)閉連接;相比之前現(xiàn)在只有Client發(fā)送心跳,單向發(fā)送心跳;

同樣的在HeaderExchangeServer中并沒有啟動多個認(rèn)為,僅僅啟動了一個CloseTimerTask,用來檢測超時時間關(guān)閉連接;感覺這個任務(wù)是不是也可以不需要了,IdleStateHandler已經(jīng)實現(xiàn)了此功能;

綜上:在使用IdleStateHandler的情況下來同時在HeaderExchangeClient啟動心跳+重連機(jī)制,HeaderExchangeServer啟動了關(guān)閉連接機(jī)制;主要是因為IdleStateHandler是Netty框架特有了,而Dubbo是支持多種底層通訊框架的包括Mina,Grizzy等,應(yīng)該是為了兼容此類框架存在的;

# 總結(jié)

本文首先介紹了RPC中引入的長連接方式,繼而引出長連接的保活機(jī)制,為什么需要保活?然后分別介紹了網(wǎng)絡(luò)層?;顧C(jī)制TCP Keepalive機(jī)制,應(yīng)用層心跳機(jī)制;最后已Dubbo為例看各個版本中對心跳機(jī)制的進(jìn)化。

作者:ksfzhaohui317

來源:https://segmentfault.com/a/1190000022591346

?往期推薦?

?

  • 字節(jié)一面,被連問MySQL索引,臉都問綠了...
  • 分布式定時任務(wù)xxl-job的常用姿勢都集齊了,So Easy!
  • 牛逼!華為 Java 編程軍規(guī)

?

點(diǎn)擊

總結(jié)

以上是生活随笔為你收集整理的心跳实现_真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。