日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java连接imserver_java后端IM消息推送服务开发——协议

發布時間:2023/12/10 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java连接imserver_java后端IM消息推送服务开发——协议 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近在一家saas企業使用Mqtt開發IM消息推送服務,把開發中的一些問題記錄下來,項目仍在商用中,完整的消息服務包括4個模塊---協議protocol,信令Signal,規則Rule,狀態Status,這個主題主要是協議protocol部分。

主要技術涉及到MongoDB,webservice,httpclient,Mqtt等

protocol分為四個模塊類來實現,當然這是為了以后的擴展性比較好

首先看一下我們的主類,主要是mqtt基礎方法的一個框架

public class MqttProtocol

{

private static Logger logger = Logger.getLogger(MqttProtocol.class);

public static final String HOST = "tcp://xx.xx.xx.xx:1883";

private static final String CLIENTID = "yyyy";

private MqttClient client;

private MqttConnectOptions options = new MqttConnectOptions();

//private String userName = "admin";

//private String passWord = "public";

public MqttMessage message;

private PushCallback callback;

/**

* 用于初始化mqttclient客戶端,設置回調函數,同時連接mqtt服務器

* @throws MqttException

*/

public MqttProtocol() throws MqttException

{

//MemoryPersistence設置clientid的保存形式,默認為以內存保存

client = new MqttClient(HOST, CLIENTID, new MemoryPersistence());

callback = new PushCallback();

client.setCallback(callback);

options = new MqttConnectOptions();

options.setCleanSession(false);

options.setKeepAliveInterval(60);

connect();

}

/**

* 連接mqtt消息服務器,同時設置了斷開重連的功能,主要是為了高可用性考慮,在斷網服務器崩潰時候我們的程序仍然不會終止

*/

private void connect()

{

SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);

System.out.println(sdf.format(System.currentTimeMillis()));

boolean tryConnecting = true;

while (tryConnecting) {

try {

client.connect(options);

} catch (Exception e1) {

System.out.println("Connection attempt failed with '"+e1.getCause()+

"'. Retrying.");

}

if (client.isConnected()) {

System.out.println("Connected.");

tryConnecting = false;

} else {

pause();

}

}

}

private void pause() {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

// Error handling goes here...

}

}

/**

*

* @param topic

* @param qos

* @throws MqttPersistenceException

* @throws MqttException

* 訂閱相關主題

*/

public void subscribe(String topic , int qos) throws MqttPersistenceException,

MqttException

{

client.subscribe(topic, qos);

}

/**

*

* @throws MqttPersistenceException

* @throws MqttException

* 斷開連接服務器

*/

public void disconnect() throws MqttPersistenceException,

MqttException

{

client.disconnect();

}

/**

*

* @author binshi

*實現mqttcallback接口,主要用于接收消息后的處理方法

*/

