skynet源码分析之网络层——网关服务器
在上一篇文章里介紹Lua層通過lualib/skynet/socket.lua這個庫與網絡底層交互(http://www.cnblogs.com/RainRill/p/8707328.html)。除此之外,skynet還提供一個通用模板lualib/snax/gateserver來啟動一個網關服務器,通過TCP連接和客戶端交換數據,這個庫不能與socket.lua共用,因為這個庫接管了底層傳來的socket類消息,具體用法參考官方wikihttps://github.com/cloudwu/skynet/wiki/GateServer。
1. 概述
gateserver注冊接收網絡底層傳過來的socket消息,通過netpack.filter解析消息包(第6行),稍后會著重分析如何解析,解析完返回的type有6中類型,每種類型指定特定的回調函數。注:當一個包不完整時,type為nil,這種情況不需要處理。
"open":新連接建立;"close":關閉連接;"warning":當fd上待發送的數據累積超過1M時,會收到這個消息;’"error":發生錯誤,關閉fd;“data”:表示收到一個完整的tcp包,回調函數把這個包傳給邏輯層去處理;
1 -- lualib/snax/gateserver.lua
2 skynet.register_protocol {
3 name = "socket",
4 id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
5 unpack = function ( msg, sz )
6 return netpack.filter( queue, msg, sz)
7 end,
8 dispatch = function (_, _, q, type, ...)
9 queue = q
10 if type then
11 MSG[type](...)
12 end
13 end
14 }
“more”:表示收到的數據不止一個tcp包,netpack.filter會把包依次放到隊列里,然后回調函數一個個從隊列中pop出來(第11行)
1 -- lualib/snax/gateserver.lua 2 local function dispatch_queue() 3 local fd, msg, sz = netpack.pop(queue) 4 if fd then 5 -- may dispatch even the handler.message blocked 6 -- If the handler.message never block, the queue should be -- 7 -- empty, so only fork once and then exit. 8 skynet.fork(dispatch_queue) 9 dispatch_msg(fd, msg, sz) 10 11 for fd, msg, sz in netpack.pop, queue do 12 dispatch_msg(fd, msg, sz) 13 end 14 end 15 end 16 17 MSG.more = dispatch_queue
2. 如何解析TCP數據包
為了說明如何解析TCP數據包,先了解下網絡底層是采用什么策略接收數據的。單個socket每次從內核嘗試讀取的數據字節數為sz(第6行),這個值保存在s->p.size中,初始是MIN_READ_BUFFER(64b),當實際讀到的數據等于sz時,sz擴大一倍(8-9行);如果小于sz的一半,則設置sz為原來的一半(10-11行)。
比如,客戶端發了一個1kb的數據,socket線程會從內核里依次讀取64b,128b,256b,512b,64b數據,總共需讀取5次,即會向gateserver服務發5條消息,一個TCP包被切割成5個數據塊。第5次嘗試讀取1024b數據,所以可能會讀到其他TCP包的數據(只要客戶端有發送其他數據)。接下來,客戶端再發一個1kb的數據,socket線程只需從內核讀取一次即可。
1 // skynet-src/socket_server.c
2 static int
3 forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
4 int sz = s->p.size;
5 char * buffer = MALLOC(sz);
6 int n = (int)read(s->fd, buffer, sz);
7 ...
8 if (n == sz) {
9 s->p.size *= 2;
10 } else if (sz > MIN_READ_BUFFER && n*2 < sz) {
11 s->p.size /= 2;
12 }
13 }
netpack做的工作就是把這些數據塊組裝成一個完整的TCP包,再交給gateserver去處理。注:netpack約定,tcp包頭兩字節(大端方式)表示數據包長度。如果采用sproto打包方式,需附加4字節(32位)的session值。所以客戶端傳過來1kb數據,實際數據只有1024-2-4=1018字節。
數據結構:
16-19行,用數組實現的隊列。當客戶端連續發了幾個小的tcp包,gateserver收到的一條消息包可能包含多個tcp包,存放到這個隊列里
第20行,存放不完整的tcp包的指針數組,每一項是指向一個鏈表,fd hash值相同的組成一個鏈表。
1 // lualib-src/lua-netpack.c
2 struct netpack {
3 int id; //socket id
4 int size; //數據塊長度
5 void * buffer; //數據塊
6 };
7
8 struct uncomplete { //不完整tcp包結構
9 struct netpack pack; //數據塊信息
10 struct uncomplete * next; //鏈表,指向下一個
11 int read; //已讀的字節數
12 int header; //第一個字節(代表數據長度的高8位)
13 };
14
15 struct queue {
16 int cap;
17 int head;
18 int tail;
19 struct netpack queue[QUEUESIZE]; //一次從內核讀取多個tcp包時放入該隊列里
20 struct uncomplete * hash[HASHSIZE]; //指針數組,數組里每個位置指向一個不完整的tcp包鏈表,fd hash值相同的組成一個鏈表
21 };
解析流程:
最終會調用filter_data_這個接口解析,下面著重介紹:
參數:fd socket; buffer從內核中讀到的數據塊;size數據塊大小
先看后半段46-82行,當queue里并沒有該socket剩余的數據塊,執行46行分支。
46-51行,是一個不完整的tcp包,只有一個字節數據,說明表示長度的頭部兩字節數據都還差一個字節,構造一個uncomplete結構(簡稱uc),然后存在queue->hash里。uc->read設置為-1,uc->header存放這一個字節。返回給Lua層的type是nil,Lua層不需要處理。
52-54行,通過頭部兩字節計算tcp包的長度read_size,接下來比較收到的數據size與真正需要的數據pack_size。
56-63行,size<pack_size,說明tcp包還有未讀到的數據,將已讀到的數據構造一個uc結構,保存在queue->hash里,返回給Lua層的type是nil,Lua層不需要處理。uc->read已讀到字節,uc->pack.size目標字節數
64-73行,size=pack_size,說明是一個完整的tcp包,大部分是這種情況,把tcp包返回給Lua層即可,此時返回的type是“data”(第66行)。
74-82行,size>pack_size,說明不止一個tcp包的數據,則先通過push_data保存第一個完整的tcp包(76行),接著通過push_more處理余下的數據(79行)。返回的type是"more"。
push_data做的工作是將tcp包保存在隊列里,供Lua層pop出使用。
push_more是一個遞歸操作,流程跟上面一樣,對比讀到的數據和需要的數據做對應的處理。
接著看6-44行,之前收到了tcp包的部分數據塊。
8-18行,說明之前只讀到一個字節,加上該數據塊的第一個字節,組成兩個字節計算出整個包的長度(12行)
第19行,目標字節-已讀字節=需要的字節need。
20-27行,如果size<need,說明仍然還差數據塊沒收到,此時將數據附加到之前的uc->pack.buffer里。
28-44行,其他兩種情況跟上面處理流程一樣。
1 // lualib-src/lua-netpack.c
2 static int
3 filter_data_(lua_State *L, int fd, uint8_t * buffer, int size) {
4 struct queue *q = lua_touserdata(L,1);
5 struct uncomplete * uc = find_uncomplete(q, fd);
6 if (uc) { //之前收到該包的部分數據塊,
7 // fill uncomplete
8 if (uc->read < 0) {//之前只收到一個字節,加上該數據塊的第一個字節,表示整個包的長度
9 // read size
10 assert(uc->read == -1);
11 int pack_size = *buffer;
12 pack_size |= uc->header << 8 ;
13 ++buffer;
14 --size;
15 uc->pack.size = pack_size;
16 uc->pack.buffer = skynet_malloc(pack_size);
17 uc->read = 0;
18 }
19 int need = uc->pack.size - uc->read;//包還差多少字節
20 if (size < need) {
21 memcpy(uc->pack.buffer + uc->read, buffer, size);
22 uc->read += size;
23 int h = hash_fd(fd);
24 uc->next = q->hash[h];
25 q->hash[h] = uc;
26 return 1;
27 }
28 memcpy(uc->pack.buffer + uc->read, buffer, need);
29 buffer += need;
30 size -= need;
31 if (size == 0) {
32 lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
33 lua_pushinteger(L, fd);
34 lua_pushlightuserdata(L, uc->pack.buffer);
35 lua_pushinteger(L, uc->pack.size);
36 skynet_free(uc);
37 return 5;
38 }
39 // more data
40 push_data(L, fd, uc->pack.buffer, uc->pack.size, 0);
41 skynet_free(uc);
42 push_more(L, fd, buffer, size);
43 lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
44 return 2;
45 } else {
46 if (size == 1) {
47 struct uncomplete * uc = save_uncomplete(L, fd);
48 uc->read = -1;
49 uc->header = *buffer;
50 return 1;
51 }
52 int pack_size = read_size(buffer); //需要數據包的字節數
53 buffer+=2;
54 size-=2;
55
56 if (size < pack_size) { //說明還有未獲得的數據包
57 struct uncomplete * uc = save_uncomplete(L, fd); //保存這個數據包
58 uc->read = size;
59 uc->pack.size = pack_size;
60 uc->pack.buffer = skynet_malloc(pack_size);
61 memcpy(uc->pack.buffer, buffer, size);
62 return 1;
63 }
64 if (size == pack_size) { //說明是一個完整包,把包返回給Lua層即可
65 // just one package
66 lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
67 lua_pushinteger(L, fd);
68 void * result = skynet_malloc(pack_size);
69 memcpy(result, buffer, size);
70 lua_pushlightuserdata(L, result);
71 lua_pushinteger(L, size);
72 return 5;
73 }
74 // more data
75 // 說明不止同一個數據包,還有額外的
76 push_data(L, fd, buffer, pack_size, 1); //保存第一個包到q->queue中
77 buffer += pack_size;
78 size -= pack_size;
79 push_more(L, fd, buffer, size); //處理余下的包
80 lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
81 return 2;
82 }
83 }
舉例,客戶端發了一個1kb的數據,socket線程會從內核里依次讀取64b,128b,256b,512b,64b數據。gateserver會執行5次filter_data_,分支依次是56行,20行,20行,20行,31行。
總結
以上是生活随笔為你收集整理的skynet源码分析之网络层——网关服务器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 玉溪红色烟草扁盒细支多少钱?
- 下一篇: 30行,金额转人民币大写的代码