日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Netty学习之路一(大文件传输案例分析)

發(fā)布時(shí)間:2024/3/26 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty学习之路一(大文件传输案例分析) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

業(yè)務(wù)場(chǎng)景: 由于工作需要,需要在兩臺(tái)服務(wù)器的java服務(wù)之間通過(guò)netty建立鏈接,將大文件(幾百G到TB級(jí)別)從機(jī)器A上的serverA發(fā)送到機(jī)器B上的serverB。

實(shí)現(xiàn)方法設(shè)計(jì):

  • 系統(tǒng)現(xiàn)有的實(shí)現(xiàn)方法:將業(yè)務(wù)方存儲(chǔ)在服務(wù)器上的文件,在傳輸之前,對(duì)文件進(jìn)行分片,以定義的規(guī)則將文件分為大小20MB的分片存儲(chǔ)在服務(wù)器中。同步時(shí)以異步的方式同步分片,當(dāng)然A服務(wù)器上的文件同步到B服務(wù)器時(shí)也是以分片的形式存儲(chǔ),此時(shí)需要在B服務(wù)器上按照規(guī)則將文件進(jìn)行組裝。 【優(yōu)點(diǎn):傳輸時(shí)使用異步方式并發(fā)同步,加快了同步效率。缺點(diǎn):文件上傳時(shí)將文件分片/同步完成后需要將分片進(jìn)行組裝 當(dāng)文件過(guò)大時(shí),這兩個(gè)步驟需要消耗大量的時(shí)間(單線程模式下一個(gè)8G左右的文件采用NIO方式拷貝分片大概需要1分16秒)】
  • 計(jì)劃整改系統(tǒng)現(xiàn)有實(shí)現(xiàn)方式,省略文件上傳時(shí)的拷貝分片與文件下載時(shí)的組合拷貝,不對(duì)文件進(jìn)行分片,一個(gè)文件為一整個(gè)實(shí)體,利用netty建立長(zhǎng)連接,單線程模式進(jìn)行傳輸。【優(yōu)點(diǎn):省略了上傳準(zhǔn)備和下載準(zhǔn)備的兩次拷貝。缺點(diǎn):傳輸時(shí)采用單線程模式傳輸,會(huì)使文件在傳輸上的時(shí)間消耗變大。】
  • 還在設(shè)計(jì)與思考中。。。。
  • 方案二 的調(diào)研DEMO:

  • pom.xml 的依賴
  • <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version></dependency>
  • 案例項(xiàng)目結(jié)構(gòu)

  • 對(duì)服務(wù)端Service的分析

  • FileUploadServer.java

    public class FileUploadServer {public static void main(String[] args) {int port = 8080; // 服務(wù)端的默認(rèn)端口if (args != null && args.length > 0) {port = Integer.valueOf(args[0]);}new FileUploadServer().bind(port);}public void bind(int port) {EventLoopGroup boosGroup = new NioEventLoopGroup(); //服務(wù)端的管理線程EventLoopGroup workerGroup = new NioEventLoopGroup(); //服務(wù)端的工作線程//ServerBootstrap負(fù)責(zé)初始化netty服務(wù)器,并且開(kāi)始監(jiān)聽(tīng)端口的socket請(qǐng)求ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boosGroup, workerGroup) //綁定管理線程和工作線程.channel(NioServerSocketChannel.class) //ServerSocketChannelFactory 有兩種選擇,一種是NioServerSocketChannelFactory,一種是OioServerSocketChannelFactory。 .option(ChannelOption.SO_BACKLOG, 124) //BACKLOG用于構(gòu)造服務(wù)端套接字ServerSocket對(duì)象,標(biāo)識(shí)當(dāng)服務(wù)器請(qǐng)求處理線程全滿時(shí),用于臨時(shí)存放已完成三次握手的請(qǐng)求的隊(duì)列的最大長(zhǎng)度。如果未設(shè)置或所設(shè)置的值小于1,Java將使用默認(rèn)值50。.childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ObjectEncoder());channel.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null)));channel.pipeline().addLast(new FileUploadServerHandler()); // 自定義Handler}});try {ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync(); //保證了服務(wù)一直啟動(dòng),相當(dāng)于一個(gè)死循環(huán)} catch (InterruptedException e) {e.printStackTrace();} finally {boosGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }

    FileUploadServerHandler.java 自定義的Handler繼承了ChannelInboundHandlerAdapter類

    @ChannelHandler.Sharable public class FileUploadServerHandler extends ChannelInboundHandlerAdapter { //繼承ChannelInboundHandlerAdapterprivate int byteRead;private volatile Long start = 0L;private String file_dir = "D:\\new_start\\file\\target";@Override //當(dāng)前channel從遠(yuǎn)端讀取到數(shù)據(jù)時(shí)執(zhí)行public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FilePacket) {FilePacket filePacket = (FilePacket) msg;byte[] bytes = filePacket.getBytes();byteRead = filePacket.getEndPos();String md5 = filePacket.getFile_md5();String path = file_dir + File.separator + md5;File file = new File(path);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");randomAccessFile.seek(start);randomAccessFile.write(bytes);start = start + byteRead;if (byteRead > 0) {ctx.writeAndFlush(start);} else {randomAccessFile.close();}}}@Override //ChannelHandler回調(diào)方法出現(xiàn)異常時(shí)被回調(diào)public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }

    ChannelHandler用于處理Channel對(duì)應(yīng)的事件。ChannelHandler接口中定義了如下三個(gè)方法

    • void handlerAdded(ChannelHandlerContext ctx) throws Exception;
      在當(dāng)前ChannelHander加入ChannelHandlerContext中時(shí)被回調(diào)
    • void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
      在當(dāng)前ChannelHander從ChannelHandlerContext中移除時(shí)被回調(diào)
    • void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
      在當(dāng)前ChannelHandler回調(diào)方法出現(xiàn)異常時(shí)被回調(diào)

    程序開(kāi)發(fā)過(guò)程中主要實(shí)現(xiàn)ChannelHandler的子接口ChannelInboundHandler和ChannelOutboundHandler。
    框架提供了ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler這三個(gè)適配類,繼承適配器類,并實(shí)現(xiàn)你需要的方法。

    ChannelInboundHandler中的回調(diào)方法于觸發(fā)時(shí)間:(參考博文:https://www.jianshu.com/p/96a50869b527)

    • channelRegistered 當(dāng)前channel注冊(cè)到EventLoop時(shí)觸發(fā)
    • channelUnregistered 當(dāng)前channel從EventLoop取消注冊(cè)時(shí)觸發(fā)
    • channelActive 當(dāng)前channel激活的時(shí)候的時(shí)候觸發(fā)
    • channelInactive 當(dāng)前channel不活躍的時(shí)候,生命周期末時(shí)觸發(fā)
    • channelRead 當(dāng)前channel從遠(yuǎn)端讀取到數(shù)據(jù)時(shí)觸發(fā)(最常用)
    • channelReadComplete 當(dāng)前channel read消費(fèi)完讀取的數(shù)據(jù)的時(shí)候被觸發(fā)
    • userEventTriggered 用戶事件觸發(fā)的時(shí)候
    • channelWritabilityChanged channel的寫狀態(tài)變化的時(shí)候觸發(fā)

    解析重寫的channelRead方法

    @Override //當(dāng)前channel從遠(yuǎn)端讀取到數(shù)據(jù)時(shí)執(zhí)行public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FilePacket) { //服務(wù)端于客戶端共同定義好的發(fā)送對(duì)象FilePacket filePacket = (FilePacket) msg;byte[] bytes = filePacket.getBytes(); //文件的字節(jié)數(shù)組內(nèi)容byteRead = filePacket.getEndPos(); //此次接收客戶端發(fā)送的字節(jié)長(zhǎng)度String md5 = filePacket.getFile_md5();String path = file_dir + File.separator + md5; //文件存儲(chǔ)的位置File file = new File(path);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); //使用RandomAccessFile讀取文件 rw權(quán)限讀寫randomAccessFile.seek(start); //設(shè)置此次寫操作,文件的起始偏移量 randomAccessFile.write(bytes); //將接收到的字節(jié)寫入到文件start = start + byteRead;if (byteRead > 0) { //文件未結(jié)束ctx.writeAndFlush(start); //將下次(即 未讀) 的文件起始位置返回給客戶端} else {randomAccessFile.close(); //接收完畢 關(guān)閉文件 (存在問(wèn)題,可能會(huì)存在文件一直未關(guān)的情況)}}}
  • 對(duì)客戶端Client的分析
  • FileUploadClient.java

    public class FileUploadClient {public static void main(String[] args) {int port = 8080;if (args != null && args.length > 0) {port = Integer.valueOf(args[0]);}FilePacket filePacket = new FilePacket();File file = new File("D:\\new_start\\file\\origin\\nohup.out");String fileMd5 = file.getName();filePacket.setFile(file);filePacket.setFile_md5(fileMd5);filePacket.setStartPos(0); //要傳輸?shù)奈募某跏夹畔ew FileUploadClient().connect("127.0.0.1", port, filePacket);}public void connect(String host, int port, final FilePacket filePacket) {EventLoopGroup group = new NioEventLoopGroup(); //只需要一個(gè)線程組,和服務(wù)端有所不同Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ObjectEncoder());channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));channel.pipeline().addLast(new FileUploadClientHandler(filePacket)); //自定義的handler}});ChannelFuture future = null;try {future = bootstrap.connect(host, port).sync(); //使得鏈接保持future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}}

    FileUploadClientHandler.java 用戶自定義的客戶端Handler 繼承了ChannelInboundHandlerAdapter

    @ChannelHandler.Sharable public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {private int byteRead;private volatile Long start = 0l; //使用Long 當(dāng)傳輸?shù)奈募笥?G時(shí),Integer類型會(huì)不夠表達(dá)文件的長(zhǎng)度private volatile int lastLength = 0;public RandomAccessFile randomAccessFile;private FilePacket filePacket;//構(gòu)造器,FilePacket作為參數(shù)public FileUploadClientHandler(FilePacket filePacket) {if (filePacket.getFile().exists()) {if (!filePacket.getFile().isFile()) {System.out.println("Not a file:" + filePacket.getFile());}}this.filePacket = filePacket;}@Override //當(dāng)前channel激活的時(shí)候的時(shí)候觸發(fā) 優(yōu)先于channelRead方法執(zhí)行 (我的理解,只執(zhí)行一次)public void channelActive(ChannelHandlerContext ctx) throws Exception {randomAccessFile = new RandomAccessFile(filePacket.getFile(), "r");randomAccessFile.seek(filePacket.getStartPos());lastLength = Integer.MAX_VALUE / 4 ; //每次發(fā)送的文件塊數(shù)的長(zhǎng)度byte[] bytes = new byte[lastLength]; if ((byteRead = randomAccessFile.read(bytes)) != -1) {filePacket.setEndPos(byteRead);filePacket.setBytes(bytes);ctx.writeAndFlush(filePacket);} else {System.out.println("文件已讀完");}}@Override //當(dāng)前channel從遠(yuǎn)端讀取到數(shù)據(jù)時(shí)觸發(fā)public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof Long) { //客戶端發(fā)送FilePacket 到服務(wù)端,服務(wù)端處理完文件當(dāng)前部分的數(shù)據(jù),返回下次文件段的起始位置start = (Long) msg;if (start != -1) {randomAccessFile = new RandomAccessFile(filePacket.getFile(), "r");randomAccessFile.seek(start); //將服務(wù)端返回的數(shù)據(jù)設(shè)置此次讀操作,文件的起始偏移量System.out.println("文件長(zhǎng)度:" + (randomAccessFile.length()));System.out.println("此次分片開(kāi)始位置:" + start);System.out.println("塊兒長(zhǎng)度:" + (Integer.MAX_VALUE / 4));System.out.println("剩余長(zhǎng)度:" + (randomAccessFile.length()-start));Long a = randomAccessFile.length() - start;int lastLength = (int) (Integer.MAX_VALUE / 4);if (a < lastLength) {System.out.println("最后一片長(zhǎng)度:"+a);lastLength = a.intValue();}byte[] bytes = new byte[lastLength];//這個(gè)判斷關(guān)閉判斷調(diào)整存在漏洞if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length()-start) > 0) {System.out.println("byte長(zhǎng)度:" + bytes.length);System.out.println("-----------------" + bytes.length);filePacket.setEndPos(byteRead);filePacket.setBytes(bytes);ctx.writeAndFlush(filePacket);} else { randomAccessFile.close(); ctx.close();System.out.println("文件已讀完------" + byteRead);}}}}@Override //在當(dāng)前ChannelHandler回調(diào)方法出現(xiàn)異常時(shí)被回調(diào)public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }
  • 文件傳遞模型
    FilePacket.java
  • @Data public class FilePacket extends Packet implements Serializable {private File file;private String file_md5;private int startPos;private byte[] bytes;private int endPos;public File getFile() {return file;}public void setFile(File file) {this.file = file;}public String getFile_md5() {return file_md5;}public void setFile_md5(String file_md5) {this.file_md5 = file_md5;}public int getStartPos() {return startPos;}public void setStartPos(int startPos) {this.startPos = startPos;}public byte[] getBytes() {return bytes;}public void setBytes(byte[] bytes) {this.bytes = bytes;}public int getEndPos() {return endPos;}public void setEndPos(int endPos) {this.endPos = endPos;}@Overridepublic Byte getCommand() {return FILE;}}

    Packet.java

    @Data public abstract class Packet {private Byte type;public abstract Byte getCommand();}

    Connand.java

    public interface Command {Byte FILE = 1;}

    Demo運(yùn)行效果
    如下圖為8G文件在兩個(gè)netty服務(wù)之間傳輸?shù)慕Y(jié)果,文件分段傳輸,需要等待傳輸完成后再接著傳輸下一段,時(shí)間上其實(shí)相當(dāng)于阻塞,考慮到效率其實(shí)不建議采用。

    總結(jié):
    進(jìn)期的工作項(xiàng)目定位是一個(gè)文件的傳輸系統(tǒng),并不會(huì)對(duì)傳輸?shù)奈募龃鎯?chǔ),只是一個(gè)傳輸?shù)臉蛄骸>W(wǎng)絡(luò)編程要保證文件的可靠性,又要確保傳輸?shù)男省4税咐龑?shí)現(xiàn)了在不對(duì)文件進(jìn)行分片存儲(chǔ)的傳輸。項(xiàng)目開(kāi)發(fā)中,應(yīng)該會(huì)實(shí)現(xiàn)兩套方案。方案一,修改上傳規(guī)則;方案二,修改同步規(guī)則。
    曾使用MappedByteBuffer通過(guò)多線程的模式對(duì)文件進(jìn)行組裝,案例未通過(guò)測(cè)試。(組裝好的文件有損壞,或長(zhǎng)度于源文件不同的情況,且測(cè)試結(jié)果不穩(wěn)定。)
    測(cè)試代碼如下,希望看見(jiàn)的同胞可對(duì)測(cè)試代碼進(jìn)行指點(diǎn)。
    TestMappedByteBufferDownloadWithThread.java類

    public class TestMappedByteBufferDownloadWithThread {public static void main(String[] args) throws IOException {long chunkCount =32646;String chunkBasePath = "D:\\new_start\\file\\523";File dir = new File(chunkBasePath);File[] files = dir.listFiles();for(File f : files){FileThread thread = new FileThread(f , chunkCount); //線程Thread類thread.start();}} }

    FileThread.java 類

    public class FileThread extends Thread {private File file ;private Long chunkCount ;public FileThread(File file, Long chunkCount) {this.file = file;this.chunkCount = chunkCount;}@Overridepublic void run() {try{String targetPath = "D:\\new_start\\file\\HTTP協(xié)議詳細(xì)介紹_8.pdf";File downloadFile = new File(targetPath);if(!downloadFile.exists()){downloadFile.createNewFile();}RandomAccessFile randFile = new RandomAccessFile(downloadFile,"rw");FileChannel outChannel = randFile.getChannel();String[] split = file.getName().split("_");long position = Integer.parseInt(split[1]) * chunkCount;FileChannel inChannel = new FileInputStream(file).getChannel();if(file.getName().equals("chunk_100")){System.out.println("file.getName():"+file.getName()+"-------file.length():"+file.length());}MappedByteBuffer mbb = outChannel.map(FileChannel.MapMode.READ_WRITE, position, file.length());if(file.getName().equals("chunk_100")){System.out.println("mbb.getLong():"+mbb.getLong()+"-------file.length():"+file.length());}inChannel.read(mbb);mbb.flip();outChannel.write(mbb);inChannel.close();outChannel.close();}catch (Exception e ){e.printStackTrace();}}}

    本案例原型來(lái)自GitHub,鏈接: https://github.com/zhangji-hhu/BigFileTransfer.git 作者:zhangji-hhu 侵刪。
    參考文章:https://www.jianshu.com/p/96a50869b527 作者:土豆肉絲蓋澆飯

    總結(jié)

    以上是生活随笔為你收集整理的Netty学习之路一(大文件传输案例分析)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 窝窝午夜看片 | 一区二区三区四区精品 | 国家队动漫免费观看在线观看晨光 | 中文字幕一区二区三区5566 | 九热精品 | 在线观看第一页 | 超碰99在线| 日韩av首页 | 亚洲 欧美 自拍偷拍 | 图片区 小说区 区 亚洲五月 | 超碰2 | 国产精品v日韩精品v在线观看 | 欧美日韩免费做爰视频 | 日韩一区二区中文字幕 | 一区二区久久久 | 成人区人妻精品一区二区不卡视频 | 天天看天天摸 | 亚洲av综合色区无码一区爱av | 天天av综合 | 国产精品一区二区三区在线播放 | 先锋资源网av| 久久久6 | 国产精品视频久久久 | 日韩国产成人无码av毛片 | 亚洲高清在线播放 | 欧美午夜小视频 | 女人叫床很黄很污句子 | 国产精品7777 | 精品国产乱子伦 | www.狠狠插 | 香蕉一级视频 | 先锋影音av资源网站 | 91九色在线 | 日韩1区| 日本精品一区二区在线观看 | 色综合av在线 | 一本久道久久综合 | 欧美videossex另类 | 日韩麻豆视频 | 老熟妇一区二区三区 | 中国三级视频 | 扒开jk护士狂揉免费 | 国产精品天天狠天天看 | 亚洲精品一二三 | 国产午夜福利一区 | 在线观看色网 | 亚洲精品无人区 | 国产在线播放一区二区 | 亚欧视频在线观看 | 久久人妻一区二区 | 成人黄网免费观看视频 | 成人黄色在线观看视频 | 亚洲乱码精品 | 成人免费视频网 | 国产日韩欧美一二三区 | 中文字幕在线观看免费高清 | 香港三级在线视频 | 青青草黄色| 日本少妇高潮喷水xxxxxxx | 成人毛片100免费观看 | 国产精品美女一区二区三区 | 一区二区不卡在线 | 国产精品人妻一区二区三区 | www.爆操| 一边吃奶一边摸做爽视频 | 国产夜夜操 | 久久精品一区二区在线观看 | 日韩激情av | 网站黄在线观看 | 靠逼动漫| 高清毛片aaaaaaaaa郊外 | 国产不卡一 | 亚洲成人久 | 日本三级吹潮 | 中文亚洲欧美 | 欧美日韩黑人 | 国产精品色视频 | 激情视频国产 | 西比尔在线观看完整视频高清 | 女人的天堂av | 日本国产一区 | 免费成人看视频 | 中文av一区二区 | 动漫美女揉胸 | 一本大道东京热无码aⅴ | 中文字幕伦理 | 亚洲熟妇丰满大屁股熟妇 | 国产精品扒开腿做爽爽爽a片唱戏 | 日韩精品理论 | 国产小视频在线免费观看 | 精东av在线 | 91一区| 亚洲欧美日韩另类 | 日韩在线观看视频一区 | 91麻豆免费看 | 国产裸体舞一区二区三区 | 猫咪av网| 涩涩屋视频在线观看 | 婷婷激情丁香 |