日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty实现自定义协议

發(fā)布時間:2025/3/21 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty实现自定义协议 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

關(guān)于協(xié)議,使用最為廣泛的是HTTP協(xié)議,但是在一些服務(wù)交互領(lǐng)域,其使用則相對較少,主要原因有三方面:

  • HTTP協(xié)議會攜帶諸如header和cookie等信息,其本身對字節(jié)的利用率也較低,這使得HTTP協(xié)議比較臃腫,在承載相同信息的情況下,HTTP協(xié)議將需要發(fā)送更多的數(shù)據(jù)包;

  • HTTP協(xié)議是基于TCP的短連接,其在每次請求和響應(yīng)的時候都需要進行三次握手和四次揮手,由于服務(wù)的交互設(shè)計一般都要求能夠承載高并發(fā)的請求,因而HTTP協(xié)議這種頻繁的握手和揮手動作會極大的影響服務(wù)之間交互的效率;

  • 服務(wù)之間往往有一些根據(jù)其自身業(yè)務(wù)特性所獨有的需求,而HTTP協(xié)議無法很好的服務(wù)于這些業(yè)務(wù)需求。

基于上面的原因,一般的服務(wù)之間進行交互時都會使用自定義協(xié)議,常見的框架,諸如dubbo,kafka,zookeeper都實現(xiàn)了符合其自身業(yè)務(wù)需求的協(xié)議,本文主要講解如何使用Netty實現(xiàn)一款自定義的協(xié)議。

1. 協(xié)議規(guī)定

所謂協(xié)議,其本質(zhì)其實就是定義了一個將數(shù)據(jù)轉(zhuǎn)換為字節(jié),或者將字節(jié)轉(zhuǎn)換為數(shù)據(jù)的一個規(guī)范。一款自定義協(xié)議,其一般包含兩個部分:消息頭和消息體。消息頭的長度一般是固定的,或者說是可確定的,其定義了此次消息的一些公有信息,比如當(dāng)前服務(wù)的版本,消息的sessionId,消息的類型等等;消息體則主要是此次消息所需要發(fā)送的內(nèi)容,一般在消息頭的最后一定的字節(jié)中保存了當(dāng)前消息的消息體的長度。下面是我們?yōu)楫?dāng)前自定義協(xié)議所做的一些規(guī)定:

上述協(xié)議定義中,我們除了定義常用的請求和響應(yīng)消息類型以外,還定義了Ping和Pong消息。Ping和Pong消息的作用一般是,在服務(wù)處于閑置狀態(tài)達到一定時長,比如2s時,客戶端服務(wù)會向服務(wù)端發(fā)送一個Ping消息,則會返回一個Pong消息,這樣才表示客戶端與服務(wù)端的連接是完好的。如果服務(wù)端沒有返回相應(yīng)的消息,客戶端就會關(guān)閉與服務(wù)端的連接或者是重新建立與服務(wù)端的連接。這樣的優(yōu)點在于可以防止突然會產(chǎn)生的客戶端與服務(wù)端的大量交互。

2. 協(xié)議實現(xiàn)

通過上面的定義其實我們可以發(fā)現(xiàn),所謂協(xié)議,就是定義了一個規(guī)范,基于這個規(guī)范,我們可以將消息轉(zhuǎn)換為相應(yīng)的字節(jié)流,然后經(jīng)由TCP傳輸?shù)侥繕朔?wù),目標服務(wù)則也基于該規(guī)范將字節(jié)流轉(zhuǎn)換為相應(yīng)的消息,這樣就達到了相互交流的目的。這里面最重要的主要是如何基于該規(guī)范將消息轉(zhuǎn)換為字節(jié)流或者將字節(jié)流轉(zhuǎn)換為消息。這一方面,Netty為我們提供了ByteToMessageDecoder和MessageToByteEncoder用于進行消息和字節(jié)流的相互轉(zhuǎn)換。首先我們定義了如下消息實體:

