日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码

發布時間:2024/4/30 windows 66 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Netty實戰 IM即時通訊系統(八)服務端和客戶端通信協議編解碼

零、 目錄

  • IM系統簡介
    • Netty 簡介
    • Netty 環境配置
    • 服務端啟動流程
    • 客戶端啟動流程
    • 實戰: 客戶端和服務端雙向通信
    • 數據傳輸載體ByteBuf介紹
    • 客戶端與服務端通信協議編解碼
    • 實現客戶端登錄
    • 實現客戶端與服務端收發消息
    • pipeline與channelHandler
    • 構建客戶端與服務端pipeline
    • 拆包粘包理論與解決方案
    • channelHandler的生命周期
    • 使用channelHandler的熱插拔實現客戶端身份校驗
    • 客戶端互聊原理與實現
    • 群聊的發起與通知
    • 群聊的成員管理(加入與退出,獲取成員列表)
    • 群聊消息的收發及Netty性能優化
    • 心跳與空閑檢測
    • 總結
    • 擴展

    八、 服務端和客戶端通信協議編解碼

  • 上一小節我們學習了ByteBuf 的API , 這一小節我們拉學習如何設計并實現客戶端與服務端的通信協議
  • 什么是服務端與客戶端的通信協議?
  • 無論是Netty 還是原始的Socket編程 , 基于TCP通信的數據包格式均為二進制 , 協議指的就是客戶端和服務端事先商量好的 , 每一個二進制數據包中每一段字節分別表示什么含義的規則 , 如下圖的一個簡單的登錄指令
  • 這個數據包中 , 第一個字節 為1 表示這是一個登錄指令 , 接下來是用戶名和密碼 , 這兩個值以\0 分割 , 客戶端發送這段二進制數據包到服務端 , 服務端就能根據這個協議取出來用戶名密碼 , 進行登錄邏輯 , 實際的通信協議設計中 , 我們會考慮更多細節 , 比這個稍微復雜一點 。
  • 那么協議設計好之后 , 客戶端和服務端之間的通信過程又是怎樣的呢?
  • 如上圖所示 , 客戶端和服務端通信:
  • 首先 , 客戶端把一個java對象按照通信協議轉換成二進制數據包
  • 然后通過網絡 , 把這段二進制數據包發送到服務端 , 數據的傳輸過程由TCP/IP協議負責數據的傳輸 , 和我們應用層無關
  • 服務端接收到數據之后 , 按照協議取出二進制數據包中的相應的字段 , 包裝成java對象 , 交給應用邏輯處理
  • 服務端處理完之后, 如果需要突出相應給客戶端 , 那么按照相同的邏輯進行。
  • 在本系列的第一小節中我們一進列出了實現一個支持單聊和群聊的IM指令集合 , 我們設計協議的目的就是為了客戶端與服務端能夠識別出這些具體的指令。
  • 通信協議的設計
  • 首先第一個數是 魔數 , 通常情況下為固定的幾個字節 (我們這里規定4個字節) , 為什么需要這個字段 , 而且還是一個固定的數? 假設我們在服務器上開了一個端口 , 比如 80 端口 , 如果沒有這個魔數 , 任何數據包傳遞到服務器 , 服務器都會根據自定義的協議進行處理 , 包括不符合自定義協議規范的數據包 。 例如: 我們直接通過http://IP:port來訪問服務器 , 服務器收到的是一個標準的Http協議數據包 , 但是他仍然會按照事先約定好的協議來處理HTTP協議 , 顯然這時會解析出錯的 , 而有了這個魔數之后 , 服務器首先取出前四個字節進行比對 , 能夠在第一時間識別出這個數據包并非是遵循自定義協議的 , 也就是說無效的數據包 , 為了安全考慮 , 可以直接關閉連接以節省資源 。 在java的二進制文件中 , 開頭的4個字節為0xcafebabe 用來表示這是一個字節碼文件 , 也是異曲同工之妙 。
  • 接下來一個字節是版本號 , 通常情況下是預留字段 , 用于協議升級的時候用到 , 有點類似TCP/IP 協議中的一個字段表示是IPV4還是IPV6 , 大多數情況下 , 這個字段是用不到的 , 不過為了協議能夠支持升級 , 我們留著 。
  • 第三部分 ,序列化算法表示如何把java對象轉換為二進制數據以及二進制數據轉換會java對象 , 比如java自帶的序列化 , json 、 hessian 等序列化方式。
  • 第四部分 表示 指令 , 關于指令的介紹 , 我們在前面已經討論過 , 服務端或者客戶端每收到一種指令都會有相應的處理邏輯 , 這里我們用一個字節來表示 , 最高支持256中指令 , 對于我們的IM 系統來說 完全夠用了
  • 接下來 的字段為數據部分的長度 , 占四個字節
  • 最后一個部分為數據部分 , 每一種指令對應的數據是不一樣的 , 比如登錄的時候需要用戶名密碼 , 收消息的時候需要用戶標識和具體的消息內容
  • 通常情況下 ,這樣一套標準的協議能夠適配大多數情況下的服務端與客戶端的通信場景 , 接下來我們就來看一下 如何使用Netty 來實現這套協議
  • 通信協議的實現
  • 我們把java對象根據協議封裝成二進制數據包的過程稱為編碼 , 而把從二進制數據包中解析出就java對象的過程稱為解碼 。 在學習如何使用Netty 進行通信協議編解碼之前 , 我們先來定義一下客戶端和服務端通信的java 對象 。

  • java 對象

    /*** 數據包對象* @author outman* */ @Data abstract class Packet{/*** 協議版本* */private Byte version = 1;/*** 獲取指令* */public abstract Byte getCommand();/*** 指令集合內部接口* */interface Command{public static final Byte LOGIN_REQUEST = 1;} }
  • 以上是通信過程中 java對象的抽象類 , 可以看到我們定義了一個版本號(默認值為1) 以及一個獲取指令的抽象方法 , 所有的指令數據包都必須實現這個方法 , 這樣我們就可以知道某種指令的含義

  • @Data 注解由lombok 提供 , 他會自動幫我們產生getter/setter 方法 , 減少大量的重復代碼 , 需要添加依賴

    <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency>
  • 接下來 , 我們一客戶登錄請求為例 , 定義登錄請求數據包 :

    @Dataclass LoginRequestPacket extends Packet{private Integer uerId ;private String userName;private String password;@Overridepublic Byte getCommand() {return Command.LOGIN_REQUEST;}}
  • 登錄請求數據包 繼承自Packet 然后定義了三個字段 , 分別是用戶ID , 用戶名 、密碼 , 這里最為重要的是覆蓋了父類的getCommand() 方法 值為常量Command.LOGIN_REQUEST
  • java對象定義完成之后 , 接下來我們就要定義一種規則 , 如何把一個java對象轉換成二進制數據 , 這個規則叫做java對象的序列化

  • 序列化

  • 我們如下定義序列化接口

    /*** 序列化接口* @author outman* */interface Serializer{/*** 序列化算法* */byte getSerializerAlgorithm();/*** java 對象轉換成二進制 (序列化) * */byte[] serialize(Object obj);/*** 二進制轉換為java對象 (反序列化)* */<T> T deSerialize(Class<T> clazz , byte[] bytes);/*** 序列化算法標識集合接口* */interface SerializerAlgorithm{public static final byte JSON = 1;}}
  • 序列化接口有三個方法: getSerializerAlgorithm() 獲取具體的序列化算法標識 , serialize() 將java對象轉換為字節數組 , deSerialize()將字節數組轉轉為對應類型的java對象 , 在本小節中 , 我們使用最簡單的json序列化方式 , 使用阿里巴巴的fastJson作為序列化框架; 接口中還有一個內部接口 , 用于讓我們定義序列化算法標識的集合
  • fastjson 依賴

    <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.54</version></dependency>
  • Json序列化實現類

    /*** JSON 序列化實現類* @author outman* */class JSONSerializer implements Serializer{@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSON;}@Overridepublic byte[] serialize(Object obj) {return JSONObject.toJSONBytes(obj);}@Overridepublic <T> T deSerialize(Class<T> clazz, byte[] bytes) {return JSONObject.parseObject(bytes, clazz);}}
  • 我們設置 Srializer 的默認序列化方式為JSONSerializer

    /*** 序列化接口* @author outman* */interface Serializer{/*** 默認的序列化對象* */Serializer DEFAULT = new JSONSerializer();/*** 序列化算法* */byte getSerializerAlgorithm();/*** java 對象轉換成二進制 (序列化) * */byte[] serialize(Object obj);/*** 二進制轉換為java對象 (反序列化)* */<T> T deSerialize(Class<T> clazz , byte[] bytes);/*** 序列化算法標識集合接口* */interface SerializerAlgorithm{public static final byte JSON = 1;}}
  • 這樣我們就是實現了序列化相關的邏輯 , 如果想要實現其他的序列化算法的話 , 只需要實現一下Serializer接口 , 然后定義一下 序列化算法標識 就好啦。

  • 編碼: 封裝成二進制數據的過程

    /*** 數據包編解碼類* @author outman* */class PacketCodec{// 魔數private static final int MAGIC_NUMBER = 0x12345678;public ByteBuf enCode(Packet packet) {// 1. 創建ByteBuf 對象ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();// 2. 序列化java對象byte[] bs = Serializer.DEFAULT.serialize(packet);// 3. 實際編碼過程byteBuf.writeInt(MAGIC_NUMBER); // 寫入魔數byteBuf.writeInt(packet.getVersion()); // 寫入協議版本號byteBuf.writeInt(Serializer.DEFAULT.getSerializerAlgorithm()); // 寫入序列化算法byteBuf.writeByte(packet.getCommand()); // 寫入指令byteBuf.writeInt(bs.length); // 寫入數據長度byteBuf.writeBytes(bs); // 寫入數據return byteBuf;}}
  • 編碼過程分為三個過程:
  • 首先我們創建一個ByteBuf , 這里我們調用Netty 的ByteBuf分配器來創建 , ioBuffrer() 方法會返回適配io讀寫相關的內存 , 他盡可能創建一個直接內存 ,直接內存可以理解為不受jvm 對內存法管理 , 寫到Io 緩沖區的效果更高
  • 接下來我們把java 對象序列化成二進制數據包
  • 最后我們對照本小節開頭的協議的設計以及上一小節ByeBuf 的API , 逐個往bytebuf 中寫入字段 , 及實現了編碼過程
  • 一端實現了編碼 。 Netty 會將次ByteBuf 寫到另外一端 , 另外一端拿到的也是一個ByteBuf 對象, 基于這個ByteBuf 對象 , 就可以反解出對端創建的java對象 , 這個過程我們稱之為解碼
  • 解碼: 解析java對象的過程

  • 還是剛剛的PacketCodec.java 中添加deCode(buf) 、 getRequestPacket(byte command) 、 getSerializer(byte serializeAlgorithm)方法

    /*** 數據包編解碼類* * @author outman*/class PacketCodec {// 魔數private static final int MAGIC_NUMBER = 0x12345678;// 指令 與 數據包 映射private final Map<Byte, Class<? super Packet>> packetTypeMap;// 序列化算法 與 序列化類 映射private final Map<Byte, Class<? super Serializer>> serializerMap;// 單例public static final PacketCodec INSTANCE = new PacketCodec();// 注冊Packet集合List<Class> packetList = Arrays.asList(new Class[] { LoginRequestPacket.class });// 注冊序列化算法集合List<Class> serializerList = Arrays.asList(new Class[] { JSONSerializer.class });/*** 初始化 指令 與 數據包 映射 序列化算法 與 序列化類 映射*/private PacketCodec() {// 初始化 指令 與 數據包 映射packetTypeMap = new HashMap<Byte, Class<? super Packet>>();packetList.forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");Byte command = (Byte) method.invoke(clazz);packetTypeMap.put(command, clazz);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}});// 初始化序列化算法 與 序列化類 映射serializerMap = new HashMap<Byte, Class<? super Serializer>>();serializerList.forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");Byte serializerAlgorithm = (Byte) method.invoke(clazz);serializerMap.put(serializerAlgorithm, clazz);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}});}// 編碼public ByteBuf enCode(Packet packet) {// 1. 創建ByteBuf 對象ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();// 2. 序列化java對象byte[] bs = Serializer.DEFAULT.serialize(packet);// 3. 實際編碼過程byteBuf.writeInt(MAGIC_NUMBER); // 寫入魔數byteBuf.writeByte(packet.getVersion()); // 寫入協議版本號byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); // 寫入序列化算法byteBuf.writeByte(packet.getCommand()); // 寫入指令byteBuf.writeInt(bs.length); // 寫入數據長度byteBuf.writeBytes(bs); // 寫入數據return byteBuf;}// 解碼public Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過 魔數 校驗byteBuf.skipBytes(4);// 跳過版本號byteBuf.skipBytes(1);// 序列化算法標識byte serializeAlgorithm = byteBuf.readByte();// 指令標識byte command = byteBuf.readByte();// 數據包長度int length = byteBuf.readInt();// 數據byte[] bs = new byte[length];byteBuf.readBytes(bs);// 通過序列化算法標識獲取對應的 序列化對象Serializer serializer = getSerializer(serializeAlgorithm);// 通過指令標識 獲取對應的 數據包類Packet packet = getRequestPacket(command);// 執行解碼if (serializer != null && packet != null) {return serializer.deSerialize(packet.getClass(), bs);} else {System.out.println("沒有找到對應的序列化對象或數據包對象");return null;}}// 通過指令獲取對應的 數據包類 private Packet getRequestPacket(byte command) throws Exception {return (Packet) packetTypeMap.get(command).newInstance();}// 通過序列化算法標識 獲取對應的序列化類private Serializer getSerializer(byte serializeAlgorithm) throws Exception {return (Serializer) serializerMap.get(serializeAlgorithm).newInstance();}}
  • 解碼的流程如下
  • 我們假定deCode方法傳遞進來的ByteBuf已經合法(后面的小節我們會實現校驗) 即首部4個字節是我們前面定義的魔數 , 這里我們跳過
  • 這里我們暫時不關注協議版本 , 通常我們沒有遇到協議升級的時候 , 這個字段暫不處理 , 因為你會發現 , 在大多數情況下 , 這個字段幾乎用不著 , 但是我們仍然保留
  • 接下來我們拿到 序列化算法標識 、 指令標識 、 數據長度 、 數據
  • 最后我們根據拿到的數據長度取出數據 , 通過指令標識拿到對應的java對象 , 根據序列化算法標識 拿到序列化對象 , 將字節碼轉換為java對象
  • 完整代碼

    package com.tj.NIO_test_maven;import java.lang.reflect.Method;import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map;import com.alibaba.fastjson.JSONObject;import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import lombok.Data;/*** @author outman*/ public class Test_09_客戶端與服務端通信協議編解碼 {public static void main(String[] args) {}}/*** 數據包對象* * @author outman*/ @Data abstract class Packet {/*** 協議版本*/private Byte version = 1;/*** 獲取指令*/public abstract Byte getCommand();/*** 指令集合內部接口*/interface Command {public static final Byte LOGIN_REQUEST = 1;} }/*** 登錄請求數據包* * @author outman*/ @Data class LoginRequestPacket extends Packet {private Integer uerId;private String userName;private String password;@Overridepublic Byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 序列化接口* * @author outman*/ interface Serializer {/*** 默認的序列化對對象*/Serializer DEFAULT = new JSONSerializer();/*** 序列化算法*/byte getSerializerAlgorithm();/*** java 對象轉換成二進制 (序列化)*/byte[] serialize(Object obj);/*** 二進制轉換為java對象 (反序列化)*/<T> T deSerialize(Class<T> clazz, byte[] bytes);/*** 序列化算法標識集合接口*/interface SerializerAlgorithm {public static final byte JSON = 1;} }/*** JSON 序列化實現類* * @author outman*/ class JSONSerializer implements Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSON;}@Overridepublic byte[] serialize(Object obj) {return JSONObject.toJSONBytes(obj);}@Overridepublic <T> T deSerialize(Class<T> clazz, byte[] bytes) {return JSONObject.parseObject(bytes, clazz);}}/*** 數據包編解碼類* * @author outman*/ class PacketCodec {// 魔數private static final int MAGIC_NUMBER = 0x12345678;// 指令 與 數據包 映射private final Map<Byte, Class<? super Packet>> packetTypeMap;// 序列化算法 與 序列化類 映射private final Map<Byte, Class<? super Serializer>> serializerMap;// 單例public static final PacketCodec INSTANCE = new PacketCodec();// 注冊Packet集合List<Class> packetList = Arrays.asList(new Class[] { LoginRequestPacket.class });// 注冊序列化算法集合List<Class> serializerList = Arrays.asList(new Class[] { JSONSerializer.class });/*** 初始化 指令 與 數據包 映射 序列化算法 與 序列化類 映射*/private PacketCodec() {// 初始化 指令 與 數據包 映射packetTypeMap = new HashMap<Byte, Class<? super Packet>>();packetList.forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");Byte command = (Byte) method.invoke(clazz);packetTypeMap.put(command, clazz);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}});// 初始化序列化算法 與 序列化類 映射serializerMap = new HashMap<Byte, Class<? super Serializer>>();serializerList.forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");Byte serializerAlgorithm = (Byte) method.invoke(clazz);serializerMap.put(serializerAlgorithm, clazz);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}});}// 編碼public ByteBuf enCode(Packet packet) {// 1. 創建ByteBuf 對象ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();// 2. 序列化java對象byte[] bs = Serializer.DEFAULT.serialize(packet);// 3. 實際編碼過程byteBuf.writeInt(MAGIC_NUMBER); // 寫入魔數byteBuf.writeByte(packet.getVersion()); // 寫入協議版本號byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); // 寫入序列化算法byteBuf.writeByte(packet.getCommand()); // 寫入指令byteBuf.writeInt(bs.length); // 寫入數據長度byteBuf.writeBytes(bs); // 寫入數據return byteBuf;}// 解碼public Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過 魔數 校驗byteBuf.skipBytes(4);// 跳過版本號byteBuf.skipBytes(1);// 序列化算法標識byte serializeAlgorithm = byteBuf.readByte();// 指令標識byte command = byteBuf.readByte();// 數據包長度int length = byteBuf.readInt();// 數據byte[] bs = new byte[length];byteBuf.readBytes(bs);// 通過序列化算法標識獲取對應的 序列化對象Serializer serializer = getSerializer(serializeAlgorithm);// 通過指令標識 獲取對應的 數據包類Packet packet = getRequestPacket(command);// 執行解碼if (serializer != null && packet != null) {return serializer.deSerialize(packet.getClass(), bs);} else {System.out.println("沒有找到對應的序列化對象或數據包對象");return null;}}// 通過指令獲取對應的 數據包類 private Packet getRequestPacket(byte command) throws Exception {return (Packet) packetTypeMap.get(command).newInstance();}// 通過序列化算法標識 獲取對應的序列化類private Serializer getSerializer(byte serializeAlgorithm) throws Exception {return (Serializer) serializerMap.get(serializeAlgorithm).newInstance();}}
  • 總結:
  • 通信協議書為了服務端和客戶端交互 , 雙方協商出來的滿足一定規則的二進制數據格式
  • 介紹了一種通用的通信協議的設計 , 包括魔數 、 版本號 、 序列化算法標識 、 指令標識 、數據長度 、 數據幾個字段 , 該協議能夠滿足大多數通信的場景
  • java對象以及序列化 , 目的就是實現java對象與二進制數據的互換
  • 最后我們依照我們設計的協議以及ByteBuf 的API 實現了通信協議 , 這個過程成為編碼過程
  • 思考:
  • 序列化和編碼都是 把java對象封裝成二進制數據的過程 , 這兩者有什么區別?
  • 序列化是把內容變成計算機可傳輸的資源,而編碼則是讓程序認識這份資源。
  • 指令標識 、 序列化算法標識為什么不用枚舉?
  • 請問這節課自己設計的通信協議是屬于應用層協議,和http協議是同一級別是吧?
  • 對的,這樣理解起來完全正確,只不過自定義協議屬于私有協議,http屬于共有協議
  • 使用protobuf 生成的對象的二進制要小很多,使用protobuf 可以減小數據包的大小。一個數據包無法裝下一個對象的這種情況怎么處理呢(就是一個ByteBuf 被很多個物理層數據包傳輸的情況)?
  • 所以設計協議的時候,長度字段的長度需要考量,需要支持最大的數據包大小,這里是4個字節,最大值為 2147483647,已經完全足夠了,然后一個物理層數據包如果塞不下,會被拆成多個數據包,另外一端接受的時候把這些數據包粘合起來,可參考13小結
  • 總結

    以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。