Nginx訪問上游服務器的流程大致分以下幾個階段:啟動upstream、連接上游服務器、向上游發(fā)送請求、接收上游響應(包頭/包體)、結(jié)束請求。本篇主要從代碼流程的角度,梳理一下upstream的整個的數(shù)據(jù)的處理流程。下面先看一下upstream相關的兩個重要數(shù)據(jù)結(jié)構ngx_http_upstream_t和ngx_http_upstream_conf_t:
相關數(shù)據(jù)結(jié)構
typedef struct ngx_http_upstream_s ? ?ngx_http_upstream_t;struct?ngx_http_upstream_s {ngx_http_upstream_handler_pt read_event_handler; ? ?// 處理讀事件的回調(diào)方法ngx_http_upstream_handler_pt write_event_handler; ??// 處理寫事件的回調(diào)方法ngx_peer_connection_t peer; ? ? ? ? ? ? ? ? ? ? ? ??// 主動向上游發(fā)起的連接,稍后會詳細分析ngx_event_pipe_t?*pipe; ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 當開啟緩存配置,會用pipe來轉(zhuǎn)發(fā)響應,需要http模塊在使用upstream機制前構造pipe結(jié)構體ngx_chain_t?*request_bufs; ? ? ? ? ? ? ? ? ? ? ? ? ?//?用鏈表將ngx_buf_t緩沖區(qū)鏈接起來,表示所有需要發(fā)送到上游的請求內(nèi)容,//?create_request回調(diào)在于構造request_buf鏈表ngx_output_chain_ctx_t output; ? ? ? ? ? ? ? ? ? ? ?// 向下游發(fā)送響應的方式,稍后會詳細分析ngx_chain_writer_ctx_t writer; ? ? ? ? ? ? ? ? ? ? ?// 向下游發(fā)送響應的方式,稍后會詳細分析ngx_http_upstream_conf_t?*conf; ? ? ? ? ? ? ? ? ? ??// upstream相關的配置信息
#if?(NGX_HTTP_CACHE)ngx_array_t?*caches; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 緩存數(shù)組,稍后會單獨介紹緩存相關內(nèi)容
#endifngx_http_upstream_headers_in_t headers_in; ? ? ? ? ?// 當直接轉(zhuǎn)發(fā)時,process_header將解析的頭部適配為http頭部,同時將包頭信息放在headers_in中ngx_http_upstream_resolved_t?*resolved; ? ? ? ? ? ??// 用于解析主機域名,后面會詳細介紹ngx_buf_t from_client; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// ToDo....ngx_buf_t buffer; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 接收上游服務器響應包頭的緩存區(qū),當不需要直接響應或buffering為0時,也作為轉(zhuǎn)發(fā)包體緩沖區(qū)??off_t?length; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 來自上游服務器的響應包體的長度ngx_chain_t?*out_bufs; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 使用時再具體介紹,不同場景下有不同意義ngx_chain_t?*busy_bufs; ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 當buffering為0時,表示上一次向下游轉(zhuǎn)發(fā)響應時沒有發(fā)送完成的內(nèi)容ngx_chain_t?*free_bufs; ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 當buffering為0時,用于回收out_bufs中已經(jīng)發(fā)送給下游的ngx_buf_t結(jié)構體ngx_int_t?(*input_filter_init)(void?*data); ? ? ? ? // 處理包體前的初始化方法,其中data用于傳遞用戶數(shù)據(jù)結(jié)構,即下方的input_filter_ctxngx_int_t?(*input_filter)(void?*data,?ssize_t bytes)// 處理包體的方法,bytes表示本次接收到的包體長度,data同上void?*input_filter_ctx; ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 傳遞http模塊的自定義的數(shù)據(jù)結(jié)構#if?(NGX_HTTP_CACHE)ngx_int_t?(*create_key)(ngx_http_request_t?*r);? ? ?// cache部分,后面再分析
#endifngx_int_t?(*create_request)(ngx_http_request_t?*r); // 用于構造發(fā)往上游服務器的請求ngx_int_t?(*reinit_request)(ngx_http_request_t?*r); // 與上游通訊失敗,需要重新發(fā)起連接時,用該方法重新初始化請求信息ngx_int_t?(*process_header)(ngx_http_request_t?*r); // 解析上游服務器返回響應的包頭,NGX_AGAIN接收不完整,NGX_OK解析到完整包頭void?(*abort_request)(ngx_http_request_t?*r); ? ? ? // 暫時沒有用到void?(*finalize_request)(ngx_http_request_t?*r, ? ? // 請求結(jié)束時會調(diào)用,目前沒有實際作用ngx_int_t rc);ngx_int_t?(*rewrite_redirect)(ngx_http_request_t?*r,// 上游返回響應中含Location或Refresh時,process_header會調(diào)用http模塊實現(xiàn)的該方法ngx_table_elt_t?*h,?size_t prefix);ngx_int_t?(*rewrite_cookie)(ngx_http_request_t?*r, ?// 同上,當響應中含Set-Cookie時,會調(diào)用http模塊實現(xiàn)的該方法ngx_table_elt_t?*h);ngx_msec_t timeout;????????????? ? ????????? ? ? ? ?// 暫時沒有用到ngx_http_upstream_state_t?*state; ? ? ? ? ? ? ? ? ??// 用于表示上游響應的錯誤碼、包體長度等信息ngx_str_t method; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 用于文件緩存,稍后再進行分析ngx_str_t schema; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 記錄日志時使用ngx_str_t uri; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 記錄日志時使用ngx_http_cleanup_pt?*cleanup; ? ? ? ? ? ? ? ? ? ? ??// 用于標識是否需要清理資源,相當于一個標志位,實際不會調(diào)用該方法unsigned store:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 是否指定文件緩存路徑的標志位unsigned cacheable:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 是否啟用文件緩存unsigned accel:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 目前沒有用到unsigned ssl:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 是否基于SSL協(xié)議訪問上游服務器unsigned buffering:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 向下游轉(zhuǎn)發(fā)響應包體時,是否開啟更大內(nèi)存及臨時磁盤文件用于緩存來不及發(fā)送到下游的響應包體unsigned keepalive:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 標識與后端是否開啟keepalive ?unsigned upgrade:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 是否存在upgrade headerunsigned request_sent:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 是否向上游服務器發(fā)送了請求unsigned header_sent:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 為1時,表示包頭已經(jīng)轉(zhuǎn)發(fā)給客戶端了
}
ngx_http_upstream_conf_t:指定了upstream的運行方式,必須在啟動upstream之前設置
點擊(此處)折疊或打開
typedef?struct?{ngx_http_upstream_srv_conf_t?*upstream; // 當上面沒有實現(xiàn)resolved成員時,用該結(jié)構體定義上游服務器的配置ngx_msec_t connect_timeout; ? ? ? // 建立tcp連接的超時時間,即寫事件添加到定時器中設置的超時時間ngx_msec_t send_timeout; ? ? ? ? ?// 發(fā)送請求的超時時間,即寫事件添加到定時器中設置的超時時間ngx_msec_t read_timeout; ? ? ? ? ?// 接收響應的超時時間,即讀事件添加到定時器中設置的超時時間ngx_msec_t timeout; ? ? ? ? ? ? ??// 暫時沒有使用ngx_msec_t next_upstream_timeout; //?size_t send_lowat; ? ? ? ? ? ? ? ?// 發(fā)送緩存區(qū)的下限,即TCP的SO_SNOLOWAT選項size_t buffer_size; ? ? ? ? ? ? ??//?指定接收頭部緩沖區(qū)分配的內(nèi)存大小,當buffering為0時,由于上述buffer同時用于接收包體,也表示接收包體緩沖區(qū)大小size_t limit_rate; ? ? ? ? ? ? ? ?//?size_t busy_buffers_size; ? ? ? ??// 當buffering為1,且向下游轉(zhuǎn)發(fā)響應時生效,會設置到ngx_event_pipe_t結(jié)構體的busy_size中size_t max_temp_file_size; ? ? ? ?// 指定臨時文件的大小,限制ngx_event_pipe_t中的temp_filesize_t temp_file_write_size; ? ? ?// 將緩沖區(qū)的響應寫入臨時文件時,一次寫入字符流的最大長度......ngx_bufs_t bufs; ? ? ? ? ? ? ? ? ?// 以緩存響應的方式轉(zhuǎn)發(fā)上游服務器的包體時所使用的內(nèi)存大小ngx_uint_t ignore_headers; ? ? ? ?// 以位圖的形式標識在轉(zhuǎn)發(fā)時需要忽略的headersngx_uint_t next_upstream; ? ? ? ? // 以位圖的方式表示一些錯誤碼,當處理上游響應時發(fā)現(xiàn)該錯誤碼,選擇下一個上游服務器重發(fā)請求ngx_uint_t store_access; ? ? ? ? ?// 表示創(chuàng)建的臨時目錄和文件的權限ngx_uint_t next_upstream_tries; ? //?ngx_flag_t buffering; ? ? ? ? ? ? //?為1時表示打開緩存,盡量在內(nèi)存和磁盤中緩存來自上游的響應,為0時則開辟固定大小內(nèi)存塊作為緩存來轉(zhuǎn)發(fā)響應......ngx_flag_t ignore_client_abort; ? // 為1時,表示與上游服務器交互時不檢查nginx與下游服務器是否斷開,即使下游主動關閉連接,也不會中斷與上游交互ngx_flag_t intercept_errors; ? ? ?// 詳見ngx_http_upstream_intercept_errorsngx_flag_t cyclic_temp_file; ? ? ?// 為1時,會嘗試復用臨時文件中已經(jīng)使用過的空間......ngx_path_t?*temp_path; ? ? ? ? ? ?//?buffering為1的情況下轉(zhuǎn)發(fā)響應時,存放臨時文件的路徑ngx_hash_t hide_headers_hash; ? ? // 不轉(zhuǎn)發(fā)的頭部,根據(jù)hide_headers和pass_headers動態(tài)數(shù)組構造出的需要隱藏的http頭部散列表ngx_array_t?*hide_headers; ? ? ? ?// 當轉(zhuǎn)發(fā)上游頭部給下游時,如果不希望將某些頭部轉(zhuǎn)發(fā)給下游,則設置到該數(shù)組中ngx_array_t?*pass_headers; ? ? ? ?// 轉(zhuǎn)發(fā)頭部時upstream機制默認不會轉(zhuǎn)發(fā)某些頭部,當確定需要轉(zhuǎn)發(fā)時,需要設置到該數(shù)組中ngx_http_upstream_local_t?*local; // 連接上游服務器時,需要使用的本機地址ngx_array_t?*store_lengths; ? ? ??// 當需要將上游響應緩存到文件中時,表示存放路徑的長度ngx_array_t?*store_values; ? ? ? ?//?當需要將上游響應緩存到文件中時,表示存放路徑......signed store:2; ? ? ? ? ? ? ? ? ??// 同ngx_http_upstream_t中的storeunsigned intercept_404:1; ? ? ? ??// 如果該值設為1,當上游返回404時直接轉(zhuǎn)發(fā)該錯誤碼給下游,而不會去與error_page進行比較unsigned change_buffering:1; ? ? ?// 當為1時,根據(jù)上游服務器返回的響應頭部,動態(tài)決定是以上游網(wǎng)速優(yōu)先,還是下游網(wǎng)速優(yōu)先......ngx_str_t module; ? ? ? ? ? ? ? ??// 使用upstream的模塊名稱,僅用于記錄日志
} ngx_http_upstream_conf_t
啟動upstream
當收到請求后,http的代理模塊是ngx_http_proxy_module,其NGX_HTTP_CONTENT_PHASE階段的處理函數(shù)為ngx_http_proxy_handler
點擊(此處)折疊或打開
static ngx_int_t
ngx_http_proxy_handler(ngx_http_request_t?*r)
{// 創(chuàng)建ngx_http_upstream_t結(jié)構,并賦值給r->upstreamif?(ngx_http_upstream_create(r)?!=?NGX_OK)?{return NGX_HTTP_INTERNAL_SERVER_ERROR;}.....plcf?=?ngx_http_get_module_loc_conf(r,?ngx_http_proxy_module);.....u?=?r->upstream;.....// 給upstream的conf成員賦值,記錄相關的配置信息u->conf?=?&plcf->upstream;// 設置相關的回調(diào)信息u->create_request?=?ngx_http_proxy_create_request;u->reinit_request?=?ngx_http_proxy_reinit_request;u->process_header?=?ngx_http_proxy_process_status_line;u->abort_request?=?ngx_http_proxy_abort_request;u->finalize_request?=?ngx_http_proxy_finalize_request;......u->buffering?=?plcf->upstream.buffering;.....// 調(diào)用ngx_http_upstream_init函數(shù)rc?=?ngx_http_read_client_request_body(r,?ngx_http_upstream_init);.....return NGX_DONE;
}
首先創(chuàng)建upstream的結(jié)構并進行設置,然后設置ngx_http_upstream_conf_t配置結(jié)構體給upstream->conf。ngx_http_upstream_init函數(shù)會根據(jù)
ngx_http_upstream_conf_t配置的信息初始化upstream,同時開始連接上游服務器,由此展開整個upstream的處理流程。
點擊(此處)折疊或打開
void?ngx_http_upstream_init(ngx_http_request_t?*r)
{ngx_connection_t?*c;// 客戶端的連接c?=?r->connection;......// 當啟用upstream時,需要將客戶端對應的讀事件從定時器中刪除,此時主要關注上游的連接相關的事件if?(c->read->timer_set)?{ngx_del_timer(c->read);}......ngx_http_upstream_init_request(r);
}
繼續(xù)看ngx_http_upstream_init_request函數(shù)
點擊(此處)折疊或打開
static void?ngx_http_upstream_init_request(ngx_http_request_t?*r)
{u?=?r->upstream;u->store?=?u->conf->store;......// 設置Nginx與下游客戶端之間TCP連接的檢查方法,注意幾個條件,ignore來自之前配置屬性,是否忽略客戶端的連接狀態(tài)if?(!u->store &&?!r->post_action &&?!u->conf->ignore_client_abort)?{r->read_event_handler?=?ngx_http_upstream_rd_check_broken_connection;r->write_event_handler?=?ngx_http_upstream_wr_check_broken_connection;}......// 調(diào)用http模塊實現(xiàn)的create_request方法,即前面注冊的ngx_http_proxy_create_request函數(shù),用于構造發(fā)到上游服務器的請求if?(u->create_request(r)?!=?NGX_OK)?{ngx_http_finalize_request(r,?NGX_HTTP_INTERNAL_SERVER_ERROR);return;}......// 向當前請求的main成員指向的原始請求中的cleanup鏈表末尾添加一個新成員cln?=?ngx_http_cleanup_add(r,?0);// 將handler的回調(diào)方法設置為ngx_http_upstream_cleanupcln->handler?=?ngx_http_upstream_cleanup;cln->data?=?r;u->cleanup?=?&cln->handler;......// 調(diào)用ngx_http_upstream_connect向上游服務器發(fā)起連接ngx_http_upstream_connect(r,?u);
}
與上游服務器建立連接
upstream機制與上游服務器之間通過tcp建立連接,為了保證三次握手的過程中不阻塞進程,Nginx采用了無阻塞的套接字來連接上游服務器。
ngx_http_upstream_connect負責發(fā)起建連動作,如果沒有立即返回成功,需要在epoll中監(jiān)控該套接字,當出現(xiàn)可寫事件時,則說明連接已經(jīng)建立成功。
點擊(此處)折疊或打開
static void?ngx_http_upstream_connect(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{// 建連的動作主要由下面函數(shù)進行.....rc?=?ngx_event_connect_peer(&u->peer);....
點擊(此處)折疊或打開
ngx_int_t?ngx_event_connect_peer(ngx_peer_connection_t?*pc)
{// 創(chuàng)建tcp socket套接字s?=?ngx_socket(pc->sockaddr->sa_family,?SOCK_STREAM,?0);......// 獲取空閑的ngx_connection_t結(jié)構來承載連接,從ngx_cycle_t的free_connections指向的空閑連接池中獲取c?=?ngx_get_connection(s,?pc->log);......// 設置連接為非阻塞的模式if?(ngx_nonblocking(s)?==?-1)?{......// 綁定地址和端口if?(pc->local)?{if?(bind(s,?pc->local->sockaddr,?pc->local->socklen)?==?-1)?{......// 設置連接收發(fā)相關的回調(diào)函數(shù)c->recv?=?ngx_recv;c->send?=?ngx_send;c->recv_chain?=?ngx_recv_chain;c->send_chain?=?ngx_send_chain;// 啟用sendfile的支持c->sendfile?=?1;......rev = c->read;wev = c->write;......pc->connection = c;// 調(diào)用ngx_event_actions.add_conn將tcp套接字以期待可讀、可寫的方式添加到事件搜集器中,這里是把套接字加到epoll中if (ngx_add_conn) {if (ngx_add_conn(c) == NGX_ERROR) {goto failed;}}// 向上游服務器發(fā)起連接,由于非阻塞,調(diào)用會立即返回rc = connect(s, pc->sockaddr, pc->socklen);......
回到ngx_http_upstream_connect繼續(xù)分析
點擊(此處)折疊或打開
static void?ngx_http_upstream_connect(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 上面已經(jīng)分析了,該函數(shù)主要進行上游服務器的連接rc?=?ngx_event_connect_peer(&u->peer);......c?=?u->peer.connection;c->data?=?r;// 將上游connection上讀寫事件的回調(diào),都設置為ngx_http_upstream_handlerc->write->handler?=?ngx_http_upstream_handler;c->read->handler?=?ngx_http_upstream_handler;// 設置upstream機制的write_event_handler和read_event_handler,具體使用見后續(xù)的ngx_upstream_handler函數(shù)//?ngx_http_upstream_send_request_handler用于向上游發(fā)送請求u->write_event_handler?=?ngx_http_upstream_send_request_handler;//?ngx_http_upstream_process_header接收和解析上游服務器的響應u->read_event_handler?=?ngx_http_upstream_process_header;......if?(rc?==?NGX_AGAIN)?{// 當連接沒有建立成功時,套接字已經(jīng)在epoll中了,將寫事件添加到定時器中,超時時間是ngx_http_upstream_conf_t中的connect_timeout成員ngx_add_timer(c->write,?u->conf->connect_timeout);return;}......// 當成功建立連接時,向上游服務器發(fā)送請求,注意:此處的函數(shù)與上面設置的定時器回調(diào)的函數(shù)有所不同,下文會進行說明ngx_http_upstream_send_request(r,?u);
}
下面先簡單看一下connection的讀寫回調(diào)函數(shù)——ngx_http_upstream_handler
點擊(此處)折疊或打開
static void
ngx_http_upstream_handler(ngx_event_t?*ev)
{......// 由事件的data成員取得ngx_connection_t連接,該連接是nginx與上游服務器之間的連接c?=?ev->data;// 由連接的data取得ngx_http_request_t結(jié)構體r?=?c->data;// 由請求的upstream成員取的表示upstream機制的ngx_http_upstream_t結(jié)構體u?=?r->upstream;// 此處ngx_http_request_t結(jié)構中的connection成員代表的是客戶端與nginx之間連接c?=?r->connection;......if?(ev->write)?{// nginx與上游服務器間的tcp連接的可寫事件被觸發(fā)時,該方法被調(diào)用u->write_event_handler(r,?u);}?else?{//?nginx與上游服務器間的tcp連接的可讀事件被觸發(fā)時,該方法被調(diào)用u->read_event_handler(r,?u);}// 與nginx_http_request_handler相同,最后一步執(zhí)行post請求ngx_http_run_posted_requests(c);
}
發(fā)送請求到上游服務器
前面在介紹ngx_http_upstream_connect函數(shù)時,我們看到將ngx_http_upstream_t中的write_event_handler設置為了ngx_http_upstream_send_request_handler,而ngx_http_upstream_connect的最后直接調(diào)用了ngx_http_upstream_send_request發(fā)送請求。
下面先來看一下兩者的區(qū)別
點擊(此處)折疊或打開
static void?ngx_http_upstream_send_request_handler(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{ngx_connection_t?*c;// 獲取與上游服務器間表示連接的ngx_connection_t結(jié)構體c?=?u->peer.connection;// 當寫事件的timeout被設置為1時,則代表向上游發(fā)送http請求已經(jīng)超時if?(c->write->timedout)?{// 將超時錯誤傳給next方法,next方法根據(jù)允許的重傳策略決定:重新發(fā)起連接執(zhí)行upstream請求,還是結(jié)束upstream請求ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_TIMEOUT);return;}......// header_sent為1時,表示上游服務器的響應需要直接轉(zhuǎn)發(fā)給客戶端,而且此時響應包頭已經(jīng)轉(zhuǎn)給客戶端了if?(u->header_sent)?{// 由于此時已經(jīng)收到了上游服務器的完整包頭,此時不需要再向上游發(fā)送請求,因此將write回調(diào)設置為空函數(shù)(只記錄日志)u->write_event_handler?=?ngx_http_upstream_dummy_handler;// 將寫事件添加到epoll中(void)?ngx_handle_write_event(c->write,?0);return;}// 調(diào)用下面函數(shù)向上游發(fā)送http請求ngx_http_upstream_send_request(r,?u);
}
通過上面的分析,現(xiàn)在很容易看出兩者的區(qū)別,ngx_http_upstream_send_request_handler更多的是在檢測請求的狀態(tài),而實際的發(fā)送函數(shù)是
ngx_http_upstream_send_request,下面繼續(xù)看一下該函數(shù)。
點擊(此處)折疊或打開
static void?ngx_http_upstream_send_request(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 發(fā)送u->request_bufs鏈表上的請求內(nèi)容,該函數(shù)會把未一次發(fā)送完的鏈表緩沖區(qū)保存下來,再次調(diào)用時不需要request_bufs參數(shù)rc?=?ngx_output_chain(&u->output,?u->request_sent???NULL?:?u->request_bufs);// 標識已經(jīng)向上游發(fā)送了請求,實際上是為了標識是否調(diào)用過ngx_output_chain,除了第一次,其他時候不需要再傳送request_bufs,直接設置為NULLu->request_sent?=?1;......// 當寫事件仍在定時器中時,先將寫事件從定時器中移出,由ngx_output_chain的返回值決定是否需要向定時器中增加寫事件if?(c->write->timer_set)?{ngx_del_timer(c->write);}// 當ngx_output_chain返回NGX_AGAIN時,說明請求還沒有發(fā)完,此時需要設置寫事件定時器if?(rc?==?NGX_AGAIN)?{ngx_add_timer(c->write,?u->conf->send_timeout);// 將寫事件添加到epoll中if?(ngx_handle_write_event(c->write,?u->conf->send_lowat)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,NGX_HTTP_INTERNAL_SERVER_ERROR);return;}// 結(jié)束ngx_http_upstream_send_request的執(zhí)行,等待epoll事件觸發(fā)return;}/*?rc?==?NGX_OK?*/// 當ngx_output_chain返回NGX_OK時,表示向上游服務器發(fā)送完了所有的請求,將寫事件的回調(diào)設置為空函數(shù)......u->write_event_handler?=?ngx_http_upstream_dummy_handler;// 重新添加到epoll中if?(ngx_handle_write_event(c->write,?0)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,NGX_HTTP_INTERNAL_SERVER_ERROR);return;}// 發(fā)送完請求后,需要開始讀上游返回的響應,設置讀事件的超時時間ngx_add_timer(c->read,?u->conf->read_timeout);// 當ready已經(jīng)設置時,說明應答已經(jīng)到位,調(diào)用process_header開始處理來自上游的響應if?(c->read->ready)?{ngx_http_upstream_process_header(r,?u);return;}
}
接收上游服務器的響應
Nginx的upstream機制支持三種響應包體的處理方式:不轉(zhuǎn)發(fā)響應、轉(zhuǎn)發(fā)響應時以下游網(wǎng)速優(yōu)先、轉(zhuǎn)發(fā)響應時以上游網(wǎng)速優(yōu)先。當ngx_http_request_t結(jié)構體的
subrequest_in_memory標志位為1時,即不轉(zhuǎn)發(fā)響應;當subrequest_in_memory為0時,則轉(zhuǎn)發(fā)響應;而ngx_http_upstream_conf_t配置結(jié)構中的buffering
為0時,則以下游網(wǎng)速優(yōu)先,即使用固定大小的內(nèi)存作為緩存;當buffering為1時,則以上游網(wǎng)速優(yōu)先,即采用更多的內(nèi)存、硬盤文件作為緩存。
下面看一下用于接收、解析響應頭部的ngx_http_upstream_process_header方法
點擊(此處)折疊或打開
static void
ngx_http_upstream_process_header(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 獲取到上游服務器的連接信息c?=?u->peer.connection;......// 檢查是否發(fā)生了讀事件超時,如果發(fā)生了超時,則調(diào)用ngx_http_upstream_next函數(shù)決定下一步動作if?(c->read->timedout)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_TIMEOUT);return;}// request_sent為1則代表已經(jīng)向上游發(fā)過請求;為0則代表還沒有發(fā)送請求,沒有發(fā)送請求卻收到上游的響應時,則不符合邏輯,進行下一步動作// ngx_http_upstream_next會根據(jù)配置信息決定是否直接結(jié)束請求,還是尋找下一個上游服務器if?(!u->request_sent && ngx_http_upstream_test_connect(c)?!=?NGX_OK)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_ERROR);return;}// 檢查用于接收上游響應的buffer,當start為NULL時,代表該緩沖區(qū)尚未進行分配,此時會按照配置指定的buffer_size進行緩沖區(qū)的分配if?(u->buffer.start?==?NULL)?{u->buffer.start?=?ngx_palloc(r->pool,?u->conf->buffer_size);if?(u->buffer.start?==?NULL)?{ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_INTERNAL_SERVER_ERROR);return;}// 針對新申請的緩沖區(qū)進行初始化,省略.......}for?(?;;?)?{// 讀取響應的內(nèi)容存儲在buffer中,每次讀取的最大不超過buffer_size,即當前緩沖區(qū)的剩余空間大小n?=?c->recv(c,?u->buffer.last,?u->buffer.end?-?u->buffer.last);// NGX_AGAIN代表響應還沒有讀完,設置讀事件到epoll中,等待下一次讀取if?(n?==?NGX_AGAIN)?{if?(ngx_handle_read_event(c->read,?0)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_INTERNAL_SERVER_ERROR);return;}return;}// 讀取出錯或者連接已關閉,則調(diào)用next函數(shù)決定是終止連接,還是重新選擇上游服務器if?(n?==?NGX_ERROR?||?n?==?0)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_ERROR);return;}// n 大于0時,代表讀取到的數(shù)據(jù),此時last游標需要往后移動n個字節(jié),last初始化時與start相同,指向buffer起始地址u->buffer.last?+=?n;// 開始處理讀取到的響應頭部信息rc?=?u->process_header(r);// 檢查process_header的返回值,當返回NGX_AGAIN時,需要判斷一下是否當前的緩沖區(qū)已經(jīng)被用盡,如果被用盡說明一個buffer_size無法容納整個響應頭部if?(rc?==?NGX_AGAIN)?{// 當buffer無法容納整個響應頭部時,調(diào)用next決定是終止連接還是選擇下一個上游服務器if?(u->buffer.last?==?u->buffer.end)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);......}// 當process_header處理的是完整的響應頭部時,會進一步判斷其返回值,檢測到無效的響應頭部時,進行next的進一步?jīng)Q策處理if?(rc?==?NGX_HTTP_UPSTREAM_INVALID_HEADER)?{ngx_http_upstream_next(r,?u,?NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);return;}// 當process_header返回ERROR時,直接終止當前的請求if?(rc?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_INTERNAL_SERVER_ERROR);return;}// 走到目前位置,當前的process_header至少是執(zhí)行成功了,完整的解析了響應的頭部信息/*?rc?==?NGX_OK?*/.......// 處理已經(jīng)解析出的頭部,該函數(shù)會把已經(jīng)解析出的頭部,設置到ngx_http_request_t結(jié)構體的headers_out成員中// 當調(diào)用ngx_http_send_header時,可以將設置到headers_out中的響應頭部發(fā)送給客戶端if?(ngx_http_upstream_process_headers(r,?u)?!=?NGX_OK)?{return;}// subrequest_in_memory字段為0時,表示需要轉(zhuǎn)發(fā)響應到客戶端;為1時,表示不需要轉(zhuǎn)發(fā)響應到客戶端if?(!r->subrequest_in_memory)?{// 發(fā)送響應給客戶端ngx_http_upstream_send_response(r,?u);return;}// 以下的邏輯是不需要轉(zhuǎn)發(fā)響應給客戶端,即subrequest_in_memory為1的情況/*?subrequest content?in?memory?*/// 檢查一下input_filter是否為NULL,input_filter用于處理響應的包體,當沒有定義自己的實現(xiàn)方法時,使用默認的處理方法if?(u->input_filter?==?NULL)?{u->input_filter_init?=?ngx_http_upstream_non_buffered_filter_init;?u->input_filter?=?ngx_http_upstream_non_buffered_filter; ?u->input_filter_ctx?=?r;?}// 調(diào)用init方法為即將進行的包體處理做一些初始化的工作,默認的init函數(shù)是空的,什么也沒做? ??if?(u->input_filter_init(u->input_filter_ctx)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// pos與last之間的內(nèi)容是已經(jīng)讀取但尚未處理的數(shù)據(jù)n?=?u->buffer.last?-?u->buffer.pos;// 當process_header處理完后,如果還有尚未處理的數(shù)據(jù),那說明除了讀到了包頭之外,還讀到部分包體信息if?(n)?{u->buffer.last?=?u->buffer.pos;u->state->response_length?+=?n;// 調(diào)用input_filter處理已經(jīng)讀到的包體信息if?(u->input_filter(u->input_filter_ctx,?n)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}}......// 設置處理上游響應包體的回調(diào)函數(shù)u->read_event_handler?=?ngx_http_upstream_process_body_in_memory;// 開始處理包體的信息ngx_http_upstream_process_body_in_memory(r,?u);
}
下面繼續(xù)分析一下,不用upstream直接轉(zhuǎn)發(fā)響應時的具體處理流程,主要是上面subrequest_memory為1的場景,此時該請求屬于一個子請求。
我們看一下上面分析時提到的默認的input_filter的處理方法,在上面的分析中,如果讀取包頭時同時讀到了包體信息,會調(diào)用input_filter方法處理:
點擊(此處)折疊或打開
static ngx_int_t?ngx_http_upstream_non_buffered_filter(void?*data,?ssize_t bytes)
{// data指向了請求的ngx_http_request_t結(jié)構,前面函數(shù)中當沒有定義input_filter時,對input_filter_ctx進行了重新初始化,指向了ngx_http_request_tngx_http_request_t?*r?=?data;......u?=?r->upstream;// 遍歷out_bufs使ll指向最后一個緩沖區(qū)->next的地址for?(cl?=?u->out_bufs,?ll?=?&u->out_bufs;?cl;?cl?=?cl->next)?{ll?=?&cl->next;}// 申請新的緩沖區(qū)cl?=?ngx_chain_get_free_buf(r->pool,?&u->free_bufs);if?(cl?==?NULL)?{return NGX_ERROR;}// 將新申請的緩沖區(qū)掛在out_bufs的鏈表末尾*ll?=?cl;......// buffer為接收上游響應包體的緩沖區(qū)b?=?&u->buffer;// b->last在調(diào)用該函數(shù)時,已經(jīng)指向了接收到的包體的首地址,cl->buf->pos指向首地址后,將b->last和cl->buf->last設置為保存包體尾部cl->buf->pos?=?b->last;b->last?+=?bytes;cl->buf->last?=?b->last;cl->buf->tag?=?u->output.tag;// 如果沒有設置包體長度,則到此可以結(jié)束了if?(u->length?==?-1)?{return NGX_OK;}// 計算還需要接收的包體的長度u->length?-=?bytes;return NGX_OK;
}
繼續(xù)向下分析,process_header調(diào)用input_filter處理完包體后,最后調(diào)用的函數(shù)時ngx_http_upstream_process_body_in_memory,
該函數(shù)實際上會接收上游服務器的包體內(nèi)容,下面看一下具體實現(xiàn)。
點擊(此處)折疊或打開
static void?ngx_http_upstream_process_body_in_memory(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 獲取到上游的連接信息c?=?u->peer.connection;// 獲取該連接的讀事件,判斷是否發(fā)生了讀事件的超時,如果超時,則直接結(jié)束連接rev?=?c->read;if?(rev->timedout)?{ngx_connection_error(c,?NGX_ETIMEDOUT,?"upstream timed out");ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_GATEWAY_TIME_OUT);return;}// buffer為存儲上游響應包體的緩沖區(qū)b?=?&u->buffer;for?(?;;?)?{// 計算剩余空閑緩沖區(qū)的大小size?=?b->end?-?b->last;......// 如果還有空閑的空間,調(diào)用recv方法繼續(xù)讀取響應n?=?c->recv(c,?b->last,?size);// 此處NGX_AGAIN代表需要等待下一次的讀事件if?(n?==?NGX_AGAIN)?{break;}// 如果上游主動關閉連接,或者讀取出現(xiàn)錯誤,則直接關閉連接? ??if?(n?==?0?||?n?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?n);return;}// 更新讀到的響應包體的長度u->state->response_length?+=?n;// 處理讀到的包體內(nèi)容if?(u->input_filter(u->input_filter_ctx,?n)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}if?(!rev->ready)?{break;}}// 如果包體長度沒有設置,則可以直接結(jié)束請求了if?(u->length?==?0)?{ngx_http_upstream_finalize_request(r,?u,?0);return;}// 將讀事件增加到Epoll中if?(ngx_handle_read_event(rev,?0)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 將讀事件同時添加到定時器中,超時時間為配置的read_timeout,避免長時間等待if?(rev->active)?{ngx_add_timer(rev,?u->conf->read_timeout);}?else?if?(rev->timer_set)?{ngx_del_timer(rev);}
}
上面流程很容易看出一個問題,那就是讀取響應頭的Buffer的空間可能不足,導致處理出現(xiàn)問題。使用時關鍵還在于Input_filter方法中對buffer的管理。
分析完不轉(zhuǎn)發(fā)響應的過程后,繼續(xù)看一下轉(zhuǎn)發(fā)響應的兩種實現(xiàn)方式,下游網(wǎng)速優(yōu)先和上游網(wǎng)速優(yōu)先的實現(xiàn)。由于上游網(wǎng)速優(yōu)先的方式,實現(xiàn)較為復雜,下面先看一下下游網(wǎng)速優(yōu)先的方式,即采用固定的內(nèi)存大小,作為響應的緩沖區(qū)。代碼上也刪減不必要的邏輯。
下游網(wǎng)速優(yōu)先
點擊(此處)折疊或打開
static void
ngx_http_upstream_send_response(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{......// 向下游的客戶端發(fā)送響應頭部,前面process_header處理時先將響應頭部設置到了headers_in中,然后upstream_process_headers將headers_in中的//?頭部設置到headers_out中,ngx_http_send_header就是將headers_out中的http包頭發(fā)送給客戶端rc?=?ngx_http_send_header(r);......// 設置頭部已發(fā)送的標志u->header_sent?=?1;......// 如果早期的請求攜帶了包體信息,且用到了臨時文件,則先清理臨時文件,因為已經(jīng)收到響應了,請求的臨時文件肯定用不到了if?(r->request_body && r->request_body->temp_file)?{ngx_pool_run_cleanup_file(r->pool,?r->request_body->temp_file->file.fd);r->request_body->temp_file->file.fd?=?NGX_INVALID_FILE;}......// buffering為1代表上游網(wǎng)速優(yōu)先,為0代表下游網(wǎng)速優(yōu)先if?(!u->buffering)?{// 看一下用戶有沒有設置input_filter,沒有的話使用默認的input_filter函數(shù)if?(u->input_filter?==?NULL)?{u->input_filter_init?=?ngx_http_upstream_non_buffered_filter_init;u->input_filter?=?ngx_http_upstream_non_buffered_filter;u->input_filter_ctx?=?r;}// 設置接收上游響應的回調(diào)函數(shù)u->read_event_handler?=?ngx_http_upstream_process_non_buffered_upstream;// 設置向下游客戶端發(fā)送報文的回調(diào)函數(shù)r->write_event_handler?=?ngx_http_upstream_process_non_buffered_downstream;r->limit_rate?=?0;// 為input_filter處理包體做初始化的準備函數(shù),默認實現(xiàn)是空的if?(u->input_filter_init(u->input_filter_ctx)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}......// 看一下解析完包頭后,是否還有未解析的包體信息,如果存在包體,則先處理一次包體,和前面分析不轉(zhuǎn)發(fā)響應的邏輯是一樣的n?=?u->buffer.last?-?u->buffer.pos;if?(n)?{// last指向代處理的包體的起始地址,更新response_length,調(diào)用input_filter處理當前的包體u->buffer.last?=?u->buffer.pos;u->state->response_length?+=?n;if?(u->input_filter(u->input_filter_ctx,?n)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 調(diào)用downstream將本次的包體轉(zhuǎn)發(fā)給客戶端ngx_http_upstream_process_non_buffered_downstream(r);}?else?{// 清空buff,實際上是將pos和last指針復位u->buffer.pos?=?u->buffer.start;u->buffer.last?=?u->buffer.start;// Todo....if?(ngx_http_send_special(r,?NGX_HTTP_FLUSH)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 如果連接上的讀事件已經(jīng)準備好或者響應頭部沒有指定包體長度時,直接調(diào)用downstream方法處理響應if?(u->peer.connection->read->ready?||?u->length?==?0)?{ngx_http_upstream_process_non_buffered_upstream(r,?u);}}// 將控制權轉(zhuǎn)交給Nginx框架return;}......
}
ngx_http_upstream_process_non_buffered_downstream函數(shù),用于處理上游服務器響應的讀事件
點擊(此處)折疊或打開
static void?ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t?*r)
{// 獲取與上游服務器的連接信息,和寫事件信息c?=?r->connection;u?=?r->upstream;wev?=?c->write;......// 如果出現(xiàn)寫事件超時,則設置超時標簽,同時終止連接if?(wev->timedout)?{c->timedout?=?1;ngx_connection_error(c,?NGX_ETIMEDOUT,?"client timed out");ngx_http_upstream_finalize_request(r,?u,?NGX_HTTP_REQUEST_TIME_OUT);return;}// non_buffered即固定內(nèi)存,用固定內(nèi)存處理轉(zhuǎn)發(fā)響應,其中第二個參數(shù)是個標簽,為1時代表向下游發(fā)送響應,為0時代表讀取上游的響應ngx_http_upstream_process_non_buffered_request(r,?1);
}
下面繼續(xù)分析一下ngx_http_upstream_process_non_buffered_request
點擊(此處)折疊或打開
static void
ngx_http_upstream_process_non_buffered_request(ngx_http_request_t?*r,?ngx_uint_t do_write)
{// 獲取上游和現(xiàn)有的連接信息,記錄為downstream和upstreamu?=?r->upstream;downstream?=?r->connection;upstream?=?u->peer.connection;b?=?&u->buffer;// 判斷是否向下游寫,do_write是調(diào)用方設置的,而u->length表示還需要接收的上游響應的長度,為0則代表不需要繼續(xù)接收do_write?=?do_write?||?u->length?==?0;for?(?;;?)?{// 判斷是否需要向客戶端寫數(shù)據(jù)if?(do_write)?{// out_bufs中記錄的是需要向下游寫的數(shù)據(jù),而busy_bufs用于記錄當out_bufs無法一次發(fā)完時指向out_bufs,從而將out_bufs置空if?(u->out_bufs?||?u->busy_bufs)?{// 向下游發(fā)送out_bufs指向的內(nèi)容,busy_bufs中記錄的是上一次的out_bufs的內(nèi)容,現(xiàn)在已經(jīng)合并當前的out_bufs中了rc?=?ngx_http_output_filter(r,?u->out_bufs);// 回收out_bufs上已經(jīng)發(fā)送的buf,將未發(fā)送完的buf設置到busy_buf上,清空out_bufsngx_chain_update_chains(r->pool,?&u->free_bufs,?&u->busy_bufs,?&u->out_bufs,?u->output.tag);}// 當busy_bufs為空時,說明當前沒有需要發(fā)送到客戶端的內(nèi)容了if?(u->busy_bufs?==?NULL)?{......// 將pos和last重新置位b->pos?=?b->start;b->last?=?b->start;}}// 計算一下buffer的可用空間size?=?b->end?-?b->last;// 繼續(xù)讀取響應if?(size?&& upstream->read->ready)?{n?=?upstream->recv(upstream,?b->last,?size);// NGX_AGAIN代表需要等待下一次讀事件if?(n?==?NGX_AGAIN)?{break;}// n > 0表示讀到的n字節(jié)的正常響應包體if?(n?>?0)?{// 更新response_lengthu->state->response_length?+=?n;// 調(diào)用input_filter處理包體if?(u->input_filter(u->input_filter_ctx,?n)?==?NGX_ERROR)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}}// 設置do_write標簽,標識已經(jīng)讀到新的響應包體,需要向客戶端轉(zhuǎn)發(fā)數(shù)據(jù)do_write?=?1;continue;}break;}......if?(downstream->data?==?r)?{// 將下游的寫事件添加到epoll中if?(ngx_handle_write_event(downstream->write,?clcf->send_lowat)?!=?NGX_OK)......}// 同時設置超時定時器,控制等待時間,超時時間為配置的send_timeoutif?(downstream->write->active &&?!downstream->write->ready)?{ngx_add_timer(downstream->write,?clcf->send_timeout);}?......// 將連接上游的讀事件也添加到epoll中去if?(ngx_handle_read_event(upstream->read,?0)?!=?NGX_OK)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 設置讀定時器,控制等待時間,超時時間為配置的read_timeoutif?(upstream->read->active &&?!upstream->read->ready)?{ngx_add_timer(upstream->read,?u->conf->read_timeout);}......
}
上游網(wǎng)速優(yōu)先
上游網(wǎng)速優(yōu)先的實現(xiàn)比較復雜,目前官方攜帶的ngx_http_proxy_module即采用了上游網(wǎng)速優(yōu)先的實現(xiàn)方式。當ngx_http_upstream_conf_t中的Buffering設置
為1時,則說明需要使用上游網(wǎng)速優(yōu)先的方式。此時需要用ngx_event_pipe_t結(jié)構,這個結(jié)構維護著上下游間轉(zhuǎn)發(fā)的響應包體,用于解決內(nèi)存復制的問題。
點擊(此處)折疊或打開
typedef?struct?ngx_event_pipe_s ngx_event_pipe_t;struct?ngx_event_pipe_s {ngx_connection_t?*upstream;????????????????????? ??// 與上游服務器間的連接ngx_connection_t?*downstream;????????????????? ? ??// 與下游客戶端間的連接????????????????????????ngx_chain_t?*free_raw_bufs;????????????????????????// 用于接收上游服務器響應的緩沖區(qū)鏈表,新收到的響應向鏈表頭部插入ngx_chain_t?*in;???????????????????????????????????// 接收到上游響應的緩沖區(qū),ngx_event_pipe_copy_input_filter將buffer中的數(shù)據(jù)設置到in中ngx_chain_t?**last_in;????????????????????????? ? ?// 指向剛剛接收到的緩沖區(qū)ngx_chain_t?*out;????????????????????????????????? //?將要發(fā)給客戶端的緩沖區(qū)鏈表,ngx_chain_t?*free;????????????????????????????? ? ?//?等待釋放的緩沖區(qū)ngx_chain_t?*busy;????????????????????????????? ???//?表示上次發(fā)送響應時未發(fā)完的緩沖區(qū)鏈表,下一次發(fā)送時會合并到out鏈表中/**?the input filter i.e.?that moves HTTP/1.1 chunks*?from the raw bufs to an incoming chain*/ngx_event_pipe_input_filter_pt input_filter;? ? ? ?//?處理接收到的來自上游服務器的緩沖區(qū),接收響應的處理方法void?*input_ctx;????????????????????????????????? ?//?input_filter函數(shù)的參數(shù),通常設置為ngx_http_request_tngx_event_pipe_output_filter_pt output_filter; ? ??//?向下游發(fā)送響應的方法,默認為ngx_http_output_filtervoid?*output_ctx;??????????????????????????????????// output_filter函數(shù)的參數(shù),通常設置為ngx_http_request_tunsigned read:1;????????????????????????????????? ?// 為1表示當前已經(jīng)讀到來自上游的響應unsigned cacheable:1;????????????????????????? ? ??//?為1時表示啟用文件緩存unsigned single_buf:1;????????????????????????? ? ?//?為1時表示接收上游的響應時一次只能接收一個ngx_buf_t緩沖區(qū)unsigned free_bufs:1; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 為1時表示當不再接收上游的響應包體時,盡可能快的釋放緩沖區(qū)unsigned upstream_done:1;??????????????????????????//?input_filter中用到的標識位,表示Nginx與上游間的交互已經(jīng)結(jié)束unsigned upstream_error:1;????????????????????? ? ?//?與上游連接出現(xiàn)錯誤時,將該標識為置為1,比如超時,解析錯誤等unsigned upstream_eof:1;???????????????????????????//?與上游的連接已經(jīng)關閉時,該標志位置為1unsigned upstream_blocked:1;????????????????????? ?//?表示暫時阻塞讀取上游響應的流程,先發(fā)送響應,再用釋放的緩沖區(qū)接收響應unsigned downstream_done:1;????????????????????????//?為1時表示與下游的交互已經(jīng)結(jié)束unsigned downstream_error:1; ? ? ? ? ? ? ? ? ? ? ??//?與下游連接出現(xiàn)錯誤時,設置為1unsigned cyclic_temp_file:1;????????????????????? ?//?為1時會試圖復用臨時文件中曾用過的空間ngx_int_t allocated; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??//?表示已經(jīng)分配的緩沖區(qū)的數(shù)目,其受bufs.num成員的限制ngx_bufs_t bufs;????????????????????????????????? ?//?記錄了接收上游響應的內(nèi)存緩沖區(qū)的大小,bufs.size記錄每個緩沖區(qū)大小,bufs.num記錄緩沖區(qū)個數(shù)ngx_buf_tag_t tag;????????????????????????????? ? ?//?用于設置、比較緩沖區(qū)鏈表中ngx_buf_t結(jié)構體的tag標志位ssize_t busy_size;off_t read_length;?????????????????????????????????//?已經(jīng)接收到上游響應包體長度off_t?length;????????????????????????????????? ? ??// 表示臨時文件的最大長度off_t max_temp_file_size;??????????????????????????//?表示臨時文件的最大長度ssize_t temp_file_write_size;????????????????????? //?表示一次寫入文件時的最大長度ngx_msec_t read_timeout;????????????????????????? ?//?讀取上游響應的超時時間ngx_msec_t send_timeout;????????????????????????? ?//?向下游發(fā)送響應的超時時間ssize_t send_lowat;????????????????????????????????//?向下游發(fā)送響應時,TCP連接中設置的參數(shù)ngx_pool_t?*pool;????????????????????????????? ? ??//?用于分配內(nèi)存緩沖區(qū)的連接池對象ngx_log_t?*log;????????????????????????????????????//?用于記錄日志的ngx_log_t對象ngx_chain_t?*preread_bufs;????????????????????? ???// 表示接收上游服務器響應頭部的階段,已經(jīng)讀到的響應包體size_t preread_size;???????????????????????????????// 表示接收上游服務器響應頭部的階段,已經(jīng)讀到的響應包體長度ngx_buf_t?*buf_to_file;????????????????????? ? ? ? //?size_t limit_rate;?????????????????????????????????// 發(fā)送速率的限制time_t start_sec;??????????????????????????????????// 連接的啟動時間ngx_temp_file_t?*temp_file;????????????????????? ??// 存放上游響應的臨時文件/*?STUB?*/?int?num;????????????????????????????????// 已經(jīng)使用的ngx_buf_t的數(shù)目
}
不管上游網(wǎng)速優(yōu)先還是下游網(wǎng)速優(yōu)先,響應的轉(zhuǎn)發(fā)都是通過ngx_http_upstream_send_response函數(shù)進行的。前面分析過下游網(wǎng)速優(yōu)先的部分流程,
下面再繼續(xù)分析一下剩下的部分
點擊(此處)折疊或打開
static void?ngx_http_upstream_send_response(ngx_http_request_t?*r,?ngx_http_upstream_t?*u)
{//?發(fā)送設置到r->headers_out中的響應頭部rc?=?ngx_http_send_header(r);......// 如果客戶端的請求攜帶了包體,且包體已經(jīng)保存到了臨時文件中,則清理臨時文件,前面分析過了if?(r->request_body && r->request_body->temp_file)?{ngx_pool_run_cleanup_file(r->pool,?r->request_body->temp_file->file.fd);r->request_body->temp_file->file.fd?=?NGX_INVALID_FILE;}......// buffering為1時走的是上游網(wǎng)速優(yōu)先的流程,為0時走的是下游網(wǎng)速優(yōu)先的流程if?(!u->buffering)?{......return ;}/*?TODO:?preallocate event_pipe bufs,?look?"Content-Length"?*/// pipe的內(nèi)存在upstream啟動時已經(jīng)分配了,這里直接使用,對pipe進行初始化p?=?u->pipe;// 設置向下游發(fā)送響應的方法p->output_filter?=?(ngx_event_pipe_output_filter_pt)?ngx_http_output_filter;// 將pipe的output_ctx指向ngx_http_request_t結(jié)構,后續(xù)傳入的參數(shù)都是pipe,通過pipe->output_ctx找到ngx_http_request_tp->output_ctx?=?r;// 設置轉(zhuǎn)發(fā)響應時啟用的每個緩沖區(qū)的tag標志位p->tag?=?u->output.tag;// bufs指定了內(nèi)存緩沖區(qū)的限制p->bufs?=?u->conf->bufs;// 設置busy緩沖區(qū)中待發(fā)送的響應長度觸發(fā)值p->busy_size?=?u->conf->busy_buffers_size;// upstream指向nginx與上游服務器的連接p->upstream?=?u->peer.connection;// downstream指向nginx與客戶端之間的連接p->downstream?=?c;// 初始化用于分配內(nèi)存緩沖區(qū)的內(nèi)存池p->pool?=?r->pool;// 初始化用于記錄日志的log成員p->log?=?c->log;// 初始化速率閥值p->limit_rate?=?u->conf->limit_rate;// 記錄當前的時間p->start_sec?=?ngx_time();// 記錄是否進行文件緩存p->cacheable?=?u->cacheable?||?u->store;// 申請臨時文件結(jié)構p->temp_file?=?ngx_pcalloc(r->pool,?sizeof(ngx_temp_file_t));if?(p->temp_file?==?NULL)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 初始化臨時文件的結(jié)構信息p->temp_file->file.fd?=?NGX_INVALID_FILE;p->temp_file->file.log?=?c->log;p->temp_file->path?=?u->conf->temp_path;p->temp_file->pool?=?r->pool;......// 設置臨時存放上游響應的單個緩存文件的最大長度p->max_temp_file_size?=?u->conf->max_temp_file_size;// 設置一次寫入臨時文件時寫入的最大長度p->temp_file_write_size?=?u->conf->temp_file_write_size;// 申請預讀緩沖區(qū)鏈表,該鏈表的緩沖區(qū)不會分配內(nèi)存來存放上游的響應內(nèi)容,而用ngx_buf_t指向?qū)嶋H存放包體的內(nèi)容p->preread_bufs?=?ngx_alloc_chain_link(r->pool);if?(p->preread_bufs?==?NULL)?{ngx_http_upstream_finalize_request(r,?u,?NGX_ERROR);return;}// 初始化預讀緩沖區(qū)的鏈表,(預讀是在讀取包頭時,同時讀到了包體的情況)p->preread_bufs->buf?=?&u->buffer;p->preread_bufs->next?=?NULL;u->buffer.recycled?=?1;p->preread_size?=?u->buffer.last?-?u->buffer.pos;.......// 設置讀取上游服務器響應的超時時間p->read_timeout?=?u->conf->read_timeout;// 設置發(fā)送到下游客戶端的超時時間p->send_timeout?=?clcf->send_timeout;// 設置向客戶端發(fā)送響應時TCP的send_lowat選項p->send_lowat?=?clcf->send_lowat;.......// 設置處理上游讀事件的回調(diào)u->read_event_handler?=?ngx_http_upstream_process_upstream;// 設置處理下游寫事件的回調(diào)r->write_event_handler?=?ngx_http_upstream_process_downstream;// 處理上游發(fā)來的響應包體ngx_http_upstream_process_upstream(r,?u);
}
不管是讀取上游的響應事件process_upstream,還是向客戶端寫數(shù)據(jù)的process_downstream,最終都是通過ngx_event_pipe實現(xiàn)緩存轉(zhuǎn)發(fā)響應的。
下面來看一下ngx_event_pipe的具體實現(xiàn):
點擊(此處)折疊或打開
ngx_int_t?ngx_event_pipe(ngx_event_pipe_t?*p,?ngx_int_t do_write)
{// do_write為1表示需要向下游客戶端發(fā)送響應,為0表示需要從上游客戶端接收響應for?(?;;?)?{if?(do_write)?{// do_write為1,向下游發(fā)送響應包體,并檢查其返回值rc?=?ngx_event_pipe_write_to_downstream(p);......// 返回NGX_OK時繼續(xù)讀取上游的響應事件,返回其他值需要終止ngx_event_pipe函數(shù)}......// 從上游讀取響應數(shù)據(jù)if?(ngx_event_pipe_read_upstream(p)?==?NGX_ABORT)?{return NGX_ABORT;}// 當沒有讀取到響應數(shù)據(jù),并且也不需要暫停讀取響應的讀取時,跳出當前循環(huán),即不對do_write進行設置if?(!p->read &&?!p->upstream_blocked)?{break;}// 當讀到的響應數(shù)據(jù),或者需要暫停讀取數(shù)據(jù),先給客戶端發(fā)送響應以釋放緩沖區(qū)時,設置do_write進行響應的發(fā)送do_write?=?1;}if?(p->upstream->fd?!=?(ngx_socket_t)?-1)?{// 將上游讀事件添加到epoll中if?(ngx_handle_read_event(rev,?flags)?!=?NGX_OK)?{return NGX_ABORT;}// 同時設置讀事件的超時定時器if?(!rev->delayed)?{if?(rev->active &&?!rev->ready)?{ngx_add_timer(rev,?p->read_timeout);......}// 將下游的寫事件添加到epoll中,并且設置寫事件的定時器if?(p->downstream->fd?!=?(ngx_socket_t)?-1?&& p->downstream->data?==?p->output_ctx){wev?=?p->downstream->write;if?(ngx_handle_write_event(wev,?p->send_lowat)?!=?NGX_OK)?{return NGX_ABORT;}if?(!wev->delayed)?{if?(wev->active &&?!wev->ready)?{ngx_add_timer(wev,?p->send_timeout);.......}return NGX_OK;
}
上面函數(shù)中提到的ngx_event_pipe_read_upstream用于接收上游的響應,下面來具體看一下:
點擊(此處)折疊或打開
static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t?*p)
{......for?(?;;?)?{// 檢查上游連接是否結(jié)束,如果已經(jīng)結(jié)束,不再接收新的響應,跳出循環(huán)if?(p->upstream_eof?||?p->upstream_error?||?p->upstream_done)?{break;}// 如果preread_bufs為NULL代表讀包頭時沒有讀到包體信息或者已經(jīng)處理完成,ready為0表示沒有上游響應可以接收,跳出循環(huán)if?(p->preread_bufs?==?NULL &&?!p->upstream->read->ready)?{break;}// preread_bufs存放著接收包頭時可能讀取到的包體信息,如果不為空,則先要優(yōu)先處理這部分包體信息if?(p->preread_bufs)?{chain?=?p->preread_bufs;// 用chain保存待處理的緩沖區(qū),重置preread_bufs,下次循環(huán)則不會再走到該邏輯p->preread_bufs?=?NULL;n?=?p->preread_size;// 有待處理的包體信息,將read設置為1,表示接收到的包體待處理if?(n)?{p->read?=?1;}}?else?{.......}?else?{limit?=?0;}// free_raw_bufs用于表示一次ngx_event_pipe_read_upstream方法調(diào)用過程中接收到的上游響應if?(p->free_raw_bufs)?{chain?=?p->free_raw_bufs;if?(p->single_buf)?{p->free_raw_bufs?=?p->free_raw_bufs->next;chain->next?=?NULL;}?else?{p->free_raw_bufs?=?NULL;}// 判斷當前已分配的緩沖區(qū)的數(shù)量是否超過了bufs.num,沒有超過時可以繼續(xù)分配}?else?if?(p->allocated?<?p->bufs.num)?{b?=?ngx_create_temp_buf(p->pool,?p->bufs.size);if?(b?==?NULL)?{return NGX_ABORT;}p->allocated++;chain?=?ngx_alloc_chain_link(p->pool);if?(chain?==?NULL)?{return NGX_ABORT;}chain->buf?=?b;chain->next?=?NULL;// 緩沖區(qū)已經(jīng)達到上限,如果寫事件的ready為1時表示可以向下游發(fā)送響應,而delay為0代表并不是由于限速的原因?qū)е聦懯录途w// 當ready為1,且delay為0時,可以向下游發(fā)送響應來釋放緩沖區(qū)了}?else?if?(!p->cacheable&& p->downstream->data?==?p->output_ctx&& p->downstream->write->ready&&?!p->downstream->write->delayed){p->upstream_blocked?=?1;break;// offset表示臨時文件中已經(jīng)寫入的響應內(nèi)容的長度,檢查是否達到了配置的上限,當達到上限時,暫時不再接收上游響應// 沒有達到上限時,調(diào)用下面write方法將響應寫入臨時文件中}?else?if?(p->cacheable||?p->temp_file->offset?<?p->max_temp_file_size){/**?if?it is allowed,?then save some bufs from p->in*?to a temporary file,?and?add?them to a p->out?chain*/// 該函數(shù)將in緩沖區(qū)鏈表中的內(nèi)容寫入temp_file臨時文件中,再將寫入臨時文件的ngx_buf_t緩沖區(qū)由in緩沖區(qū)鏈表中移出,添加到out緩沖區(qū)鏈表中rc?=?ngx_event_pipe_write_chain_to_temp_file(p);......// 調(diào)用recv_chain接收上游的響應n?=?p->upstream->recv_chain(p->upstream,?chain,?limit);// 將新接收到的緩沖區(qū)放置到free_raw_bufs鏈表的最后if?(p->free_raw_bufs)?{chain->next?=?p->free_raw_bufs;}......while?(cl?&& n?>?0)?{// 從接收到的緩沖區(qū)鏈表中取出一塊緩沖區(qū),將其shadow域釋放掉ngx_event_pipe_remove_shadow_links(cl->buf);// 檢查當前收到的包體長度是否小于緩沖區(qū)的大小,小于時當前緩沖區(qū)可以繼續(xù)接收響應包體,否則緩沖區(qū)已滿,需要調(diào)用input_filter函數(shù)處理size?=?cl->buf->end?-?cl->buf->last;if?(n?>=?size)?{// 當前緩沖區(qū)已滿,需要處理,下面的input_filter方法是ngx_event_pipe_copy_input_filter函數(shù),其主要在in鏈表中增加這個緩沖區(qū)cl->buf->last?=?cl->buf->end;if?(p->input_filter(p,?cl->buf)?==?NGX_ERROR)?{return NGX_ABORT;}// 更新待處理的包體的長度,釋放已經(jīng)處理的緩沖區(qū)n?-=?size;ln?=?cl;cl?=?cl->next;ngx_free_chain(p->pool,?ln);}?else?{// 緩沖區(qū)沒有滿,更新last位置,n可以設置為0了,因為last的位置已經(jīng)包含當前讀到的包體信息cl->buf->last?+=?n;n?=?0;}}// 走到這里時cl的鏈表中一定有緩沖區(qū)沒有用滿(最后一個?),此時cl不為NULL;或者cl的所有緩沖區(qū)都已經(jīng)被處理回收了,此時cl為NULLif?(cl)?{for?(ln?=?cl;?ln->next;?ln?=?ln->next)?{ /*?void?*/?}// 此時的p->free_raw_bufs已經(jīng)為NULL了,將p->free_raw_bufs指向當前待處理的緩沖區(qū)鏈表ln->next?=?p->free_raw_bufs;p->free_raw_bufs?=?cl;}......}......// upstream_eof為1時表示上游服務器關閉了連接,upstream_error表示處理過程中出現(xiàn)了錯誤,而free_raw_bufs不為空代表還有需要處理的包體信息if?((p->upstream_eof?||?p->upstream_error)?&& p->free_raw_bufs)?{// 調(diào)用input_filter處理剩余的包體信息if?(p->input_filter(p,?p->free_raw_bufs->buf)?==?NGX_ERROR)?{return NGX_ABORT;}p->free_raw_bufs?=?p->free_raw_bufs->next;// free_bufs為1時代表需要盡快釋放緩沖區(qū)中用到內(nèi)存,此時應該調(diào)用ngx_pfree盡快釋放shadow域為空的緩沖區(qū)if?(p->free_bufs && p->buf_to_file?==?NULL)?{for?(cl?=?p->free_raw_bufs;?cl;?cl?=?cl->next)?{if?(cl->buf->shadow?==?NULL)?{ngx_pfree(p->pool,?cl->buf->start);......}.......
}
看完接收響應的處理過程,再來看一下發(fā)送響應的處理流程,對應的函數(shù)是ngx_event_pipe_write_to_downstream
點擊(此處)折疊或打開
static ngx_int_t?ngx_event_pipe_write_to_downstream(ngx_event_pipe_t?*p)
{......for?(?;;?)?{if?(p->downstream_error)?{return ngx_event_pipe_drain_chains(p);}// 檢查與上游的連接是否結(jié)束if?(p->upstream_eof?||?p->upstream_error?||?p->upstream_done)?{// 發(fā)送out鏈表中的緩沖區(qū)給客戶端if?(p->out)?{for?(cl?=?p->out;?cl;?cl?=?cl->next)?{cl->buf->recycled?=?0;}rc?=?p->output_filter(p->output_ctx,?p->out);......}// 發(fā)送in鏈表中的緩沖區(qū)給客戶端if?(p->in)?{for?(cl?=?p->in;?cl;?cl?=?cl->next)?{cl->buf->recycled?=?0;}rc?=?p->output_filter(p->output_ctx,?p->in);......}// 標識需要向下游發(fā)送的響應已經(jīng)完成p->downstream_done?=?1;break;}......// 計算busy緩沖區(qū)中待發(fā)送的響應長度for?(cl?=?p->busy;?cl;?cl?=?cl->next)?{if?(cl->buf->recycled)?{......bsize?+=?cl->buf->end?-?cl->buf->start;prev?=?cl->buf->start;}}......// 檢查是否超過了busy_size的配置,當超過配置值時跳轉(zhuǎn)至flush處檢查和發(fā)送out緩沖區(qū)if?(bsize?>=?(size_t)?p->busy_size)?{flush?=?1;goto?flush;}......for?(?;;?)?{// 先檢查out鏈表是否為NULL,不為空則先發(fā)送out鏈表的緩沖區(qū)if?(p->out)?{cl?=?p->out;p->out?=?p->out->next;// 當out鏈表中的數(shù)據(jù)被處理完成后,開始處理in鏈表中的數(shù)據(jù)}?else?if?(!p->cacheable && p->in)?{cl?=?p->in;.....}?else?{break;}cl->next?=?NULL;if?(out)?{*ll?=?cl;}?else?{out?=?cl;}ll?=?&cl->next;}flush:......// 發(fā)送響應給客戶端rc?=?p->output_filter(p->output_ctx,?out);// 更新free、busy和out緩沖區(qū)ngx_chain_update_chains(p->pool,?&p->free,?&p->busy,?&out,?p->tag);......// 遍歷free鏈表中的緩沖區(qū),釋放緩沖區(qū)中shadow域for?(cl?=?p->free;?cl;?cl?=?cl->next)?{......if?(cl->buf->last_shadow)?{if?(ngx_event_pipe_add_free_buf(p,?cl->buf->shadow)?!=?NGX_OK)?{return NGX_ABORT;}cl->buf->last_shadow?=?0;}cl->buf->shadow?=?NULL;}}return NGX_OK;
}
終于快要結(jié)束了,upstream的流程還是比較復雜的,最后看一下結(jié)束upstream的請求
結(jié)束upstream的請求
upstream請求的結(jié)束的流程,有三個函數(shù)可以進來,ngx_http_upstream_finalize_request、ngx_http_upstream_cleanup、ngx_http_upstream_next。
其中cleanup和next真正終止upstream時還是會調(diào)用到finalize_request函數(shù)。ngx_http_upstream_cleanup函數(shù)在啟動upstream時,會掛在到請求的cleanup
鏈表中,當HTTP框架結(jié)束http請求時一定會調(diào)用到upstream_cleanup函數(shù)。
點擊(此處)折疊或打開
static void?ngx_http_upstream_cleanup(void?*data)
{ngx_http_request_t?*r?=?data;ngx_http_upstream_finalize_request(r,?r->upstream,?NGX_DONE);
}
可以看到upstream_cleanup的實現(xiàn),其實是直接調(diào)用了ngx_http_upstream_finalize_request,這個流程是我們期待的關閉方式。
而ngx_http_upstream_next函數(shù),是在處理請求的的流程中出現(xiàn)錯誤才會主動調(diào)用到,該函數(shù)通過重連服務器、選取新的服務器等策略來提高服務的可用性。目前
nginx的負載均衡的功能就是通過next函數(shù)來實現(xiàn)的,我們后面會進行詳細分析,這里只簡單說明一下。
點擊(此處)折疊或打開
static void?ngx_http_upstream_next(ngx_http_request_t?*r,?ngx_http_upstream_t?*u,?ngx_uint_t ft_type)
{......if?(status)?{u->state->status?=?status;timeout?=?u->conf->next_upstream_timeout;// 當tries為0時,才最終結(jié)束upstream的請求if?(u->peer.tries?==?0||?!(u->conf->next_upstream & ft_type)||?(timeout && ngx_current_msec?-?u->peer.start_time?>=?timeout)){ngx_http_upstream_finalize_request(r,?u,?status);return;}}// 由于要發(fā)起新的連接,所以需要先關閉和上游服務器的已有連接if?(u->peer.connection)?{? ? ?if?(u->peer.connection->pool)?{ngx_destroy_pool(u->peer.connection->pool);}ngx_close_connection(u->peer.connection);u->peer.connection?=?NULL;}// 重新發(fā)起連接ngx_http_upstream_connect(r,?u);
}
最后看一下ngx_http_upstream_finalize_request的具體實現(xiàn)
點擊(此處)折疊或打開
static void?ngx_http_upstream_finalize_request(ngx_http_request_t?*r,?ngx_http_upstream_t?*u,?ngx_int_t rc)
{// 將cleanup指向的清理資源回調(diào)方法設置為NULLif?(u->cleanup)?{*u->cleanup?=?NULL;u->cleanup?=?NULL;}// 釋放解析主機域名時分配的資源if?(u->resolved && u->resolved->ctx)?{ngx_resolve_name_done(u->resolved->ctx);u->resolved->ctx?=?NULL;}......// 調(diào)用http模塊實現(xiàn)的finalize_request方法u->finalize_request(r,?rc);// 釋放與上游的連接if?(u->peer.connection)?{if?(u->peer.connection->pool)?{ngx_destroy_pool(u->peer.connection->pool);}ngx_close_connection(u->peer.connection);}u->peer.connection?=?NULL;// 刪除用于緩存響應的臨時文件if?(u->store && u->pipe && u->pipe->temp_file&& u->pipe->temp_file->file.fd?!=?NGX_INVALID_FILE){if?(ngx_delete_file(u->pipe->temp_file->file.name.data)==?NGX_FILE_ERROR)......}......// 最后還是調(diào)用HTTP框架提供的方法結(jié)束請求ngx_http_finalize_request(r,?rc);
}
至此,大概的梳理了一下upstream的處理流程,后面會針對目前已經(jīng)實現(xiàn)的負載均衡各類算法,以及Nginx cache功能進行分析。。
總結(jié)
以上是生活随笔為你收集整理的Nginx upstream (一) 整体流程分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。