public?class?Message?{private?int?magicNumber;private?byte?mainVersion;private?byte?subVersion;private?byte?modifyVersion;private?String?sessionId;private?MessageTypeEnum?messageType;private?Map<String,?String>?attachments?=?new?HashMap<>();private?String?body;public?Map<String,?String>?getAttachments()?{return?Collections.unmodifiableMap(attachments);}public?void?setAttachments(Map<String,?String>?attachments)?{this.attachments.clear();if?(null?!=?attachments)?{this.attachments.putAll(attachments);}}public?void?addAttachment(String?key,?String?value)?{attachments.put(key,?value);}//?getter?and?setter... }

上述消息中,我們將協(xié)議中所規(guī)定的各個字段都進行了定義,并且定義了一個標志消息類型的枚舉MessageTypeEnum,如下是該枚舉的源碼:

public?enum?MessageTypeEnum?{REQUEST((byte)1),?RESPONSE((byte)2),?PING((byte)3),?PONG((byte)4),?EMPTY((byte)5);private?byte?type;MessageTypeEnum(byte?type)?{this.type?=?type;}public?int?getType()?{return?type;}public?static?MessageTypeEnum?get(byte?type)?{for?(MessageTypeEnum?value?:?values())?{if?(value.type?==?type)?{return?value;}}throw?new?RuntimeException("unsupported?type:?"?+?type);} }

上述主要是定義了描述自定義協(xié)議相關(guān)的實體屬性,對于消息的編碼,本質(zhì)就是依據(jù)上述協(xié)議方式將消息實體轉(zhuǎn)換為字節(jié)流,如下是轉(zhuǎn)換字節(jié)流的代碼:

public?class?MessageEncoder?extends?MessageToByteEncoder<Message>?{@Overrideprotected?void?encode(ChannelHandlerContext?ctx,?Message?message,?ByteBuf?out)?{//?這里會判斷消息類型是不是EMPTY類型,如果是EMPTY類型,則表示當(dāng)前消息不需要寫入到管道中if?(message.getMessageType()?!=?MessageTypeEnum.EMPTY)?{out.writeInt(Constants.MAGIC_NUMBER);?//?寫入當(dāng)前的魔數(shù)out.writeByte(Constants.MAIN_VERSION);?//?寫入當(dāng)前的主版本號out.writeByte(Constants.SUB_VERSION);?//?寫入當(dāng)前的次版本號out.writeByte(Constants.MODIFY_VERSION);?//?寫入當(dāng)前的修訂版本號if?(!StringUtils.hasText(message.getSessionId()))?{//?生成一個sessionId,并將其寫入到字節(jié)序列中String?sessionId?=?SessionIdGenerator.generate();message.setSessionId(sessionId);out.writeCharSequence(sessionId,?Charset.defaultCharset());}out.writeByte(message.getMessageType().getType());?//?寫入當(dāng)前消息的類型out.writeShort(message.getAttachments().size());?//?寫入當(dāng)前消息的附加參數(shù)數(shù)量message.getAttachments().forEach((key,?value)?->?{Charset?charset?=?Charset.defaultCharset();out.writeInt(key.length());?//?寫入鍵的長度out.writeCharSequence(key,?charset);?//?寫入鍵數(shù)據(jù)out.writeInt(value.length());?//?希爾值的長度out.writeCharSequence(value,?charset);?//?寫入值數(shù)據(jù)});if?(null?==?message.getBody())?{out.writeInt(0);?//?如果消息體為空,則寫入0,表示消息體長度為0}?else?{out.writeInt(message.getBody().length());out.writeCharSequence(message.getBody(),?Charset.defaultCharset());}}} }

對于消息的解碼,其過程與上面的消息編碼方式基本一致,主要是基于協(xié)議所規(guī)定的將字節(jié)流數(shù)據(jù)轉(zhuǎn)換為消息實體數(shù)據(jù)。如下是其轉(zhuǎn)換過程:

public?class?MessageDecoder?extends?ByteToMessageDecoder?{@Overrideprotected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?byteBuf,?List<Object>?out)?throws?Exception?{Message?message?=?new?Message();message.setMagicNumber(byteBuf.readInt());??//?讀取魔數(shù)message.setMainVersion(byteBuf.readByte());?//?讀取主版本號message.setSubVersion(byteBuf.readByte());?//?讀取次版本號message.setModifyVersion(byteBuf.readByte());?//?讀取修訂版本號CharSequence?sessionId?=?byteBuf.readCharSequence(Constants.SESSION_ID_LENGTH,?Charset.defaultCharset());?//?讀取sessionIdmessage.setSessionId((String)sessionId);message.setMessageType(MessageTypeEnum.get(byteBuf.readByte()));?//?讀取當(dāng)前的消息類型short?attachmentSize?=?byteBuf.readShort();?//?讀取附件長度for?(short?i?=?0;?i?<?attachmentSize;?i++)?{int?keyLength?=?byteBuf.readInt();?//?讀取鍵長度和數(shù)據(jù)CharSequence?key?=?byteBuf.readCharSequence(keyLength,?Charset.defaultCharset());int?valueLength?=?byteBuf.readInt();?//?讀取值長度和數(shù)據(jù)CharSequence?value?=?byteBuf.readCharSequence(valueLength,?Charset.defaultCharset());message.addAttachment(key.toString(),?value.toString());}int?bodyLength?=?byteBuf.readInt();?//?讀取消息體長度和數(shù)據(jù)CharSequence?body?=?byteBuf.readCharSequence(bodyLength,?Charset.defaultCharset());message.setBody(body.toString());out.add(message);} }

如此,我們自定義消息與字節(jié)流的相互轉(zhuǎn)換工作已經(jīng)完成。對于消息的處理,主要是要根據(jù)消息的不同類型,對消息進行相應(yīng)的處理,比如對于request類型消息,要寫入響應(yīng)數(shù)據(jù),對于ping消息,要寫入pong消息作為回應(yīng)。下面我們通過定義Netty handler的方式實現(xiàn)對消息的處理:

//?服務(wù)端消息處理器 public?class?ServerMessageHandler?extends?SimpleChannelInboundHandler<Message>?{//?獲取一個消息處理器工廠類實例private?MessageResolverFactory?resolverFactory?=?MessageResolverFactory.getInstance();@Overrideprotected?void?channelRead0(ChannelHandlerContext?ctx,?Message?message)?throws?Exception?{Resolver?resolver?=?resolverFactory.getMessageResolver(message);?//?獲取消息處理器Message?result?=?resolver.resolve(message);?//?對消息進行處理并獲取響應(yīng)數(shù)據(jù)ctx.writeAndFlush(result);?//?將響應(yīng)數(shù)據(jù)寫入到處理器中}@Overridepublic?void?channelRegistered(ChannelHandlerContext?ctx)?throws?Exception?{resolverFactory.registerResolver(new?RequestMessageResolver());?//?注冊request消息處理器resolverFactory.registerResolver(new?ResponseMessageResolver());//?注冊response消息處理器resolverFactory.registerResolver(new?PingMessageResolver());?//?注冊ping消息處理器resolverFactory.registerResolver(new?PongMessageResolver());?//?注冊pong消息處理器} } //?客戶端消息處理器 public?class?ClientMessageHandler?extends?ServerMessageHandler?{//?創(chuàng)建一個線程,模擬用戶發(fā)送消息private?ExecutorService?executor?=?Executors.newSingleThreadExecutor();@Overridepublic?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{//?對于客戶端,在建立連接之后,在一個獨立線程中模擬用戶發(fā)送數(shù)據(jù)給服務(wù)端executor.execute(new?MessageSender(ctx));}/***?這里userEventTriggered()主要是在一些用戶事件觸發(fā)時被調(diào)用,這里我們定義的事件是進行心跳檢測的*?ping和pong消息,當(dāng)前觸發(fā)器會在指定的觸發(fā)器指定的時間返回內(nèi)如果客戶端沒有被讀取消息或者沒有寫入*?消息到管道,則會觸發(fā)當(dāng)前方法*/@Overridepublic?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{if?(evt?instanceof?IdleStateEvent)?{IdleStateEvent?event?=?(IdleStateEvent)?evt;if?(event.state()?==?IdleState.READER_IDLE)?{//?一定時間內(nèi),當(dāng)前服務(wù)沒有發(fā)生讀取事件,也即沒有消息發(fā)送到當(dāng)前服務(wù)來時,//?其會發(fā)送一個Ping消息到服務(wù)器,以等待其響應(yīng)Pong消息Message?message?=?new?Message();message.setMessageType(MessageTypeEnum.PING);ctx.writeAndFlush(message);}?else?if?(event.state()?==?IdleState.WRITER_IDLE)?{//?如果當(dāng)前服務(wù)在指定時間內(nèi)沒有寫入消息到管道,則關(guān)閉當(dāng)前管道ctx.close();}}} ??private?static?final?class?MessageSender?implements?Runnable?{private?static?final?AtomicLong?counter?=?new?AtomicLong(1);private?volatile?ChannelHandlerContext?ctx;public?MessageSender(ChannelHandlerContext?ctx)?{this.ctx?=?ctx;}@Overridepublic?void?run()?{try?{while?(true)?{//?模擬隨機發(fā)送消息的過程TimeUnit.SECONDS.sleep(new?Random().nextInt(3));Message?message?=?new?Message();message.setMessageType(MessageTypeEnum.REQUEST);message.setBody("this?is?my?"?+?counter.getAndIncrement()?+?"?message.");message.addAttachment("name",?"xufeng");ctx.writeAndFlush(message);}}?catch?(InterruptedException?e)?{e.printStackTrace();}}} }

上述代碼中,由于客戶端和服務(wù)端需要處理的消息類型是完全一樣的,因而客戶端處理類繼承了服務(wù)端處理類。但是對于客戶端而言,其還需要定時向服務(wù)端發(fā)送心跳消息,用于檢測客戶端與服務(wù)器的連接是否健在,因而客戶端還會實現(xiàn)userEventTriggered()方法,在該方法中定時向服務(wù)器發(fā)送心跳消息。userEventTriggered()方法主要是在客戶端被閑置一定時間后,其會根據(jù)其讀取或者寫入消息的限制時長來選擇性的觸發(fā)讀取或?qū)懭胧录?/p>

上述實現(xiàn)中,我們看到,對于具體類型消息的處理,我們是通過一個工廠類來獲取對應(yīng)的消息處理器,然后處理相應(yīng)的消息,下面我們該工廠類的代碼:

public?final?class?MessageResolverFactory?{//?創(chuàng)建一個工廠類實例private?static?final?MessageResolverFactory?resolverFactory?=?new?MessageResolverFactory();private?static?final?List<Resolver>?resolvers?=?new?CopyOnWriteArrayList<>();private?MessageResolverFactory()?{}//?使用單例模式實例化當(dāng)前工廠類實例public?static?MessageResolverFactory?getInstance()?{return?resolverFactory;}public?void?registerResolver(Resolver?resolver)?{resolvers.add(resolver);}//?根據(jù)解碼后的消息,在工廠類處理器中查找可以處理當(dāng)前消息的處理器public?Resolver?getMessageResolver(Message?message)?{for?(Resolver?resolver?:?resolvers)?{if?(resolver.support(message))?{return?resolver;}}throw?new?RuntimeException("cannot?find?resolver,?message?type:?"?+?message.getMessageType());}}

上述工廠類比較簡單,主要就是通過單例模式獲取一個工廠類實例,然后提供一個根據(jù)具體消息來查找其對應(yīng)的處理器的方法。下面我們來看看各個消息處理器的代碼:

//?request類型的消息 public?class?RequestMessageResolver?implements?Resolver?{private?static?final?AtomicInteger?counter?=?new?AtomicInteger(1);@Overridepublic?boolean?support(Message?message)?{return?message.getMessageType()?==?MessageTypeEnum.REQUEST;}@Overridepublic?Message?resolve(Message?message)?{//?接收到request消息之后,對消息進行處理,這里主要是將其打印出來int?index?=?counter.getAndIncrement();System.out.println("[trx:?"?+?message.getSessionId()?+?"]"+?index?+?".?receive?request:?"?+?message.getBody());System.out.println("[trx:?"?+?message.getSessionId()?+?"]"+?index?+?".?attachments:?"?+?message.getAttachments());//?處理完成后,生成一個響應(yīng)消息返回Message?response?=?new?Message();response.setMessageType(MessageTypeEnum.RESPONSE);response.setBody("nice?to?meet?you?too!");response.addAttachment("name",?"xufeng");response.addAttachment("hometown",?"wuhan");return?response;} } //?響應(yīng)消息處理器 public?class?ResponseMessageResolver?implements?Resolver?{private?static?final?AtomicInteger?counter?=?new?AtomicInteger(1);@Overridepublic?boolean?support(Message?message)?{return?message.getMessageType()?==?MessageTypeEnum.RESPONSE;}@Overridepublic?Message?resolve(Message?message)?{//?接收到對方服務(wù)的響應(yīng)消息之后,對響應(yīng)消息進行處理,這里主要是將其打印出來int?index?=?counter.getAndIncrement();System.out.println("[trx:?"?+?message.getSessionId()?+?"]"+?index?+?".?receive?response:?"?+?message.getBody());System.out.println("[trx:?"?+?message.getSessionId()?+?"]"+?index?+?".?attachments:?"?+?message.getAttachments());//?響應(yīng)消息不需要向?qū)Ψ椒?wù)再發(fā)送響應(yīng),因而這里寫入一個空消息Message?empty?=?new?Message();empty.setMessageType(MessageTypeEnum.EMPTY);return?empty;} } //?ping消息處理器 public?class?PingMessageResolver?implements?Resolver?{@Overridepublic?boolean?support(Message?message)?{return?message.getMessageType()?==?MessageTypeEnum.PING;}@Overridepublic?Message?resolve(Message?message)?{//?接收到ping消息后,返回一個pong消息返回System.out.println("receive?ping?message:?"?+?System.currentTimeMillis());Message?pong?=?new?Message();pong.setMessageType(MessageTypeEnum.PONG);return?pong;} } //?pong消息處理器 public?class?PongMessageResolver?implements?Resolver?{@Overridepublic?boolean?support(Message?message)?{return?message.getMessageType()?==?MessageTypeEnum.PONG;}@Overridepublic?Message?resolve(Message?message)?{//?接收到pong消息后,不需要進行處理,直接返回一個空的messageSystem.out.println("receive?pong?message:?"?+?System.currentTimeMillis());Message?empty?=?new?Message();empty.setMessageType(MessageTypeEnum.EMPTY);return?empty;} }

如此,對于自定義協(xié)議的消息處理過程已經(jīng)完成,下面則是使用用Netty實現(xiàn)的客戶端與服務(wù)端代碼:

//?服務(wù)端 public?class?Server?{public?static?void?main(String[]?args)?{EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{ServerBootstrap?bootstrap?=?new?ServerBootstrap();bootstrap.group(bossGroup,?workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,?1024).handler(new?LoggingHandler(LogLevel.INFO)).childHandler(new?ChannelInitializer<SocketChannel>()?{@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{ChannelPipeline?pipeline?=?ch.pipeline();?//?添加用于處理粘包和拆包問題的處理器pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));pipeline.addLast(new?LengthFieldPrepender(4));//?添加自定義協(xié)議消息的編碼和解碼處理器pipeline.addLast(new?MessageEncoder());pipeline.addLast(new?MessageDecoder());//?添加具體的消息處理器pipeline.addLast(new?ServerMessageHandler());}});ChannelFuture?future?=?bootstrap.bind(8585).sync();future.channel().closeFuture().sync();}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} } public?class?Client?{public?static?void?main(String[]?args)?{NioEventLoopGroup?group?=?new?NioEventLoopGroup();Bootstrap?bootstrap?=?new?Bootstrap();try?{bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,?Boolean.TRUE).handler(new?ChannelInitializer<SocketChannel>()?{@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{ChannelPipeline?pipeline?=?ch.pipeline();//?添加用于解決粘包和拆包問題的處理器pipeline.addLast(new?LengthFieldBasedFrameDecoder(1024,?0,?4,?0,?4));pipeline.addLast(new?LengthFieldPrepender(4));//?添加用于進行心跳檢測的處理器pipeline.addLast(new?IdleStateHandler(1,?2,?0));//?添加用于根據(jù)自定義協(xié)議將消息與字節(jié)流進行相互轉(zhuǎn)換的處理器pipeline.addLast(new?MessageEncoder());pipeline.addLast(new?MessageDecoder());//?添加客戶端消息處理器pipeline.addLast(new?ClientMessageHandler());}});ChannelFuture?future?=?bootstrap.connect("127.0.0.1",?8585).sync();future.channel().closeFuture().sync();}?catch?(InterruptedException?e)?{e.printStackTrace();}?finally?{group.shutdownGracefully();}} }

運行上述代碼之后,我們可以看到客戶端和服務(wù)器分別打印了如下數(shù)據(jù):

// 客戶端 receive pong message: 1555123429356 [trx: d05024d2]1. receive response: nice to meet you too! [trx: d05024d2]1. attachments: {hometown=wuhan, name=xufeng} [trx: 66ee1438]2. receive response: nice to meet you too! // 服務(wù)器 receive ping message: 1555123432279 [trx: f582444f]4. receive request: this is my 4 message. [trx: f582444f]4. attachments: {name=xufeng}

3. 小結(jié)

本文首先將自定義協(xié)議與HTTP協(xié)議進行了對比,闡述了自定義協(xié)議的一些優(yōu)點。然后定義了一份自定義協(xié)議,并且講解了協(xié)議中各個字節(jié)的含義。最后通過Netty對自定義協(xié)議進行了實現(xiàn),并且實現(xiàn)了基于自定義協(xié)議的心跳功能。

總結(jié)

以上是生活随笔為你收集整理的Netty实现自定义协议的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。