Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码
Netty實戰(zhàn) IM即時通訊系統(tǒng)(八)服務(wù)端和客戶端通信協(xié)議編解碼
零、 目錄
- Netty 簡介
- Netty 環(huán)境配置
- 服務(wù)端啟動流程
- 客戶端啟動流程
- 實戰(zhàn): 客戶端和服務(wù)端雙向通信
- 數(shù)據(jù)傳輸載體ByteBuf介紹
- 客戶端與服務(wù)端通信協(xié)議編解碼
- 實現(xiàn)客戶端登錄
- 實現(xiàn)客戶端與服務(wù)端收發(fā)消息
- pipeline與channelHandler
- 構(gòu)建客戶端與服務(wù)端pipeline
- 拆包粘包理論與解決方案
- channelHandler的生命周期
- 使用channelHandler的熱插拔實現(xiàn)客戶端身份校驗
- 客戶端互聊原理與實現(xiàn)
- 群聊的發(fā)起與通知
- 群聊的成員管理(加入與退出,獲取成員列表)
- 群聊消息的收發(fā)及Netty性能優(yōu)化
- 心跳與空閑檢測
- 總結(jié)
- 擴展
八、 服務(wù)端和客戶端通信協(xié)議編解碼
我們把java對象根據(jù)協(xié)議封裝成二進制數(shù)據(jù)包的過程稱為編碼 , 而把從二進制數(shù)據(jù)包中解析出就java對象的過程稱為解碼 。 在學(xué)習(xí)如何使用Netty 進行通信協(xié)議編解碼之前 , 我們先來定義一下客戶端和服務(wù)端通信的java 對象 。
java 對象
/*** 數(shù)據(jù)包對象* @author outman* */ @Data abstract class Packet{/*** 協(xié)議版本* */private Byte version = 1;/*** 獲取指令* */public abstract Byte getCommand();/*** 指令集合內(nèi)部接口* */interface Command{public static final Byte LOGIN_REQUEST = 1;} }以上是通信過程中 java對象的抽象類 , 可以看到我們定義了一個版本號(默認值為1) 以及一個獲取指令的抽象方法 , 所有的指令數(shù)據(jù)包都必須實現(xiàn)這個方法 , 這樣我們就可以知道某種指令的含義
@Data 注解由lombok 提供 , 他會自動幫我們產(chǎn)生getter/setter 方法 , 減少大量的重復(fù)代碼 , 需要添加依賴
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency>接下來 , 我們一客戶登錄請求為例 , 定義登錄請求數(shù)據(jù)包 :
@Dataclass LoginRequestPacket extends Packet{private Integer uerId ;private String userName;private String password;@Overridepublic Byte getCommand() {return Command.LOGIN_REQUEST;}}java對象定義完成之后 , 接下來我們就要定義一種規(guī)則 , 如何把一個java對象轉(zhuǎn)換成二進制數(shù)據(jù) , 這個規(guī)則叫做java對象的序列化
序列化
我們?nèi)缦露x序列化接口
/*** 序列化接口* @author outman* */interface Serializer{/*** 序列化算法* */byte getSerializerAlgorithm();/*** java 對象轉(zhuǎn)換成二進制 (序列化) * */byte[] serialize(Object obj);/*** 二進制轉(zhuǎn)換為java對象 (反序列化)* */<T> T deSerialize(Class<T> clazz , byte[] bytes);/*** 序列化算法標(biāo)識集合接口* */interface SerializerAlgorithm{public static final byte JSON = 1;}}fastjson 依賴
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.54</version></dependency>Json序列化實現(xiàn)類
/*** JSON 序列化實現(xiàn)類* @author outman* */class JSONSerializer implements Serializer{@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSON;}@Overridepublic byte[] serialize(Object obj) {return JSONObject.toJSONBytes(obj);}@Overridepublic <T> T deSerialize(Class<T> clazz, byte[] bytes) {return JSONObject.parseObject(bytes, clazz);}}我們設(shè)置 Srializer 的默認序列化方式為JSONSerializer
/*** 序列化接口* @author outman* */interface Serializer{/*** 默認的序列化對象* */Serializer DEFAULT = new JSONSerializer();/*** 序列化算法* */byte getSerializerAlgorithm();/*** java 對象轉(zhuǎn)換成二進制 (序列化) * */byte[] serialize(Object obj);/*** 二進制轉(zhuǎn)換為java對象 (反序列化)* */<T> T deSerialize(Class<T> clazz , byte[] bytes);/*** 序列化算法標(biāo)識集合接口* */interface SerializerAlgorithm{public static final byte JSON = 1;}}這樣我們就是實現(xiàn)了序列化相關(guān)的邏輯 , 如果想要實現(xiàn)其他的序列化算法的話 , 只需要實現(xiàn)一下Serializer接口 , 然后定義一下 序列化算法標(biāo)識 就好啦。
編碼: 封裝成二進制數(shù)據(jù)的過程
/*** 數(shù)據(jù)包編解碼類* @author outman* */class PacketCodec{// 魔數(shù)private static final int MAGIC_NUMBER = 0x12345678;public ByteBuf enCode(Packet packet) {// 1. 創(chuàng)建ByteBuf 對象ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();// 2. 序列化java對象byte[] bs = Serializer.DEFAULT.serialize(packet);// 3. 實際編碼過程byteBuf.writeInt(MAGIC_NUMBER); // 寫入魔數(shù)byteBuf.writeInt(packet.getVersion()); // 寫入?yún)f(xié)議版本號byteBuf.writeInt(Serializer.DEFAULT.getSerializerAlgorithm()); // 寫入序列化算法byteBuf.writeByte(packet.getCommand()); // 寫入指令byteBuf.writeInt(bs.length); // 寫入數(shù)據(jù)長度byteBuf.writeBytes(bs); // 寫入數(shù)據(jù)return byteBuf;}}解碼: 解析java對象的過程
還是剛剛的PacketCodec.java 中添加deCode(buf) 、 getRequestPacket(byte command) 、 getSerializer(byte serializeAlgorithm)方法
/*** 數(shù)據(jù)包編解碼類* * @author outman*/class PacketCodec {// 魔數(shù)private static final int MAGIC_NUMBER = 0x12345678;// 指令 與 數(shù)據(jù)包 映射private final Map<Byte, Class<? super Packet>> packetTypeMap;// 序列化算法 與 序列化類 映射private final Map<Byte, Class<? super Serializer>> serializerMap;// 單例public static final PacketCodec INSTANCE = new PacketCodec();// 注冊Packet集合List<Class> packetList = Arrays.asList(new Class[] { LoginRequestPacket.class });// 注冊序列化算法集合List<Class> serializerList = Arrays.asList(new Class[] { JSONSerializer.class });/*** 初始化 指令 與 數(shù)據(jù)包 映射 序列化算法 與 序列化類 映射*/private PacketCodec() {// 初始化 指令 與 數(shù)據(jù)包 映射packetTypeMap = new HashMap<Byte, Class<? super Packet>>();packetList.forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");Byte command = (Byte) method.invoke(clazz);packetTypeMap.put(command, clazz);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}});// 初始化序列化算法 與 序列化類 映射serializerMap = new HashMap<Byte, Class<? super Serializer>>();serializerList.forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");Byte serializerAlgorithm = (Byte) method.invoke(clazz);serializerMap.put(serializerAlgorithm, clazz);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}});}// 編碼public ByteBuf enCode(Packet packet) {// 1. 創(chuàng)建ByteBuf 對象ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();// 2. 序列化java對象byte[] bs = Serializer.DEFAULT.serialize(packet);// 3. 實際編碼過程byteBuf.writeInt(MAGIC_NUMBER); // 寫入魔數(shù)byteBuf.writeByte(packet.getVersion()); // 寫入?yún)f(xié)議版本號byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); // 寫入序列化算法byteBuf.writeByte(packet.getCommand()); // 寫入指令byteBuf.writeInt(bs.length); // 寫入數(shù)據(jù)長度byteBuf.writeBytes(bs); // 寫入數(shù)據(jù)return byteBuf;}// 解碼public Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過 魔數(shù) 校驗byteBuf.skipBytes(4);// 跳過版本號byteBuf.skipBytes(1);// 序列化算法標(biāo)識byte serializeAlgorithm = byteBuf.readByte();// 指令標(biāo)識byte command = byteBuf.readByte();// 數(shù)據(jù)包長度int length = byteBuf.readInt();// 數(shù)據(jù)byte[] bs = new byte[length];byteBuf.readBytes(bs);// 通過序列化算法標(biāo)識獲取對應(yīng)的 序列化對象Serializer serializer = getSerializer(serializeAlgorithm);// 通過指令標(biāo)識 獲取對應(yīng)的 數(shù)據(jù)包類Packet packet = getRequestPacket(command);// 執(zhí)行解碼if (serializer != null && packet != null) {return serializer.deSerialize(packet.getClass(), bs);} else {System.out.println("沒有找到對應(yīng)的序列化對象或數(shù)據(jù)包對象");return null;}}// 通過指令獲取對應(yīng)的 數(shù)據(jù)包類 private Packet getRequestPacket(byte command) throws Exception {return (Packet) packetTypeMap.get(command).newInstance();}// 通過序列化算法標(biāo)識 獲取對應(yīng)的序列化類private Serializer getSerializer(byte serializeAlgorithm) throws Exception {return (Serializer) serializerMap.get(serializeAlgorithm).newInstance();}}完整代碼
package com.tj.NIO_test_maven;import java.lang.reflect.Method;import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map;import com.alibaba.fastjson.JSONObject;import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import lombok.Data;/*** @author outman*/ public class Test_09_客戶端與服務(wù)端通信協(xié)議編解碼 {public static void main(String[] args) {}}/*** 數(shù)據(jù)包對象* * @author outman*/ @Data abstract class Packet {/*** 協(xié)議版本*/private Byte version = 1;/*** 獲取指令*/public abstract Byte getCommand();/*** 指令集合內(nèi)部接口*/interface Command {public static final Byte LOGIN_REQUEST = 1;} }/*** 登錄請求數(shù)據(jù)包* * @author outman*/ @Data class LoginRequestPacket extends Packet {private Integer uerId;private String userName;private String password;@Overridepublic Byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 序列化接口* * @author outman*/ interface Serializer {/*** 默認的序列化對對象*/Serializer DEFAULT = new JSONSerializer();/*** 序列化算法*/byte getSerializerAlgorithm();/*** java 對象轉(zhuǎn)換成二進制 (序列化)*/byte[] serialize(Object obj);/*** 二進制轉(zhuǎn)換為java對象 (反序列化)*/<T> T deSerialize(Class<T> clazz, byte[] bytes);/*** 序列化算法標(biāo)識集合接口*/interface SerializerAlgorithm {public static final byte JSON = 1;} }/*** JSON 序列化實現(xiàn)類* * @author outman*/ class JSONSerializer implements Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSON;}@Overridepublic byte[] serialize(Object obj) {return JSONObject.toJSONBytes(obj);}@Overridepublic <T> T deSerialize(Class<T> clazz, byte[] bytes) {return JSONObject.parseObject(bytes, clazz);}}/*** 數(shù)據(jù)包編解碼類* * @author outman*/ class PacketCodec {// 魔數(shù)private static final int MAGIC_NUMBER = 0x12345678;// 指令 與 數(shù)據(jù)包 映射private final Map<Byte, Class<? super Packet>> packetTypeMap;// 序列化算法 與 序列化類 映射private final Map<Byte, Class<? super Serializer>> serializerMap;// 單例public static final PacketCodec INSTANCE = new PacketCodec();// 注冊Packet集合List<Class> packetList = Arrays.asList(new Class[] { LoginRequestPacket.class });// 注冊序列化算法集合List<Class> serializerList = Arrays.asList(new Class[] { JSONSerializer.class });/*** 初始化 指令 與 數(shù)據(jù)包 映射 序列化算法 與 序列化類 映射*/private PacketCodec() {// 初始化 指令 與 數(shù)據(jù)包 映射packetTypeMap = new HashMap<Byte, Class<? super Packet>>();packetList.forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");Byte command = (Byte) method.invoke(clazz);packetTypeMap.put(command, clazz);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}});// 初始化序列化算法 與 序列化類 映射serializerMap = new HashMap<Byte, Class<? super Serializer>>();serializerList.forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");Byte serializerAlgorithm = (Byte) method.invoke(clazz);serializerMap.put(serializerAlgorithm, clazz);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}});}// 編碼public ByteBuf enCode(Packet packet) {// 1. 創(chuàng)建ByteBuf 對象ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();// 2. 序列化java對象byte[] bs = Serializer.DEFAULT.serialize(packet);// 3. 實際編碼過程byteBuf.writeInt(MAGIC_NUMBER); // 寫入魔數(shù)byteBuf.writeByte(packet.getVersion()); // 寫入?yún)f(xié)議版本號byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); // 寫入序列化算法byteBuf.writeByte(packet.getCommand()); // 寫入指令byteBuf.writeInt(bs.length); // 寫入數(shù)據(jù)長度byteBuf.writeBytes(bs); // 寫入數(shù)據(jù)return byteBuf;}// 解碼public Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過 魔數(shù) 校驗byteBuf.skipBytes(4);// 跳過版本號byteBuf.skipBytes(1);// 序列化算法標(biāo)識byte serializeAlgorithm = byteBuf.readByte();// 指令標(biāo)識byte command = byteBuf.readByte();// 數(shù)據(jù)包長度int length = byteBuf.readInt();// 數(shù)據(jù)byte[] bs = new byte[length];byteBuf.readBytes(bs);// 通過序列化算法標(biāo)識獲取對應(yīng)的 序列化對象Serializer serializer = getSerializer(serializeAlgorithm);// 通過指令標(biāo)識 獲取對應(yīng)的 數(shù)據(jù)包類Packet packet = getRequestPacket(command);// 執(zhí)行解碼if (serializer != null && packet != null) {return serializer.deSerialize(packet.getClass(), bs);} else {System.out.println("沒有找到對應(yīng)的序列化對象或數(shù)據(jù)包對象");return null;}}// 通過指令獲取對應(yīng)的 數(shù)據(jù)包類 private Packet getRequestPacket(byte command) throws Exception {return (Packet) packetTypeMap.get(command).newInstance();}// 通過序列化算法標(biāo)識 獲取對應(yīng)的序列化類private Serializer getSerializer(byte serializeAlgorithm) throws Exception {return (Serializer) serializerMap.get(serializeAlgorithm).newInstance();}}總結(jié)
以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty实战 IM即时通讯系统(七)数
- 下一篇: Flume架构及应用