日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java socket ip_JAVA 网络编程 TCP/IP、Socket 和协议设计

發布時間:2023/12/19 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java socket ip_JAVA 网络编程 TCP/IP、Socket 和协议设计 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

【JAVA 網絡編程 TCP/IP、Socket 和協議設計】

TCP/IP 協議簡介

IP

首先我們看 IP(Internet Protocol)協議。IP 協議提供了主機和主機間的通信。

為了完成不同主機的通信,我們需要某種方式來唯一標識一臺主機,這個標識,就是著名的IP地址。通過IP地址,IP 協議就能夠幫我們把一個數據包發送給對方。

TCP

前面我們說過,IP 協議提供了主機和主機間的通信。

TCP 協議在 IP 協議提供的主機間通信功能的基礎上,完成這兩個主機上進程對進程的通信。

有了 IP,不同主機就能夠交換數據。但是,計算機收到數據后,并不知道這個數據屬于哪個進程(簡單講,進程就是一個正在運行的應用程序)。TCP 的作用就在于,讓我們能夠知道這個數據屬于哪個進程,從而完成進程間的通信。

為了標識數據屬于哪個進程,我們給需要進行 TCP 通信的進程分配一個唯一的數字來標識它。這個數字,就是我們常說的端口號。

三次握手

TCP 的全稱是 Transmission Control Protocol,大家對它說得最多的,大概就是面向連接的特性了。之所以說它是有連接的,是說在進行通信前,通信雙方需要先經過一個三次握手的過程。三次握手完成后,連接便建立了。這時候我們才可以開始發送/接收數據。(與之相對的是 UDP,不需要經過握手,就可以直接發送數據)。

下面我們簡單了解一下三次握手的過程。

tcp-three-way-handshake

首先,客戶向服務端發送一個 SYN,假設此時 sequence number 為 x。這個 x 是由操作系統根據一定的規則生成的,不妨認為它是一個隨機數。

服務端收到 SYN 后,會向客戶端再發送一個 SYN,此時服務器的 seq number = y。與此同時,會 ACK x+1,告訴客戶端“已經收到了 SYN,可以發送數據了”。

客戶端收到服務器的 SYN 后,回復一個 ACK y+1,這個 ACK 則是告訴服務器,SYN 已經收到,服務器可以發送數據了。

經過這 3 步,TCP 連接就建立了。這里需要注意的有三點:

連接是由客戶端主動發起的

在第 3 步客戶端向服務器回復 ACK 的時候,TCP 協議是允許我們攜帶數據的。之所以做不到,是 API 的限制導致的。

TCP 協議還允許 “四次握手” 的發生,同樣的,由于 API 的限制,這個極端的情況并不會發生。

TCP/IP 相關的理論知識我們就先了解到這里。關于 TCP,還有諸如可靠性、流量控制、擁塞控制等非常有趣的特性,強烈推薦讀者看一看 Richard 的名著《TCP/IP 詳解 - 卷1》(注意,是第1版,不是第2版)。

下面我們看一些偏實戰的東西。

Socket 基本用法

Socket 是 TCP 層的封裝,通過 socket,我們就能進行 TCP 通信。

在 Java 的 SDK 中,socket 的共有兩個接口:用于監聽客戶連接的 ServerSocket 和用于通信的 Socket。使用 socket 的步驟如下:

創建 ServerSocket 并監聽客戶連接

使用 Socket 連接服務端

通過 Socket.getInputStream()/getOutputStream() 獲取輸入輸出流進行通信

下面,我們通過實現一個簡單的 echo 服務來學習 socket 的使用。所謂的 echo 服務,就是客戶端向服務端寫入任意數據,服務器都將數據原封不動地寫回給客戶端。

1. 創建 ServerSocket 并監聽客戶連接

public class EchoServer {

private final ServerSocket mServerSocket;

public EchoServer(int port) throws IOException {

// 1. 創建一個 ServerSocket 并監聽端口 port

mServerSocket = new ServerSocket(port);

}

public void run() throws IOException {

// 2. 開始接受客戶連接

Socket client = mServerSocket.accept();

handleClient(client);

}

private void handleClient(Socket socket) {

// 3. 使用 socket 進行通信 ...

}

public static void main(String[] argv) {

try {

EchoServer server = new EchoServer(9877);

server.run();

} catch (IOException e) {

e.printStackTrace();

}

}

}

