java nio socket长连接_netty学习实战—实现websocket长连接和socket之间进程通信
netty學習—實現websocket長連接和socket之間通信
最近正在學習netty,跟著教程寫了一個基于WebSocket的網頁聊天室,對netty有了一定的了解,現在正好項目使用到長連接,選用了netty。
項目目標:客戶端A(網頁)和服務端通過WebSocket進行通信,客戶端B和服務端通過Socket通信,把客戶端B的數據傳輸到客戶端A,橋梁為服務端
Socket服務端監聽8090端口,長連接服務端監聽8089端口,客戶端A連接到8089端口,客戶端B連接到8090端口
由于是需要對兩個端口數據進行不同處理,所以我們創建兩個ServerBootstrap,分別綁定兩個端口,一個ServerGzhBootstrap處理客戶端B和服務端的socket通信;ServerWxQBootstrap處理客戶端A和服務端之間的WebSocket長連接通信
ServerInitializer,實現ChannelInitializer,負責初始化客戶端B和服務端通信的處理器Handler
WebSocketChannelInitializer,實現ChannelInitializer,負責初始化客戶端A和服務端長連接通信的處理器Handler
ServerInitializer添加一個自定義SimpleChannelInboundHandler負責處理客戶端B和服務端socket通信
WebSocketChannelInitializer添加一個自定義SimpleChannelInboundHandler負責處理客戶端A和服務端WebSocket長連接通信
網頁聊天室作為客戶端A,客戶端B通過Socket通信并接收控制臺的輸入作為通信數據傳遞給服務端,服務端再傳遞給客戶端A
問題:
netty中SimpleChannelInboundHandler類的泛型中指定了傳入的消息的類型,只能接收這種類型的消息,客戶端B發送的String類型消息與客戶端A接收的TextWebSocketFrame類型不同,客戶端A無法接收。
解決方法:
我們把客戶端B發送的String類型消息在Socket服務端接收到,要將其發送給客戶端A(需要將其封裝成TextWebSocketFrame類型才能發送給客戶端A),而且我們就必須要有客戶端A的channel,我們才可以調用writeAndFlush方法把數據寫入客戶端A
使用什么可以得到客戶端A的channel呢?
那就是ChannelGroup,我們定義一個類保存全部Channel客戶端作為全局ChannelGroup,每次有客戶端Channel創建(handlerAdded方法),我們就把它保存到該全局ChannelGroup中,每次channel使用完畢,ChannelGroup會為我們自動刪除其中無用的channel,這樣我們就可以獲取所有的客戶端channel
任何獲取到客戶端A的channel?
客戶端A和客戶端B很大一個區別就是端口號,我們可以通過端口號來判斷是客戶端A還是客戶端B
全局ChannelGroup
public class GlobalChannelGroup {
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
服務端
服務端啟動器
public class Server {
public static void main(String[] args) throws InterruptedException {
//兩個事件循環組 boss獲取連接發送 worker接收處理
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
//server啟動器
ServerBootstrap serverWxQBootstrap = new ServerBootstrap();
ServerBootstrap serverGzhBootstrap = new ServerBootstrap();
// 定義組
// channel(反射)
// 定義處理器(自定義):連接上channel后執行init
System.out.println("啟動server");
serverGzhBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer());
serverWxQBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new WebSocketChannelInitializer());
//綁定端口,同步
ChannelFuture wxq = serverGzhBootstrap.bind(8090).sync();
ChannelFuture gzh = serverWxQBootstrap.bind(8089).sync();
gzh.channel().closeFuture().sync();
wxq.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
服務端Socket端口初始化器
public class ServerInitializer extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));//用于解碼
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));//用于編碼
pipeline.addLast(new ServerHandler());//自定義處理器
}
}
服務端Socket端口通信處理器
public class ServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+","+msg);
ctx.channel().writeAndFlush("消息已經進入數據庫,正在趕往微信墻!");
GlobalChannelGroup.channelGroup.forEach(o->{
//如果端口以8089結尾,說明這個channel是客戶端A
if (o.localAddress().toString().endsWith("8089")){
TextWebSocketFrame text = new TextWebSocketFrame(o.remoteAddress() + "發送消息:" + msg + "\n");
o.writeAndFlush(text);
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+":連接到微信墻模式成功!");
int size = GlobalChannelGroup.channelGroup.size();
System.out.println("當前微信墻連接數:"+(size==0?0:size-1));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
GlobalChannelGroup.channelGroup.add(channel);
}
}
服務端長連接通信端口初始化器
public class WebSocketChannelInitializer extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
//用于將http數據聚合到一起發送一個請求 fullHttpRequest
pipeline.addLast(new HttpObjectAggregator(8192));
pipeline.addLast(new WebSocketServerProtocolHandler("/"));//傳入websocket path
pipeline.addLast(new TextWebSocketHandler());//傳入websocket path
}
}
服務端長連接通信處理器
public class TextWebSocketHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
int size = GlobalChannelGroup.channelGroup.size();
System.out.println("當前微信墻連接數:"+(size==0?0:size-1));
System.out.println("收到消息:"+msg.text());
Channel channel = ctx.channel();
GlobalChannelGroup.channelGroup.forEach(o->{
if (o.localAddress().toString().endsWith("8090")){
o.writeAndFlush(msg.text());
}else {
TextWebSocketFrame text = new TextWebSocketFrame(o.remoteAddress() + "發送消息:" + msg.text() + "\n");
o.writeAndFlush(text);
}
});
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
GlobalChannelGroup.channelGroup.add(ch);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+":離開聊天室");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
System.out.println(ch.remoteAddress()+":連接到聊天室");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("異常");
ctx.close();
}
}
客戶端
客戶端把控制臺的標準輸入作為參數傳入,創建客戶端channel時將其發送
客戶端B啟動器
public class GzhClient {
public static void main(String[] args) {
EventLoopGroup eventExecutors = null;
ChannelFuture channelFuture = null;
try{
// while (true) {
eventExecutors = new NioEventLoopGroup();
Scanner scanner = new Scanner(System.in);
String json = scanner.nextLine();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors).channel(NioSocketChannel.class)
.handler(new GzhClientInitializer(json));
System.out.println("啟動客戶端");
channelFuture = bootstrap.connect("localhost", 8090).sync();
// }
// channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
客戶端B初始化器
public class GzhClientInitializer extends ChannelInitializer {
private String json;
public GzhClientInitializer(String json){
this.json = json;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new GzhClientHandler(json));
}
}
客戶端B與服務端Socket通信處理器
public class GzhClientHandler extends SimpleChannelInboundHandler {
private String json;
public GzhClientHandler(String json){
this.json = json;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("client receive:"+msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("進入微信墻模式,清發消息:");
ctx.writeAndFlush(json);
}
}
測試
啟動服務端,創建兩個客戶端B(也就是GzhClient)
打開聊天室,建立長連接
兩個客戶端B從控制臺發送消息
服務端接收到消息,打印出來并準備轉發給客戶端A(也就是網頁聊天室)
網頁聊天室接收到服務端發送的消息
總結
以上是生活随笔為你收集整理的java nio socket长连接_netty学习实战—实现websocket长连接和socket之间进程通信的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 土豆炖牛肉怎么做(土豆炖牛肉这做法简单好
- 下一篇: postgres 退出_如何退出post