日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

Java实现心跳机制

發(fā)布時(shí)間:2024/2/28 java 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java实现心跳机制 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

轉(zhuǎn)載自?https://www.cnblogs.com/codingexperience/p/5939059.html

一、心跳機(jī)制簡介

???? 在分布式系統(tǒng)中,分布在不同主機(jī)上的節(jié)點(diǎn)需要檢測其他節(jié)點(diǎn)的狀態(tài),如服務(wù)器節(jié)點(diǎn)需要檢測從節(jié)點(diǎn)是否失效。為了檢測對(duì)方節(jié)點(diǎn)的有效性,每隔固定時(shí)間就發(fā)送一個(gè)固定信息給對(duì)方,對(duì)方回復(fù)一個(gè)固定信息,如果長時(shí)間沒有收到對(duì)方的回復(fù),則斷開與對(duì)方的連接。

???? 發(fā)包方既可以是服務(wù)端,也可以是客戶端,這要看具體實(shí)現(xiàn)。因?yàn)槭敲扛艄潭〞r(shí)間發(fā)送一次,類似心跳,所以發(fā)送的固定信息稱為心跳包。心跳包一般為比較小的包,可根據(jù)具體實(shí)現(xiàn)。心跳包主要應(yīng)用于長連接的保持與短線鏈接。

????? 一般而言,應(yīng)該客戶端主動(dòng)向服務(wù)器發(fā)送心跳包,因?yàn)榉?wù)器向客戶端發(fā)送心跳包會(huì)影響服務(wù)器的性能。

二、心跳機(jī)制實(shí)現(xiàn)方式

??? 心跳機(jī)制有兩種實(shí)現(xiàn)方式,一種基于TCP自帶的心跳包,TCP的SO_KEEPALIVE選項(xiàng)可以,系統(tǒng)默認(rèn)的默認(rèn)跳幀頻率為2小時(shí),超過2小時(shí)后,本地的TCP 實(shí)現(xiàn)會(huì)發(fā)送一個(gè)數(shù)據(jù)包給遠(yuǎn)程的 Socket. 如果遠(yuǎn)程Socket 沒有發(fā)回響應(yīng), TCP實(shí)現(xiàn)就會(huì)持續(xù)嘗試 11 分鐘, 直到接收到響應(yīng)為止。 否則就會(huì)自動(dòng)斷開Socket連接。但TCP自帶的心跳包無法檢測比較敏感地知道對(duì)方的狀態(tài),默認(rèn)2小時(shí)的空閑時(shí)間,對(duì)于大多數(shù)的應(yīng)用而言太長了??梢允止ら_啟KeepAlive功能并設(shè)置合理的KeepAlive參數(shù)。

