Netty学习之路一(大文件传输案例分析)
業(yè)務(wù)場(chǎng)景: 由于工作需要,需要在兩臺(tái)服務(wù)器的java服務(wù)之間通過(guò)netty建立鏈接,將大文件(幾百G到TB級(jí)別)從機(jī)器A上的serverA發(fā)送到機(jī)器B上的serverB。
實(shí)現(xiàn)方法設(shè)計(jì):
方案二 的調(diào)研DEMO:
案例項(xiàng)目結(jié)構(gòu)
對(duì)服務(wù)端Service的分析
FileUploadServer.java
public class FileUploadServer {public static void main(String[] args) {int port = 8080; // 服務(wù)端的默認(rèn)端口if (args != null && args.length > 0) {port = Integer.valueOf(args[0]);}new FileUploadServer().bind(port);}public void bind(int port) {EventLoopGroup boosGroup = new NioEventLoopGroup(); //服務(wù)端的管理線程EventLoopGroup workerGroup = new NioEventLoopGroup(); //服務(wù)端的工作線程//ServerBootstrap負(fù)責(zé)初始化netty服務(wù)器,并且開(kāi)始監(jiān)聽(tīng)端口的socket請(qǐng)求ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boosGroup, workerGroup) //綁定管理線程和工作線程.channel(NioServerSocketChannel.class) //ServerSocketChannelFactory 有兩種選擇,一種是NioServerSocketChannelFactory,一種是OioServerSocketChannelFactory。 .option(ChannelOption.SO_BACKLOG, 124) //BACKLOG用于構(gòu)造服務(wù)端套接字ServerSocket對(duì)象,標(biāo)識(shí)當(dāng)服務(wù)器請(qǐng)求處理線程全滿時(shí),用于臨時(shí)存放已完成三次握手的請(qǐng)求的隊(duì)列的最大長(zhǎng)度。如果未設(shè)置或所設(shè)置的值小于1,Java將使用默認(rèn)值50。.childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ObjectEncoder());channel.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null)));channel.pipeline().addLast(new FileUploadServerHandler()); // 自定義Handler}});try {ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync(); //保證了服務(wù)一直啟動(dòng),相當(dāng)于一個(gè)死循環(huán)} catch (InterruptedException e) {e.printStackTrace();} finally {boosGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }FileUploadServerHandler.java 自定義的Handler繼承了ChannelInboundHandlerAdapter類
@ChannelHandler.Sharable public class FileUploadServerHandler extends ChannelInboundHandlerAdapter { //繼承ChannelInboundHandlerAdapterprivate int byteRead;private volatile Long start = 0L;private String file_dir = "D:\\new_start\\file\\target";@Override //當(dāng)前channel從遠(yuǎn)端讀取到數(shù)據(jù)時(shí)執(zhí)行public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FilePacket) {FilePacket filePacket = (FilePacket) msg;byte[] bytes = filePacket.getBytes();byteRead = filePacket.getEndPos();String md5 = filePacket.getFile_md5();String path = file_dir + File.separator + md5;File file = new File(path);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");randomAccessFile.seek(start);randomAccessFile.write(bytes);start = start + byteRead;if (byteRead > 0) {ctx.writeAndFlush(start);} else {randomAccessFile.close();}}}@Override //ChannelHandler回調(diào)方法出現(xiàn)異常時(shí)被回調(diào)public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }ChannelHandler用于處理Channel對(duì)應(yīng)的事件。ChannelHandler接口中定義了如下三個(gè)方法
- void handlerAdded(ChannelHandlerContext ctx) throws Exception;
在當(dāng)前ChannelHander加入ChannelHandlerContext中時(shí)被回調(diào) - void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
在當(dāng)前ChannelHander從ChannelHandlerContext中移除時(shí)被回調(diào) - void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
在當(dāng)前ChannelHandler回調(diào)方法出現(xiàn)異常時(shí)被回調(diào)
程序開(kāi)發(fā)過(guò)程中主要實(shí)現(xiàn)ChannelHandler的子接口ChannelInboundHandler和ChannelOutboundHandler。
框架提供了ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler這三個(gè)適配類,繼承適配器類,并實(shí)現(xiàn)你需要的方法。
ChannelInboundHandler中的回調(diào)方法于觸發(fā)時(shí)間:(參考博文:https://www.jianshu.com/p/96a50869b527)
- channelRegistered 當(dāng)前channel注冊(cè)到EventLoop時(shí)觸發(fā)
- channelUnregistered 當(dāng)前channel從EventLoop取消注冊(cè)時(shí)觸發(fā)
- channelActive 當(dāng)前channel激活的時(shí)候的時(shí)候觸發(fā)
- channelInactive 當(dāng)前channel不活躍的時(shí)候,生命周期末時(shí)觸發(fā)
- channelRead 當(dāng)前channel從遠(yuǎn)端讀取到數(shù)據(jù)時(shí)觸發(fā)(最常用)
- channelReadComplete 當(dāng)前channel read消費(fèi)完讀取的數(shù)據(jù)的時(shí)候被觸發(fā)
- userEventTriggered 用戶事件觸發(fā)的時(shí)候
- channelWritabilityChanged channel的寫狀態(tài)變化的時(shí)候觸發(fā)
解析重寫的channelRead方法
@Override //當(dāng)前channel從遠(yuǎn)端讀取到數(shù)據(jù)時(shí)執(zhí)行public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FilePacket) { //服務(wù)端于客戶端共同定義好的發(fā)送對(duì)象FilePacket filePacket = (FilePacket) msg;byte[] bytes = filePacket.getBytes(); //文件的字節(jié)數(shù)組內(nèi)容byteRead = filePacket.getEndPos(); //此次接收客戶端發(fā)送的字節(jié)長(zhǎng)度String md5 = filePacket.getFile_md5();String path = file_dir + File.separator + md5; //文件存儲(chǔ)的位置File file = new File(path);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); //使用RandomAccessFile讀取文件 rw權(quán)限讀寫randomAccessFile.seek(start); //設(shè)置此次寫操作,文件的起始偏移量 randomAccessFile.write(bytes); //將接收到的字節(jié)寫入到文件start = start + byteRead;if (byteRead > 0) { //文件未結(jié)束ctx.writeAndFlush(start); //將下次(即 未讀) 的文件起始位置返回給客戶端} else {randomAccessFile.close(); //接收完畢 關(guān)閉文件 (存在問(wèn)題,可能會(huì)存在文件一直未關(guān)的情況)}}}FileUploadClient.java
public class FileUploadClient {public static void main(String[] args) {int port = 8080;if (args != null && args.length > 0) {port = Integer.valueOf(args[0]);}FilePacket filePacket = new FilePacket();File file = new File("D:\\new_start\\file\\origin\\nohup.out");String fileMd5 = file.getName();filePacket.setFile(file);filePacket.setFile_md5(fileMd5);filePacket.setStartPos(0); //要傳輸?shù)奈募某跏夹畔ew FileUploadClient().connect("127.0.0.1", port, filePacket);}public void connect(String host, int port, final FilePacket filePacket) {EventLoopGroup group = new NioEventLoopGroup(); //只需要一個(gè)線程組,和服務(wù)端有所不同Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ObjectEncoder());channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));channel.pipeline().addLast(new FileUploadClientHandler(filePacket)); //自定義的handler}});ChannelFuture future = null;try {future = bootstrap.connect(host, port).sync(); //使得鏈接保持future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}}FileUploadClientHandler.java 用戶自定義的客戶端Handler 繼承了ChannelInboundHandlerAdapter
@ChannelHandler.Sharable public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {private int byteRead;private volatile Long start = 0l; //使用Long 當(dāng)傳輸?shù)奈募笥?G時(shí),Integer類型會(huì)不夠表達(dá)文件的長(zhǎng)度private volatile int lastLength = 0;public RandomAccessFile randomAccessFile;private FilePacket filePacket;//構(gòu)造器,FilePacket作為參數(shù)public FileUploadClientHandler(FilePacket filePacket) {if (filePacket.getFile().exists()) {if (!filePacket.getFile().isFile()) {System.out.println("Not a file:" + filePacket.getFile());}}this.filePacket = filePacket;}@Override //當(dāng)前channel激活的時(shí)候的時(shí)候觸發(fā) 優(yōu)先于channelRead方法執(zhí)行 (我的理解,只執(zhí)行一次)public void channelActive(ChannelHandlerContext ctx) throws Exception {randomAccessFile = new RandomAccessFile(filePacket.getFile(), "r");randomAccessFile.seek(filePacket.getStartPos());lastLength = Integer.MAX_VALUE / 4 ; //每次發(fā)送的文件塊數(shù)的長(zhǎng)度byte[] bytes = new byte[lastLength]; if ((byteRead = randomAccessFile.read(bytes)) != -1) {filePacket.setEndPos(byteRead);filePacket.setBytes(bytes);ctx.writeAndFlush(filePacket);} else {System.out.println("文件已讀完");}}@Override //當(dāng)前channel從遠(yuǎn)端讀取到數(shù)據(jù)時(shí)觸發(fā)public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof Long) { //客戶端發(fā)送FilePacket 到服務(wù)端,服務(wù)端處理完文件當(dāng)前部分的數(shù)據(jù),返回下次文件段的起始位置start = (Long) msg;if (start != -1) {randomAccessFile = new RandomAccessFile(filePacket.getFile(), "r");randomAccessFile.seek(start); //將服務(wù)端返回的數(shù)據(jù)設(shè)置此次讀操作,文件的起始偏移量System.out.println("文件長(zhǎng)度:" + (randomAccessFile.length()));System.out.println("此次分片開(kāi)始位置:" + start);System.out.println("塊兒長(zhǎng)度:" + (Integer.MAX_VALUE / 4));System.out.println("剩余長(zhǎng)度:" + (randomAccessFile.length()-start));Long a = randomAccessFile.length() - start;int lastLength = (int) (Integer.MAX_VALUE / 4);if (a < lastLength) {System.out.println("最后一片長(zhǎng)度:"+a);lastLength = a.intValue();}byte[] bytes = new byte[lastLength];//這個(gè)判斷關(guān)閉判斷調(diào)整存在漏洞if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length()-start) > 0) {System.out.println("byte長(zhǎng)度:" + bytes.length);System.out.println("-----------------" + bytes.length);filePacket.setEndPos(byteRead);filePacket.setBytes(bytes);ctx.writeAndFlush(filePacket);} else { randomAccessFile.close(); ctx.close();System.out.println("文件已讀完------" + byteRead);}}}}@Override //在當(dāng)前ChannelHandler回調(diào)方法出現(xiàn)異常時(shí)被回調(diào)public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }FilePacket.java
Packet.java
@Data public abstract class Packet {private Byte type;public abstract Byte getCommand();}Connand.java
public interface Command {Byte FILE = 1;}Demo運(yùn)行效果
如下圖為8G文件在兩個(gè)netty服務(wù)之間傳輸?shù)慕Y(jié)果,文件分段傳輸,需要等待傳輸完成后再接著傳輸下一段,時(shí)間上其實(shí)相當(dāng)于阻塞,考慮到效率其實(shí)不建議采用。
總結(jié):
進(jìn)期的工作項(xiàng)目定位是一個(gè)文件的傳輸系統(tǒng),并不會(huì)對(duì)傳輸?shù)奈募龃鎯?chǔ),只是一個(gè)傳輸?shù)臉蛄骸>W(wǎng)絡(luò)編程要保證文件的可靠性,又要確保傳輸?shù)男省4税咐龑?shí)現(xiàn)了在不對(duì)文件進(jìn)行分片存儲(chǔ)的傳輸。項(xiàng)目開(kāi)發(fā)中,應(yīng)該會(huì)實(shí)現(xiàn)兩套方案。方案一,修改上傳規(guī)則;方案二,修改同步規(guī)則。
曾使用MappedByteBuffer通過(guò)多線程的模式對(duì)文件進(jìn)行組裝,案例未通過(guò)測(cè)試。(組裝好的文件有損壞,或長(zhǎng)度于源文件不同的情況,且測(cè)試結(jié)果不穩(wěn)定。)
測(cè)試代碼如下,希望看見(jiàn)的同胞可對(duì)測(cè)試代碼進(jìn)行指點(diǎn)。
TestMappedByteBufferDownloadWithThread.java類
FileThread.java 類
public class FileThread extends Thread {private File file ;private Long chunkCount ;public FileThread(File file, Long chunkCount) {this.file = file;this.chunkCount = chunkCount;}@Overridepublic void run() {try{String targetPath = "D:\\new_start\\file\\HTTP協(xié)議詳細(xì)介紹_8.pdf";File downloadFile = new File(targetPath);if(!downloadFile.exists()){downloadFile.createNewFile();}RandomAccessFile randFile = new RandomAccessFile(downloadFile,"rw");FileChannel outChannel = randFile.getChannel();String[] split = file.getName().split("_");long position = Integer.parseInt(split[1]) * chunkCount;FileChannel inChannel = new FileInputStream(file).getChannel();if(file.getName().equals("chunk_100")){System.out.println("file.getName():"+file.getName()+"-------file.length():"+file.length());}MappedByteBuffer mbb = outChannel.map(FileChannel.MapMode.READ_WRITE, position, file.length());if(file.getName().equals("chunk_100")){System.out.println("mbb.getLong():"+mbb.getLong()+"-------file.length():"+file.length());}inChannel.read(mbb);mbb.flip();outChannel.write(mbb);inChannel.close();outChannel.close();}catch (Exception e ){e.printStackTrace();}}}本案例原型來(lái)自GitHub,鏈接: https://github.com/zhangji-hhu/BigFileTransfer.git 作者:zhangji-hhu 侵刪。
參考文章:https://www.jianshu.com/p/96a50869b527 作者:土豆肉絲蓋澆飯
總結(jié)
以上是生活随笔為你收集整理的Netty学习之路一(大文件传输案例分析)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: luoguP3375 【模板】KMP字符
- 下一篇: 计算机多媒体技术在中职医学院校里的应用与