socket的NIO操作
一、前言
????? Java中直接使用socket進行通信的場景應該不是很多,在公司的一個項目中有這種需求,所以根據自己的理解和相關資料的參考,基于NIO 實現了一組工具類庫,具體的協議還未定義,后續再整理
?
二、實現思路
包結構如下:
??
?
Listener: 事件監聽接口
AcceptListener(請求事件接口),TCPServerProtocol實現類中當服務端接收到連接請求并成功建立通信之后通知注冊的此事件集合;
ReadListener(讀取事件接口),TCPProtocol實現類中讀取接收到的信息完成之后通知注冊的此事件集合;
SendListener(發送事件接口),TCPProtocol實現類中調用發送信息方法之后通知注冊的此事件集合;
?
Protocol:TCP處理接口
TCPProtocol(讀取、輸出TCP處理接口),定義了NIO中關于輸入、輸出處理以及相關監聽事件的維護
TCPServerProtocol(接收TCP請求處理接口),定義了NIO中TCP請求處理以及相關監聽事件的維護
?
Util:輔助類
HelperUtil(一些基本操作工具類),定義獲取KEY值,獲取本地IP,核對結束幀等
SocketConfig(socket連接配置類),定義IP地址、端口、處理接口、是否自動重置等配置信息
SocketStat(socket連接狀態管理類),管理socket的生命周期,提供socket控制方法
SocketLogicException(socket異常類),定義此類庫中可能出現的異常
?
Socket:對外服務類
NSocketBlockClient(socket阻塞客戶端類),提供阻塞的客戶端實現
NSocketClient(socket非阻塞客戶端類),提供非阻塞的客戶端實現
NSocketService(socket非阻塞服務端類),提供非阻塞的服務端實現
SelectOptionListener(發送事件接口的實現),用于改變selector的interestOps為OP_WRITE
SocketClientTask(客戶端線程任務),用于管理客戶端的輸入、輸出事件以及異常處理
SocketServiceTask(服務端線程任務),用于管理服務端的輸入、輸出、請求事件以及異常處理
?
核心類圖:
?
三、使用方式
(1)?通過實現AcceptListener、ReadListener、SendListener事件接口來注入發送、讀取、接收業務
(2)?通過設置SocketConfig對象屬性來進行SOCKET通信配置
(3)?通過NSocketBlockClient、NSocketClient、NSocketService對象來進行信息發送
? 服務端例子:
?
SocketConfig config=new SocketConfig(true, "10.33.6.178", 8899); TCPServerProtocol protocol=new DefaultServerProtocol(new AcceptListener(){@Overridepublic void handleEvent(SocketStat socket) {socket.getConfig().getProtocol().addReadListener(new TestRead());}});NSocketService server=new NSocketService(config, protocol); View Code? 其中 TestRead 為實現ReadListener的類?
public void handleEvent(byte[] message,TCPProtocol tcpProtocol) {System.out.println(new String(message,Charset.forName("GBK")));tcpProtocol.sendMessage("hello".getBytes(Charset.forName("GBK"))); } View Code? 非阻塞客戶端例子:
SocketConfig config=new SocketConfig(true, "10.33.6.178", 8899);config.getProtocol().addReadListener(new ReadListener() {@Overridepublic void handleEvent(byte[] message, TCPProtocol tcpProtocol) {System.out.println("recive from server:"+ new String(message)); }});NSocketClient client=new NSocketClient(config);client.sendMessage("test nsocket".getBytes()); View Code?
?關于NIO處理的核心類:
public class DefaultServerProtocol implements TCPServerProtocol{ private LinkedList<AcceptListener> acceptList=new LinkedList<AcceptListener>(); public DefaultServerProtocol(AcceptListener... acceptColl){for(AcceptListener al:acceptColl){this.addAcceptListener(al);}}@Overridepublic SocketStat handleAccept(SelectionKey key) throws IOException {SocketChannel channel=((ServerSocketChannel)key.channel()).accept(); channel.configureBlocking(false); Socket socket=channel.socket();SocketConfig config=new SocketConfig(false,socket.getInetAddress().getHostAddress(),socket.getPort(),this.createProtocol()); SocketStat stat=new SocketStat(config, key.selector(), channel);System.out.println("遠程客戶端地址:".concat(socket.getInetAddress().getHostAddress()));notifyAccept(stat);return stat;}@Overridepublic void addAcceptListener(AcceptListener al) {this.acceptList.add(al); }@Overridepublic void notifyAccept(SocketStat socket) {for(AcceptListener al: acceptList){al.handleEvent(socket);}}/*** * 獲取關于TCP 的讀取和寫入操作協議,可以override 返回自己的實現** @return * @since Ver 1.0*/public TCPProtocol createProtocol(){return new DefaultTCPProtocol();} } View Code public class DefaultTCPProtocol implements TCPProtocol { /**讀取緩存區*/private ByteBuffer readBuff; /**待發送消息隊列*/protected Queue<ByteBuffer> messageQueue=new LinkedBlockingQueue<ByteBuffer>();/**鎖*/private Object lockObje=new Object();/**讀取監聽*/private LinkedList<ReadListener> readList=new LinkedList<ReadListener>();/**發送監聽*/private LinkedList<SendListener> sendList=new LinkedList<SendListener>();public DefaultTCPProtocol(ReadListener... readColl){ readBuff=ByteBuffer.allocate(1024); for(ReadListener rl : readColl){this.addReadListener(rl);}} @Overridepublic void handleRead(SelectionKey key) throws IOException { SocketChannel clientChn=(SocketChannel)key.channel();ByteArrayOutputStream out=new ByteArrayOutputStream();try{ synchronized(this.readBuff){readBuff.clear();int bytesRead=clientChn.read(readBuff); if(bytesRead==-1){throw new IOException("遠程已關閉");}while(bytesRead>0){readBuff.flip(); out.write(readBuff.array(), 0, readBuff.limit());readBuff.clear();bytesRead=clientChn.read(readBuff);} } key.interestOps(SelectionKey.OP_READ);notifyRead(out.toByteArray());}finally{out.close();}} @Overridepublic void handleWrite(SelectionKey key) throws IOException {SocketChannel clientChn=(SocketChannel)key.channel();ByteBuffer message=null;while(!messageQueue.isEmpty()){synchronized(lockObje){if(!messageQueue.isEmpty()){message=this.messageQueue.peek(); clientChn.write(message);if(message.hasRemaining()){ break; } messageQueue.poll();} } } key.interestOps(SelectionKey.OP_READ);}/*** * 發送消息(此時不是真正發送,而是放在一個待發送的隊列中)** @param message 信息 * @since Ver 1.0*/@Overridepublic void sendMessage(byte[] message) { messageQueue.add(ByteBuffer.wrap(message));this.notifySend(message);}@Overridepublic void addReadListener(ReadListener rl) { readList.add(rl); }@Overridepublic void addSendListener(SendListener sl) {sendList.add(sl);}@Overridepublic void removeReadListener(ReadListener rl) {readList.remove(rl);}@Overridepublic void removeSendListener(SendListener rl) {sendList.remove(rl);}private void notifyRead(byte[] message) { for(ReadListener rl : readList){rl.handleEvent(message,this);}}private void notifySend(byte[] message) {for(SendListener rl : sendList){rl.handleEvent(message);}}} View Code?
?
轉載于:https://www.cnblogs.com/WGZ_Home/p/3400389.html
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的socket的NIO操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++实现另一个猜数字游戏
- 下一篇: HDU 3123 GCC(同余模定理)