2. 使用 Socket 連接服務端

public class EchoClient {

private final Socket mSocket;

public EchoClient(String host, int port) throws IOException {

// 創建 socket 并連接服務器

mSocket = new Socket(host, port);

}

public void run() {

// 和服務端進行通信

}

public static void main(String[] argv) {

try {

// 由于服務端運行在同一主機,這里我們使用 localhost

EchoClient client = new EchoClient("localhost", 9877);

client.run();

} catch (IOException e) {

e.printStackTrace();

}

}

}

3. 通過 socket.getInputStream()/getOutputStream() 獲取輸入/輸出流進行通信

首先,我們來實現服務端:

public class EchoServer {

// ...

private void handleClient(Socket socket) throws IOException {

InputStream in = socket.getInputStream();

OutputStream out = socket.getOutputStream();

byte[] buffer = new byte[1024];

int n;

while ((n = in.read(buffer)) > 0) {

out.write(buffer, 0, n);

}

}

}

可以看到,服務端的實現其實很簡單,我們不停地讀取輸入數據,然后寫回給客戶端。

下面我們看看客戶端。

public class EchoClient {

// ...

public void run() throws IOException {

Thread readerThread = new Thread(this::readResponse);

readerThread.start();

OutputStream out = mSocket.getOutputStream();

byte[] buffer = new byte[1024];

int n;

while ((n = System.in.read(buffer)) > 0) {

out.write(buffer, 0, n);

}

}

private void readResponse() {

try {

InputStream in = mSocket.getInputStream();

byte[] buffer = new byte[1024];

int n;

while ((n = in.read(buffer)) > 0) {

System.out.write(buffer, 0, n);

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

客戶端會稍微復雜一點點,在讀取用戶輸入的同時,我們又想讀取服務器的響應。所以,這里創建了一個線程來讀服務器的響應。

不熟悉 lambda 的讀者,可以把Thread readerThread = new Thread(this::readResponse) 換成下面這個代碼:

Thread readerThread = new Thread(new Runnable() {

@Override

public void run() {

readResponse();

}

});

打開兩個 terminal 分別執行如下命令:

$ javac EchoServer.java

$ java EchoServer

$ javac EchoClient.java

$ java EchoClient

hello Server

hello Server

foo

foo

在客戶端,我們會看到,輸入的所有字符都打印了出來。

最后需要注意的有幾點:

在上面的代碼中,我們所有的異常都沒有處理。實際應用中,在發生異常時,需要關閉 socket,并根據實際業務做一些錯誤處理工作

在客戶端,我們沒有停止 readThread。實際應用中,我們可以通過關閉 socket 來讓線程從阻塞讀中返回。推薦讀者閱讀《Java并發編程實戰》

我們的服務端只處理了一個客戶連接。如果需要同時處理多個客戶端,可以創建線程來處理請求。這個作為練習留給讀者來完全。

Socket vs ServerSocket

在進入這一節的主題前,讀者不妨先考慮一個問題:在上一節的實例中,我們運行 echo 服務后,在客戶端連接成功時,一共有多少個 socket 存在?

答案是 3 個 socket。客戶端一個,服務端有兩個。跟這個問題的答案直接關聯的是本節的主題——Socket 和 ServerSocket 的區別是什么。

眼尖的讀者,可能會注意到在上一節我是這樣描述他們的:

在 Java 的 SDK 中,socket 的共有兩個接口:用于監聽客戶連接的 ServerSocket 和用于通信的 Socket。

注意,我只說 ServerSocket 是用于監聽客戶連接,而沒有說它也可以用來通信。下面我們來詳細了解一下他們的區別。

注:以下描述使用的是 UNIX/Linux 系統的 API

首先,我們創建 ServerSocket 后,內核會創建一個 socket。這個 socket 既可以拿來監聽客戶連接,也可以連接遠端的服務。由于 ServerSocket 是用來監聽客戶連接的,緊接著它就會對內核創建的這個 socket 調用 listen 函數。這樣一來,這個 socket 就成了所謂的 listening socket,它開始監聽客戶的連接。

接下來,我們的客戶端創建一個 Socket,同樣的,內核也創建一個 socket 實例。內核創建的這個 socket 跟 ServerSocket 一開始創建的那個沒有什么區別。不同的是,接下來 Socket 會對它執行 connect,發起對服務端的連接。前面我們說過,socket API 其實是 TCP 層的封裝,所以 connect 后,內核會發送一個 SYN 給服務端。

現在,我們切換角色到服務端。服務端的主機在收到這個 SYN 后,會創建一個新的 socket,這個新創建的 socket 跟客戶端繼續執行三次握手過程。

三次握手完成后,我們執行的 serverSocket.accept() 會返回一個 Socket 實例,這個 socket 就是上一步內核自動幫我們創建的。

所以說,在一個客戶端連接的情況下,其實有 3 個 socket。

關于內核自動創建的這個 socket,還有一個很有意思的地方。它的端口號跟 ServerSocket 是一毛一樣的。咦!!不是說,一個端口只能綁定一個 socket 嗎?其實這個說法并不夠準確。

前面我說的TCP 通過端口號來區分數據屬于哪個進程的說法,在 socket 的實現里需要改一改。Socket 并不僅僅使用端口號來區別不同的 socket 實例,而是使用 這個四元組。

在上面的例子中,我們的 ServerSocket 長這樣:。意思是,可以接受任何的客戶端,和本地任何 IP。

accept 返回的 Socket 則是這樣:<127.0.0.1:xxxx, 127.0.0.1:9877>。其中,xxxx 是客戶端的端口號。

如果數據是發送給一個已連接的 socket,內核會找到一個完全匹配的實例,所以數據準確發送給了對端。

如果是客戶端要發起連接,這時候只有 會匹配成功,所以 SYN 也準確發送給了監聽套接字。

Socket/ServerSocket 的區別我們就講到這里。如果讀者覺得不過癮,可以參考《TCP/IP 詳解》卷1、卷2。

Socket 長連接的實現

背景知識

Socket 長連接,指的是在客戶和服務端之間保持一個 socket 連接長時間不斷開。

比較熟悉 Socket 的讀者,可能知道有這樣一個 API:

socket.setKeepAlive(true);

嗯……keep alive,“保持活著”,這個應該就是讓 TCP 不斷開的意思。那么,我們要實現一個 socket 的長連接,只需要這一個調用即可。

遺憾的是,生活并不總是那么美好。對于 4.4BSD 的實現來說,Socket 的這個 keep alive 選項如果打開并且兩個小時內沒有通信,那么底層會發一個心跳,看看對方是不是還活著。

注意,兩個小時才會發一次。也就是說,在沒有實際數據通信的時候,我把網線拔了,你的應用程序要經過兩個小時才會知道。

在說明如果實現長連接前,我們先來理一理我們面臨的問題。假定現在有一對已經連接的 socket,在以下情況發生時候,socket 將不再可用:

某一端關閉 socket(這不是廢話嗎)。主動關閉的一方會發送 FIN,通知對方要關閉 TCP 連接。在這種情況下,另一端如果去讀 socket,將會讀到 EoF(End of File)。于是我們知道對方關閉了 socket。

應用程序奔潰。此時 socket 會由內核關閉,結果跟情況1一樣。

系統奔潰。這時候系統是來不及發送 FIN 的,因為它已經跪了。此時對方無法得知這一情況。對方在嘗試讀取數據時,最后會返回 read time out。如果寫數據,則是 host unreachable 之類的錯誤。

電纜被挖斷、網線被拔。跟情況3差不多,如果沒有對 socket 進行讀寫,兩邊都不知道發生了事故。跟情況3不同的是,如果我們把網線接回去,socket 依舊可以正常使用。

在上面的幾種情形中,有一個共同點就是,只要去讀、寫 socket,只要 socket 連接不正常,我們就能夠知道?;谶@一點,要實現一個 socket 長連接,我們需要做的就是不斷地給對方寫數據,然后讀取對方的數據,也就是所謂的心跳。只要心還在跳,socket 就是活的。寫數據的間隔,需要根據實際的應用需求來決定。

心跳包不是實際的業務數據,根據通信協議的不同,需要做不同的處理。

比方說,我們使用 JSON 進行通信,那么,我們可以加一個 type 字段,表面這個 JSON 是心跳還是業務數據。

{

"type": 0, // 0 表示心跳

// ...

}

使用二進制協議的情況類似。要求就是,我們能夠區別一個數據包是心跳還是真實數據。這樣,我們便實現了一個 socket 長連接。

實現示例

這一小節我們一起來實現一個帶長連接的 Android echo 客戶端。完整的代碼可以在這里[3]找到。

首先了接口部分:

public final class LongLiveSocket {

/**

* 錯誤回調

*/

public interface ErrorCallback {

/**

* 如果需要重連,返回 true

*/

boolean onError();

}

/**

* 讀數據回調

*/

public interface DataCallback {

void onData(byte[] data, int offset, int len);

}

/**

* 寫數據回調

*/

public interface WritingCallback {

void onSuccess();

void onFail(byte[] data, int offset, int len);

}

public LongLiveSocket(String host, int port,

DataCallback dataCallback, ErrorCallback errorCallback) {

}

public void write(byte[] data, WritingCallback callback) {

}

public void write(byte[] data, int offset, int len, WritingCallback callback) {

}

public void close() {

}

}

我們這個支持長連接的類就叫 LongLiveSocket 好了。如果在 socket 斷開后需要重連,只需要在對應的接口里面返回 true 即可(在真實場景里,我們還需要讓客戶設置重連的等待時間,還有讀寫、連接的 timeout等。為了簡單,這里就直接不支持了。

另外需要注意的一點是,如果要做一個完整的庫,需要同時提供阻塞式和回調式API。同樣由于篇幅原因,這里直接省掉了。

下面我們直接看實現:

public final class LongLiveSocket {

private static final String TAG = "LongLiveSocket";

private static final long RETRY_INTERVAL_MILLIS = 3 * 1000;

private static final long HEART_BEAT_INTERVAL_MILLIS = 5 * 1000;

private static final long HEART_BEAT_TIMEOUT_MILLIS = 2 * 1000;

/**

* 錯誤回調

*/

public interface ErrorCallback {

/**

* 如果需要重連,返回 true

*/

boolean onError();

}

/**

* 讀數據回調

*/

public interface DataCallback {

void onData(byte[] data, int offset, int len);

}

/**

* 寫數據回調

*/

public interface WritingCallback {

void onSuccess();

void onFail(byte[] data, int offset, int len);

}

private final String mHost;

private final int mPort;

private final DataCallback mDataCallback;

private final ErrorCallback mErrorCallback;

private final HandlerThread mWriterThread;

private final Handler mWriterHandler;

private final Handler mUIHandler = new Handler(Looper.getMainLooper());

private final Object mLock = new Object();

private Socket mSocket; // guarded by mLock

private boolean mClosed; // guarded by mLock

private final Runnable mHeartBeatTask = new Runnable() {

private byte[] mHeartBeat = new byte[0];

@Override

public void run() {

// 我們使用長度為 0 的數據作為 heart beat

write(mHeartBeat, new WritingCallback() {

@Override

public void onSuccess() {

// 每隔 HEART_BEAT_INTERVAL_MILLIS 發送一次

mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS);

mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS);

}

@Override

public void onFail(byte[] data, int offset, int len) {

// nop

// write() 方法會處理失敗

}

});

}

};

private final Runnable mHeartBeatTimeoutTask = () -> {

Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout");

closeSocket();

};

public LongLiveSocket(String host, int port,

DataCallback dataCallback, ErrorCallback errorCallback) {

mHost = host;

mPort = port;

mDataCallback = dataCallback;

mErrorCallback = errorCallback;

mWriterThread = new HandlerThread("socket-writer");

mWriterThread.start();

mWriterHandler = new Handler(mWriterThread.getLooper());

mWriterHandler.post(this::initSocket);

}

private void initSocket() {

while (true) {

if (closed()) return;

try {

Socket socket = new Socket(mHost, mPort);

synchronized (mLock) {

// 在我們創建 socket 的時候,客戶可能就調用了 close()

if (mClosed) {

silentlyClose(socket);

return;

}

mSocket = socket;

// 每次創建新的 socket,會開一個線程來讀數據

Thread reader = new Thread(new ReaderTask(socket), "socket-reader");

reader.start();

mWriterHandler.post(mHeartBeatTask);

}

break;

} catch (IOException e) {

Log.e(TAG, "initSocket: ", e);

if (closed() || !mErrorCallback.onError()) {

break;

}

try {

TimeUnit.MILLISECONDS.sleep(RETRY_INTERVAL_MILLIS);

} catch (InterruptedException e1) {

// interrupt writer-thread to quit

break;

}

}

}

}

public void write(byte[] data, WritingCallback callback) {

write(data, 0, data.length, callback);

}

public void write(byte[] data, int offset, int len, WritingCallback callback) {

mWriterHandler.post(() -> {

Socket socket = getSocket();

if (socket == null) {

// initSocket 失敗而客戶說不需要重連,但客戶又叫我們給他發送數據

throw new IllegalStateException("Socket not initialized");

}

try {

OutputStream outputStream = socket.getOutputStream();

DataOutputStream out = new DataOutputStream(outputStream);

out.writeInt(len);

out.write(data, offset, len);

callback.onSuccess();

} catch (IOException e) {

Log.e(TAG, "write: ", e);

closeSocket();

callback.onFail(data, offset, len);

if (!closed() && mErrorCallback.onError()) {

initSocket();

}

}

});

}

private boolean closed() {

synchronized (mLock) {

return mClosed;

}

}

private Socket getSocket() {

synchronized (mLock) {

return mSocket;

}

}

private void closeSocket() {

synchronized (mLock) {

closeSocketLocked();

}

}

private void closeSocketLocked() {

if (mSocket == null) return;

silentlyClose(mSocket);

mSocket = null;

mWriterHandler.removeCallbacks(mHeartBeatTask);

}

public void close() {

if (Looper.getMainLooper() == Looper.myLooper()) {

new Thread() {

@Override

public void run() {

doClose();

}

}.start();

} else {

doClose();

}

}

private void doClose() {

synchronized (mLock) {

mClosed = true;

// 關閉 socket,從而使得阻塞在 socket 上的線程返回

closeSocketLocked();

}

mWriterThread.quit();

// 在重連的時候,有個 sleep

mWriterThread.interrupt();

}

private static void silentlyClose(Closeable closeable) {

if (closeable != null) {

try {

closeable.close();

} catch (IOException e) {

Log.e(TAG, "silentlyClose: ", e);

// error ignored

}

}

}

private class ReaderTask implements Runnable {

private final Socket mSocket;

public ReaderTask(Socket socket) {

mSocket = socket;

}

@Override

public void run() {

try {

readResponse();

} catch (IOException e) {

Log.e(TAG, "ReaderTask#run: ", e);

}

}

private void readResponse() throws IOException {

// For simplicity, assume that a msg will not exceed 1024-byte

byte[] buffer = new byte[1024];

InputStream inputStream = mSocket.getInputStream();

DataInputStream in = new DataInputStream(inputStream);

while (true) {

int nbyte = in.readInt();

if (nbyte == 0) {

Log.i(TAG, "readResponse: heart beat received");

mUIHandler.removeCallbacks(mHeartBeatTimeoutTask);

continue;

}

if (nbyte > buffer.length) {

throw new IllegalStateException("Receive message with len " + nbyte +

" which exceeds limit " + buffer.length);

}

if (readn(in, buffer, nbyte) != 0) {

// Socket might be closed twice but it does no harm

silentlyClose(mSocket);

// Socket will be re-connected by writer-thread if you want

break;

}

mDataCallback.onData(buffer, 0, nbyte);

}

}

private int readn(InputStream in, byte[] buffer, int n) throws IOException {

int offset = 0;

while (n > 0) {

int readBytes = in.read(buffer, offset, n);

if (readBytes < 0) {

// EoF

break;

}

n -= readBytes;

offset += readBytes;

}

return n;

}

}

}

下面是我們新實現的 EchoClient:

public class EchoClient {

private static final String TAG = "EchoClient";

private final LongLiveSocket mLongLiveSocket;

public EchoClient(String host, int port) {

mLongLiveSocket = new LongLiveSocket(

host, port,

(data, offset, len) -> Log.i(TAG, "EchoClient: received: " + new String(data, offset, len)),

// 返回 true,所以只要出錯,就會一直重連

() -> true);

}

public void send(String msg) {

mLongLiveSocket.write(msg.getBytes(), new LongLiveSocket.WritingCallback() {

@Override

public void onSuccess() {

Log.d(TAG, "onSuccess: ");

}

@Override

public void onFail(byte[] data, int offset, int len) {

Log.w(TAG, "onFail: fail to write: " + new String(data, offset, len));

// 連接成功后,還會發送這個消息

mLongLiveSocket.write(data, offset, len, this);

}

});

}

}

就這樣,一個帶 socket 長連接的客戶端就完成了。剩余代碼跟我們這里的主題沒有太大關系,感興趣的讀者可以看這里[3]或者自己完成這個例子。下面是一些輸出示例:

03:54:55.583 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received

03:55:00.588 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received

03:55:05.594 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received

03:55:09.638 12691-12710/com.example.echo D/EchoClient: onSuccess:

03:55:09.639 12691-12713/com.example.echo I/EchoClient: EchoClient: received: hello

03:55:10.595 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received

03:55:14.652 12691-12710/com.example.echo D/EchoClient: onSuccess:

03:55:14.654 12691-12713/com.example.echo I/EchoClient: EchoClient: received: echo

03:55:15.596 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received

03:55:20.597 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received

03:55:25.602 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received

最后需要說明的是,如果想節省資源,在有客戶發送數據的時候可以省略 heart beat。

我們對讀出錯時候的處理,可能也存在一些爭議。讀出錯后,我們只是關閉了 socket。socket 需要等到下一次寫動作發生時,才會重新連接。實際應用中,如果這是一個問題,在讀出錯后可以直接開始重連。這種情況下,還需要一些額外的同步,避免重復創建 socket。heart beat timeout 的情況類似。

跟 TCP/IP 學協議設計

如果僅僅是為了使用是 socket,我們大可以不去理會協議的細節。之所以推薦大家去看一看《TCP/IP 詳解》,是因為它們有太多值得學習的地方。很多我們工作中遇到的問題,都可以在這里找到答案。

以下每一個小節的標題都是一個小問題,建議讀者獨立思考一下,再繼續往下看。如果你發現你的答案比我的更好,請一定發送郵件到 ljtong64 AT gmail DOT com 告訴我。

協議版本如何升級?

有這么一句流行的話:這個世界唯一不變的,就是變化。當我們對協議版本進行升級的時候,正確識別不同版本的協議對軟件的兼容非常重要。那么,我們如何設計協議,才能夠為將來的版本升級做準備呢?

答案可以在 IP 協議找到。

IP 協議的第一個字段叫 version,目前使用的是 4 或 6,分別表示 IPv4 和 IPv6。由于這個字段在協議的開頭,接收端收到數據后,只要根據第一個字段的值就能夠判斷這個數據包是 IPv4 還是 IPv6。

再強調一下,這個字段在兩個版本的IP協議都位于第一個字段,為了做兼容處理,對應的這個字段必須位于同一位置。文本協議(如,JSON、HTML)的情況類似。

如何發送不定長數據的數據包

舉個例子,我們用微信發送一條消息。這條消息的長度是不確定的,并且每條消息都有它的邊界。我們如何來處理這個邊界呢?

還是一樣,看看 IP。IP 的頭部有個 header length 和 data length 兩個字段。通過添加一個 len 域,我們就能夠把數據根據應用邏輯分開。

跟這個相對的,還有另一個方案,那就是在數據的末尾放置終止符。比方說,想 C 語言的字符串那樣,我們在每個數據的末尾放一個 \0 作為終止符,用以標識一條消息的尾部。這個方法帶來的問題是,用戶的數據也可能存在 \0。此時,我們就需要對用戶的數據進行轉義。比方說,把用戶數據的所有 \0 都變成 \0\0。讀消息的過程總,如果遇到 \0\0,那它就代表 \0,如果只有一個 \0,那就是消息尾部。

使用 len 字段的好處是,我們不需要對數據進行轉義。讀取數據的時候,只要根據 len 字段,一次性把數據都讀進來就好,效率會更高一些。

終止符的方案雖然要求我們對數據進行掃描,但是如果我們可能從任意地方開始讀取數據,就需要這個終止符來確定哪里才是消息的開頭了。

當然,這兩個方法不是互斥的,可以一起使用。

上傳多個文件,只有所有文件都上傳成功時才算成功

現在我們有一個需求,需要一次上傳多個文件到服務器,只有在所有文件都上傳成功的情況下,才算成功。我們該如何來實現呢?

IP 在數據報過大的時候,會把一個數據報拆分成多個,并設置一個 MF (more fragments)位,表示這個包只是被拆分后的數據的一部分。

好,我們也學一學 IP。這里,我們可以給每個文件從 0 開始編號。上傳文件的同時,也攜帶這個編號,并額外附帶一個 MF 標志。除了編號最大的文件,所有文件的 MF 標志都置位。因為 MF 沒有置位的是最后一個文件,服務器就可以根據這個得出總共有多少個文件。

另一種不使用 MF 標志的方法是,我們在上傳文件前,就告訴服務器總共有多少個文件。

如果讀者對數據庫比較熟悉,學數據庫用事務來處理,也是可以的。這里就不展開討論了。

如何保證數據的有序性

這里講一個我曾經遇到過的面試題?,F在有一個任務隊列,多個工作線程從中取出任務并執行,執行結果放到一個結果隊列中。先要求,放入結果隊列的時候,順序順序需要跟從工作隊列取出時的一樣(也就是說,先取出的任務,執行結果需要先放入結果隊列)。

我們看看 TCP/IP 是怎么處理的。IP 在發送數據的時候,不同數據報到達對端的時間是不確定的,后面發送的數據有可能較先到達。TCP 為了解決這個問題,給所發送數據的每個字節都賦了一個序列號,通過這個序列號,TCP 就能夠把數據按原順序重新組裝。

一樣,我們也給每個任務賦一個值,根據進入工作隊列的順序依次遞增。工作線程完成任務后,在將結果放入結果隊列前,先檢查要放入對象的寫一個序列號是不是跟自己的任務相同,如果不同,這個結果就不能放進去。此時,最簡單的做法是等待,知道下一個可以放入隊列的結果是自己所執行的那一個。但是,這個線程就沒辦法繼續處理任務了。

更好的方法是,我們維護多一個結果隊列的緩沖,這個緩沖里面的數據按序列號從小到大排序。工作線程要將結果放入,有兩種可能:

剛剛完成的任務剛好是下一個,將這個結果放入隊列。然后從緩沖的頭部開始,將所有可以放入結果隊列的數據都放進去。

所完成的任務不能放入結果隊列,這個時候就插入結果隊列。然后,跟上一種情況一樣,需要檢查緩沖。

如果測試表明,這個結果緩沖的數據不多,那么使用普通的鏈表就可以。如果數據比較多,可以使用一個最小堆。

如何保證對方收到了消息

我們說,TCP 提供了可靠的傳輸。這樣不就能夠保證對方收到消息了嗎?

很遺憾,其實不能。在我們往 socket 寫入的數據,只要對端的內核收到后,就會返回 ACK,此時,socket 就認為數據已經寫入成功。然而要注意的是,這里只是對方所運行的系統的內核成功收到了數據,并不表示應用程序已經成功處理了數據。

解決辦法還是一樣,我們學 TCP,添加一個應用層的 APP ACK。應用接收到消息并處理成功后,發送一個 APP ACK 給對方。

有了 APP ACK,我們需要處理的另一個問題是,如果對方真的沒有收到,需要怎么做?

TCP 發送數據的時候,消息一樣可能丟失。TCP 發送數據后,如果長時間沒有收到對方的 ACK,就假設數據已經丟失,并重新發送。

我們也一樣,如果長時間沒有收到 APP ACK,就假設數據丟失,重新發送一個。

總結

以上是生活随笔為你收集整理的java socket ip_JAVA 网络编程 TCP/IP、Socket 和协议设计的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。