日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java nio socket长连接_netty学习实战—实现websocket长连接和socket之间进程通信

發布時間:2024/9/19 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java nio socket长连接_netty学习实战—实现websocket长连接和socket之间进程通信 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

netty學習—實現websocket長連接和socket之間通信

最近正在學習netty,跟著教程寫了一個基于WebSocket的網頁聊天室,對netty有了一定的了解,現在正好項目使用到長連接,選用了netty。

項目目標:客戶端A(網頁)和服務端通過WebSocket進行通信,客戶端B和服務端通過Socket通信,把客戶端B的數據傳輸到客戶端A,橋梁為服務端

Socket服務端監聽8090端口,長連接服務端監聽8089端口,客戶端A連接到8089端口,客戶端B連接到8090端口

由于是需要對兩個端口數據進行不同處理,所以我們創建兩個ServerBootstrap,分別綁定兩個端口,一個ServerGzhBootstrap處理客戶端B和服務端的socket通信;ServerWxQBootstrap處理客戶端A和服務端之間的WebSocket長連接通信

ServerInitializer,實現ChannelInitializer,負責初始化客戶端B和服務端通信的處理器Handler

WebSocketChannelInitializer,實現ChannelInitializer,負責初始化客戶端A和服務端長連接通信的處理器Handler

ServerInitializer添加一個自定義SimpleChannelInboundHandler負責處理客戶端B和服務端socket通信

WebSocketChannelInitializer添加一個自定義SimpleChannelInboundHandler負責處理客戶端A和服務端WebSocket長連接通信

網頁聊天室作為客戶端A,客戶端B通過Socket通信并接收控制臺的輸入作為通信數據傳遞給服務端,服務端再傳遞給客戶端A

問題:

netty中SimpleChannelInboundHandler類的泛型中指定了傳入的消息的類型,只能接收這種類型的消息,客戶端B發送的String類型消息與客戶端A接收的TextWebSocketFrame類型不同,客戶端A無法接收。

解決方法:

我們把客戶端B發送的String類型消息在Socket服務端接收到,要將其發送給客戶端A(需要將其封裝成TextWebSocketFrame類型才能發送給客戶端A),而且我們就必須要有客戶端A的channel,我們才可以調用writeAndFlush方法把數據寫入客戶端A

使用什么可以得到客戶端A的channel呢?

那就是ChannelGroup,我們定義一個類保存全部Channel客戶端作為全局ChannelGroup,每次有客戶端Channel創建(handlerAdded方法),我們就把它保存到該全局ChannelGroup中,每次channel使用完畢,ChannelGroup會為我們自動刪除其中無用的channel,這樣我們就可以獲取所有的客戶端channel

任何獲取到客戶端A的channel?

客戶端A和客戶端B很大一個區別就是端口號,我們可以通過端口號來判斷是客戶端A還是客戶端B

全局ChannelGroup

public class GlobalChannelGroup {

public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

}

服務端

服務端啟動器

public class Server {

public static void main(String[] args) throws InterruptedException {

//兩個事件循環組 boss獲取連接發送 worker接收處理

EventLoopGroup boss = new NioEventLoopGroup();

EventLoopGroup worker = new NioEventLoopGroup();

try {

//server啟動器

ServerBootstrap serverWxQBootstrap = new ServerBootstrap();

ServerBootstrap serverGzhBootstrap = new ServerBootstrap();

// 定義組

// channel(反射)

// 定義處理器(自定義):連接上channel后執行init

System.out.println("啟動server");

serverGzhBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)

.childHandler(new ServerInitializer());

serverWxQBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)

.childHandler(new WebSocketChannelInitializer());

//綁定端口,同步

ChannelFuture wxq = serverGzhBootstrap.bind(8090).sync();

ChannelFuture gzh = serverWxQBootstrap.bind(8089).sync();

gzh.channel().closeFuture().sync();

wxq.channel().closeFuture().sync();

} finally {

boss.shutdownGracefully();

worker.shutdownGracefully();

}

}

}

服務端Socket端口初始化器

public class ServerInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));

pipeline.addLast(new LengthFieldPrepender(4));

pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));//用于解碼

pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));//用于編碼

