日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java tcp 心跳机制_Java实现心跳机制的方法

發布時間:2024/1/1 java 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java tcp 心跳机制_Java实现心跳机制的方法 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、心跳機制簡介

在分布式系統中,分布在不同主機上的節點需要檢測其他節點的狀態,如服務器節點需要檢測從節點是否失效。為了檢測對方節點的有效性,每隔固定時間就發送一個固定信息給對方,對方回復一個固定信息,如果長時間沒有收到對方的回復,則斷開與對方的連接。

發包方既可以是服務端,也可以是客戶端,這要看具體實現。因為是每隔固定時間發送一次,類似心跳,所以發送的固定信息稱為心跳包。心跳包一般為比較小的包,可根據具體實現。心跳包主要應用于長連接的保持與短線鏈接。

一般而言,應該客戶端主動向服務器發送心跳包,因為服務器向客戶端發送心跳包會影響服務器的性能。

二、心跳機制實現方式

心跳機制有兩種實現方式,一種基于TCP自帶的心跳包,TCP的SO_KEEPALIVE選項可以,系統默認的默認跳幀頻率為2小時,超過2小時后,本地的TCP 實現會發送一個數據包給遠程的 Socket. 如果遠程Socket 沒有發回響應, TCP實現就會持續嘗試 11 分鐘, 直到接收到響應為止。 否則就會自動斷開Socket連接。但TCP自帶的心跳包無法檢測比較敏感地知道對方的狀態,默認2小時的空閑時間,對于大多數的應用而言太長了。可以手工開啟KeepAlive功能并設置合理的KeepAlive參數。

另一種在應用層自己進行實現,基本步驟如下:

Client使用定時器,不斷發送心跳;

Server收到心跳后,回復一個包;

Server為每個Client啟動超時定時器,如果在指定時間內沒有收到Client的心跳包,則Client失效。

三、Java實現心跳機制

這里基于Java實現的簡單RPC框架實現心跳機制。Java實現代碼如下所示:

心跳客戶端類:

public class HeartbeatClient implements Runnable {

private String serverIP = "127.0.0.1";

private int serverPort = 8089;

private String nodeID = UUID.randomUUID().toString();

private boolean isRunning = true;

// 最近的心跳時間

private long lastHeartbeat;

// 心跳間隔時間

private long heartBeatInterval = 10 * 1000;

public void run() {

try {

while (isRunning) {

HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new InetSocketAddress(serverIP, serverPort));

long startTime = System.currentTimeMillis();

// 是否達到發送心跳的周期時間

if (startTime - lastHeartbeat > heartBeatInterval) {

System.out.println("send a heart beat");

lastHeartbeat = startTime;

HeartbeatEntity entity = new HeartbeatEntity();

entity.setTime(startTime);

entity.setNodeID(nodeID);

// 向服務器發送心跳,并返回需要執行的命令

Cmder cmds = handler.sendHeartBeat(entity);

if (!processCommand(cmds))

continue;

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

private boolean processCommand(Cmder cmds) {

// ...

return true;

}

}

心跳包實體類:

public class HeartbeatEntity implements Serializable {

private long time;

private String nodeID;

private String error;

private Map info = new HashMap();

public String getNodeID() {

return nodeID;

}

public void setNodeID(String nodeID) {

this.nodeID = nodeID;

}

public String getError() {

return error;

}

public void setError(String error) {

this.error = error;

}

public Map getInfo() {

return info;

}

public void setInfo(Map info) {

this.info = info;

}

public long getTime() {

return time;

}

public void setTime(long time) {

this.time = time;

}

}

服務器接受心跳包返回的命令對象類:

public class Cmder implements Serializable {

private String nodeID;

private String error;

private Map info = new HashMap();

public String getNodeID() {

return nodeID;

}

public void setNodeID(String nodeID) {

this.nodeID = nodeID;

}

public String getError() {

return error;

}

public void setError(String error) {

this.error = error;

}

public Map getInfo() {

return info;

}

public void setInfo(Map info) {

this.info = info;

}

}

RPC服務注冊中心:

public class ServiceCenter {

private ExecutorService executor = Executors.newFixedThreadPool(20);

private final ConcurrentHashMap serviceRegistry = new ConcurrentHashMap();

private AtomicBoolean isRunning = new AtomicBoolean(true);

// 服務器監聽端口

private int port = 8089;

// 心跳監聽器

HeartbeatLinstener linstener;

// 單例模式

private static class SingleHolder {

private static final ServiceCenter INSTANCE = new ServiceCenter();

}

private ServiceCenter() {

}

public static ServiceCenter getInstance() {

return SingleHolder.INSTANCE;

}

public void register(Class serviceInterface, Class impl) {

System.out.println("regeist service " + serviceInterface.getName());

serviceRegistry.put(serviceInterface.getName(), impl);

}

public void start() throws IOException {

ServerSocket server = new ServerSocket();

server.bind(new InetSocketAddress(port));

System.out.println("start server");

linstener = HeartbeatLinstener.getInstance();

System.out.println("start listen heart beat");

try {

while (true) {

// 1.監聽客戶端的TCP連接,接到TCP連接后將其封裝成task,由線程池執行

executor.execute(new ServiceTask(server.accept()));

}

} finally {

server.close();

}

}

public void stop() {

isRunning.set(false);

executor.shutdown();

}

public boolean isRunning() {

return isRunning.get();

}

public int getPort() {

return port;

}

public void settPort(int port) {

this.port = port;

}

public ConcurrentHashMap getServiceRegistry() {

return serviceRegistry;

}

private class ServiceTask implements Runnable {

Socket clent = null;

public ServiceTask(Socket client) {

this.clent = client;

}

public void run() {

ObjectInputStream input = null;

ObjectOutputStream output = null;

try {

// 2.將客戶端發送的碼流反序列化成對象,反射調用服務實現者,獲取執行結果

input = new ObjectInputStream(clent.getInputStream());

String serviceName = input.readUTF();

String methodName = input.readUTF();

Class>[] parameterTypes = (Class>[]) input.readObject();

Object[] arguments = (Object[]) input.readObject();

Class serviceClass = serviceRegistry.get(serviceName);

if (serviceClass == null) {

throw new ClassNotFoundException(serviceName + " not found");

}

Method method = serviceClass.getMethod(methodName, parameterTypes);

Object result = method.invoke(serviceClass.newInstance(), arguments);

// 3.將執行結果反序列化,通過socket發送給客戶端

output = new ObjectOutputStream(clent.getOutputStream());

output.writeObject(result);

} catch (Exception e) {

e.printStackTrace();

} finally {

if (output != null) {

try {

output.close();

} catch (IOException e) {

e.printStackTrace();

}

}

if (input != null) {

try {

input.close();

} catch (IOException e) {

e.printStackTrace();

}

}

if (clent != null) {

try {

clent.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

}

}

心跳監聽類:

package com.cang.heartbeat;

import java.io.IOException;

import java.io.ObjectInputStream;

import java.io.ObjectOutputStream;

import java.lang.reflect.Method;

import java.net.InetSocketAddress;

import java.net.ServerSocket;

import java.net.Socket;

import java.util.Iterator;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.atomic.AtomicBoolean;

/**

* 心跳監聽保存信息

*

* @author cang

* @create_time 2016-09-28 11:40

*/

public class HeartbeatLinstener {

private ExecutorService executor = Executors.newFixedThreadPool(20);

private final ConcurrentHashMap nodes = new ConcurrentHashMap();

private final ConcurrentHashMap nodeStatus = new ConcurrentHashMap();

private long timeout = 10 * 1000;

// 服務器監聽端口

private int port = 8089;

// 單例模式

private static class SingleHolder {

private static final HeartbeatLinstener INSTANCE = new HeartbeatLinstener();

}

private HeartbeatLinstener() {

}

public static HeartbeatLinstener getInstance() {

return SingleHolder.INSTANCE;

}

public ConcurrentHashMap getNodes() {

return nodes;

}

public void registerNode(String nodeId, Object nodeInfo) {

nodes.put(nodeId, nodeInfo);

nodeStatus.put(nodeId, System.currentTimeMillis());

}

public void removeNode(String nodeID) {

if (nodes.containsKey(nodeID)) {

nodes.remove(nodeID);

}

}

// 檢測節點是否有效

public boolean checkNodeValid(String key) {

if (!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return false;

if ((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return false;

return true;

}

// 刪除所有失效節點

public void removeInValidNode() {

Iterator> it = nodeStatus.entrySet().iterator();

while (it.hasNext()) {

Map.Entry e = it.next();

if ((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) {

nodes.remove(e.getKey());

}

}

}

}

心跳處理類接口:

public interface HeartbeatHandler {

public Cmder sendHeartBeat(HeartbeatEntity info);

}

心跳處理實現類:

public class HeartbeatHandlerImpl implements HeartbeatHandler {

public Cmder sendHeartBeat(HeartbeatEntity info) {

HeartbeatLinstener linstener = HeartbeatLinstener.getInstance();

// 添加節點

if (!linstener.checkNodeValid(info.getNodeID())) {

linstener.registerNode(info.getNodeID(), info);

}

// 其他操作

Cmder cmder = new Cmder();

cmder.setNodeID(info.getNodeID());

// ...

System.out.println("current all the nodes: ");

Map nodes = linstener.getNodes();

for (Map.Entry e : nodes.entrySet()) {

System.out.println(e.getKey() + " : " + e.getValue());

}

System.out.println("hadle a heartbeat");

return cmder;

}

}

測試類:

public class HeartbeatTest {

public static void main(String[] args) {

new Thread(new Runnable() {

public void run() {

try {

ServiceCenter serviceServer = ServiceCenter.getInstance();

serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class);

serviceServer.start();

} catch (IOException e) {

e.printStackTrace();

}

}

}).start();

Thread client1 = new Thread(new HeartbeatClient());

client1.start();

Thread client2 = new Thread(new HeartbeatClient());

client2.start();

}

}

四、總結

上面的代碼還有很多不足的地方,希望有空能進行改善:

配置為硬編碼;

命令類Cmder沒有實際實現,返回的Cmder對象沒有實際進行處理;

其他小問題就暫時不管了,希望以后能重寫上面的代碼。

以上就是Java實現心跳機制的方法的詳細內容,更多關于Java實現心跳機制的資料請關注免費資源網其它相關文章!

總結

以上是生活随笔為你收集整理的java tcp 心跳机制_Java实现心跳机制的方法的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。