日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

netty服务器返回信息关闭,netty4 服务端同步客户端返回的结果

發布時間:2025/3/12 编程问答 11 豆豆
生活随笔 收集整理的這篇文章主要介紹了 netty服务器返回信息关闭,netty4 服务端同步客户端返回的结果 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

netty是一個異步通訊框架,在有的時候咱們想使用服務端向客戶端發送消息,服務端同步等待客戶端返回結果真進行下一步的業務邏輯操做。那要怎么作才能同步獲取客戶端返回的數據呢?這里我用到了JDK中的閉鎖等待?CountDownLatch,接下來看看代碼如何實現:java

服務端:git

package com.example.demo.server;

import com.example.demo.cache.ChannelMap;

import com.example.demo.model.Result;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;

import lombok.extern.slf4j.Slf4j;

/**

* @ClassName: NettyServer

* @Author: huangzf

* @Date: 2018/9/25 15:40

* @Description:

*/

@Slf4j

public class NettyServer {

private NettyServerChannelInitializer serverChannelInitializer = null;

private int port = 8000;

public void bind() throws Exception {

//配置服務端的NIO線程組

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

serverChannelInitializer = new NettyServerChannelInitializer();

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

//保持長鏈接

.childOption(ChannelOption.SO_KEEPALIVE,true)

.option(ChannelOption.SO_BACKLOG, 1024)

.childHandler(serverChannelInitializer);

//綁定端口,同步等待成功

ChannelFuture f = b.bind(port).sync();

//等待服務器監聽端口關閉

f.channel().closeFuture().sync();

} finally {

//釋放線程池資源

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

public Result write(Object obj, String tenantId ,String uniId) throws Exception {

// 獲取鎖

Lock lock = ChannelMap.getChannelLock(tenantId);

try {

Channel channel = ChannelMap.getChannel(tenantId);

if(channel != null){

lock.lock();

if(channel.isOpen()){

// 設置同步

CountDownLatch latch = new CountDownLatch(1);

NettyServerHandler nettyServerHandler = (NettyServerHandler) channel.pipeline().get("handler");

nettyServerHandler.resetSync(latch,1);

nettyServerHandler.setUnidId(uniId);

channel.writeAndFlush(obj );

//同步返回結果

if (latch.await(60,TimeUnit.SECONDS)){

// printerServerHandler.setTimeout(0);

return nettyServerHandler.getResult();

}

//若是超時,將超時標志設置為1

//printerServerHandler.setTimeout(1);

log.error("請求超時60s");

return new Result(2,"請求超時",null);

}else{

return new Result(0,"客戶端已關閉!",null);

}

}

}catch (Exception e){

e.printStackTrace();

return new Result(0,"服務出錯!",null);

}finally {

if (lock != null){

lock.unlock();

}

}

return new Result(0,"客戶端沒有鏈接!",null);

}

public static void main(String[] args) throws Exception {

new NettyServer().bind();

}

}

代碼中write方法是業務代碼調用服務端向客戶端發送信息的統一入口,這里用了Lock是為了防止并發操做影響數據返回的問題,這里每一個客戶端通道分配一個鎖。latch.await(60,TimeUnit.SECONDS) 是為了阻塞程序,等待客戶端返回結果,若是60s內沒有返回結果則釋放鎖并返回請求超時。bootstrap

服務端NettyServerChannelInitializer 的實現服務器

package com.example.demo.server;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.serialization.ClassResolvers;

import io.netty.handler.codec.serialization.ObjectDecoder;

import io.netty.handler.codec.serialization.ObjectEncoder;

import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**

* @ClassName: NettyServerChannelInitializer

* @Author: huangzf

* @Date: 2018/9/25 15:43

* @Description:

*/

public class NettyServerChannelInitializer extends ChannelInitializer {

private NettyServerHandler handler ;

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers

.weakCachingConcurrentResolver(this.getClass().getClassLoader())));

pipeline.addLast("encoder", new ObjectEncoder());

pipeline.addLast(new IdleStateHandler(40,0,0,TimeUnit.SECONDS));

//服務器的邏輯

handler = new NettyServerHandler();

pipeline.addLast("handler", handler);

}

}

這里使用了對象進行數據傳輸,避免了客戶端從新解析組裝對象的麻煩并發

package com.example.demo.server;

import com.example.demo.cache.ChannelMap;

import com.example.demo.model.Result;

import com.example.demo.model.Tenant;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.locks.ReentrantLock;

import lombok.extern.slf4j.Slf4j;

/**

* @ClassName: NettyServerHandler

* @Author: huangzf

* @Date: 2018/9/25 15:44

* @Description:

*/

@Slf4j

public class NettyServerHandler extends SimpleChannelInboundHandler {

private CountDownLatch latch;

/**

* 消息的惟一ID

*/

private String unidId = "";

/**

* 同步標志

*/

private int rec;

/**

* 客戶端返回的結果

*/

private Result result;

/**

* 心跳丟失次數

*/

private int counter = 0;

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("Client say : " + msg.toString());

if(msg instanceof Tenant){

ChannelMap.setChannel(((Tenant) msg).getTenantId(),ctx.channel());

ChannelMap.setChannelLock(((Tenant) msg).getTenantId(),new ReentrantLock());

}

counter = 0;

if(rec == 1 && msg instanceof Result){

Result re = (Result) msg;

//校驗返回的信息是不是同一個信息

if (unidId.equals(re.getUniId())){

latch.countDown();//消息返回完畢,釋放同步鎖,具體業務須要判斷指令是否匹配

rec = 0;

result = re;

}

}

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

log.info("RemoteAddress : " + ctx.channel().remoteAddress().toString()+ " active !");

super.channelActive(ctx);

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {

IdleStateEvent event = (IdleStateEvent) evt;

if (event.state().equals(IdleState.READER_IDLE)){

// 空閑40s以后觸發 (心跳包丟失)

if (counter >= 3) {

// 連續丟失3個心跳包 (斷開鏈接)

ctx.channel().close().sync();

log.error("已與"+ctx.channel().remoteAddress()+"斷開鏈接");

System.out.println("已與"+ctx.channel().remoteAddress()+"斷開鏈接");

} else {

counter++;

log.debug(ctx.channel().remoteAddress() + "丟失了第 " + counter + " 個心跳包");

System.out.println("丟失了第 " + counter + " 個心跳包");

}

}

}

}

public void resetSync(CountDownLatch latch, int rec) {

this.latch = latch;

this.rec = rec;

}

public void setUnidId(String s){

this.unidId = s;

}

public Result getResult() {

return result;

}

}

在channelRead0方法中 若是讀取到的信息是Tenant (客戶端剛鏈接上發送的消息)就為該客戶端關聯一個惟一標志和分配一個鎖Lock(用于并發操做)框架

若是讀取到的信息是Result(客戶端響服務端的消息)就判斷其是不是同一個消息(服務端發送的消息中帶有該消息的惟一id,客戶端返回時也要帶上該id),若是是就latch.countDown() 釋放同步鎖,這樣就能夠使得服務端同步獲得客戶端返回的消息了。異步

詳情與客戶端代碼請移步碼云:socket

總結

以上是生活随笔為你收集整理的netty服务器返回信息关闭,netty4 服务端同步客户端返回的结果的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。