pipeline.addLast(new ServerHandler());//自定義處理器

}

}

服務端Socket端口通信處理器

public class ServerHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

System.out.println(ctx.channel().remoteAddress()+","+msg);

ctx.channel().writeAndFlush("消息已經進入數據庫,正在趕往微信墻!");

GlobalChannelGroup.channelGroup.forEach(o->{

//如果端口以8089結尾,說明這個channel是客戶端A

if (o.localAddress().toString().endsWith("8089")){

TextWebSocketFrame text = new TextWebSocketFrame(o.remoteAddress() + "發送消息:" + msg + "\n");

o.writeAndFlush(text);

}

});

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();

ctx.close();

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println(ctx.channel().remoteAddress()+":連接到微信墻模式成功!");

int size = GlobalChannelGroup.channelGroup.size();

System.out.println("當前微信墻連接數:"+(size==0?0:size-1));

}

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

Channel channel = ctx.channel();

GlobalChannelGroup.channelGroup.add(channel);

}

}

服務端長連接通信端口初始化器

public class WebSocketChannelInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new HttpServerCodec());

pipeline.addLast(new ChunkedWriteHandler());

//用于將http數據聚合到一起發送一個請求 fullHttpRequest

pipeline.addLast(new HttpObjectAggregator(8192));

pipeline.addLast(new WebSocketServerProtocolHandler("/"));//傳入websocket path

pipeline.addLast(new TextWebSocketHandler());//傳入websocket path

}

}

服務端長連接通信處理器

public class TextWebSocketHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

int size = GlobalChannelGroup.channelGroup.size();

System.out.println("當前微信墻連接數:"+(size==0?0:size-1));

System.out.println("收到消息:"+msg.text());

Channel channel = ctx.channel();

GlobalChannelGroup.channelGroup.forEach(o->{

if (o.localAddress().toString().endsWith("8090")){

o.writeAndFlush(msg.text());

}else {

TextWebSocketFrame text = new TextWebSocketFrame(o.remoteAddress() + "發送消息:" + msg.text() + "\n");

o.writeAndFlush(text);

}

});

}

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

Channel ch = ctx.channel();

GlobalChannelGroup.channelGroup.add(ch);

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.out.println(ctx.channel().remoteAddress()+":離開聊天室");

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

Channel ch = ctx.channel();

System.out.println(ch.remoteAddress()+":連接到聊天室");

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.out.println("異常");

ctx.close();

}

}

客戶端

客戶端把控制臺的標準輸入作為參數傳入,創建客戶端channel時將其發送

客戶端B啟動器

public class GzhClient {

public static void main(String[] args) {

EventLoopGroup eventExecutors = null;

ChannelFuture channelFuture = null;

try{

// while (true) {

eventExecutors = new NioEventLoopGroup();

Scanner scanner = new Scanner(System.in);

String json = scanner.nextLine();

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(eventExecutors).channel(NioSocketChannel.class)

.handler(new GzhClientInitializer(json));

System.out.println("啟動客戶端");

channelFuture = bootstrap.connect("localhost", 8090).sync();

// }

// channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

eventExecutors.shutdownGracefully();

}

}

}

客戶端B初始化器

public class GzhClientInitializer extends ChannelInitializer {

private String json;

public GzhClientInitializer(String json){

this.json = json;

}

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));

pipeline.addLast(new LengthFieldPrepender(4));

pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

pipeline.addLast(new GzhClientHandler(json));

}

}

客戶端B與服務端Socket通信處理器

public class GzhClientHandler extends SimpleChannelInboundHandler {

private String json;

public GzhClientHandler(String json){

this.json = json;

}

@Override

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

System.out.println("client receive:"+msg);

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("進入微信墻模式,清發消息:");

ctx.writeAndFlush(json);

}

}

測試

啟動服務端,創建兩個客戶端B(也就是GzhClient)

打開聊天室,建立長連接

兩個客戶端B從控制臺發送消息

服務端接收到消息,打印出來并準備轉發給客戶端A(也就是網頁聊天室)

網頁聊天室接收到服務端發送的消息

總結

以上是生活随笔為你收集整理的java nio socket长连接_netty学习实战—实现websocket长连接和socket之间进程通信的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。