java socket ip_JAVA 网络编程 TCP/IP、Socket 和协议设计
【JAVA 網絡編程 TCP/IP、Socket 和協議設計】
TCP/IP 協議簡介
IP
首先我們看 IP(Internet Protocol)協議。IP 協議提供了主機和主機間的通信。
為了完成不同主機的通信,我們需要某種方式來唯一標識一臺主機,這個標識,就是著名的IP地址。通過IP地址,IP 協議就能夠幫我們把一個數據包發送給對方。
TCP
前面我們說過,IP 協議提供了主機和主機間的通信。
TCP 協議在 IP 協議提供的主機間通信功能的基礎上,完成這兩個主機上進程對進程的通信。
有了 IP,不同主機就能夠交換數據。但是,計算機收到數據后,并不知道這個數據屬于哪個進程(簡單講,進程就是一個正在運行的應用程序)。TCP 的作用就在于,讓我們能夠知道這個數據屬于哪個進程,從而完成進程間的通信。
為了標識數據屬于哪個進程,我們給需要進行 TCP 通信的進程分配一個唯一的數字來標識它。這個數字,就是我們常說的端口號。
三次握手
TCP 的全稱是 Transmission Control Protocol,大家對它說得最多的,大概就是面向連接的特性了。之所以說它是有連接的,是說在進行通信前,通信雙方需要先經過一個三次握手的過程。三次握手完成后,連接便建立了。這時候我們才可以開始發送/接收數據。(與之相對的是 UDP,不需要經過握手,就可以直接發送數據)。
下面我們簡單了解一下三次握手的過程。
tcp-three-way-handshake
首先,客戶向服務端發送一個 SYN,假設此時 sequence number 為 x。這個 x 是由操作系統根據一定的規則生成的,不妨認為它是一個隨機數。
服務端收到 SYN 后,會向客戶端再發送一個 SYN,此時服務器的 seq number = y。與此同時,會 ACK x+1,告訴客戶端“已經收到了 SYN,可以發送數據了”。
客戶端收到服務器的 SYN 后,回復一個 ACK y+1,這個 ACK 則是告訴服務器,SYN 已經收到,服務器可以發送數據了。
經過這 3 步,TCP 連接就建立了。這里需要注意的有三點:
連接是由客戶端主動發起的
在第 3 步客戶端向服務器回復 ACK 的時候,TCP 協議是允許我們攜帶數據的。之所以做不到,是 API 的限制導致的。
TCP 協議還允許 “四次握手” 的發生,同樣的,由于 API 的限制,這個極端的情況并不會發生。
TCP/IP 相關的理論知識我們就先了解到這里。關于 TCP,還有諸如可靠性、流量控制、擁塞控制等非常有趣的特性,強烈推薦讀者看一看 Richard 的名著《TCP/IP 詳解 - 卷1》(注意,是第1版,不是第2版)。
下面我們看一些偏實戰的東西。
Socket 基本用法
Socket 是 TCP 層的封裝,通過 socket,我們就能進行 TCP 通信。
在 Java 的 SDK 中,socket 的共有兩個接口:用于監聽客戶連接的 ServerSocket 和用于通信的 Socket。使用 socket 的步驟如下:
創建 ServerSocket 并監聽客戶連接
使用 Socket 連接服務端
通過 Socket.getInputStream()/getOutputStream() 獲取輸入輸出流進行通信
下面,我們通過實現一個簡單的 echo 服務來學習 socket 的使用。所謂的 echo 服務,就是客戶端向服務端寫入任意數據,服務器都將數據原封不動地寫回給客戶端。
1. 創建 ServerSocket 并監聽客戶連接
public class EchoServer {
private final ServerSocket mServerSocket;
public EchoServer(int port) throws IOException {
// 1. 創建一個 ServerSocket 并監聽端口 port
mServerSocket = new ServerSocket(port);
}
public void run() throws IOException {
// 2. 開始接受客戶連接
Socket client = mServerSocket.accept();
handleClient(client);
}
private void handleClient(Socket socket) {
// 3. 使用 socket 進行通信 ...
}
public static void main(String[] argv) {
try {
EchoServer server = new EchoServer(9877);
server.run();
} catch (IOException e) {
e.printStackTrace();
}
}
}
2. 使用 Socket 連接服務端
public class EchoClient {
private final Socket mSocket;
public EchoClient(String host, int port) throws IOException {
// 創建 socket 并連接服務器
mSocket = new Socket(host, port);
}
public void run() {
// 和服務端進行通信
}
public static void main(String[] argv) {
try {
// 由于服務端運行在同一主機,這里我們使用 localhost
EchoClient client = new EchoClient("localhost", 9877);
client.run();
} catch (IOException e) {
e.printStackTrace();
}
}
}
3. 通過 socket.getInputStream()/getOutputStream() 獲取輸入/輸出流進行通信
首先,我們來實現服務端:
public class EchoServer {
// ...
private void handleClient(Socket socket) throws IOException {
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
byte[] buffer = new byte[1024];
int n;
while ((n = in.read(buffer)) > 0) {
out.write(buffer, 0, n);
}
}
}
可以看到,服務端的實現其實很簡單,我們不停地讀取輸入數據,然后寫回給客戶端。
下面我們看看客戶端。
public class EchoClient {
// ...
public void run() throws IOException {
Thread readerThread = new Thread(this::readResponse);
readerThread.start();
OutputStream out = mSocket.getOutputStream();
byte[] buffer = new byte[1024];
int n;
while ((n = System.in.read(buffer)) > 0) {
out.write(buffer, 0, n);
}
}
private void readResponse() {
try {
InputStream in = mSocket.getInputStream();
byte[] buffer = new byte[1024];
int n;
while ((n = in.read(buffer)) > 0) {
System.out.write(buffer, 0, n);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客戶端會稍微復雜一點點,在讀取用戶輸入的同時,我們又想讀取服務器的響應。所以,這里創建了一個線程來讀服務器的響應。
不熟悉 lambda 的讀者,可以把Thread readerThread = new Thread(this::readResponse) 換成下面這個代碼:
Thread readerThread = new Thread(new Runnable() {
@Override
public void run() {
readResponse();
}
});
打開兩個 terminal 分別執行如下命令:
$ javac EchoServer.java
$ java EchoServer
$ javac EchoClient.java
$ java EchoClient
hello Server
hello Server
foo
foo
在客戶端,我們會看到,輸入的所有字符都打印了出來。
最后需要注意的有幾點:
在上面的代碼中,我們所有的異常都沒有處理。實際應用中,在發生異常時,需要關閉 socket,并根據實際業務做一些錯誤處理工作
在客戶端,我們沒有停止 readThread。實際應用中,我們可以通過關閉 socket 來讓線程從阻塞讀中返回。推薦讀者閱讀《Java并發編程實戰》
我們的服務端只處理了一個客戶連接。如果需要同時處理多個客戶端,可以創建線程來處理請求。這個作為練習留給讀者來完全。
Socket vs ServerSocket
在進入這一節的主題前,讀者不妨先考慮一個問題:在上一節的實例中,我們運行 echo 服務后,在客戶端連接成功時,一共有多少個 socket 存在?
答案是 3 個 socket。客戶端一個,服務端有兩個。跟這個問題的答案直接關聯的是本節的主題——Socket 和 ServerSocket 的區別是什么。
眼尖的讀者,可能會注意到在上一節我是這樣描述他們的:
在 Java 的 SDK 中,socket 的共有兩個接口:用于監聽客戶連接的 ServerSocket 和用于通信的 Socket。
注意,我只說 ServerSocket 是用于監聽客戶連接,而沒有說它也可以用來通信。下面我們來詳細了解一下他們的區別。
注:以下描述使用的是 UNIX/Linux 系統的 API
首先,我們創建 ServerSocket 后,內核會創建一個 socket。這個 socket 既可以拿來監聽客戶連接,也可以連接遠端的服務。由于 ServerSocket 是用來監聽客戶連接的,緊接著它就會對內核創建的這個 socket 調用 listen 函數。這樣一來,這個 socket 就成了所謂的 listening socket,它開始監聽客戶的連接。
接下來,我們的客戶端創建一個 Socket,同樣的,內核也創建一個 socket 實例。內核創建的這個 socket 跟 ServerSocket 一開始創建的那個沒有什么區別。不同的是,接下來 Socket 會對它執行 connect,發起對服務端的連接。前面我們說過,socket API 其實是 TCP 層的封裝,所以 connect 后,內核會發送一個 SYN 給服務端。
現在,我們切換角色到服務端。服務端的主機在收到這個 SYN 后,會創建一個新的 socket,這個新創建的 socket 跟客戶端繼續執行三次握手過程。
三次握手完成后,我們執行的 serverSocket.accept() 會返回一個 Socket 實例,這個 socket 就是上一步內核自動幫我們創建的。
所以說,在一個客戶端連接的情況下,其實有 3 個 socket。
關于內核自動創建的這個 socket,還有一個很有意思的地方。它的端口號跟 ServerSocket 是一毛一樣的。咦!!不是說,一個端口只能綁定一個 socket 嗎?其實這個說法并不夠準確。
前面我說的TCP 通過端口號來區分數據屬于哪個進程的說法,在 socket 的實現里需要改一改。Socket 并不僅僅使用端口號來區別不同的 socket 實例,而是使用 這個四元組。
在上面的例子中,我們的 ServerSocket 長這樣:。意思是,可以接受任何的客戶端,和本地任何 IP。
accept 返回的 Socket 則是這樣:<127.0.0.1:xxxx, 127.0.0.1:9877>。其中,xxxx 是客戶端的端口號。
如果數據是發送給一個已連接的 socket,內核會找到一個完全匹配的實例,所以數據準確發送給了對端。
如果是客戶端要發起連接,這時候只有 會匹配成功,所以 SYN 也準確發送給了監聽套接字。
Socket/ServerSocket 的區別我們就講到這里。如果讀者覺得不過癮,可以參考《TCP/IP 詳解》卷1、卷2。
Socket 長連接的實現
背景知識
Socket 長連接,指的是在客戶和服務端之間保持一個 socket 連接長時間不斷開。
比較熟悉 Socket 的讀者,可能知道有這樣一個 API:
socket.setKeepAlive(true);
嗯……keep alive,“保持活著”,這個應該就是讓 TCP 不斷開的意思。那么,我們要實現一個 socket 的長連接,只需要這一個調用即可。
遺憾的是,生活并不總是那么美好。對于 4.4BSD 的實現來說,Socket 的這個 keep alive 選項如果打開并且兩個小時內沒有通信,那么底層會發一個心跳,看看對方是不是還活著。
注意,兩個小時才會發一次。也就是說,在沒有實際數據通信的時候,我把網線拔了,你的應用程序要經過兩個小時才會知道。
在說明如果實現長連接前,我們先來理一理我們面臨的問題。假定現在有一對已經連接的 socket,在以下情況發生時候,socket 將不再可用:
某一端關閉 socket(這不是廢話嗎)。主動關閉的一方會發送 FIN,通知對方要關閉 TCP 連接。在這種情況下,另一端如果去讀 socket,將會讀到 EoF(End of File)。于是我們知道對方關閉了 socket。
應用程序奔潰。此時 socket 會由內核關閉,結果跟情況1一樣。
系統奔潰。這時候系統是來不及發送 FIN 的,因為它已經跪了。此時對方無法得知這一情況。對方在嘗試讀取數據時,最后會返回 read time out。如果寫數據,則是 host unreachable 之類的錯誤。
電纜被挖斷、網線被拔。跟情況3差不多,如果沒有對 socket 進行讀寫,兩邊都不知道發生了事故。跟情況3不同的是,如果我們把網線接回去,socket 依舊可以正常使用。
在上面的幾種情形中,有一個共同點就是,只要去讀、寫 socket,只要 socket 連接不正常,我們就能夠知道。基于這一點,要實現一個 socket 長連接,我們需要做的就是不斷地給對方寫數據,然后讀取對方的數據,也就是所謂的心跳。只要心還在跳,socket 就是活的。寫數據的間隔,需要根據實際的應用需求來決定。
心跳包不是實際的業務數據,根據通信協議的不同,需要做不同的處理。
比方說,我們使用 JSON 進行通信,那么,我們可以加一個 type 字段,表面這個 JSON 是心跳還是業務數據。
{
"type": 0, // 0 表示心跳
// ...
}
使用二進制協議的情況類似。要求就是,我們能夠區別一個數據包是心跳還是真實數據。這樣,我們便實現了一個 socket 長連接。
實現示例
這一小節我們一起來實現一個帶長連接的 Android echo 客戶端。完整的代碼可以在這里[3]找到。
首先了接口部分:
public final class LongLiveSocket {
/**
* 錯誤回調
*/
public interface ErrorCallback {
/**
* 如果需要重連,返回 true
*/
boolean onError();
}
/**
* 讀數據回調
*/
public interface DataCallback {
void onData(byte[] data, int offset, int len);
}
/**
* 寫數據回調
*/
public interface WritingCallback {
void onSuccess();
void onFail(byte[] data, int offset, int len);
}
public LongLiveSocket(String host, int port,
DataCallback dataCallback, ErrorCallback errorCallback) {
}
public void write(byte[] data, WritingCallback callback) {
}
public void write(byte[] data, int offset, int len, WritingCallback callback) {
}
public void close() {
}
}
我們這個支持長連接的類就叫 LongLiveSocket 好了。如果在 socket 斷開后需要重連,只需要在對應的接口里面返回 true 即可(在真實場景里,我們還需要讓客戶設置重連的等待時間,還有讀寫、連接的 timeout等。為了簡單,這里就直接不支持了。
另外需要注意的一點是,如果要做一個完整的庫,需要同時提供阻塞式和回調式API。同樣由于篇幅原因,這里直接省掉了。
下面我們直接看實現:
public final class LongLiveSocket {
private static final String TAG = "LongLiveSocket";
private static final long RETRY_INTERVAL_MILLIS = 3 * 1000;
private static final long HEART_BEAT_INTERVAL_MILLIS = 5 * 1000;
private static final long HEART_BEAT_TIMEOUT_MILLIS = 2 * 1000;
/**
* 錯誤回調
*/
public interface ErrorCallback {
/**
* 如果需要重連,返回 true
*/
boolean onError();
}
/**
* 讀數據回調
*/
public interface DataCallback {
void onData(byte[] data, int offset, int len);
}
/**
* 寫數據回調
*/
public interface WritingCallback {
void onSuccess();
void onFail(byte[] data, int offset, int len);
}
private final String mHost;
private final int mPort;
private final DataCallback mDataCallback;
private final ErrorCallback mErrorCallback;
private final HandlerThread mWriterThread;
private final Handler mWriterHandler;
private final Handler mUIHandler = new Handler(Looper.getMainLooper());
private final Object mLock = new Object();
private Socket mSocket; // guarded by mLock
private boolean mClosed; // guarded by mLock
private final Runnable mHeartBeatTask = new Runnable() {
private byte[] mHeartBeat = new byte[0];
@Override
public void run() {
// 我們使用長度為 0 的數據作為 heart beat
write(mHeartBeat, new WritingCallback() {
@Override
public void onSuccess() {
// 每隔 HEART_BEAT_INTERVAL_MILLIS 發送一次
mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS);
mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS);
}
@Override
public void onFail(byte[] data, int offset, int len) {
// nop
// write() 方法會處理失敗
}
});
}
};
private final Runnable mHeartBeatTimeoutTask = () -> {
Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout");
closeSocket();
};
public LongLiveSocket(String host, int port,
DataCallback dataCallback, ErrorCallback errorCallback) {
mHost = host;
mPort = port;
mDataCallback = dataCallback;
mErrorCallback = errorCallback;
mWriterThread = new HandlerThread("socket-writer");
mWriterThread.start();
mWriterHandler = new Handler(mWriterThread.getLooper());
mWriterHandler.post(this::initSocket);
}
private void initSocket() {
while (true) {
if (closed()) return;
try {
Socket socket = new Socket(mHost, mPort);
synchronized (mLock) {
// 在我們創建 socket 的時候,客戶可能就調用了 close()
if (mClosed) {
silentlyClose(socket);
return;
}
mSocket = socket;
// 每次創建新的 socket,會開一個線程來讀數據
Thread reader = new Thread(new ReaderTask(socket), "socket-reader");
reader.start();
mWriterHandler.post(mHeartBeatTask);
}
break;
} catch (IOException e) {
Log.e(TAG, "initSocket: ", e);
if (closed() || !mErrorCallback.onError()) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(RETRY_INTERVAL_MILLIS);
} catch (InterruptedException e1) {
// interrupt writer-thread to quit
break;
}
}
}
}
public void write(byte[] data, WritingCallback callback) {
write(data, 0, data.length, callback);
}
public void write(byte[] data, int offset, int len, WritingCallback callback) {
mWriterHandler.post(() -> {
Socket socket = getSocket();
if (socket == null) {
// initSocket 失敗而客戶說不需要重連,但客戶又叫我們給他發送數據
throw new IllegalStateException("Socket not initialized");
}
try {
OutputStream outputStream = socket.getOutputStream();
DataOutputStream out = new DataOutputStream(outputStream);
out.writeInt(len);
out.write(data, offset, len);
callback.onSuccess();
} catch (IOException e) {
Log.e(TAG, "write: ", e);
closeSocket();
callback.onFail(data, offset, len);
if (!closed() && mErrorCallback.onError()) {
initSocket();
}
}
});
}
private boolean closed() {
synchronized (mLock) {
return mClosed;
}
}
private Socket getSocket() {
synchronized (mLock) {
return mSocket;
}
}
private void closeSocket() {
synchronized (mLock) {
closeSocketLocked();
}
}
private void closeSocketLocked() {
if (mSocket == null) return;
silentlyClose(mSocket);
mSocket = null;
mWriterHandler.removeCallbacks(mHeartBeatTask);
}
public void close() {
if (Looper.getMainLooper() == Looper.myLooper()) {
new Thread() {
@Override
public void run() {
doClose();
}
}.start();
} else {
doClose();
}
}
private void doClose() {
synchronized (mLock) {
mClosed = true;
// 關閉 socket,從而使得阻塞在 socket 上的線程返回
closeSocketLocked();
}
mWriterThread.quit();
// 在重連的時候,有個 sleep
mWriterThread.interrupt();
}
private static void silentlyClose(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
Log.e(TAG, "silentlyClose: ", e);
// error ignored
}
}
}
private class ReaderTask implements Runnable {
private final Socket mSocket;
public ReaderTask(Socket socket) {
mSocket = socket;
}
@Override
public void run() {
try {
readResponse();
} catch (IOException e) {
Log.e(TAG, "ReaderTask#run: ", e);
}
}
private void readResponse() throws IOException {
// For simplicity, assume that a msg will not exceed 1024-byte
byte[] buffer = new byte[1024];
InputStream inputStream = mSocket.getInputStream();
DataInputStream in = new DataInputStream(inputStream);
while (true) {
int nbyte = in.readInt();
if (nbyte == 0) {
Log.i(TAG, "readResponse: heart beat received");
mUIHandler.removeCallbacks(mHeartBeatTimeoutTask);
continue;
}
if (nbyte > buffer.length) {
throw new IllegalStateException("Receive message with len " + nbyte +
" which exceeds limit " + buffer.length);
}
if (readn(in, buffer, nbyte) != 0) {
// Socket might be closed twice but it does no harm
silentlyClose(mSocket);
// Socket will be re-connected by writer-thread if you want
break;
}
mDataCallback.onData(buffer, 0, nbyte);
}
}
private int readn(InputStream in, byte[] buffer, int n) throws IOException {
int offset = 0;
while (n > 0) {
int readBytes = in.read(buffer, offset, n);
if (readBytes < 0) {
// EoF
break;
}
n -= readBytes;
offset += readBytes;
}
return n;
}
}
}
下面是我們新實現的 EchoClient:
public class EchoClient {
private static final String TAG = "EchoClient";
private final LongLiveSocket mLongLiveSocket;
public EchoClient(String host, int port) {
mLongLiveSocket = new LongLiveSocket(
host, port,
(data, offset, len) -> Log.i(TAG, "EchoClient: received: " + new String(data, offset, len)),
// 返回 true,所以只要出錯,就會一直重連
() -> true);
}
public void send(String msg) {
mLongLiveSocket.write(msg.getBytes(), new LongLiveSocket.WritingCallback() {
@Override
public void onSuccess() {
Log.d(TAG, "onSuccess: ");
}
@Override
public void onFail(byte[] data, int offset, int len) {
Log.w(TAG, "onFail: fail to write: " + new String(data, offset, len));
// 連接成功后,還會發送這個消息
mLongLiveSocket.write(data, offset, len, this);
}
});
}
}
就這樣,一個帶 socket 長連接的客戶端就完成了。剩余代碼跟我們這里的主題沒有太大關系,感興趣的讀者可以看這里[3]或者自己完成這個例子。下面是一些輸出示例:
03:54:55.583 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:00.588 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:05.594 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:09.638 12691-12710/com.example.echo D/EchoClient: onSuccess:
03:55:09.639 12691-12713/com.example.echo I/EchoClient: EchoClient: received: hello
03:55:10.595 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:14.652 12691-12710/com.example.echo D/EchoClient: onSuccess:
03:55:14.654 12691-12713/com.example.echo I/EchoClient: EchoClient: received: echo
03:55:15.596 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:20.597 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
03:55:25.602 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
最后需要說明的是,如果想節省資源,在有客戶發送數據的時候可以省略 heart beat。
我們對讀出錯時候的處理,可能也存在一些爭議。讀出錯后,我們只是關閉了 socket。socket 需要等到下一次寫動作發生時,才會重新連接。實際應用中,如果這是一個問題,在讀出錯后可以直接開始重連。這種情況下,還需要一些額外的同步,避免重復創建 socket。heart beat timeout 的情況類似。
跟 TCP/IP 學協議設計
如果僅僅是為了使用是 socket,我們大可以不去理會協議的細節。之所以推薦大家去看一看《TCP/IP 詳解》,是因為它們有太多值得學習的地方。很多我們工作中遇到的問題,都可以在這里找到答案。
以下每一個小節的標題都是一個小問題,建議讀者獨立思考一下,再繼續往下看。如果你發現你的答案比我的更好,請一定發送郵件到 ljtong64 AT gmail DOT com 告訴我。
協議版本如何升級?
有這么一句流行的話:這個世界唯一不變的,就是變化。當我們對協議版本進行升級的時候,正確識別不同版本的協議對軟件的兼容非常重要。那么,我們如何設計協議,才能夠為將來的版本升級做準備呢?
答案可以在 IP 協議找到。
IP 協議的第一個字段叫 version,目前使用的是 4 或 6,分別表示 IPv4 和 IPv6。由于這個字段在協議的開頭,接收端收到數據后,只要根據第一個字段的值就能夠判斷這個數據包是 IPv4 還是 IPv6。
再強調一下,這個字段在兩個版本的IP協議都位于第一個字段,為了做兼容處理,對應的這個字段必須位于同一位置。文本協議(如,JSON、HTML)的情況類似。
如何發送不定長數據的數據包
舉個例子,我們用微信發送一條消息。這條消息的長度是不確定的,并且每條消息都有它的邊界。我們如何來處理這個邊界呢?
還是一樣,看看 IP。IP 的頭部有個 header length 和 data length 兩個字段。通過添加一個 len 域,我們就能夠把數據根據應用邏輯分開。
跟這個相對的,還有另一個方案,那就是在數據的末尾放置終止符。比方說,想 C 語言的字符串那樣,我們在每個數據的末尾放一個 \0 作為終止符,用以標識一條消息的尾部。這個方法帶來的問題是,用戶的數據也可能存在 \0。此時,我們就需要對用戶的數據進行轉義。比方說,把用戶數據的所有 \0 都變成 \0\0。讀消息的過程總,如果遇到 \0\0,那它就代表 \0,如果只有一個 \0,那就是消息尾部。
使用 len 字段的好處是,我們不需要對數據進行轉義。讀取數據的時候,只要根據 len 字段,一次性把數據都讀進來就好,效率會更高一些。
終止符的方案雖然要求我們對數據進行掃描,但是如果我們可能從任意地方開始讀取數據,就需要這個終止符來確定哪里才是消息的開頭了。
當然,這兩個方法不是互斥的,可以一起使用。
上傳多個文件,只有所有文件都上傳成功時才算成功
現在我們有一個需求,需要一次上傳多個文件到服務器,只有在所有文件都上傳成功的情況下,才算成功。我們該如何來實現呢?
IP 在數據報過大的時候,會把一個數據報拆分成多個,并設置一個 MF (more fragments)位,表示這個包只是被拆分后的數據的一部分。
好,我們也學一學 IP。這里,我們可以給每個文件從 0 開始編號。上傳文件的同時,也攜帶這個編號,并額外附帶一個 MF 標志。除了編號最大的文件,所有文件的 MF 標志都置位。因為 MF 沒有置位的是最后一個文件,服務器就可以根據這個得出總共有多少個文件。
另一種不使用 MF 標志的方法是,我們在上傳文件前,就告訴服務器總共有多少個文件。
如果讀者對數據庫比較熟悉,學數據庫用事務來處理,也是可以的。這里就不展開討論了。
如何保證數據的有序性
這里講一個我曾經遇到過的面試題。現在有一個任務隊列,多個工作線程從中取出任務并執行,執行結果放到一個結果隊列中。先要求,放入結果隊列的時候,順序順序需要跟從工作隊列取出時的一樣(也就是說,先取出的任務,執行結果需要先放入結果隊列)。
我們看看 TCP/IP 是怎么處理的。IP 在發送數據的時候,不同數據報到達對端的時間是不確定的,后面發送的數據有可能較先到達。TCP 為了解決這個問題,給所發送數據的每個字節都賦了一個序列號,通過這個序列號,TCP 就能夠把數據按原順序重新組裝。
一樣,我們也給每個任務賦一個值,根據進入工作隊列的順序依次遞增。工作線程完成任務后,在將結果放入結果隊列前,先檢查要放入對象的寫一個序列號是不是跟自己的任務相同,如果不同,這個結果就不能放進去。此時,最簡單的做法是等待,知道下一個可以放入隊列的結果是自己所執行的那一個。但是,這個線程就沒辦法繼續處理任務了。
更好的方法是,我們維護多一個結果隊列的緩沖,這個緩沖里面的數據按序列號從小到大排序。工作線程要將結果放入,有兩種可能:
剛剛完成的任務剛好是下一個,將這個結果放入隊列。然后從緩沖的頭部開始,將所有可以放入結果隊列的數據都放進去。
所完成的任務不能放入結果隊列,這個時候就插入結果隊列。然后,跟上一種情況一樣,需要檢查緩沖。
如果測試表明,這個結果緩沖的數據不多,那么使用普通的鏈表就可以。如果數據比較多,可以使用一個最小堆。
如何保證對方收到了消息
我們說,TCP 提供了可靠的傳輸。這樣不就能夠保證對方收到消息了嗎?
很遺憾,其實不能。在我們往 socket 寫入的數據,只要對端的內核收到后,就會返回 ACK,此時,socket 就認為數據已經寫入成功。然而要注意的是,這里只是對方所運行的系統的內核成功收到了數據,并不表示應用程序已經成功處理了數據。
解決辦法還是一樣,我們學 TCP,添加一個應用層的 APP ACK。應用接收到消息并處理成功后,發送一個 APP ACK 給對方。
有了 APP ACK,我們需要處理的另一個問題是,如果對方真的沒有收到,需要怎么做?
TCP 發送數據的時候,消息一樣可能丟失。TCP 發送數據后,如果長時間沒有收到對方的 ACK,就假設數據已經丟失,并重新發送。
我們也一樣,如果長時間沒有收到 APP ACK,就假設數據丟失,重新發送一個。
總結
以上是生活随笔為你收集整理的java socket ip_JAVA 网络编程 TCP/IP、Socket 和协议设计的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 颜值党狂喜 雷柏发布VT9无线鼠标白色版
- 下一篇: java socket smtp_JAV