websocket实现消息实时更新(亲测,2021/11/9)
生活随笔
收集整理的這篇文章主要介紹了
websocket实现消息实时更新(亲测,2021/11/9)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
前端
```html //這是去獲取未讀消息的條數(shù)(這個函數(shù)是自定義的) function getNotifyInfo() {$.ajax({cache: true,type: "get",url: ctx + "tbs/notice/unreadMessage",async: false,success: function (result) {if (result.code == 0) {if(result.data > 0){//設(shè)置在小鈴鐺上面$("#noticeNum").text(result.data);$("#noticeNum").show();$("#noticeDetail").text([[#{tbs.meaasge.notice.unread}]].format(result.data));}else{$("#noticeNum").text("");$("#noticeNum").hide();$("#noticeDetail").text([[#{tbs.meaasge.notice.read}]]);}}},error: function (error) {}});}var websocket;//避免重復(fù)連接var lockReconnect = false, tt;/*** websocket啟動*/function createWebSocket() {try {var userId = $("#userId").val();var url = $("#url").val() + "/websocket/message/" + userId;if ('WebSocket' in window) {websocket = new WebSocket(url);init();} else if ('MozWebSocket' in window) {websocket = new MozWebSocket(url);init();} else {//websocket = new SockJS(url);}} catch (e) {console.log('catch' + e);reconnect();}}function init() {//連接成功建立的回調(diào)方法websocket.onopen = function (event) {//console.log("WebSocket:onopen");//心跳檢測重置heartCheck.reset().start();};//接收到消息的回調(diào)方法websocket.onmessage = function (event) {//console.log("WebSocket:onmessage,", event.data);heartCheck.reset().start();if(event.data != "ok"){getNotifyInfo();}};//連接發(fā)生錯誤的回調(diào)方法websocket.onerror = function (event) {//console.log("WebSocket:error");reconnect();};//連接關(guān)閉的回調(diào)方法websocket.onclose = function (event) {//console.log("WebSocket:closed");heartCheck.reset();//心跳檢測reconnect();};//監(jiān)聽窗口關(guān)閉事件,當(dāng)窗口關(guān)閉時,主動去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口,server端會拋異常。window.onbeforeunload = function () {websocket.close();};//關(guān)閉連接function closeWebSocket() {websocket.close();}//發(fā)送消息function send(message) {websocket.send(message);}}/*** websocket重連*/function reconnect() {if (lockReconnect) {return;}lockReconnect = true;tt && clearTimeout(tt);tt = setTimeout(function () {//console.log('reconnect...');lockReconnect = false;createWebSocket();}, 10000);}/*** websocket心跳檢測*/var heartCheck = {timeout: 60000,timeoutObj: null,serverTimeoutObj: null,reset: function () {clearTimeout(this.timeoutObj);clearTimeout(this.serverTimeoutObj);return this;},start: function () {var self = this;this.timeoutObj && clearTimeout(this.timeoutObj);this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);this.timeoutObj = setTimeout(function () {//這里發(fā)送一個心跳,后端收到后,返回一個心跳消息,//onmessage拿到返回的心跳就說明連接正常websocket.send("ping");//console.log('ping');self.serverTimeoutObj = setTimeout(function () { // 如果超過一定時間還沒重置,說明后端主動斷開了websocket.close();//如果onclose會執(zhí)行reconnect,我們執(zhí)行 websocket.close()就行了.如果直接執(zhí)行 reconnect 會觸發(fā)onclose導(dǎo)致重連兩次}, self.timeout)}, this.timeout)}};java
controller
import java.util.concurrent.Semaphore; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint;import com.dcdzsoft.tbs.utils.SemaphoreUtils; import com.dcdzsoft.tbs.utils.WebSocketUsers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;/*** websocket 消息處理*/ @Component @ServerEndpoint("/websocket/message/{uid}") public class WebSocketServer {/*** WebSocketServer 日志控制器*/private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);/*** 默認(rèn)最多允許同時在線人數(shù)100000*/public static int socketMaxOnlineCount = 100000;private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);/*** 連接建立成功調(diào)用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam("uid") String uid) throws Exception {boolean semaphoreFlag = false;// 嘗試獲取信號量semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);if (!semaphoreFlag) {// 未獲取到信號量WebSocketUsers.sendMessageToUserByText(session, "Online Limit:" + socketMaxOnlineCount);session.close();} else {// 添加用戶WebSocketUsers.put(uid, session);LOGGER.info("\n online number - {}", WebSocketUsers.getUsers().size());WebSocketUsers.sendMessageToUserByText(session, "success");}}/*** 連接關(guān)閉時處理*/@OnClosepublic void onClose(Session session) {LOGGER.info("\n close connect - {}", session);// 移除用戶WebSocketUsers.remove(session);// 獲取到信號量則需釋放SemaphoreUtils.release(socketSemaphore);}/*** 拋出異常時處理*/@OnErrorpublic void onError(Session session, Throwable exception) throws Exception {if (session.isOpen()) {// 關(guān)閉連接session.close();}LOGGER.info("\n connect error - {}", exception);// 移出用戶WebSocketUsers.remove(session);// 獲取到信號量則需釋放SemaphoreUtils.release(socketSemaphore);}/*** 服務(wù)器接收到客戶端消息時調(diào)用的方法*/@OnMessagepublic void onMessage(String message, Session session) {WebSocketUsers.sendMessageToUserByText(session, "ok");} }WebSocketConfig
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** websocket 配置* @author ruoyi*/ @Configuration public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();} }WebSocketUsers
import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.websocket.Session; import com.dcdzsoft.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** websocket 客戶端用戶集* @author ruoyi*/ public class WebSocketUsers {/*** WebSocketUsers 日志控制器*/private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);/*** 用戶集*/private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();/*** 存儲用戶* @param key 唯一鍵* @param session 用戶信息*/public static void put(String key, Session session) {USERS.put(key, session);}/*** 移除用戶* @param session 用戶信息* @return 移除結(jié)果*/public static boolean remove(Session session) {String key = null;boolean flag = USERS.containsValue(session);if (flag) {Set<Map.Entry<String, Session>> entries = USERS.entrySet();for (Map.Entry<String, Session> entry : entries) {Session value = entry.getValue();if (value.equals(session)) {key = entry.getKey();break;}}} else {return true;}return remove(key);}/*** 移出用戶** @param key 鍵*/public static boolean remove(String key) {Session remove = USERS.remove(key);if (remove != null) {boolean containsValue = USERS.containsValue(remove);return containsValue;} else {return true;}}/*** 獲取在線用戶列表** @return 返回用戶集合*/public static Map<String, Session> getUsers() {return USERS;}/*** 獲取在線用戶列表* @return 返回用戶集合*/public static Session getUserSession(String uid) {return USERS.get(uid);}/*** 群發(fā)消息文本消息* @param message 消息內(nèi)容*/public static void sendMessageToUsersByText(String message) {Collection<Session> values = USERS.values();for (Session value : values) {sendMessageToUserByText(value, message);}}/*** 發(fā)送文本消息** @param message 消息內(nèi)容*/public static void sendMessageToUserByText(Session session, String message) {if (session != null) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {LOGGER.error("\n[msg error]", e);}} else {LOGGER.info("\n[already offline]");}}/*** 發(fā)送文本消息* @param message 消息內(nèi)容*/public static void sendMessageToUserByText(String uid, String message) {try {Session session = getUserSession(uid);if(session == null){LOGGER.info("\n[already offline]");}else{session.getBasicRemote().sendText(message);}} catch (IOException e) {LOGGER.error("\n[msg error]", e);}} }SemaphoreUtils
import java.util.concurrent.Semaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /*** 信號量相關(guān)處理** @author ruoyi*/ public class SemaphoreUtils {/*** SemaphoreUtils 日志控制器*/private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);/*** 獲取信號量* @param semaphore* @return*/public static boolean tryAcquire(Semaphore semaphore) {boolean flag = false;try {flag = semaphore.tryAcquire();} catch (Exception e) {LOGGER.error("獲取信號量異常", e);}return flag;}/*** 釋放信號量* @param semaphore*/public static void release(Semaphore semaphore) {try {semaphore.release();} catch (Exception e) {LOGGER.error("釋放信號量異常", e);}} }哪里需要,哪里調(diào) WebSocketUsers.sendMessageToUserByText(userId.toString(), “internalMessage”);這個方法來告訴前端,你執(zhí)行一下獲取數(shù)量的函數(shù)
總結(jié)
以上是生活随笔為你收集整理的websocket实现消息实时更新(亲测,2021/11/9)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【前端html页面练习】还原英雄联盟客户
- 下一篇: NFC源码分析之Handover