Java实现心跳机制
轉(zhuǎn)載自?https://www.cnblogs.com/codingexperience/p/5939059.html
一、心跳機(jī)制簡介
???? 在分布式系統(tǒng)中,分布在不同主機(jī)上的節(jié)點(diǎn)需要檢測其他節(jié)點(diǎn)的狀態(tài),如服務(wù)器節(jié)點(diǎn)需要檢測從節(jié)點(diǎn)是否失效。為了檢測對(duì)方節(jié)點(diǎn)的有效性,每隔固定時(shí)間就發(fā)送一個(gè)固定信息給對(duì)方,對(duì)方回復(fù)一個(gè)固定信息,如果長時(shí)間沒有收到對(duì)方的回復(fù),則斷開與對(duì)方的連接。
???? 發(fā)包方既可以是服務(wù)端,也可以是客戶端,這要看具體實(shí)現(xiàn)。因?yàn)槭敲扛艄潭〞r(shí)間發(fā)送一次,類似心跳,所以發(fā)送的固定信息稱為心跳包。心跳包一般為比較小的包,可根據(jù)具體實(shí)現(xiàn)。心跳包主要應(yīng)用于長連接的保持與短線鏈接。
????? 一般而言,應(yīng)該客戶端主動(dòng)向服務(wù)器發(fā)送心跳包,因?yàn)榉?wù)器向客戶端發(fā)送心跳包會(huì)影響服務(wù)器的性能。
二、心跳機(jī)制實(shí)現(xiàn)方式
??? 心跳機(jī)制有兩種實(shí)現(xiàn)方式,一種基于TCP自帶的心跳包,TCP的SO_KEEPALIVE選項(xiàng)可以,系統(tǒng)默認(rèn)的默認(rèn)跳幀頻率為2小時(shí),超過2小時(shí)后,本地的TCP 實(shí)現(xiàn)會(huì)發(fā)送一個(gè)數(shù)據(jù)包給遠(yuǎn)程的 Socket. 如果遠(yuǎn)程Socket 沒有發(fā)回響應(yīng), TCP實(shí)現(xiàn)就會(huì)持續(xù)嘗試 11 分鐘, 直到接收到響應(yīng)為止。 否則就會(huì)自動(dòng)斷開Socket連接。但TCP自帶的心跳包無法檢測比較敏感地知道對(duì)方的狀態(tài),默認(rèn)2小時(shí)的空閑時(shí)間,對(duì)于大多數(shù)的應(yīng)用而言太長了??梢允止ら_啟KeepAlive功能并設(shè)置合理的KeepAlive參數(shù)。
??? 另一種在應(yīng)用層自己進(jìn)行實(shí)現(xiàn),基本步驟如下:
三、Java實(shí)現(xiàn)心跳機(jī)制
??? 這里基于Java實(shí)現(xiàn)的簡單RPC框架實(shí)現(xiàn)心跳機(jī)制。Java實(shí)現(xiàn)代碼如下所示:
??? 心跳客戶端類:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | public?class?HeartbeatClient implements?Runnable { ? ????private?String serverIP = "127.0.0.1"; ????private?int?serverPort = 8089; ????private?String nodeID = UUID.randomUUID().toString(); ????private?boolean?isRunning = true; ????//? 最近的心跳時(shí)間 ????private?long?lastHeartbeat; ????// 心跳間隔時(shí)間 ????private?long?heartBeatInterval = 10?* 1000; ? ????public?void?run() { ????????try?{ ????????????while?(isRunning) { ????????????????HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new?InetSocketAddress(serverIP, serverPort)); ????????????????long?startTime = System.currentTimeMillis(); ????????????????// 是否達(dá)到發(fā)送心跳的周期時(shí)間 ????????????????if?(startTime - lastHeartbeat > heartBeatInterval) { ????????????????????System.out.println("send a heart beat"); ????????????????????lastHeartbeat = startTime; ? ????????????????????HeartbeatEntity entity = new?HeartbeatEntity(); ????????????????????entity.setTime(startTime); ????????????????????entity.setNodeID(nodeID); ? ????????????????????// 向服務(wù)器發(fā)送心跳,并返回需要執(zhí)行的命令 ????????????????????Cmder cmds = handler.sendHeartBeat(entity); ? ????????????????????if?(!processCommand(cmds)) ????????????????????????continue; ????????????????} ????????????} ????????} catch?(Exception e) { ????????????e.printStackTrace(); ????????} ????} ? ????private?boolean?processCommand(Cmder cmds) { ????????// ... ????????return?true; ????} ? } |
? ? ? 心跳包實(shí)體類:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | public?class?HeartbeatEntity?implements?Serializable { ? ????private?long?time; ????private?String nodeID; ????private?String error; ????private?Map<String, Object> info =?new?HashMap<String, Object>(); ? ????public?String getNodeID() { ????????return?nodeID; ????} ? ????public?void?setNodeID(String nodeID) { ????????this.nodeID = nodeID; ????} ? ????public?String getError() { ????????return?error; ????} ? ????public?void?setError(String error) { ????????this.error = error; ????} ? ????public?Map<String, Object> getInfo() { ????????return?info; ????} ? ????public?void?setInfo(Map<String, Object> info) { ????????this.info = info; ????} ? ????public?long?getTime() { ????????return?time; ????} ? ????public?void?setTime(long?time) { ????????this.time = time; ????} } |
服務(wù)器接受心跳包返回的命令對(duì)象類:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public?class?Cmder implements?Serializable { ? ????private?String nodeID; ????private?String error; ????private?Map<String, Object> info = new?HashMap<String, Object>(); ? ????public?String getNodeID() { ????????return?nodeID; ????} ? ????public?void?setNodeID(String nodeID) { ????????this.nodeID = nodeID; ????} ? ????public?String getError() { ????????return?error; ????} ? ????public?void?setError(String error) { ????????this.error = error; ????} ? ????public?Map<String, Object> getInfo() { ????????return?info; ????} ? ????public?void?setInfo(Map<String, Object> info) { ????????this.info = info; ????} } |
RPC服務(wù)注冊(cè)中心:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | public?class?ServiceCenter { ? ????private?ExecutorService executor = Executors.newFixedThreadPool(20); ? ????private?final?ConcurrentHashMap<String, Class> serviceRegistry = new?ConcurrentHashMap<String, Class>(); ? ????private?AtomicBoolean isRunning = new?AtomicBoolean(true); ? ????// 服務(wù)器監(jiān)聽端口 ????private?int?port = 8089; ? ????// 心跳監(jiān)聽器 ????HeartbeatLinstener linstener; ? ????// 單例模式 ????private?static?class?SingleHolder { ????????private?static?final?ServiceCenter INSTANCE = new?ServiceCenter(); ????} ? ????private?ServiceCenter() { ????} ? ????public?static?ServiceCenter getInstance() { ????????return?SingleHolder.INSTANCE; ????} ? ????public?void?register(Class serviceInterface, Class impl) { ????????System.out.println("regeist service "?+ serviceInterface.getName()); ????????serviceRegistry.put(serviceInterface.getName(), impl); ????} ? ????public?void?start() throws?IOException { ????????ServerSocket server = new?ServerSocket(); ????????server.bind(new?InetSocketAddress(port)); ????????System.out.println("start server"); ????????linstener = HeartbeatLinstener.getInstance(); ????????System.out.println("start listen heart beat"); ????????try?{ ????????????while?(true) { ????????????????// 1.監(jiān)聽客戶端的TCP連接,接到TCP連接后將其封裝成task,由線程池執(zhí)行 ????????????????executor.execute(new?ServiceTask(server.accept())); ????????????} ????????} finally?{ ????????????server.close(); ????????} ????} ? ????public?void?stop() { ????????isRunning.set(false); ????????executor.shutdown(); ????} ? ? ????public?boolean?isRunning() { ????????return?isRunning.get(); ????} ? ????public?int?getPort() { ????????return?port; ????} ? ????public?void?settPort(int?port) { ????????this.port = port; ????} ? ????public?ConcurrentHashMap<String, Class> getServiceRegistry() { ????????return?serviceRegistry; ????} ? ????private?class?ServiceTask implements?Runnable { ????????Socket clent = null; ? ????????public?ServiceTask(Socket client) { ????????????this.clent = client; ????????} ? ????????public?void?run() { ????????????ObjectInputStream input = null; ????????????ObjectOutputStream output = null; ????????????try?{ ????????????????// 2.將客戶端發(fā)送的碼流反序列化成對(duì)象,反射調(diào)用服務(wù)實(shí)現(xiàn)者,獲取執(zhí)行結(jié)果 ????????????????input = new?ObjectInputStream(clent.getInputStream()); ????????????????String serviceName = input.readUTF(); ????????????????String methodName = input.readUTF(); ????????????????Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); ????????????????Object[] arguments = (Object[]) input.readObject(); ????????????????Class serviceClass = serviceRegistry.get(serviceName); ????????????????if?(serviceClass == null) { ????????????????????throw?new?ClassNotFoundException(serviceName + " not found"); ????????????????} ????????????????Method method = serviceClass.getMethod(methodName, parameterTypes); ????????????????Object result = method.invoke(serviceClass.newInstance(), arguments); ? ????????????????// 3.將執(zhí)行結(jié)果反序列化,通過socket發(fā)送給客戶端 ????????????????output = new?ObjectOutputStream(clent.getOutputStream()); ????????????????output.writeObject(result); ????????????} catch?(Exception e) { ????????????????e.printStackTrace(); ????????????} finally?{ ????????????????if?(output != null) { ????????????????????try?{ ????????????????????????output.close(); ????????????????????} catch?(IOException e) { ????????????????????????e.printStackTrace(); ????????????????????} ????????????????} ????????????????if?(input != null) { ????????????????????try?{ ????????????????????????input.close(); ????????????????????} catch?(IOException e) { ????????????????????????e.printStackTrace(); ????????????????????} ????????????????} ????????????????if?(clent != null) { ????????????????????try?{ ????????????????????????clent.close(); ????????????????????} catch?(IOException e) { ????????????????????????e.printStackTrace(); ????????????????????} ????????????????} ????????????} ? ????????} ????} } |
心跳監(jiān)聽類:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | package?com.cang.heartbeat; ? import?java.io.IOException; import?java.io.ObjectInputStream; import?java.io.ObjectOutputStream; import?java.lang.reflect.Method; import?java.net.InetSocketAddress; import?java.net.ServerSocket; import?java.net.Socket; import?java.util.Iterator; import?java.util.Map; import?java.util.concurrent.ConcurrentHashMap; import?java.util.concurrent.ExecutorService; import?java.util.concurrent.Executors; import?java.util.concurrent.atomic.AtomicBoolean; ? /** ?* 心跳監(jiān)聽保存信息 ?* ?* @author cang ?* @create_time 2016-09-28 11:40 ?*/ public?class?HeartbeatLinstener { ? ????private?ExecutorService executor = Executors.newFixedThreadPool(20); ? ????private?final?ConcurrentHashMap<String, Object> nodes = new?ConcurrentHashMap<String, Object>(); ????private?final?ConcurrentHashMap<String, Long> nodeStatus = new?ConcurrentHashMap<String, Long>(); ? ????private?long?timeout = 10?* 1000; ? ????// 服務(wù)器監(jiān)聽端口 ????private?int?port = 8089; ? ????// 單例模式 ????private?static?class?SingleHolder { ????????private?static?final?HeartbeatLinstener INSTANCE = new?HeartbeatLinstener(); ????} ? ????private?HeartbeatLinstener() { ????} ? ????public?static?HeartbeatLinstener getInstance() { ????????return?SingleHolder.INSTANCE; ????} ? ????public?ConcurrentHashMap<String, Object> getNodes() { ????????return?nodes; ????} ? ????public?void?registerNode(String nodeId, Object nodeInfo) { ????????nodes.put(nodeId, nodeInfo); ????????nodeStatus.put(nodeId, System.currentTimeMillis()); ????} ? ????public?void?removeNode(String nodeID) { ????????if?(nodes.containsKey(nodeID)) { ????????????nodes.remove(nodeID); ????????} ????} ? ????// 檢測節(jié)點(diǎn)是否有效 ????public?boolean?checkNodeValid(String key) { ????????if?(!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return?false; ????????if?((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return?false; ????????return?true; ????} ? ????// 刪除所有失效節(jié)點(diǎn) ????public?void?removeInValidNode() { ????????Iterator<Map.Entry<String, Long>> it = nodeStatus.entrySet().iterator(); ????????while?(it.hasNext()) { ????????????Map.Entry<String, Long> e = it.next(); ????????????if?((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) { ????????????????nodes.remove(e.getKey()); ????????????} ????????} ????} ? } |
心跳處理類接口:
| 1 2 3 | public?interface?HeartbeatHandler { ????public?Cmder sendHeartBeat(HeartbeatEntity info); } |
?? 心跳處理實(shí)現(xiàn)類:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public?class?HeartbeatHandlerImpl implements?HeartbeatHandler { ????public?Cmder sendHeartBeat(HeartbeatEntity info) { ????????HeartbeatLinstener linstener = HeartbeatLinstener.getInstance(); ? ????????// 添加節(jié)點(diǎn) ????????if?(!linstener.checkNodeValid(info.getNodeID())) { ????????????linstener.registerNode(info.getNodeID(), info); ????????} ? ????????// 其他操作 ????????Cmder cmder = new?Cmder(); ???????cmder.setNodeID(info.getNodeID()); ????????// ... ? ????????System.out.println("current all the nodes: "); ????????Map<String, Object> nodes = linstener.getNodes(); ????????for?(Map.Entry e : nodes.entrySet()) { ????????????System.out.println(e.getKey() + " : "?+ e.getValue()); ????????} ????????System.out.println("hadle a heartbeat"); ????????return?cmder; ????} } |
測試類:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | public?class?HeartbeatTest { ? ????public?static?void?main(String[] args) { ????????new?Thread(new?Runnable() { ????????????public?void?run() { ????????????????try?{ ????????????????????ServiceCenter serviceServer = ServiceCenter.getInstance(); ????????????????????serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class); ????????????????????serviceServer.start(); ????????????????} catch?(IOException e) { ????????????????????e.printStackTrace(); ????????????????} ????????????} ????????}).start(); ????????Thread client1 = new?Thread(new?HeartbeatClient()); ????????client1.start(); ????????Thread client2 = new?Thread(new?HeartbeatClient()); ????????client2.start(); ????} } |
四、總結(jié)
??? 上面的代碼還有很多不足的地方,希望有空能進(jìn)行改善:
?? 其他小問題就暫時(shí)不管了,希望以后能重寫上面的代碼。
超強(qiáng)干貨來襲 云風(fēng)專訪:近40年碼齡,通宵達(dá)旦的技術(shù)人生總結(jié)
以上是生活随笔為你收集整理的Java实现心跳机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: lombok常用注解整理
- 下一篇: Java HttpClient使用小结