日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

实现分布式服务注册及简易的netty聊天

發布時間:2023/11/27 生活经验 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 实现分布式服务注册及简易的netty聊天 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  現在很多地方都會用到zookeeper, 用到它的地方就是為了實現分布式。用到的場景就是服務注冊,比如一個集群服務器,需要知道哪些服務器在線,哪些服務器不在線。

  ZK有一個功能,就是創建臨時節點,當機器啟動應用的時候就會連接到一個ZK節點,然后創建一個臨時節點,那么通過獲取監聽該路徑,并且獲取該路徑下的節點數量就知道有哪些服務器在線了。當機器停止應用的時候,zk的臨時節點將會自動被刪除。我們通過這個機制去實現。

  這次主要實現是采用springboot, zkui, swagger實現。接下來來看一下主要的代碼實現:

  在機器啟動的時候獲取本機的IP,然后將本機的IP和指定的端口號注冊到程序中:

package com.hqs.zk.register;import com.hqs.zk.register.config.AppConfig;
import com.hqs.zk.register.thread.ZKRegister;
import com.hqs.zk.register.util.ZKUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import java.net.InetAddress;@SpringBootApplication
public class ZKApplication implements CommandLineRunner{@AutowiredAppConfig appConfig;@AutowiredZKUtil zkUtil;public static void main(String[] args) {SpringApplication.run(ZKApplication.class, args);System.out.println("啟動應用成功");}@Overridepublic void run(String... strings) throws Exception {//獲得本機IPString addr = InetAddress.getLocalHost().getHostAddress();Thread thread = new Thread(new ZKRegister(appConfig, zkUtil, addr));thread.setName("register-thread");thread.start();Thread scanner = new Thread(new Scanner());scanner.start();}
}

  創建一個工具類,工具類主要實現創建父節點,創建臨時路徑,監聽事件,獲取所有注冊節點。

    /*** 創建臨時目錄*/public void createEphemeralNode(String path, String value) {zkClient.createEphemeral(path, value);}/*** 監聽事件*/public void subscribeEvent(String path) {zkClient.subscribeChildChanges(path, new IZkChildListener() {@Overridepublic void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {System.out.println("parentPath:" + parentPath + ":list:" + currentChilds.toString());}});}

  這塊就基本完成了,下面開始創建controller,目的是為了獲取所有在線機器的節點。為了方便測試和查看我使用了Swagger2, 這樣界面話的發請求工具特別好用。

  接下來看controller的主要內容:

    /*** 獲取所有路由節點* @return*/@ApiOperation("獲取所有路由節點")@RequestMapping(value = "getAllRoute",method = RequestMethod.POST)@ResponseBody()public List<String> getAllRoute(){List<String> allNode = zkUtil.getAllNode();List<String> result = new ArrayList<>();for (String node : allNode) {String key = node.split("-")[1];result.add(key);}return result ;}