private class PushCallback implements MqttCallback {

/**

* 斷開后 系統會自動調用這個函數,同時在這個函數里進行重連操作

*/

public void connectionLost(Throwable cause) {

// 連接丟失后,一般在這里面進行重連

System.out.println("連接斷開,可以做重連");

connect();

try {

subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);

} catch (MqttPersistenceException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (MqttException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

/**

* 消息成功傳送后,系統會自動調用此函數,表明成功向topic發送消息

*/

@Override

public void deliveryComplete(IMqttDeliveryToken arg0) {

// TODO Auto-generated method stub

System.out.println("deliveryComplete---------" + arg0.isComplete());

}

/**

* 連接mongo數據庫,返回關于具體collection的Mongocollection

* @param collectionname

* @return

*/

public void messageArrived(String topic, MqttMessage message) throws Exception

{

System.out.println(topic);

SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);

System.out.println(sdf.format(System.currentTimeMillis()));

System.out.println("接收消息主題 : " + topic);

System.out.println("接收消息Qos : " + message.getQos());

System.out.println("接收消息內容 : " + new String(message.getPayload()));

//1 抽取事件信令消息

String messagejudge=new String(message.getPayload());

System.out.println("忽略所有robot消息以及offline離線消息");

JSONObject jo=new JSONObject();

try {

jo=JSONObject.fromObject(messagejudge);

} catch (Exception e) {

e.printStackTrace();

}

String from=jo.getString("from");

System.out.println("獲得from"+from);

System.out.println("確定消息是否包含offline,如果包含取得offline,為1就不處理");

String offline=null;

if(messagejudge.contains("offline"))

{

offline=jo.getString("offline");

}

if((offline==null)&&(!from.contains("robot")))

{

System.out.println("處理非系統消息和非離線消息");

String type=jo.getString("type");

System.out.println("獲得type"+type);

if(type.equals("shakehand"))

{

System.out.println("處理shakehand消息");

String admin="doyounkowwhy";

if(jo.toString().contains("admin"))

{

admin=jo.getString("admin");

}

System.out.println("取得admin 如果為1定義為客服,否則為普通用戶 admin為"+admin);

if(admin.equals("1"))

{

System.out.println("處理客服握手消息");

System.out.println("發送握手成功消息");

MqttTopic retopic=client.getTopic(topic);

MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic);

System.out.println("向客戶端發送離線未接收的消息");

String convid=jo.getString("convid");

String database="dolina";

String collection="messages";

MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection);

}

else

{

System.out.println("處理普通用戶的握手消息");

String appid=jo.getString("appid");

String pageid=jo.getString("pageid");

String convid=jo.getString("convid");

MqttTopic retopic=client.getTopic(topic);

MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic);

}

}

else if(type.equals("text")||type.equals("image"))

{

System.out.println("處理圖片和文字消息");

String tmpindex=jo.getString("tmpindex");

String convid=jo.getString("convid");

MqttTopic retopic=client.getTopic(topic);

MsgOperation.getTextMsg( tmpindex, from, convid, retopic);

System.out.println("保存圖片文字消息");

String database="dolina";

String collection="messages";

MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo);

}

else if(type.equals("ack"))

{

System.out.println("處理ack消息");

String tmpindex=jo.getString("tmpindex");

String convid=jo.getString("convid");

String database="dolina";

String collection="messages";

MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection);

}

}

}

}

/**

*

* @param args

* @throws MqttException

* 整個工程從這里開始執行,生成可執行jar包,這個設置為主類。

*/

public static void main(String[] args) throws MqttException

{

MqttProtocol signal = new MqttProtocol();

signal.message = new MqttMessage();

/**

server.message.setQos(2);

server.message.setRetained(false);

server.message.setPayload("給客戶端124推送的信息".getBytes());

server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2);

*/

signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);

System.out.println(signal.message.isRetained() + "------ratained狀態");

}

}

接下來使我們的遠程連接模塊,主要是通過給定的url調用遠程接口

public class RemoteOperation

{

private static Logger logger = Logger.getLogger(MqttProtocol.class);

public static JSONObject remoteCall(String url) throws HttpException, IOException

{

HttpClient httpClient = new HttpClient();

GetMethod method =null ;

method=new GetMethod(url);

int retcode = httpClient.executeMethod(method);

if (retcode != HttpStatus.SC_OK)

{// 發送不成功

logger.info("遠程調用出錯");

return null;

}

else

{

String body = method.getResponseBodyAsString();

logger.info(body+"遠程調用php成功");

JSONObject jsonObject=new JSONObject();

try {

jsonObject=JSONObject.fromObject(body);

} catch (Exception e) {

e.printStackTrace();

}

if (method != null)

{

method.releaseConnection();

}

return jsonObject;

}

}

}

下面是Mongo數據庫的相關操作的一個封裝,設計為單例模式,相當于每次都使用同一個client打開連接,類似于連接池的概念,當然業務邏輯部分可以更換

public class MongoDBDao

