日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

大数据 NIO

發(fā)布時間:2024/4/30 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据 NIO 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

NIO

一、基礎(chǔ)回顧

a 、 進(jìn)程與線程

  • 進(jìn)程
  • 進(jìn)程: 程序加載到內(nèi)存中之后被CPU所計算的過程 — 進(jìn)程是計算機(jī)資源分配 、 任務(wù)調(diào)度的最小單位
  • 三個維度考慮進(jìn)程:
  • 物理內(nèi)存維度:每一個進(jìn)程都要分配一塊連續(xù)的內(nèi)存空間(首地址 、 尾地址)
  • 進(jìn)程執(zhí)行維度/邏輯維度: 每一個進(jìn)程都能被CPU計算 , 每一個進(jìn)程都能掛起然后讓另外的進(jìn)程被CPU計算 — 對于單核CPU而言 , 每一個時刻只能計算一個進(jìn)程 。 對于windows而言 , (即使是多核CPU)默認(rèn)只用一個核處理 。 對于Linux而言 , 有幾個核就用幾個。 — 從微觀上 , 計算機(jī)是串行處理進(jìn)程 。 從宏觀上而言 , 是多個進(jìn)程來并行執(zhí)行 。 — 引入多道編程
  • 時間維度: 在每一個時間段內(nèi) , 進(jìn)程一定是向前撲進(jìn)的 。
  • 為什么要引入進(jìn)程模型?
  • 減少程序響應(yīng)時間 , 提高使用效率 。
  • 提高CPU利用率 。 IO事件(程序與硬件之間的交互)
  • 進(jìn)程的產(chǎn)生事件:
  • 系統(tǒng)啟動時 , 會創(chuàng)建系統(tǒng)進(jìn)程 。
  • 用戶請求創(chuàng)建進(jìn)程時 , 創(chuàng)建用戶進(jìn)程 。 (打開應(yīng)用)
  • 主進(jìn)程自動啟動子進(jìn)程 。 (QQ等程序啟動之后 , 會自動啟動安全守護(hù)進(jìn)程 。 )
  • 進(jìn)程的消亡事件
  • 進(jìn)程任務(wù)執(zhí)行完畢 , 自動銷毀
  • 進(jìn)程執(zhí)行過程中出現(xiàn)錯誤或異常, 導(dǎo)致進(jìn)程退出 。 意外身亡
  • 一個進(jìn)程被另外的進(jìn)程強(qiáng)制關(guān)閉 他殺
  • 進(jìn)程的狀態(tài) (除啟動 、 消亡)
  • 就緒 就緒->運(yùn)行
  • 運(yùn)行 運(yùn)行->阻塞 運(yùn)行->就緒
  • 阻塞 阻塞->就緒
  • 線程
  • 線程: 是進(jìn)程中執(zhí)行某一個具體的任務(wù) 。 線程本質(zhì)上是簡化版的進(jìn)程 。 線程不具有進(jìn)程資源分配 、 任務(wù)調(diào)度的資格 。 一個進(jìn)程中至少有一個線程是在執(zhí)行的 。 — 線程是任務(wù)執(zhí)行的最小單位 。
  • 進(jìn)程的任務(wù)調(diào)度算法
  • 時間片輪轉(zhuǎn)算法
  • 優(yōu)先級調(diào)度算法
  • 短任務(wù)優(yōu)先算法
  • FICS 先來先執(zhí)行
  • b、 Socket

  • BIO BlockingIO— 阻塞式IO — 阻塞在一些場景下會相對影響效率 ; 由于流具有方向性, 所以在傳輸數(shù)據(jù)時往往要創(chuàng)建多個流對象。 如果一些流長時間不使用 , 卻依然保持連接, 會造成資源的大量浪費(fèi) 。 無法從流中準(zhǔn)確的抽取一段數(shù)據(jù)
  • 引入 NIO
  • 二、 NIO

  • NIO — NewIO — NonBlockingIO — 非阻塞式IO — 基于通道和緩沖區(qū)。
  • Buffer類 — 一個基本數(shù)據(jù)類型的容器 。 緩沖區(qū)
  • Buffer子類 ByteBuffer

  • 底層是依靠字節(jié)數(shù)組來存儲數(shù)據(jù) 理解 容量位capacity 、 position 操作位 、 limit限制位
  • 在讀取數(shù)據(jù)之前往往要做一次filp操作 , 反轉(zhuǎn)緩沖區(qū) , 先將限制位設(shè)置為操作位 , 然后將操作位歸0
  • 重繞緩沖區(qū) – 將操作位歸0 , 限制位不變
  • 代碼1

    import java.nio.ByteBuffer;public class BufferDemo {public static void main(String[] args) { // //創(chuàng)建緩沖區(qū) , 并指定了大小為1024個字節(jié)//當(dāng)創(chuàng)建好緩沖區(qū)的時候 , 就有了一下屬性//1. capacity 容量位 --- 表示緩沖區(qū)容量//2. position 操作位 --- 表示要操作的位置 ---- 當(dāng)緩沖區(qū)剛剛創(chuàng)建的時候 , 操作位默認(rèn)為0 , 每添加一個字節(jié)的數(shù)據(jù) , position就會向后挪一位//3. limit 限制位 ---- 表示position 所能達(dá)到的最大位置 --- 當(dāng)緩沖區(qū)剛剛創(chuàng)建的時候 , limit就是容量位 。//獲取數(shù)據(jù)時 , 默認(rèn)是從操作位開始獲取的 // ByteBuffer buffer = ByteBuffer.allocate(1024);//最多能存放1k數(shù)據(jù) // //向緩沖區(qū)添加數(shù)據(jù) // buffer.put("hello".getBytes());//以上方法存在資源浪費(fèi)//******************************************************* // //在已知具體數(shù)據(jù)的情況下 , 建議使用這種方法創(chuàng)建緩沖區(qū)//使用wrap方式創(chuàng)建緩沖區(qū) , 參數(shù)實際上是一個字節(jié)數(shù)組 , 底層實際上就是將參數(shù)字節(jié)數(shù)組復(fù)制給底層的實際存儲數(shù)據(jù)的數(shù)組 , 此時操作位并沒有改變還是0 //為什么是數(shù)組使用復(fù)制 , 而不是直接使用賦值?//保持?jǐn)?shù)據(jù)的不變和唯一 // ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());//創(chuàng)建與數(shù)據(jù)大小相對應(yīng)的緩沖區(qū) // // //獲取數(shù)據(jù) , 每一次獲取 , 只能獲取一個字節(jié)byte b = buffer.get();System.out.println(b); // // //獲取緩沖區(qū)所有數(shù)據(jù) // while(buffer.hasRemaining()) {//判斷是否還有剩余數(shù)據(jù) // // byte b = buffer.get(); // System.out.println(b); // }//*******************************************************//但是使用固定緩沖區(qū)大小的情況下獲取數(shù)據(jù)會出現(xiàn)獲取到0的情況 , 需要將默認(rèn)的操作位歸0 , 并且讀取到有效數(shù)據(jù)結(jié)束即可 ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put("hello".getBytes());//遍歷方法一 : 記錄操作位位置后循環(huán)遍歷 // int position = buffer.position(); // for(int i = 0 ;i < position ; i++) { // System.out.println(buffer.get(i)); // }//遍歷方法二: 設(shè)置限制位為操作位后 , 操作位歸0 遍歷 // buffer.limit(buffer.position()); // buffer.position(0); // while(buffer.hasRemaining()) { // System.out.println(buffer.get()); // }//遍歷方法三: 反轉(zhuǎn)緩沖區(qū)//先將限制位設(shè)置為當(dāng)前的操作位 , 然后把操作位歸0buffer.flip(); // buffer.hasRemaining()該方法 本質(zhì)上就是判斷操作位是否小于限制位while(buffer.hasRemaining()) {System.out.println(buffer.get());}//獲取緩沖區(qū)中的底層數(shù)組byte[] array = buffer.array();//底層也是使用的數(shù)組復(fù)制 , 返回的是整個底層數(shù)組 , 而不是有效數(shù)據(jù)System.out.println(new String(array , 0 , buffer.position()));//如果使用過反轉(zhuǎn)buffer.flip();System.out.println(new String(array , 0 , buffer.limit()));} }
  • 代碼2

    import java.nio.ByteBuffer;public class BufferDemo2 {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put("hello".getBytes());System.out.println("操作位:"+ buffer.position());System.out.println("限制位:"+ buffer.limit()); // buffer.flip(); // System.out.println("操作位:"+ buffer.position()); // System.out.println("限制位:"+ buffer.limit());//重繞緩沖區(qū)buffer.rewind(); //作用: 將操作位歸0 , 限制位不變 。 System.out.println("操作位:"+ buffer.position());System.out.println("限制位:"+ buffer.limit());} }
  • Channel 通道

  • SocketChannel

  • 客戶端步驟
  • 打開客戶端通道 – open
  • 將客戶端通道設(shè)置為非阻塞
  • 發(fā)起連接
  • 人為阻塞 , 防止產(chǎn)生無效連接 finishConnect()
  • 寫出數(shù)據(jù)
  • 服務(wù)器端步驟
  • 打開服務(wù)器端通道
  • 綁定偵聽的端口號
  • 設(shè)置為非阻塞
  • 接收連接
  • 人為阻塞 , 防止沒有獲取真正的連接
  • 讀取數(shù)據(jù)
  • 代碼示例:

    客戶端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel;public class SocketChannelDemo {public static void main(String[] args) throws IOException, InterruptedException {//打開通道//SocketChannel 默認(rèn)為阻塞連接 此時和Socket基本一樣SocketChannel s = SocketChannel.open();//設(shè)置SoceketChannel為非阻塞的s.configureBlocking(false);//發(fā)起連接s.connect(new InetSocketAddress("localhost", 8090));//由于SoceketChannel為非阻塞的 , 所以不能保證連接的真正建立//在實際開發(fā)中往往會認(rèn)為的設(shè)置阻塞 , 來保證連接的建立//判斷連接是否成功 , 如果沒有連接成功finishConnect()底層會試圖再次建立連接//如果多次試圖連接沒有成功 , 則報錯while(!s.finishConnect()) ;//寫出數(shù)據(jù)s.write(ByteBuffer.wrap("hello".getBytes()));//獲取服務(wù)器端的響應(yīng)Thread.sleep(100);ByteBuffer b = ByteBuffer.allocate(100);s.read(b);b.flip();System.out.println(new String(b.array() , 0 , b.limit())); // 關(guān)閉通道 s.close();}服務(wù)端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel;public class ServerSocketChannelDemo {public static void main(String[] args) throws IOException, InterruptedException {//打開服務(wù)器通道ServerSocketChannel s= ServerSocketChannel.open();//綁定偵聽的端口s.bind(new InetSocketAddress( 8090));//設(shè)置非阻塞s.configureBlocking(false);//接收連接SocketChannel accept = s.accept();//由于ServerSocketChannel是非阻塞的 , 所以可能出現(xiàn)還沒有客戶端聯(lián)入 但是服務(wù)器已經(jīng)結(jié)束的現(xiàn)象//所以需要人為的設(shè)置為阻塞的 。 while(accept == null) {accept = s.accept();}//將socketChannel設(shè)置為非阻塞accept.configureBlocking(false);//讀取數(shù)據(jù)ByteBuffer buffer = ByteBuffer.allocate(100);accept.read(buffer);buffer.flip();System.out.println(new String(buffer.array() , 0 , buffer.limit()));//向客戶端做出響應(yīng)accept.write(ByteBuffer.wrap("服務(wù)器端接收成功!".getBytes()));Thread.sleep(1000);//如果不加延時 , 服務(wù)器端寫出數(shù)據(jù)立即結(jié)束 , 此時客戶端還沒有接收完數(shù)據(jù)會報錯} }
  • 通道特點(diǎn) :

  • 能夠進(jìn)行數(shù)據(jù)的雙向傳輸 , 減少流的數(shù)量 , 降低服務(wù)器的內(nèi)存消耗
  • 由于數(shù)據(jù)時存儲在緩沖區(qū)的 , 所以我們可以根據(jù)緩沖區(qū)的數(shù)據(jù)做定向操作 **
  • 能夠利用一個或者少量的服務(wù)器來完成大量的用戶的請求處理(一個服務(wù)器能夠接受多個客戶端的請求) — NIO適用于短任務(wù)場景 , BIO適用于長任務(wù)場景
  • Selector 選擇器

  • 每一個客戶端 或者服務(wù)器端都需要注冊到選擇器上 , 讓這個選擇器進(jìn)行管理 , 選擇器管理的時候需要監(jiān)聽事件:
  • 可連接事件 — 一般是客戶端
  • 可接受事件 — 一般是服務(wù)器端
  • 可讀事件
  • 可寫事件
  • 代碼:

    客戶端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;public class ClientDemo {public static void main(String[] args) throws IOException {//打開客戶端的通道SocketChannel sc = SocketChannel.open();//設(shè)置為非阻塞sc.configureBlocking(false);//獲取選擇器Selector selc = Selector.open();//將通道注冊到選擇器上sc.register(selc, SelectionKey.OP_CONNECT);//并給予連接權(quán)限//發(fā)起連接sc.connect(new InetSocketAddress("localhost", 8080));while(true) {//進(jìn)行選擇 , 篩選出有用的連接selc.select();//獲取篩選之后有用的事件Set<SelectionKey> keys = selc.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();while(iterator.hasNext()) {//將遍歷到的事件讀取出來SelectionKey next = iterator.next();//可能向服務(wù)器發(fā)起連接//可能向服務(wù)器寫數(shù)據(jù)//可能接收服務(wù)器的數(shù)據(jù)if(next.isConnectable()) {//判斷是否是一個連接事件//從該事件中獲取到對應(yīng)的通道SocketChannel scx = (SocketChannel) next.channel();//判斷之前的連接是否成功while(!scx.finishConnect());//連接成功之后 進(jìn)行讀寫操作scx.register(selc, SelectionKey.OP_READ | SelectionKey.OP_WRITE);}if(next.isWritable()) {//從該事件中獲取到對應(yīng)的通道SocketChannel scx = (SocketChannel) next.channel();//寫數(shù)據(jù)scx.write(ByteBuffer.wrap("讀取數(shù)據(jù)成功!".getBytes()));//執(zhí)行完寫操作之后 , 需要將這個通道的寫權(quán)限注銷掉 ,防止不停地向服務(wù)器寫數(shù)據(jù)scx.register(selc, next.interestOps() ^ SelectionKey.OP_WRITE);//可用^ 或& ~}if(next.isReadable()) {//從該事件中獲取到對應(yīng)的通道SocketChannel scx = (SocketChannel) next.channel();//讀數(shù)據(jù)ByteBuffer buffer = ByteBuffer.allocate(100);scx.read(buffer);buffer.flip();System.out.println(new String(buffer.array() , 0 , buffer.limit()));//移除可讀事件scx.register(selc, next.interestOps() & ~SelectionKey.OP_READ);}//為了防止事件移除失敗 , 處理完成后將事件移除iterator.remove();}}} }服務(wù)器端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;import javax.swing.plaf.SliderUI;public class ServerDemo {public static void main(String[] args) throws IOException {//打開服務(wù)器端通道ServerSocketChannel ssc = ServerSocketChannel.open();//綁定偵聽的端口號ssc.bind(new InetSocketAddress(8080));//接收任何IP客戶端8080端口傳來的數(shù)據(jù)//將通道設(shè)置為非阻塞ssc.configureBlocking(false);//將服務(wù)器注冊到選擇器上Selector selc = Selector.open();//為服務(wù)器注冊一個接受請求的權(quán)限ssc.register(selc, SelectionKey.OP_ACCEPT);while(true) {//進(jìn)行選擇selc.select();//將選擇后的事件獲取出來Set<SelectionKey> keys = selc.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while(it.hasNext()) {//獲取這個事件SelectionKey key = it.next();//可能是接受連接事件//可能是可讀事件//可能是可寫事件if(key.isAcceptable()) {//獲取事件的通道ServerSocketChannel sscx = (ServerSocketChannel) key.channel();//接受連接SocketChannel sc = sscx.accept();while(sc == null) {sscx.accept();}//設(shè)置為非阻塞sc.configureBlocking(false);//注冊一個可讀事件sc.register(selc, SelectionKey.OP_READ | SelectionKey.OP_WRITE);}if(key.isReadable()) {//獲取事件的通道SocketChannel scx = (SocketChannel) key.channel();//讀取數(shù)據(jù)ByteBuffer buffer = ByteBuffer.allocate(100); scx.read(buffer);buffer.flip();System.out.println(new String (buffer.array() , 0 , buffer.limit()));//消除可讀事件scx.register(selc, key.interestOps() ^ SelectionKey.OP_READ);}if(key.isWritable()) {//獲取事件的通道SocketChannel scx = (SocketChannel) key.channel();//寫出數(shù)據(jù)scx.write(ByteBuffer.wrap("hello".getBytes()));//消除可以寫事件scx.register(selc, key.interestOps() & ~SelectionKey.OP_WRITE);}it.remove();}}} }
  • 三 、 考慮 : 數(shù)據(jù)粘包怎么處理?

  • 數(shù)據(jù)定長 — 如果數(shù)據(jù)長度不夠 , 填充無用數(shù)據(jù) — 怎樣區(qū)分無用數(shù)據(jù)
  • 約定數(shù)據(jù)結(jié)尾符號 — 結(jié)尾符號可能會和實際的數(shù)據(jù)內(nèi)容沖突
  • 約定協(xié)議 — 序列化/反序列化 — 底層實際上是約束了起始和結(jié)束的協(xié)議
  • 總結(jié)

    以上是生活随笔為你收集整理的大数据 NIO的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。