大数据 NIO
NIO
一、基礎(chǔ)回顧
a 、 進(jìn)程與線程
b、 Socket
二、 NIO
Buffer子類 ByteBuffer
代碼1
import java.nio.ByteBuffer;public class BufferDemo {public static void main(String[] args) { // //創(chuàng)建緩沖區(qū) , 并指定了大小為1024個字節(jié)//當(dāng)創(chuàng)建好緩沖區(qū)的時候 , 就有了一下屬性//1. capacity 容量位 --- 表示緩沖區(qū)容量//2. position 操作位 --- 表示要操作的位置 ---- 當(dāng)緩沖區(qū)剛剛創(chuàng)建的時候 , 操作位默認(rèn)為0 , 每添加一個字節(jié)的數(shù)據(jù) , position就會向后挪一位//3. limit 限制位 ---- 表示position 所能達(dá)到的最大位置 --- 當(dāng)緩沖區(qū)剛剛創(chuàng)建的時候 , limit就是容量位 。//獲取數(shù)據(jù)時 , 默認(rèn)是從操作位開始獲取的 // ByteBuffer buffer = ByteBuffer.allocate(1024);//最多能存放1k數(shù)據(jù) // //向緩沖區(qū)添加數(shù)據(jù) // buffer.put("hello".getBytes());//以上方法存在資源浪費(fèi)//******************************************************* // //在已知具體數(shù)據(jù)的情況下 , 建議使用這種方法創(chuàng)建緩沖區(qū)//使用wrap方式創(chuàng)建緩沖區(qū) , 參數(shù)實際上是一個字節(jié)數(shù)組 , 底層實際上就是將參數(shù)字節(jié)數(shù)組復(fù)制給底層的實際存儲數(shù)據(jù)的數(shù)組 , 此時操作位并沒有改變還是0 //為什么是數(shù)組使用復(fù)制 , 而不是直接使用賦值?//保持?jǐn)?shù)據(jù)的不變和唯一 // ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());//創(chuàng)建與數(shù)據(jù)大小相對應(yīng)的緩沖區(qū) // // //獲取數(shù)據(jù) , 每一次獲取 , 只能獲取一個字節(jié)byte b = buffer.get();System.out.println(b); // // //獲取緩沖區(qū)所有數(shù)據(jù) // while(buffer.hasRemaining()) {//判斷是否還有剩余數(shù)據(jù) // // byte b = buffer.get(); // System.out.println(b); // }//*******************************************************//但是使用固定緩沖區(qū)大小的情況下獲取數(shù)據(jù)會出現(xiàn)獲取到0的情況 , 需要將默認(rèn)的操作位歸0 , 并且讀取到有效數(shù)據(jù)結(jié)束即可 ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put("hello".getBytes());//遍歷方法一 : 記錄操作位位置后循環(huán)遍歷 // int position = buffer.position(); // for(int i = 0 ;i < position ; i++) { // System.out.println(buffer.get(i)); // }//遍歷方法二: 設(shè)置限制位為操作位后 , 操作位歸0 遍歷 // buffer.limit(buffer.position()); // buffer.position(0); // while(buffer.hasRemaining()) { // System.out.println(buffer.get()); // }//遍歷方法三: 反轉(zhuǎn)緩沖區(qū)//先將限制位設(shè)置為當(dāng)前的操作位 , 然后把操作位歸0buffer.flip(); // buffer.hasRemaining()該方法 本質(zhì)上就是判斷操作位是否小于限制位while(buffer.hasRemaining()) {System.out.println(buffer.get());}//獲取緩沖區(qū)中的底層數(shù)組byte[] array = buffer.array();//底層也是使用的數(shù)組復(fù)制 , 返回的是整個底層數(shù)組 , 而不是有效數(shù)據(jù)System.out.println(new String(array , 0 , buffer.position()));//如果使用過反轉(zhuǎn)buffer.flip();System.out.println(new String(array , 0 , buffer.limit()));} }代碼2
import java.nio.ByteBuffer;public class BufferDemo2 {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put("hello".getBytes());System.out.println("操作位:"+ buffer.position());System.out.println("限制位:"+ buffer.limit()); // buffer.flip(); // System.out.println("操作位:"+ buffer.position()); // System.out.println("限制位:"+ buffer.limit());//重繞緩沖區(qū)buffer.rewind(); //作用: 將操作位歸0 , 限制位不變 。 System.out.println("操作位:"+ buffer.position());System.out.println("限制位:"+ buffer.limit());} }Channel 通道
SocketChannel
代碼示例:
客戶端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel;public class SocketChannelDemo {public static void main(String[] args) throws IOException, InterruptedException {//打開通道//SocketChannel 默認(rèn)為阻塞連接 此時和Socket基本一樣SocketChannel s = SocketChannel.open();//設(shè)置SoceketChannel為非阻塞的s.configureBlocking(false);//發(fā)起連接s.connect(new InetSocketAddress("localhost", 8090));//由于SoceketChannel為非阻塞的 , 所以不能保證連接的真正建立//在實際開發(fā)中往往會認(rèn)為的設(shè)置阻塞 , 來保證連接的建立//判斷連接是否成功 , 如果沒有連接成功finishConnect()底層會試圖再次建立連接//如果多次試圖連接沒有成功 , 則報錯while(!s.finishConnect()) ;//寫出數(shù)據(jù)s.write(ByteBuffer.wrap("hello".getBytes()));//獲取服務(wù)器端的響應(yīng)Thread.sleep(100);ByteBuffer b = ByteBuffer.allocate(100);s.read(b);b.flip();System.out.println(new String(b.array() , 0 , b.limit())); // 關(guān)閉通道 s.close();}服務(wù)端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel;public class ServerSocketChannelDemo {public static void main(String[] args) throws IOException, InterruptedException {//打開服務(wù)器通道ServerSocketChannel s= ServerSocketChannel.open();//綁定偵聽的端口s.bind(new InetSocketAddress( 8090));//設(shè)置非阻塞s.configureBlocking(false);//接收連接SocketChannel accept = s.accept();//由于ServerSocketChannel是非阻塞的 , 所以可能出現(xiàn)還沒有客戶端聯(lián)入 但是服務(wù)器已經(jīng)結(jié)束的現(xiàn)象//所以需要人為的設(shè)置為阻塞的 。 while(accept == null) {accept = s.accept();}//將socketChannel設(shè)置為非阻塞accept.configureBlocking(false);//讀取數(shù)據(jù)ByteBuffer buffer = ByteBuffer.allocate(100);accept.read(buffer);buffer.flip();System.out.println(new String(buffer.array() , 0 , buffer.limit()));//向客戶端做出響應(yīng)accept.write(ByteBuffer.wrap("服務(wù)器端接收成功!".getBytes()));Thread.sleep(1000);//如果不加延時 , 服務(wù)器端寫出數(shù)據(jù)立即結(jié)束 , 此時客戶端還沒有接收完數(shù)據(jù)會報錯} }通道特點(diǎn) :
Selector 選擇器
代碼:
客戶端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;public class ClientDemo {public static void main(String[] args) throws IOException {//打開客戶端的通道SocketChannel sc = SocketChannel.open();//設(shè)置為非阻塞sc.configureBlocking(false);//獲取選擇器Selector selc = Selector.open();//將通道注冊到選擇器上sc.register(selc, SelectionKey.OP_CONNECT);//并給予連接權(quán)限//發(fā)起連接sc.connect(new InetSocketAddress("localhost", 8080));while(true) {//進(jìn)行選擇 , 篩選出有用的連接selc.select();//獲取篩選之后有用的事件Set<SelectionKey> keys = selc.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();while(iterator.hasNext()) {//將遍歷到的事件讀取出來SelectionKey next = iterator.next();//可能向服務(wù)器發(fā)起連接//可能向服務(wù)器寫數(shù)據(jù)//可能接收服務(wù)器的數(shù)據(jù)if(next.isConnectable()) {//判斷是否是一個連接事件//從該事件中獲取到對應(yīng)的通道SocketChannel scx = (SocketChannel) next.channel();//判斷之前的連接是否成功while(!scx.finishConnect());//連接成功之后 進(jìn)行讀寫操作scx.register(selc, SelectionKey.OP_READ | SelectionKey.OP_WRITE);}if(next.isWritable()) {//從該事件中獲取到對應(yīng)的通道SocketChannel scx = (SocketChannel) next.channel();//寫數(shù)據(jù)scx.write(ByteBuffer.wrap("讀取數(shù)據(jù)成功!".getBytes()));//執(zhí)行完寫操作之后 , 需要將這個通道的寫權(quán)限注銷掉 ,防止不停地向服務(wù)器寫數(shù)據(jù)scx.register(selc, next.interestOps() ^ SelectionKey.OP_WRITE);//可用^ 或& ~}if(next.isReadable()) {//從該事件中獲取到對應(yīng)的通道SocketChannel scx = (SocketChannel) next.channel();//讀數(shù)據(jù)ByteBuffer buffer = ByteBuffer.allocate(100);scx.read(buffer);buffer.flip();System.out.println(new String(buffer.array() , 0 , buffer.limit()));//移除可讀事件scx.register(selc, next.interestOps() & ~SelectionKey.OP_READ);}//為了防止事件移除失敗 , 處理完成后將事件移除iterator.remove();}}} }服務(wù)器端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;import javax.swing.plaf.SliderUI;public class ServerDemo {public static void main(String[] args) throws IOException {//打開服務(wù)器端通道ServerSocketChannel ssc = ServerSocketChannel.open();//綁定偵聽的端口號ssc.bind(new InetSocketAddress(8080));//接收任何IP客戶端8080端口傳來的數(shù)據(jù)//將通道設(shè)置為非阻塞ssc.configureBlocking(false);//將服務(wù)器注冊到選擇器上Selector selc = Selector.open();//為服務(wù)器注冊一個接受請求的權(quán)限ssc.register(selc, SelectionKey.OP_ACCEPT);while(true) {//進(jìn)行選擇selc.select();//將選擇后的事件獲取出來Set<SelectionKey> keys = selc.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while(it.hasNext()) {//獲取這個事件SelectionKey key = it.next();//可能是接受連接事件//可能是可讀事件//可能是可寫事件if(key.isAcceptable()) {//獲取事件的通道ServerSocketChannel sscx = (ServerSocketChannel) key.channel();//接受連接SocketChannel sc = sscx.accept();while(sc == null) {sscx.accept();}//設(shè)置為非阻塞sc.configureBlocking(false);//注冊一個可讀事件sc.register(selc, SelectionKey.OP_READ | SelectionKey.OP_WRITE);}if(key.isReadable()) {//獲取事件的通道SocketChannel scx = (SocketChannel) key.channel();//讀取數(shù)據(jù)ByteBuffer buffer = ByteBuffer.allocate(100); scx.read(buffer);buffer.flip();System.out.println(new String (buffer.array() , 0 , buffer.limit()));//消除可讀事件scx.register(selc, key.interestOps() ^ SelectionKey.OP_READ);}if(key.isWritable()) {//獲取事件的通道SocketChannel scx = (SocketChannel) key.channel();//寫出數(shù)據(jù)scx.write(ByteBuffer.wrap("hello".getBytes()));//消除可以寫事件scx.register(selc, key.interestOps() & ~SelectionKey.OP_WRITE);}it.remove();}}} }三 、 考慮 : 數(shù)據(jù)粘包怎么處理?
總結(jié)
- 上一篇: Java 利用InetAddress类确
- 下一篇: Oracle:ORA-01789: 查询