  同時配置對應的Swagger2

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;/*** Created by huangqingshi on 2019/1/8.*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {@Value("${swagger.switch}")private boolean swaggerSwitch;@Beanpublic Docket api() {Docket docket = new Docket(DocumentationType.SWAGGER_2);docket.enable(swaggerSwitch);docket.apiInfo(apiInfo()).select().apis(RequestHandlerSelectors.basePackage("com.hqs.zk.register.controller")).paths(PathSelectors.any()).build();return docket;}private ApiInfo apiInfo() {return new ApiInfoBuilder().title("Spring boot zk register").description("測試").contact(new Contact("黃青石","http://www.cnblogs.com/huangqingshi","68344150@qq.com")).termsOfServiceUrl("http://www.cnblogs.com/huangqingshi").version("1.0").build();}
}

  好了,接下來該啟動工程了,啟動之后訪問:?http://localhost:8080/swagger-ui.html

  

  點擊下面的zk-controller,對應controller的方法就會顯示出來,然后點try it out, execute 相應的結果就直接出來了, 通過下面的圖片,可以發現我本機的IP已經注冊到里邊了。  

  接下來,咱們使用ZKUI連接上zookeeper,看一下是否真的有注冊的機器(父節點用的monior),已經存在了,沒有問題:

  注冊這塊就算實現完了,我一直想實現一個簡易的聊天,參考了各種資料然后實現了一把,也算圓夢了。下面開始實現簡易netty版聊天(為什么選擇netty?因為這個工具非常棒),使用google的protobuf進行序列化和反序列化:

  首先從官網上下載protobuf工具,注意對應不同的操作系統,我的是WINDOWS的,直接下載一個EXE程序,你下載的哪個版本,需要使用與該版本對應的版本號,否則會出錯誤。

  

  自己創建好對應的Request.proto和Response.proto,在里邊指定好對應的字段和包名信息。分別執行命令即可生成對應的文件:protoc.exe ./Response.proto --java_out=./? 這個是生成Response的,還需要指定一條生成Request。

  將文件夾放到工程里邊,工程的大致接入如下:

  Server的主要實現,主要基于protoBuf固定長度的進行實現的(序列化和反序列化一般通過固定長度或者分隔符實現),這樣的話就不會造成粘包、拆包的問題。

public void bind(int port) throws Exception {//配置服務器端NIO線程組EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap b  = new ServerBootstrap();b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());socketChannel.pipeline().addLast(new ProtobufDecoder(RequestProto.ReqProtocol.getDefaultInstance())).addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder());socketChannel.pipeline().addLast(new ProBufServerHandler());}});//綁定端口,同步等待ChannelFuture f = b.bind(port).sync();if (f.isSuccess()) {System.out.println("啟動 server 成功");}} catch (Exception  e) {e.printStackTrace();}}

  客戶端主要兩個方式,一個方式是客戶端向服務端發請求,一個方式是群組發消息,我為了快速實現,就直接發一條請求,并且將結果輸出到日志中了。客戶端使用一個線程執行兩個不同的方法,然后將一個是發送給Server, 一個是發送給Group。發送給Server比較簡單就直接給Server了。

    @PostConstructpublic void start() throws Exception{connection(appConfig.getNettyServer(), appConfig.getNettyPort());for(int i = 1; i <= 1; i++) {int j = i;Runnable runnable = () -> {try {sendMesgToServer(j);sendMesgToGroup(j);} catch (Exception e) {e.printStackTrace();}};new Thread(runnable).start();}}

  發送給Group的話需要記住每次過來的唯一requestId,并且保存對應的channel,然后發送消息的時候遍歷所有requestId,并且與之對應的發送消息:

    @Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RequestProto.ReqProtocol reqProtocol) throws Exception {RequestProto.ReqProtocol req = reqProtocol;CHANNEL_MAP.putIfAbsent(req.getRequestId(), (NioSocketChannel)channelHandlerContext.channel());
//        System.out.println("get Msg from Client:" + req.getReqMsg());
        handleReq(req);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(cause.getMessage());ctx.close();}public void handleReq(RequestProto.ReqProtocol req) {Long originRequestId = req.getRequestId();if(req.getType() == Constants.CommandType.SERVER) {NioSocketChannel nioSocketChannel = CHANNEL_MAP.get(req.getRequestId());sendMsg(nioSocketChannel, originRequestId, originRequestId, Constants.CommandType.SERVER, "hello client");} else if(req.getType() == Constants.CommandType.GROUP) {for(Map.Entry<Long, NioSocketChannel> entry : CHANNEL_MAP.entrySet()) {//過濾自己收消息if(entry.getKey() == originRequestId) {continue;}sendMsg(entry.getValue(), originRequestId, entry.getKey(), Constants.CommandType.GROUP, req.getReqMsg());}}}

   輸出的結果如下,自定義兩個客戶端,一個requestId是1L,另一個requestId是2L,然后都在啟動的時候sleep 3秒,然后發送給server。sleep5秒發送到Group里邊去,輸出的結果就是如下這個樣子的。

1L : send message to server successful!
2L : send message to server successful!
get Msg from Server: 2:hello client
received id:2- send to id:2
received id:1- send to id:1
get Msg from Server: 1:hello clientreceived id:1- send to id:2
get Msg from Group: 1:hello peoole in group
received id:2- send to id:1get Msg from Group: 2:hello peoole in group

?  具體的代碼可參考:https://github.com/stonehqs/ZKRegister

 ? 如果問題,歡迎留言討論。

?

轉載于:https://www.cnblogs.com/huangqingshi/p/10259913.html

總結

以上是生活随笔為你收集整理的实现分布式服务注册及简易的netty聊天的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。