??? 另一種在應(yīng)用層自己進(jìn)行實(shí)現(xiàn),基本步驟如下:

  • Client使用定時(shí)器,不斷發(fā)送心跳;
  • Server收到心跳后,回復(fù)一個(gè)包;
  • Server為每個(gè)Client啟動(dòng)超時(shí)定時(shí)器,如果在指定時(shí)間內(nèi)沒有收到Client的心跳包,則Client失效。
  • 三、Java實(shí)現(xiàn)心跳機(jī)制

    ??? 這里基于Java實(shí)現(xiàn)的簡單RPC框架實(shí)現(xiàn)心跳機(jī)制。Java實(shí)現(xiàn)代碼如下所示:

    ??? 心跳客戶端類:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    public?class?HeartbeatClient implements?Runnable {

    ?

    ????private?String serverIP = "127.0.0.1";

    ????private?int?serverPort = 8089;

    ????private?String nodeID = UUID.randomUUID().toString();

    ????private?boolean?isRunning = true;

    ????//? 最近的心跳時(shí)間

    ????private?long?lastHeartbeat;

    ????// 心跳間隔時(shí)間

    ????private?long?heartBeatInterval = 10?* 1000;

    ?

    ????public?void?run() {

    ????????try?{

    ????????????while?(isRunning) {

    ????????????????HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new?InetSocketAddress(serverIP, serverPort));

    ????????????????long?startTime = System.currentTimeMillis();

    ????????????????// 是否達(dá)到發(fā)送心跳的周期時(shí)間

    ????????????????if?(startTime - lastHeartbeat > heartBeatInterval) {

    ????????????????????System.out.println("send a heart beat");

    ????????????????????lastHeartbeat = startTime;

    ?

    ????????????????????HeartbeatEntity entity = new?HeartbeatEntity();

    ????????????????????entity.setTime(startTime);

    ????????????????????entity.setNodeID(nodeID);

    ?

    ????????????????????// 向服務(wù)器發(fā)送心跳,并返回需要執(zhí)行的命令

    ????????????????????Cmder cmds = handler.sendHeartBeat(entity);

    ?

    ????????????????????if?(!processCommand(cmds))

    ????????????????????????continue;

    ????????????????}

    ????????????}

    ????????} catch?(Exception e) {

    ????????????e.printStackTrace();

    ????????}

    ????}

    ?

    ????private?boolean?processCommand(Cmder cmds) {

    ????????// ...

    ????????return?true;

    ????}

    ?

    }

    ? ? ? 心跳包實(shí)體類:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    public?class?HeartbeatEntity?implements?Serializable {

    ?

    ????private?long?time;

    ????private?String nodeID;

    ????private?String error;

    ????private?Map<String, Object> info =?new?HashMap<String, Object>();

    ?

    ????public?String getNodeID() {

    ????????return?nodeID;

    ????}

    ?

    ????public?void?setNodeID(String nodeID) {

    ????????this.nodeID = nodeID;

    ????}

    ?

    ????public?String getError() {

    ????????return?error;

    ????}

    ?

    ????public?void?setError(String error) {

    ????????this.error = error;

    ????}

    ?

    ????public?Map<String, Object> getInfo() {

    ????????return?info;

    ????}

    ?

    ????public?void?setInfo(Map<String, Object> info) {

    ????????this.info = info;

    ????}

    ?

    ????public?long?getTime() {

    ????????return?time;

    ????}

    ?

    ????public?void?setTime(long?time) {

    ????????this.time = time;

    ????}

    }

      服務(wù)器接受心跳包返回的命令對(duì)象類:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    public?class?Cmder implements?Serializable {

    ?

    ????private?String nodeID;

    ????private?String error;

    ????private?Map<String, Object> info = new?HashMap<String, Object>();

    ?

    ????public?String getNodeID() {

    ????????return?nodeID;

    ????}

    ?

    ????public?void?setNodeID(String nodeID) {

    ????????this.nodeID = nodeID;

    ????}

    ?

    ????public?String getError() {

    ????????return?error;

    ????}

    ?

    ????public?void?setError(String error) {

    ????????this.error = error;

    ????}

    ?

    ????public?Map<String, Object> getInfo() {

    ????????return?info;

    ????}

    ?

    ????public?void?setInfo(Map<String, Object> info) {

    ????????this.info = info;

    ????}

    }

      RPC服務(wù)注冊(cè)中心:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    84

    85

    86

    87

    88

    89

    90

    91

    92

    93

    94

    95

    96

    97

    98

    99

    100

    101

    102

    103

    104

    105

    106

    107

    108

    109

    110

    111

    112

    113

    114

    115

    116

    117

    118

    119

    120

    121

    122

    123

    124

    125

    public?class?ServiceCenter {

    ?

    ????private?ExecutorService executor = Executors.newFixedThreadPool(20);

    ?

    ????private?final?ConcurrentHashMap<String, Class> serviceRegistry = new?ConcurrentHashMap<String, Class>();

    ?

    ????private?AtomicBoolean isRunning = new?AtomicBoolean(true);

    ?

    ????// 服務(wù)器監(jiān)聽端口

    ????private?int?port = 8089;

    ?

    ????// 心跳監(jiān)聽器

    ????HeartbeatLinstener linstener;

    ?

    ????// 單例模式

    ????private?static?class?SingleHolder {

    ????????private?static?final?ServiceCenter INSTANCE = new?ServiceCenter();

    ????}

    ?

    ????private?ServiceCenter() {

    ????}

    ?

    ????public?static?ServiceCenter getInstance() {

    ????????return?SingleHolder.INSTANCE;

    ????}

    ?

    ????public?void?register(Class serviceInterface, Class impl) {

    ????????System.out.println("regeist service "?+ serviceInterface.getName());

    ????????serviceRegistry.put(serviceInterface.getName(), impl);

    ????}

    ?

    ????public?void?start() throws?IOException {

    ????????ServerSocket server = new?ServerSocket();

    ????????server.bind(new?InetSocketAddress(port));

    ????????System.out.println("start server");

    ????????linstener = HeartbeatLinstener.getInstance();

    ????????System.out.println("start listen heart beat");

    ????????try?{

    ????????????while?(true) {

    ????????????????// 1.監(jiān)聽客戶端的TCP連接,接到TCP連接后將其封裝成task,由線程池執(zhí)行

    ????????????????executor.execute(new?ServiceTask(server.accept()));

    ????????????}

    ????????} finally?{

    ????????????server.close();

    ????????}

    ????}

    ?

    ????public?void?stop() {

    ????????isRunning.set(false);

    ????????executor.shutdown();

    ????}

    ?

    ?

    ????public?boolean?isRunning() {

    ????????return?isRunning.get();

    ????}

    ?

    ????public?int?getPort() {

    ????????return?port;

    ????}

    ?

    ????public?void?settPort(int?port) {

    ????????this.port = port;

    ????}

    ?

    ????public?ConcurrentHashMap<String, Class> getServiceRegistry() {

    ????????return?serviceRegistry;

    ????}

    ?

    ????private?class?ServiceTask implements?Runnable {

    ????????Socket clent = null;

    ?

    ????????public?ServiceTask(Socket client) {

    ????????????this.clent = client;

    ????????}

    ?

    ????????public?void?run() {

    ????????????ObjectInputStream input = null;

    ????????????ObjectOutputStream output = null;

    ????????????try?{

    ????????????????// 2.將客戶端發(fā)送的碼流反序列化成對(duì)象,反射調(diào)用服務(wù)實(shí)現(xiàn)者,獲取執(zhí)行結(jié)果

    ????????????????input = new?ObjectInputStream(clent.getInputStream());

    ????????????????String serviceName = input.readUTF();

    ????????????????String methodName = input.readUTF();

    ????????????????Class<?>[] parameterTypes = (Class<?>[]) input.readObject();

    ????????????????Object[] arguments = (Object[]) input.readObject();

    ????????????????Class serviceClass = serviceRegistry.get(serviceName);

    ????????????????if?(serviceClass == null) {

    ????????????????????throw?new?ClassNotFoundException(serviceName + " not found");

    ????????????????}

    ????????????????Method method = serviceClass.getMethod(methodName, parameterTypes);

    ????????????????Object result = method.invoke(serviceClass.newInstance(), arguments);

    ?

    ????????????????// 3.將執(zhí)行結(jié)果反序列化,通過socket發(fā)送給客戶端

    ????????????????output = new?ObjectOutputStream(clent.getOutputStream());

    ????????????????output.writeObject(result);

    ????????????} catch?(Exception e) {

    ????????????????e.printStackTrace();

    ????????????} finally?{

    ????????????????if?(output != null) {

    ????????????????????try?{

    ????????????????????????output.close();

    ????????????????????} catch?(IOException e) {

    ????????????????????????e.printStackTrace();

    ????????????????????}

    ????????????????}

    ????????????????if?(input != null) {

    ????????????????????try?{

    ????????????????????????input.close();

    ????????????????????} catch?(IOException e) {

    ????????????????????????e.printStackTrace();

    ????????????????????}

    ????????????????}

    ????????????????if?(clent != null) {

    ????????????????????try?{

    ????????????????????????clent.close();

    ????????????????????} catch?(IOException e) {

    ????????????????????????e.printStackTrace();

    ????????????????????}

    ????????????????}

    ????????????}

    ?

    ????????}

    ????}

    }

      心跳監(jiān)聽類:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    package?com.cang.heartbeat;

    ?

    import?java.io.IOException;

    import?java.io.ObjectInputStream;

    import?java.io.ObjectOutputStream;

    import?java.lang.reflect.Method;

    import?java.net.InetSocketAddress;

    import?java.net.ServerSocket;

    import?java.net.Socket;

    import?java.util.Iterator;

    import?java.util.Map;

    import?java.util.concurrent.ConcurrentHashMap;

    import?java.util.concurrent.ExecutorService;

    import?java.util.concurrent.Executors;

    import?java.util.concurrent.atomic.AtomicBoolean;

    ?

    /**

    ?* 心跳監(jiān)聽保存信息

    ?*

    ?* @author cang

    ?* @create_time 2016-09-28 11:40

    ?*/

    public?class?HeartbeatLinstener {

    ?

    ????private?ExecutorService executor = Executors.newFixedThreadPool(20);

    ?

    ????private?final?ConcurrentHashMap<String, Object> nodes = new?ConcurrentHashMap<String, Object>();

    ????private?final?ConcurrentHashMap<String, Long> nodeStatus = new?ConcurrentHashMap<String, Long>();

    ?

    ????private?long?timeout = 10?* 1000;

    ?

    ????// 服務(wù)器監(jiān)聽端口

    ????private?int?port = 8089;

    ?

    ????// 單例模式

    ????private?static?class?SingleHolder {

    ????????private?static?final?HeartbeatLinstener INSTANCE = new?HeartbeatLinstener();

    ????}

    ?

    ????private?HeartbeatLinstener() {

    ????}

    ?

    ????public?static?HeartbeatLinstener getInstance() {

    ????????return?SingleHolder.INSTANCE;

    ????}

    ?

    ????public?ConcurrentHashMap<String, Object> getNodes() {

    ????????return?nodes;

    ????}

    ?

    ????public?void?registerNode(String nodeId, Object nodeInfo) {

    ????????nodes.put(nodeId, nodeInfo);

    ????????nodeStatus.put(nodeId, System.currentTimeMillis());

    ????}

    ?

    ????public?void?removeNode(String nodeID) {

    ????????if?(nodes.containsKey(nodeID)) {

    ????????????nodes.remove(nodeID);

    ????????}

    ????}

    ?

    ????// 檢測節(jié)點(diǎn)是否有效

    ????public?boolean?checkNodeValid(String key) {

    ????????if?(!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return?false;

    ????????if?((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return?false;

    ????????return?true;

    ????}

    ?

    ????// 刪除所有失效節(jié)點(diǎn)

    ????public?void?removeInValidNode() {

    ????????Iterator<Map.Entry<String, Long>> it = nodeStatus.entrySet().iterator();

    ????????while?(it.hasNext()) {

    ????????????Map.Entry<String, Long> e = it.next();

    ????????????if?((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) {

    ????????????????nodes.remove(e.getKey());

    ????????????}

    ????????}

    ????}

    ?

    }

      心跳處理類接口:

    1

    2

    3

    public?interface?HeartbeatHandler {

    ????public?Cmder sendHeartBeat(HeartbeatEntity info);

    }

    ?? 心跳處理實(shí)現(xiàn)類:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    public?class?HeartbeatHandlerImpl implements?HeartbeatHandler {

    ????public?Cmder sendHeartBeat(HeartbeatEntity info) {

    ????????HeartbeatLinstener linstener = HeartbeatLinstener.getInstance();

    ?

    ????????// 添加節(jié)點(diǎn)

    ????????if?(!linstener.checkNodeValid(info.getNodeID())) {

    ????????????linstener.registerNode(info.getNodeID(), info);

    ????????}

    ?

    ????????// 其他操作

    ????????Cmder cmder = new?Cmder();

    ???????cmder.setNodeID(info.getNodeID());

    ????????// ...

    ?

    ????????System.out.println("current all the nodes: ");

    ????????Map<String, Object> nodes = linstener.getNodes();

    ????????for?(Map.Entry e : nodes.entrySet()) {

    ????????????System.out.println(e.getKey() + " : "?+ e.getValue());

    ????????}

    ????????System.out.println("hadle a heartbeat");

    ????????return?cmder;

    ????}

    }

      測試類:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    public?class?HeartbeatTest {

    ?

    ????public?static?void?main(String[] args) {

    ????????new?Thread(new?Runnable() {

    ????????????public?void?run() {

    ????????????????try?{

    ????????????????????ServiceCenter serviceServer = ServiceCenter.getInstance();

    ????????????????????serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class);

    ????????????????????serviceServer.start();

    ????????????????} catch?(IOException e) {

    ????????????????????e.printStackTrace();

    ????????????????}

    ????????????}

    ????????}).start();

    ????????Thread client1 = new?Thread(new?HeartbeatClient());

    ????????client1.start();

    ????????Thread client2 = new?Thread(new?HeartbeatClient());

    ????????client2.start();

    ????}

    }

    四、總結(jié)

    ??? 上面的代碼還有很多不足的地方,希望有空能進(jìn)行改善:

  • ?配置為硬編碼;
  • ?命令類Cmder沒有實(shí)際實(shí)現(xiàn),返回的Cmder對(duì)象沒有實(shí)際進(jìn)行處理;
  • ?? 其他小問題就暫時(shí)不管了,希望以后能重寫上面的代碼。

    超強(qiáng)干貨來襲 云風(fēng)專訪:近40年碼齡,通宵達(dá)旦的技術(shù)人生

    總結(jié)

    以上是生活随笔為你收集整理的Java实现心跳机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。