js udp通信_nodejs源码分析第十九章 -- udp模块
udp不是面向連接的協(xié)議,所以使用上會比tcp簡單,他和tcp一樣,使用四元組來標記通信的雙方(單播的情況下)。我們看看udp作為服務(wù)器和客戶端的時候的流程。
1 在c語言中使用udp
1.1 服務(wù)器流程(偽代碼)
// 申請一個socket int fd = socket(...); // 綁定一個眾所周知的地址,像tcp一樣 bind(fd, ip, port); // 直接阻塞等待消息的到來,因為udp不是面向連接的,所以不需要listen recvmsg();1.2 客戶端流程
客戶端的流程有多種方式,原因在于源ip和端口可以有多種設(shè)置方式,不像服務(wù)器一樣,服務(wù)器的ip和端口是需要對外公布的,否則客戶端就無法找到目的地進行通信。這就意味著服務(wù)器的ip端口是需要用戶顯式指定的,而客戶端則不然,客戶端的ip端口是隨意選擇的,用戶可以自己指定,也可以由操作系統(tǒng)決定,下面我們看看各種使用方式。
1.2.1 顯式指定ip端口
// 申請一個socket1.2.2 由操作系統(tǒng)決定源ip和端口
// 申請一個socket我們看到這里直接就給服務(wù)器發(fā)送數(shù)據(jù),如果用戶不指定ip和端口,則操作系統(tǒng)會提供默認的源ip和端口,不過端口是在第一個調(diào)用sendto的時候就設(shè)置了,并且不能修改,但是如果是多宿主主機,每次調(diào)用sendto的時候,操作系統(tǒng)會動態(tài)選擇源ip。另外還有另外一種使用方式。
// 申請一個socket我們可以先調(diào)用connect綁定服務(wù)器ip和端口到fd,然后直接調(diào)用write發(fā)送數(shù)據(jù)。 雖然使用方式很多,但是歸根到底還是對四元組設(shè)置的管理。bind是綁定源ip端口到fd,connect是綁定服務(wù)器ip端口到fd。我們可以主動調(diào)用他們來對fd進行設(shè)置,也可以讓操作系統(tǒng)隨機選擇。
1.3 發(fā)送數(shù)據(jù)
我們剛才看到使用udp之前都需要調(diào)用socket函數(shù)申請一個socket,雖然調(diào)用socket函數(shù)返回的是一個fd,但是在操作系統(tǒng)中,的確是新建了一個socket對象,fd只是一個索引,操作這個fd的時候,操作系統(tǒng)會根據(jù)這個fd找到對應(yīng)的socket。socket是一個非常復(fù)雜的結(jié)構(gòu)體,我們可以理解為一個對象。這個對象中有兩個屬性,一個是讀緩沖區(qū)大小,一個是寫緩沖區(qū)大小。當我們發(fā)送數(shù)據(jù)的時候,雖然理論上可以發(fā)送任意大小的數(shù)據(jù),但是因為受限于發(fā)送緩沖區(qū)的大小,如果需要發(fā)送的數(shù)據(jù)比當前緩沖區(qū)大小大則會導(dǎo)致一些問題,我們分情況分析一下。 1 發(fā)送的數(shù)據(jù)大小比當前緩沖區(qū)大,如果設(shè)置了非阻塞模式,則返回EAGAIN,如果是阻塞模式,則會引起進程的阻塞。 2 如果發(fā)送的數(shù)據(jù)大小比緩沖區(qū)的最大值還大,則會導(dǎo)致一直阻塞或者返回EAGAIN。我們可能會想到修改緩沖區(qū)最大值的大小,但是這個大小也是有限制的。 講完一些邊界情況,我們再來看看正常的流程,我們看看發(fā)送一個數(shù)據(jù)包的流程 1 首先在socket的寫緩沖區(qū)申請一塊內(nèi)存用于數(shù)據(jù)發(fā)送。 2 調(diào)用ip層發(fā)送接口,如果數(shù)據(jù)包大小超過了ip層的限制,則需要分包。因為udp不是可靠的,所以不需要緩存這個數(shù)據(jù)包。 這就是udp發(fā)送數(shù)據(jù)的流程。
1.4 接收數(shù)據(jù)
當收到一個udp數(shù)據(jù)包的時候,操作系統(tǒng)首先會把這個數(shù)據(jù)包緩存到socket的緩沖區(qū),如果收到的數(shù)據(jù)包比當前緩沖區(qū)大小大,則丟棄數(shù)據(jù)包(關(guān)于大小的限制可以參考1.3章節(jié)),否則把數(shù)據(jù)包掛載到接收隊列,等用戶來讀取的時候,就逐個摘下接收隊列的節(jié)點。
2 udp模塊在nodejs中的實現(xiàn)
2.1 udp服務(wù)器
我們從一個使用例子開始看看udp模塊的實現(xiàn)。
const我們看到創(chuàng)建一個udp服務(wù)器很簡單,首先申請一個socket對象,在nodejs中和操作系統(tǒng)中一樣,socket是對網(wǎng)絡(luò)通信的一個抽象,我們可以把他理解成對傳輸層的抽象,他可以代表tcp也可以代表udp。我們看一下createSocket做了什么。
function我們看到一個socket對象是對handle的一個封裝。我們看看handle是什么。
functionhandle又是對UDP模塊的封裝,UDP是c++模塊,我們看看該c++模塊的定義。
// 定義一個v8函數(shù)模塊在c++層通用邏輯中我們講過相關(guān)的知識,這里就不詳細講述了,當我們在js層new UDP的時候,會新建一個c++對象。
UDPWrap執(zhí)行了uv_udp_init初始化udp對應(yīng)的handle。我們看一下libuv的定義。
int到這里,就是我們在js層執(zhí)行dgram.createSocket('udp4')的時候,在nodejs中主要的執(zhí)行過程。回到最開始的例子,我們看一下執(zhí)行bind的時候的邏輯。
Socketbind函數(shù)主要的邏輯是handle.bind和startListening。我們一個個看。我們看一下c++層的bind。
void也沒有太多邏輯,處理參數(shù)然后執(zhí)行uv_udp_bind,uv_udp_bind就不具體展開了,和tcp類似,設(shè)置一些標記和屬性,然后執(zhí)行操作系統(tǒng)bind的函數(shù)把本端的ip和端口保存到socket中。我們繼續(xù)看startListening。
function重點是recvStart函數(shù),我們到c++的實現(xiàn)。
voidOnAlloc, OnRecv分別是分配內(nèi)存接收數(shù)據(jù)的函數(shù)和數(shù)據(jù)到來時執(zhí)行的回調(diào)。繼續(xù)看libuv
intuvudp_recv_start主要是注冊io觀察者到loop,等待事件到來的時候,在poll io階段處理。前面我們講過,回調(diào)函數(shù)是uvudp_io。我們看一下事件觸發(fā)的時候,該函數(shù)怎么處理的。
static我們這里先分析可讀事件的邏輯。我們看uv__udp_recvmsg。
static最終通過操作系統(tǒng)調(diào)用recvmsg讀取數(shù)據(jù),操作系統(tǒng)收到一個udp數(shù)據(jù)包的時候,會掛載到socket的接收隊列,如果滿了則會丟棄,當用戶調(diào)用recvmsg函數(shù)的時候,操作系統(tǒng)就把接收隊列中節(jié)點逐個返回給用戶。讀取完后,libuv會回調(diào)c++層,然后c++層回調(diào)到j(luò)s層,最后觸發(fā)message事件,這就是對應(yīng)開始那段代碼的message事件。
2.2 客戶端
udp客戶端的流程是
1 調(diào)用bind綁定客戶端的地址信息
2 調(diào)用connect綁定服務(wù)器的地址信息
3 調(diào)用sendmsg和recvmsg進行數(shù)據(jù)通信
我們看一下nodejs里的流程
我們看到nodejs首先調(diào)用connect綁定服務(wù)器的地址,然后調(diào)用send發(fā)送信息,最后調(diào)用close。我們一個個分析。首先看connect。
Socket這里分為兩種情況,一種是在connect之前已經(jīng)調(diào)用了bind,第二種是沒有調(diào)用bind,如果沒有調(diào)用bind,則在connect之前先要調(diào)用bind。我們只分析沒有調(diào)用bind的情況,因為這是最長的鏈路。我們看一下bind的邏輯。
// port = {posrt: 0, exclusive : true}, address_ = null因為bind函數(shù)中的lookup不是同步執(zhí)行傳入的callback,所以這時候會先返回到connect函數(shù)。從而connect函數(shù)執(zhí)行以下代碼。
ifconnect函數(shù)先把回調(diào)加入隊列。
functionenqueue把回調(diào)加入隊列,并且監(jiān)聽了listening事件,該事件在bind成功后觸發(fā)。這時候connect函數(shù)就執(zhí)行完了,等待bind成功后(nexttick)會執(zhí)行 startListening(this)。
function我們看到這里(bind成功后)觸發(fā)了listening事件,從而執(zhí)行我們剛才入隊的回調(diào)onListenSuccess。
function回調(diào)就是把隊列中的回調(diào)執(zhí)行一遍,connect函數(shù)設(shè)置的回調(diào)是_connect。
function這里的address是服務(wù)器地址,_connect函數(shù)主要邏輯是
1 監(jiān)聽connect事件
2 對服務(wù)器地址進行dns解析(只能是本地的配的域名)。解析成功后執(zhí)行afterDns,最后執(zhí)行doConnect,并傳入解析出來的ip。我們看看doConnect
connect函數(shù)通過c++層,然后調(diào)用libuv,到操作系統(tǒng)的connect。作用是把服務(wù)器地址保存到socket中。connect的流程就走完了。接下來我們就可以調(diào)用send和recv發(fā)送和接收數(shù)據(jù)。
2.3 發(fā)送數(shù)據(jù)
發(fā)送數(shù)據(jù)接口是sendto,他是對send的封裝。
Socket.prototype.send = function(buffer,offset,length,port,address,callback) {let list;const state = this[kStateSymbol];const connected = state.connectState === CONNECT_STATE_CONNECTED;// 沒有調(diào)用connect綁定過服務(wù)端地址,則需要傳服務(wù)端地址信息if (!connected) {if (address || (port && typeof port !== 'function')) {buffer = sliceBuffer(buffer, offset, length);} else {callback = port;port = offset;address = length;}} else {if (typeof length === 'number') {buffer = sliceBuffer(buffer, offset, length);if (typeof port === 'function') {callback = port;port = null;}} else {callback = offset;}// 已經(jīng)綁定了服務(wù)端地址,則不能再傳了if (port || address)throw new ERR_SOCKET_DGRAM_IS_CONNECTED();}// 如果沒有綁定服務(wù)器端口,則這里需要傳,并且校驗if (!connected)port = validatePort(port);// 忽略一些參數(shù)處理邏輯// 沒有綁定客戶端地址信息,則需要先綁定,值由操作系統(tǒng)決定if (state.bindState === BIND_STATE_UNBOUND)this.bind({ port: 0, exclusive: true }, null);// bind還沒有完成,則先入隊,等待bind完成再執(zhí)行if (state.bindState !== BIND_STATE_BOUND) {enqueue(this, this.send.bind(this, list, port, address, callback));return;}// 已經(jīng)綁定了,設(shè)置服務(wù)端地址后發(fā)送數(shù)據(jù)const afterDns = (ex, ip) => {defaultTriggerAsyncIdScope(this[async_id_symbol],doSend,ex, this, ip, list, address, port, callback);};// 傳了地址則可能需要dns解析if (!connected) {state.handle.lookup(address, afterDns);} else {afterDns(null, null);} }我們繼續(xù)看doSend函數(shù)。
function doSend(ex, self, ip, list, address, port, callback) {const state = self[kStateSymbol];// dns解析出錯if (ex) {if (typeof callback === 'function') {process.nextTick(callback, ex);return;}process.nextTick(() => self.emit('error', ex));return;}// 定義一個請求對象const req = new SendWrap();req.list = list; // Keep reference alive.req.address = address;req.port = port;// 設(shè)置nodejs和用戶的回調(diào),oncomplete由c++層調(diào)用,callback由oncomplete調(diào)用if (callback) {req.callback = callback;req.oncomplete = afterSend;}let err;// 根據(jù)是否需要設(shè)置服務(wù)端地址,調(diào)c++層函數(shù)if (port)err = state.handle.send(req, list, list.length, port, ip, !!callback);elseerr = state.handle.send(req, list, list.length, !!callback);// err大于等于1說明同步發(fā)送成功了,直接執(zhí)行回調(diào),否則等待異步回調(diào)if (err >= 1) {if (callback)process.nextTick(callback, null, err - 1);return;}// 發(fā)送失敗if (err && callback) {// Don't emit as error, dgram_legacy.js compatibilityconst ex = exceptionWithHostPort(err, 'send', address, port);process.nextTick(callback, ex);} }我們穿過c++層,直接看libuv的代碼。
int uv__udp_send(uv_udp_send_t* req,uv_udp_t* handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr* addr,unsigned int addrlen,uv_udp_send_cb send_cb) {int err;int empty_queue;assert(nbufs > 0);// 還沒有綁定服務(wù)端地址,則綁定if (addr) {err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);if (err)return err;}// 當前寫隊列是否為空empty_queue = (handle->send_queue_count == 0);// 初始化一個寫請求uv__req_init(handle->loop, req, UV_UDP_SEND);if (addr == NULL)req->addr.ss_family = AF_UNSPEC;elsememcpy(&req->addr, addr, addrlen);// 保存上下文req->send_cb = send_cb;req->handle = handle;req->nbufs = nbufs;// 初始化數(shù)據(jù),預(yù)分配的內(nèi)存不夠,則分配新的堆內(nèi)存req->bufs = req->bufsml;if (nbufs > ARRAY_SIZE(req->bufsml))req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));// 復(fù)制過去堆中memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));// 更新寫隊列數(shù)據(jù)handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);handle->send_queue_count++;// 插入寫隊列,等待可寫事件的發(fā)生QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);uv__handle_start(handle);// 當前寫隊列為空,則直接開始寫,否則設(shè)置等待可寫隊列if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {// 發(fā)送數(shù)據(jù)uv__udp_sendmsg(handle);// 寫隊列是否非空,則設(shè)置等待可寫事件,可寫的時候接著寫if (!QUEUE_EMPTY(&handle->write_queue))uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);} else {uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);}return 0; }該函數(shù)把寫請求插入寫隊列中等待可寫事件的到來。然后注冊等待可寫事件。當可寫事件觸發(fā)的時候,執(zhí)行的函數(shù)是uv__udp_io。
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {uv_udp_t* handle;if (revents & POLLOUT) {uv__udp_sendmsg(handle);uv__udp_run_completed(handle);} }我們先看uv__udp_sendmsg
static void uv__udp_sendmsg(uv_udp_t* handle) {uv_udp_send_t* req;QUEUE* q;struct msghdr h;ssize_t size;// 逐個節(jié)點發(fā)送while (!QUEUE_EMPTY(&handle->write_queue)) {q = QUEUE_HEAD(&handle->write_queue);req = QUEUE_DATA(q, uv_udp_send_t, queue);memset(&h, 0, sizeof h);// 忽略參數(shù)處理h.msg_iov = (struct iovec*) req->bufs;h.msg_iovlen = req->nbufs;do {size = sendmsg(handle->io_watcher.fd, &h, 0);} while (size == -1 && errno == EINTR);if (size == -1) {// 繁忙則先不發(fā)了,等到可寫事件if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)break;}// 記錄發(fā)送結(jié)果req->status = (size == -1 ? UV__ERR(errno) : size);// 發(fā)送“完”移出寫隊列QUEUE_REMOVE(&req->queue);// 加入寫完成隊列QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);// 有節(jié)點數(shù)據(jù)寫完了,把io觀察者插入pending隊列,pending階段執(zhí)行回調(diào)uv__udp_iouv__io_feed(handle->loop, &handle->io_watcher);} }該函數(shù)遍歷寫隊列,然后逐個發(fā)送節(jié)點中的數(shù)據(jù),并記錄發(fā)送結(jié)果, 1 如果寫繁忙則結(jié)束寫邏輯,等待下一次寫事件觸發(fā)。
2 如果寫成功則把節(jié)點插入寫完成隊列中,并且把io觀察者插入pending隊列,等待pending階段執(zhí)行回調(diào)uvudp_io。 我們再次回到uvudp_io中
當寫事件觸發(fā)時,執(zhí)行完數(shù)據(jù)發(fā)送的邏輯后還會處理寫完成隊列。我們看uv__udp_run_completed。
static void uv__udp_run_completed(uv_udp_t* handle) {uv_udp_send_t* req;QUEUE* q;handle->flags |= UV_HANDLE_UDP_PROCESSING;// 逐個節(jié)點處理while (!QUEUE_EMPTY(&handle->write_completed_queue)) {q = QUEUE_HEAD(&handle->write_completed_queue);QUEUE_REMOVE(q);req = QUEUE_DATA(q, uv_udp_send_t, queue);uv__req_unregister(handle->loop, req);// 更新待寫數(shù)據(jù)大小handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);handle->send_queue_count--;// 如果重新申請了堆內(nèi)存,則需要釋放if (req->bufs != req->bufsml)uv__free(req->bufs);req->bufs = NULL;if (req->send_cb == NULL)continue;// 執(zhí)行回調(diào)if (req->status >= 0)req->send_cb(req, 0);elsereq->send_cb(req, req->status);}// 寫隊列為空,則注銷等待可寫事件if (QUEUE_EMPTY(&handle->write_queue)) {uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);if (!uv__io_active(&handle->io_watcher, POLLIN))uv__handle_stop(handle);}handle->flags &= ~UV_HANDLE_UDP_PROCESSING; }這就是發(fā)送的邏輯,發(fā)送完后libuv會調(diào)用c++回調(diào),最后回調(diào)js層回調(diào)。具體到操作系統(tǒng)也是類似的實現(xiàn),操作系統(tǒng)首先判斷數(shù)據(jù)的大小是否小于寫緩沖區(qū),是的話申請一塊內(nèi)存,然后構(gòu)造udp協(xié)議數(shù)據(jù)包,再逐層往下調(diào),最后發(fā)送出來,但是如果數(shù)據(jù)超過了底層的報文大小限制,則會被分片。
2.4 多播
udp支持多播,tcp則不支持,因為tcp是基于連接和可靠的,多播則會帶來過多的連接和流量。多播分為局域網(wǎng)多播和廣域網(wǎng)多播,我們知道在局域網(wǎng)內(nèi)發(fā)生一個數(shù)據(jù),是會以廣播的形式發(fā)送到各個主機的,主機根據(jù)目的地址判斷是否需要處理該數(shù)據(jù)包。如果udp是單播的模式,則只會有一個主機會處理該數(shù)據(jù)包。如果udp是多播的模式,則有多個主機處理該數(shù)據(jù)包。多播的時候,存在一個多播組的概念,只有加入這個組的主機才能處理該組的數(shù)據(jù)包。假設(shè)有以下局域網(wǎng)
當主機1給多播組1發(fā)送數(shù)據(jù)的時候,主機2,4可以收到,主機3則無法收到。我們再來看看廣域網(wǎng)的多播。廣域網(wǎng)的多播需要路由器的支持,多個路由器之間會使用多播路由協(xié)議交換多播組的信息。假設(shè)有以下廣域網(wǎng)。
當主機1給多播組1發(fā)送數(shù)據(jù)的時候,路由器1會給路由器2發(fā)送一份數(shù)據(jù)(通過多播路由協(xié)議交換了信息,路由1知道路由器2的主機4在多播組1中),但是路由器2不會給路由器3發(fā)送數(shù)據(jù),因為他知道路由器3對應(yīng)的網(wǎng)絡(luò)中沒有主機在多播組1。以上是多播的一些概念。nodejs中關(guān)于多播的實現(xiàn),基本是對操作系統(tǒng)api的封裝,所以就不打算講解,我們直接看操作系統(tǒng)中對于多播的實現(xiàn)。
2.4.1 加入一個多播組
可以通過以下代碼加入一個多播組。
setsockoptmreq的結(jié)構(gòu)體定義如下
struct我們看一下setsockopt的實現(xiàn)(只列出相關(guān)部分代碼)
case拿到加入的多播組ip和device后,調(diào)用ip_mc_join_group,在socket結(jié)構(gòu)體中,有一個字段維護了該socket加入的多播組信息。
intip_mc_join_group函數(shù)的主要邏輯是把socket想加入的多播組信息記錄到socket的ip_mc_list字段中(如果還沒有加入過該多播組的話)。接著調(diào)ip_mc_inc_group往下走。device層維護了主機中使用了該device的多播組信息。
staticip_mc_inc_group函數(shù)的主要邏輯是判斷socket想要加入的多播組是不是已經(jīng)存在于當前device中,如果不是則新增一個節(jié)點。繼續(xù)調(diào)用igmp_group_added
static我們看看igmp_send_report和ip_mc_filter_add的具體邏輯。
staticigmp_send_report其實就是構(gòu)造一個igmp協(xié)議數(shù)據(jù)包,然后發(fā)送出去,igmp的協(xié)議格式如下
struct接著我們看ip_mc_filter_add
void我們知道ip地址是32位,mac地址是48位,但是IANA規(guī)定,ipv4組播MAC地址的高24位是0x01005E,第25位是0,低23位是ipv4組播地址的低23位。而多播的ip地址高四位固定是1110。另外低23位被映射到mac多播地址的23位,所以多播ip地址中,有5位是可以隨機組合的。這就意味著,每32個多播ip地址,映射到一個mac地址。這會帶來一些問題,假設(shè)主機x加入了多播組a,主機y加入了多播組b,而a和b對應(yīng)的mac多播地址是一樣的。當主機z給多播組a發(fā)送一個數(shù)據(jù)包的時候,這時候主機x和y的網(wǎng)卡都會處理該數(shù)據(jù)包,并上報到上層,但是多播組a對應(yīng)的mac多播地址和多播組b是一樣的。我們拿到一個多播組ip的時候,可以計算出他的多播mac地址,但是反過來就不行,因為一個多播mac地址對應(yīng)了32個多播ip地址。那主機x和y怎么判斷是不是發(fā)給自己的數(shù)據(jù)包?因為device維護了一個本device上的多播ip列表,操作系統(tǒng)根據(jù)收到的數(shù)據(jù)包中的ip目的地址和device的多播ip列表對比。如果在列表中,則說明是發(fā)給自己的。最后我們看看dev_mc_add。device中維護了當前的mac多播地址列表,他會把這個列表信息同步到網(wǎng)卡中,使得網(wǎng)卡可以處理該列表中多播mac地址的數(shù)據(jù)包。
void網(wǎng)卡的工作模式有幾種,分別是正常模式(只接收發(fā)給自己的數(shù)據(jù)包)、混雜模式(接收所有數(shù)據(jù)包)、多播模式(接收一般數(shù)據(jù)包和多播數(shù)據(jù)包)。網(wǎng)卡默認是只處理發(fā)給自己的數(shù)據(jù)包,所以當我們加入一個多播組的時候,我們需要告訴網(wǎng)卡,當收到該多播組的數(shù)據(jù)包時,需要處理,而不是忽略。dev_mc_upload函數(shù)就是通知網(wǎng)卡。
void最后我們看一下set_multicast_list
staticset_multicast_list就是設(shè)置網(wǎng)卡工作模式的函數(shù)。至此,我們就成功加入了一個多播組。離開一個多播組也是類似的過程。
2.4.2 開啟多播
udp的多播能力是需要用戶主動開啟的,原因是防止用戶發(fā)送udp數(shù)據(jù)包的時候,誤傳了一個多播地址,但其實用戶是想發(fā)送一個單播的數(shù)據(jù)包。我們可以通過setBroadcast開啟多播能力。我們看libuv的代碼。
int uv_udp_set_broadcast(uv_udp_t* handle, int on) {if (setsockopt(handle->io_watcher.fd,SOL_SOCKET,SO_BROADCAST,&on,sizeof(on))) {return UV__ERR(errno);}return 0; }再看看操作系統(tǒng)的實現(xiàn)。
int sock_setsockopt(struct sock *sk, int level, int optname,char *optval, int optlen){...case SO_BROADCAST:sk->broadcast=val?1:0; }我們看到實現(xiàn)很簡單,就是設(shè)置一個標記位。當我們發(fā)送消息的時候,如果目的地址是多播地址,但是又沒有設(shè)置這個標記,則會報錯。
if(!sk->broadcast && ip_chk_addr(sin.sin_addr.s_addr)==IS_BROADCAST)return -EACCES;上面代碼來自調(diào)用udp的發(fā)送函數(shù)(例如sendto)時,進行的校驗,如果發(fā)送的目的ip是多播地址,但是沒有設(shè)置多播標記,則報錯。
2.4.3 其他功能
udp模塊還提供了其他一些功能
1 獲取本端地址address
如果用戶沒有顯示調(diào)用bind綁定自己設(shè)置的ip和端口,那么操作系統(tǒng)就會隨機選擇。通過address函數(shù)就可以獲取操作系統(tǒng)選擇的源ip和端口。
2 獲取對端的地址
通過remoteAddress函數(shù)可以獲取對端地址。該地址由用戶調(diào)用connect或sendto函數(shù)時設(shè)置。
3 獲取/設(shè)置緩沖區(qū)大小get/setRecvBufferSize,get/setSendBufferSize
4 setMulticastLoopback
發(fā)送多播數(shù)據(jù)包的時候,如果多播ip在出口設(shè)備的多播列表中,則給回環(huán)設(shè)備也發(fā)一份。
5 setMulticastInterface
設(shè)置多播數(shù)據(jù)的出口設(shè)備
6 加入或退出多播組addMembership/dropMembership
7 addSourceSpecificMembership/dropSourceSpecificMembership
這兩個函數(shù)是設(shè)置本端只接收特性源(主機)的多播數(shù)據(jù)包。
8 setTTL
單播ttl(單播的時候,ip協(xié)議頭中的ttl字段)。
9 setMulticastTTL
多播ttl(多播的時候,ip協(xié)議的ttl字段)。
10 ref/unref
這兩個函數(shù)設(shè)置如果nodejs主進程中只有udp對應(yīng)的handle時,是否允許nodejs退出。nodejs事件循環(huán)的退出的條件之一是是否還有ref狀態(tài)的handle。 這些都是對操作系統(tǒng)api的封裝,就不一一分析。
3 具體例子
局域網(wǎng)中有兩個局域網(wǎng)ip,分別是192.168.8.164和192.168.8.226
單播
服務(wù)器端
const dgram = require('dgram'); const udp = dgram.createSocket('udp4'); udp.bind(1234); udp.on('message', (msg, remoteInfo) => {console.log(`receive msg: ${msg} from ${remoteInfo.address}:${remoteInfo.port}`); });客戶端
const dgram = require('dgram'); const udp = dgram.createSocket('udp4'); udp.bind(1234); udp.send('test', 1234, '192.168.8.226');我們會看到服務(wù)端會顯示receive msg test from 192.168.8.164:1234。
多播
服務(wù)器
const dgram = require('dgram'); const udp = dgram.createSocket('udp4');udp.bind(1234, () => {// 局域網(wǎng)多播地址(224.0.0.0~224.0.0.255,該范圍的多播數(shù)據(jù)包,路由器不會轉(zhuǎn)發(fā))udp.addMembership('224.0.0.114'); });udp.on('message', (msg, rinfo) => {console.log(`receive msg: ${msg} from ${rinfo.address}:${rinfo.port}`); });服務(wù)器綁定1234端口后,加入多播組224.0.0.114,然后等待多播數(shù)據(jù)的到來。
客戶端
const dgram = require('dgram'); const udp = dgram.createSocket('udp4'); udp.bind(1234, () => {udp.addMembership('224.0.0.114'); }); udp.send('test', 1234, '224.0.0.114', (err) => {});客戶端綁定1234端口后,也加入了多播組224.0.0.114,然后發(fā)送數(shù)據(jù),但是發(fā)現(xiàn)服務(wù)端沒有收到數(shù)據(jù),客戶端打印了receive msg test from 169.254.167.41:1234。這怎么多了一個ip出來?原來我主機有兩個局域網(wǎng)地址。當我們加入多播組的時候,不僅可以設(shè)置加入哪個多播組,還能設(shè)置出口的設(shè)備和ip。當我們調(diào)用udp.addMembership('224.0.0.114')的時候,我們只是設(shè)置了我們加入的多播組,沒有設(shè)置出口。這時候操作系統(tǒng)會為我們選擇一個。根據(jù)輸出,我們發(fā)現(xiàn)操作系統(tǒng)選擇的是169.254.167.41(子網(wǎng)掩碼是255.255.0.0)。因為這個ip和192開頭的那個不是同一子網(wǎng),但是我們加入的是局域網(wǎng)的多播ip,所有服務(wù)端無法收到客戶端發(fā)出的數(shù)據(jù)包。下面是nodejs文檔的解釋。
Tells the kernel to join a multicast group at the given multicastAddress and multicastInterface using the IP_ADD_MEMBERSHIP socket option. If the multicastInterface argument is not specified, the operating system will choose one interface and will add membership to it. To add membership to every available interface, call addMembership multiple times, once per interface.我們看一下操作系統(tǒng)的相關(guān)邏輯。
if(MULTICAST(daddr) && *dev==NULL && skb->sk && *skb->sk->ip_mc_name)*dev=dev_get(skb->sk->ip_mc_name);上面的代碼來自操作系統(tǒng)發(fā)送ip數(shù)據(jù)包時的邏輯,如果目的ip似乎多播地址并且ip_mc_name非空(即我們通過addMembership第二個參數(shù)設(shè)置的值),則出口設(shè)備就是我們設(shè)置的值。否則操作系統(tǒng)自己選。所以我們需要顯示指定這個出口,把代碼改成udp.addMembership('224.0.0.114', '192.168.8.164');重新執(zhí)行發(fā)現(xiàn)客戶端和服務(wù)器都顯示了receive msg test from 192.168.8.164:1234。為什么客戶端自己也會收到呢?原來操作系統(tǒng)發(fā)送多播數(shù)據(jù)的時候,也會給自己發(fā)送一份。我們看看相關(guān)邏輯
// 目的地是多播地址,并且不是回環(huán)設(shè)備 if (MULTICAST(iph->daddr) && !(dev->flags&IFF_LOOPBACK)) {// 是否需要給自己一份,默認為trueif(sk==NULL || sk->ip_mc_loop){ // 給所有多播組的所有主機的數(shù)據(jù)包,則直接給自己一份if(iph->daddr==IGMP_ALL_HOSTS)ip_loopback(dev,skb);else{ // 判斷目的ip是否在當前設(shè)備的多播ip列表中,是的回傳一份struct ip_mc_list *imc=dev->ip_mc_list;while(imc!=NULL){if(imc->multiaddr==iph->daddr){ip_loopback(dev,skb);break;}imc=imc->next;}}} }以上代碼來自ip層發(fā)送數(shù)據(jù)包時的邏輯。如果我們設(shè)置了sk->ip_mc_loop字段為1,并且數(shù)據(jù)包的目的ip在出口設(shè)備的多播列表中,則需要給自己回傳一份。那么我們?nèi)绾侮P(guān)閉這個特性呢?調(diào)用udp.setMulticastLoopback(false)就可以了。
更多參考
1 通過源碼理解IGMP v1的實現(xiàn)(基于linux1.2.13)
2 UDP協(xié)議源碼解析之接收
3 UDP協(xié)議源碼解析之發(fā)送
總結(jié)
以上是生活随笔為你收集整理的js udp通信_nodejs源码分析第十九章 -- udp模块的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 硬件知识:USB3.0和USB2.0的区
- 下一篇: html 显示不吃,20180902_h