日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

网络与IO知识扫盲(六):多路复用器

發布時間:2024/2/28 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 网络与IO知识扫盲(六):多路复用器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

NIO存在的問題

NIO的優勢:可以通過一個或幾個線程來解決N個IO連接的處理

NIO存在C10K問題:當客戶端連接的數量達到10K時,單線程每循環一次所有的fd,成本是O(n)復雜度,每一次循環都會有10K次系統調用,但是有意義的調用可能只有三五個,大多數調用是無意義的浪費資源。

多路復用器

還是沿用上一版當中的 Java 代碼

package com.bjmashibing.system.io;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;public class SocketMultiplexingSingleThreadv1 {//坦克一、二期(netty)private ServerSocketChannel server = null;private Selector selector = null; //linux 提供多路復用器(select poll epoll kqueue)等,在java中被封裝為Selector。拓展:nginx的event模塊int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false); // 設置成非阻塞server.bind(new InetSocketAddress(port)); // 綁定監聽的端口號//如果在epoll模型下,Selector.open()其實完成了epoll_create,可能給你返回了一個 fd3selector = Selector.open(); // 可以選擇 select poll *epoll,在linux中會優先選擇epoll 但是可以在JVM使用-D參數修正/*server 約等于 listen 狀態的 fd4server.register()初始化過程:1、如果在select,poll的模型下,Selector.open()操作實際上是在jvm里開辟一個數組,把fd4放進去2、如果在epoll的模型下,調用了epoll_ctl(fd3,ADD,fd4,EPOLLIN)*/server.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服務器啟動了");try {while (true) { //死循環Set<SelectionKey> keys = selector.keys();System.out.println("keys.size() = " + keys.size());//1、selector.select調用多路復用器(分為select,poll or epoll(實質上是調用的epoll_wait))/*java中調用的select()方法是啥意思:1、如果用select,poll模型,其實調的是內核的select方法,并傳入參數(fd4),或者poll(fd4)2、如果用epoll模型,其實調用的是內核的epoll_wait(),因為fd4在上面的epoll_ctl已經傳進去了注意:參數可以帶時間。如果沒有時間,或者時間是0,代表阻塞。如果有時間,則設置一個超時時間。方法selector.wakeup()可以外部控制讓它不阻塞。這時select的結果返回是0。懶加載:其實再觸碰到selector.select()調用的時候,觸發了epoll_ctl的調用*/while (selector.select(500) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 拿到返回的有狀態的fd結果集Iterator<SelectionKey> iter = selectionKeys.iterator(); // 將有狀態的fd結果集轉成迭代器//所以,不管你是哪一種多路復用器,你只能告訴我fd的狀態,我作為應用程序,還需要一個一個的去處理他們的R/W。同步好辛苦!!!//我們之前用NIO的時候,需要自己對每一個fd都去調用系統調用,浪費資源。那么你看,現在是不是只調用了一次select方法,就能知道具體的那些可以R/W了?是不是很省力?while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove(); //這時一個set,不移除的話會重復循環處理if (key.isAcceptable()) { //我前邊強調過,socket分為兩種,一種是listen的,一種是用于通信 R/W 的,所以拿到之后需要判斷它是可讀還是可寫。//這里是重點,如果要去接受一個新的連接,語義上,accept接受連接且返回新連接的FD,對吧?//那新的FD放在哪里?//1、如果使用select,poll的時候,因為他們在內核沒有開辟空間,那么由jvm去維護一個集合(是個native方法),和前邊的fd4那個listen的放在一起//2、如果使用epoll的話,我們希望通過epoll_ctl把新的客戶端fd注冊到內核空間acceptHandler(key);} else if (key.isReadable()) {readHandler(key);//在當前線程,readHandler處理了很多東西,那么這個方法可能會阻塞,如果阻塞了十年,其他的IO早就沒電了。那其他的IO怎么辦?//所以,這就是為什么提出了IO THREADS,我把讀到的東西扔出去,而不是現場處理//你想,redis是不是用了epoll?redis是不是有個io threads的概念?redis是不是單線程的?它的worker是單線程的,但是它的io threads是多線程的。//你想,tomcat 8,9版本之后,是不是也提出了一種異步的處理方式?是不是也在 IO 和處理上解耦?//這些都是等效的。}}}}} catch (IOException e) {e.printStackTrace();}}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept(); //來啦,目的是調用accept接受客戶端 fd7client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192); //前邊講過了/*你看,調用了register。register做了什么呢?1、select,poll:在jvm里開辟一個數組,把 fd7 放進去2、epoll:調用epoll_ctl(fd3,ADD,fd7,EPOLLIN*/client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("新客戶端:" + client.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);if (read > 0) {buffer.flip();while (buffer.hasRemaining()) {client.write(buffer);}buffer.clear();} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();service.start();} }

同一套代碼 java NIO 的 Selector 對應到 poll,epoll時的不同底層實現

以poll的形式啟動

服務端啟動

客戶端連接進來之后,觀察strace的監控輸出

以epoll的形式啟動

服務端啟動

客戶端連接進來之后

客戶端發送一些數據,觀察strace的監控輸出

一端斷開連接時,產生的中間狀態

FIN_WAIT, CLOSE_WAIT, TIME_WAIT 的關系

如果有客戶端斷開連接的時候,四次分手的最后一個包沒有收到,服務端的資源還要在 TIME_WAIT 停留一會兒。消耗的是socket四元組的規則。

改進1:增加注冊寫事件、將讀寫獨立拋出線程

在單線程循環中,如果某一個連接的讀寫操作耗費了大量的時間,會影響其他連接的讀寫。所以我們將讀寫獨立拋出線程去處理。
但是這樣需要在拋出的線程中使用 key. cancel() ,否則,同一個連接在被處理完之前始終保持"有狀態",有可能會被重復調起

package com.bjmashibing.system.io;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;public class SocketMultiplexingSingleThreadv2 {private ServerSocketChannel server = null;private Selector selector = null; //linux 多路復用器(select poll epoll) nginx event{}int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open(); // select poll *epollserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服務器啟動了。。。。。");try {while (true) { // Set<SelectionKey> keys = selector.keys(); // System.out.println(keys.size()+" size");while (selector.select(50) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {key.cancel(); //現在多路復用器里把key cancel了readHandler(key);//還是阻塞的嘛? 即便以拋出了線程去讀取,但是在時差里,這個key的read事件會被重復觸發} else if (key.isWritable()) { //我之前沒講過寫的事件!!!!!//寫事件<-- send-queue 只要是空的,就一定會給你返回可以寫的事件,就會回調我們的寫方法//你真的要明白:什么時候寫?不是依賴send-queue隊列是不是有空間,真正的是因為://1,你已經準備好要寫什么了//2,然后你才關心send-queue是否有空間//3,所以,read 操作一開始就要 register,但是write操作依賴以上關系,什么時候用什么時候才注冊//4,如果一開始就注冊了write的事件,會進入死循環,一直調起!!!key.cancel();writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(() -> {System.out.println("write handler...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {client.write(buffer);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();key.cancel();try {client.close();System.out.println("here close");} catch (IOException e) {e.printStackTrace();}}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客戶端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {new Thread(() -> {System.out.println("read handler.....");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);System.out.println(Thread.currentThread().getName() + " " + read);if (read > 0) {client.register(key.selector(), SelectionKey.OP_WRITE, buffer);} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {e.printStackTrace();}}).start();}public static void main(String[] args) {SocketMultiplexingSingleThreadv2 service = new SocketMultiplexingSingleThreadv2();service.start();} }


總結

以上是生活随笔為你收集整理的网络与IO知识扫盲(六):多路复用器的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。