{

private static Logger logger = Logger.getLogger(MongoDBDao.class);

/**

* MongoClient的實例代表數據庫連接池,是線程安全的,可以被多線程共享,客戶端在多線程條件下僅維持一個實例即可

* Mongo是非線程安全的,目前mongodb API中已經建議用MongoClient替代Mongo

*/

private MongoClient mongoClient = null;

/**

*

* 私有的構造函數

* 作者:shibin

*/

private MongoDBDao(){

if(mongoClient == null){

String url = Constant.MONGO_MQTT_URL;

String user = Constant.MONGO_MQTT_USER;

String password = Constant.MONGO_MQTT_PASSWORD;

String database = Constant.MONGO_MQTT_DATABASE;

int port = 27017;

ServerAddress serverAddress = new ServerAddress(url, port);

ListserverAddresses = new ArrayList();

serverAddresses.add(serverAddress);

MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray());

Listcredentials = new ArrayList();

credentials.add(credential);

mongoClient = new MongoClient(serverAddresses, credentials);

System.out.println(mongoClient);

System.out.println("初始化client完成");

}

}

/********單例模式聲明開始,采用餓漢式方式生成,保證線程安全********************/

//類初始化時,自行實例化,餓漢式單例模式

private static final MongoDBDao mongoDBDao = new MongoDBDao();

/**

*

* 方法名:getMongoDBDaoImplInstance

* 作者:shibin

*

* 描述:單例的靜態工廠方法

* @return

*/

public static MongoDBDao getMongoDBDaoInstance(){

return mongoDBDao;

}

public void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException

{

System.out.println("獲得message的連接");

MongoDatabase mongoDatabase = mongoClient.getDatabase(database);

MongoCollection mongoCollection = mongoDatabase.getCollection(collection);

System.out.println("取得convid所對應的msg列表");

BasicDBObject query = new BasicDBObject();

query.put("_id", convid);

FindIterableiterable=null;

iterable = mongoCollection.find(query);

if(iterable.first()!=null)

{

System.out.println(iterable.first());

String res= iterable.first().toJson();

JSONObject jo=new JSONObject();

try {

jo=JSONObject.fromObject(res);

} catch (Exception e) {

e.printStackTrace();

}

JSONArray jsonArray=jo.getJSONArray("msg");

for(int i=0;i

doc.put("read", read);

Document tdoc = new Document();

tdoc.put("msg", doc);

UpdateOptions updateOptions=new UpdateOptions();

updateOptions.upsert(true);

mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions);

iterable = mongoCollection.find(query);

System.out.println("更新message之后的值"+iterable.first());

}

public void getAck(String tmpindex,String convid,String from,String database,String collection)

{

System.out.println("接收到ack消息后更新message中的read字段");

MongoDatabase mongoDatabase = mongoClient.getDatabase(database);

MongoCollection mongoCollection = mongoDatabase.getCollection(collection);

BasicDBObject query = new BasicDBObject();

query.put("_id", convid);

query.put("msg.tmpindex", tmpindex);

BasicDBObject query1 = new BasicDBObject();

query1.put("_id", convid);

FindIterable iterable;

FindIterable iterable2;

iterable = mongoCollection.find(query1);

iterable2 = mongoCollection.find(query);

System.out.println("更新message滿足id過濾條件之前的值"+iterable.first());

System.out.println("更新message滿足id和tmpindex過濾條件之前的值"+iterable2.first());

if(iterable2.first()!=null)

{

Document doc = new Document();

doc.put("msg.$.read", from);

UpdateOptions updateOptions=new UpdateOptions();

updateOptions.upsert(true);

mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions);

}

iterable = mongoCollection.find(query1);

System.out.println("更新messages之后的值"+iterable.first());

}

}

剩下的關于業務邏輯方面的就不多說了,主要是關于mqtt高可用性斷開重連的功能以及mongo相關的操作

總結

以上是生活随笔為你收集整理的java连接imserver_java后端IM消息推送服务开发——协议的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。