WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)
首發(fā)CSDN:徐同學(xué)呀,原創(chuàng)不易,轉(zhuǎn)載請(qǐng)注明源鏈接。我是徐同學(xué),用心輸出高質(zhì)量文章,希望對(duì)你有所幫助。 本篇基于Tomcat10.0.6。建議收藏起來(lái)慢慢看。
文章目錄
- 一、前言
- 二、什么是WebSocket
- 1、HTTP/1.1的缺陷
- 2、WebSocket發(fā)展歷史
- (1)背景
- (2)歷史
- 3、WebSocket握手和雙向通信
- (1)定義
- (2)握手(建立連接)
- (3)消息幀
- (4)揮手(關(guān)閉連接)
- 4、WebSocket優(yōu)點(diǎn)
- 三、Java API for WebSocket(JSR356)
- 1、服務(wù)端API
- (1)注解方式@ServerEndpoint
- (2)繼承抽象類(lèi)Endpoint
- 2、客戶(hù)端API
- 3、上下文Session
- 4、HandshakeRequest 和 HandshakeResponse
- (1)HandshakeRequest
- (2)HandshakeResponse
- 5、WebSocketContainer
- 四、WebSocket基于Tomcat應(yīng)用
- 1、服務(wù)器端實(shí)現(xiàn)
- (1)@ServerEndpoint注解方式
- (2)繼承抽象類(lèi)Endpoint方式
- (3)早期Tomcat7中Server端實(shí)現(xiàn)對(duì)比
- 2、客戶(hù)端實(shí)現(xiàn)
- (1)前端js版
- (2)@ClientEndpoint注解方式
- (3)繼承抽象類(lèi)Endpoint方式
- 3、基于Nginx反向代理注意事項(xiàng)
- 五、WebSocket在Tomcat中的源碼實(shí)現(xiàn)
- 1、WsSci初始化
- (1)WsSci#onStartup
- (2)WsServerContainer#addEndpoint
- (3)PojoMethodMapping方法映射和形參解析
- 2、協(xié)議升級(jí)(握手)
- (1)WsFilter
- (2)UpgradeUtil#doUpgrade
- (3)Request#upgrade
- (4)回調(diào)機(jī)制ActionHook#action
- (5)ConnectionHandler#process
- (6)WsHttpUpgradeHandler#init握手成功
- 3、數(shù)據(jù)傳輸和解析
- (1)接收客戶(hù)端消息
- (2)發(fā)送消息給客戶(hù)端
- 六、要點(diǎn)回顧
- 七、參考文獻(xiàn)
一、前言
WebSocket是一種全雙工通信協(xié)議,即客戶(hù)端可以向服務(wù)端發(fā)送請(qǐng)求,服務(wù)端也可以主動(dòng)向客戶(hù)端推送數(shù)據(jù)。這樣的特點(diǎn),使得它在一些實(shí)時(shí)性要求比較高的場(chǎng)景效果斐然(比如微信朋友圈實(shí)時(shí)通知、在線協(xié)同編輯等)。主流瀏覽器以及一些常見(jiàn)服務(wù)端通信框架(Tomcat、netty、undertow、webLogic等)都對(duì)WebSocket進(jìn)行了技術(shù)支持。那么,WebSocket具體是什么?為什么會(huì)出現(xiàn)WebSocket?如何做到全雙工通信?解決了什么問(wèn)題?
二、什么是WebSocket
1、HTTP/1.1的缺陷
HTTP/1.1最初是為網(wǎng)絡(luò)中超文本資源(HTML),請(qǐng)求-響應(yīng)傳輸而設(shè)計(jì)的,后來(lái)支持了傳輸更多類(lèi)型的資源,如圖片、視頻等,但都沒(méi)有改變它單向的請(qǐng)求-響應(yīng)模式。
隨著互聯(lián)網(wǎng)的日益壯大,HTTP/1.1功能使用上已體現(xiàn)捉襟見(jiàn)肘的疲態(tài)。雖然可以通過(guò)某些方式滿(mǎn)足需求(如Ajax、Comet),但是性能上還是局限于HTTP/1.1,那么HTTP/1.1有哪些缺陷呢:
- 請(qǐng)求-響應(yīng)模式,只能客戶(hù)端發(fā)送請(qǐng)求給服務(wù)端,服務(wù)端才可以發(fā)送響應(yīng)數(shù)據(jù)給客戶(hù)端。
- 傳輸數(shù)據(jù)為文本格式,且請(qǐng)求/響應(yīng)頭部冗長(zhǎng)重復(fù)。
(為了區(qū)分HTTP/1.1和HTTP/1.2,下面描述中,HTTP均代表HTTP/1.1)
2、WebSocket發(fā)展歷史
(1)背景
在WebSocket出現(xiàn)之前,主要通過(guò)長(zhǎng)輪詢(xún)和HTTP長(zhǎng)連接實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)更新,這種方式有個(gè)統(tǒng)稱(chēng)叫Comet,Tomcat8.5之前有對(duì)Comet基于流的HTTP長(zhǎng)連接做支持,后來(lái)因?yàn)閃ebSocket的成熟和標(biāo)準(zhǔn)化,以及Comet自身依然是基于HTTP,在性能消耗和瓶頸上無(wú)法跳脫HTTP,就把Comet廢棄了。
還有一個(gè)SPDY技術(shù),也對(duì)HTTP進(jìn)行了改進(jìn),多路復(fù)用流、服務(wù)器推送等,后來(lái)演化成HTTP/2.0,因?yàn)檫m用場(chǎng)景和解決的問(wèn)題不同,暫不對(duì)HTTP/2.0做過(guò)多解釋,不過(guò)對(duì)于HTTP/2.0和WebSocket在Tomcat實(shí)現(xiàn)中都是作為協(xié)議升級(jí)來(lái)處理的。
(Comet和SPDY的原理不是本篇重點(diǎn),沒(méi)有展開(kāi)講解,感興趣的同學(xué)可自行百度)
(2)歷史
在這種背景下,HTML5制定了WebSocket
- 籌備階段,WebSocket被劃分為HTML5標(biāo)準(zhǔn)的一部分,2008年6月,Michael Carter進(jìn)行了一系列討論,最終形成了稱(chēng)為WebSocket的協(xié)議。
- 2009年12月,Google Chrome 4是第一個(gè)提供標(biāo)準(zhǔn)支持的瀏覽器,默認(rèn)情況下啟用了WebSocket。
- 2010年2月,WebSocket協(xié)議的開(kāi)發(fā)從W3C和WHATWG小組轉(zhuǎn)移到IETF(TheInternet Engineering Task Force),并在Ian Hickson的指導(dǎo)下進(jìn)行了兩次修訂。
- 2011年,IETF將WebSocket協(xié)議標(biāo)準(zhǔn)化為RFC 6455起,大多數(shù)Web瀏覽器都在實(shí)現(xiàn)支持WebSocket協(xié)議的客戶(hù)端API。此外,已經(jīng)開(kāi)發(fā)了許多實(shí)現(xiàn)WebSocket協(xié)議的Java庫(kù)。
- 2013年,發(fā)布JSR356標(biāo)準(zhǔn),Java API for WebSocket。
(為什么要去了解WebSocket的發(fā)展歷史和背景呢?個(gè)人認(rèn)為可以更好的理解某個(gè)技術(shù)實(shí)現(xiàn)的演變歷程,比如Tomcat,早期有Comet沒(méi)有WebSocket時(shí),Tomcat就對(duì)Comet做了支持,后來(lái)有WebSocket了,但是還沒(méi)出JSR356標(biāo)準(zhǔn),Tomcat就對(duì)Websocket做了支持,自定義API,再后來(lái)有了JSR356,Tomcat立馬緊跟潮流,廢棄自定義的API,實(shí)現(xiàn)JSR356那一套,這就使得在Tomcat7使用WebSocket的同學(xué),想升為T(mén)omcat8(其實(shí)Tomcat7.0.47之后就是JSR356標(biāo)準(zhǔn)了),發(fā)現(xiàn)WebSocket接入方式變了,而且一些細(xì)節(jié)也變了。)
3、WebSocket握手和雙向通信
(1)定義
WebSocket全雙工通信協(xié)議,在客戶(hù)端和服務(wù)端建立連接后,可以持續(xù)雙向通信,和HTTP同屬于應(yīng)用層協(xié)議,并且都依賴(lài)于傳輸層的TCP/IP協(xié)議。
雖然WebSocket有別于HTTP,是一種新協(xié)議,但是RFC 6455中規(guī)定:
it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries.
- WebSocket通過(guò)HTTP端口80和443進(jìn)行工作,并支持HTTP代理和中介,從而使其與HTTP協(xié)議兼容。
- 為了實(shí)現(xiàn)兼容性,WebSocket握手使用HTTP Upgrade頭從HTTP協(xié)議更改為WebSocket協(xié)議。
- Websocket使用ws或wss的統(tǒng)一資源標(biāo)志符(URI),分別對(duì)應(yīng)明文和加密連接。
(2)握手(建立連接)
在雙向通信之前,必須通過(guò)握手建立連接。Websocket通過(guò) HTTP/1.1 協(xié)議的101狀態(tài)碼進(jìn)行握手,首先客戶(hù)端(如瀏覽器)發(fā)出帶有特殊消息頭(Upgrade、Connection)的請(qǐng)求到服務(wù)器,服務(wù)器判斷是否支持升級(jí),支持則返回響應(yīng)狀態(tài)碼101,表示協(xié)議升級(jí)成功,對(duì)于WebSocket就是握手成功。
客戶(hù)端請(qǐng)求示例:
GET /test HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: tFGdnEL/5fXMS9yKwBjllg== Origin: http://example.com Sec-WebSocket-Protocol: v10.stomp, v11.stomp, v12.stomp Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits Sec-WebSocket-Version: 13- Connection必須設(shè)置Upgrade,表示客戶(hù)端希望連接升級(jí)。
- Upgrade: websocket表明協(xié)議升級(jí)為websocket。
- Sec-WebSocket-Key字段內(nèi)記錄著握手過(guò)程中必不可少的鍵值,由客戶(hù)端(瀏覽器)生成,可以盡量避免普通HTTP請(qǐng)求被誤認(rèn)為Websocket協(xié)議。
- Sec-WebSocket-Version 表示支持的Websocket版本。RFC6455要求使用的版本是13。
- Origin字段是必須的。如果缺少origin字段,WebSocket服務(wù)器需要回復(fù)HTTP 403 狀態(tài)碼(禁止訪問(wèn)),通過(guò)Origin可以做安全校驗(yàn)。
服務(wù)端響應(yīng)示例:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: HaA6EjhHRejpHyuO0yBnY4J4n3A= Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15 Sec-WebSocket-Protocol: v12.stompSec-WebSocket-Accept的字段值是由握手請(qǐng)求中的Sec-WebSocket-Key的字段值生成的。成功握手確立WebSocket連接之后,通信時(shí)不再使用HTTP的數(shù)據(jù)幀,而采用WebSocket獨(dú)立的數(shù)據(jù)幀。
(3)消息幀
WebSocket使用二進(jìn)制消息幀作為雙向通信的媒介。何為消息幀?發(fā)送方將每個(gè)應(yīng)用程序消息拆分為一個(gè)或多個(gè)幀,通過(guò)網(wǎng)絡(luò)將它們傳輸?shù)侥康牡?#xff0c;并重新組裝解析出一個(gè)完整消息。
有別于HTTP/1.1文本消息格式(冗長(zhǎng)的消息頭和分隔符等),WebSocket消息幀規(guī)定一定的格式,以二進(jìn)制傳輸,更加短小精悍。二者相同之處就是都是基于TCP/IP流式協(xié)議(沒(méi)有規(guī)定消息邊界)。
如下是消息幀的基本結(jié)構(gòu)圖:
- FIN: 1 bit,表示該幀是否為消息的最后一幀。1-是,0-否。
- RSV1,RSV2,RSV3: 1 bit each,預(yù)留(3位),擴(kuò)展的預(yù)留標(biāo)志。一般情況為0,除非協(xié)商的擴(kuò)展定義為非零值。如果接收到非零值且不為協(xié)商擴(kuò)展定義,接收端必須使連接失敗。
- Opcode: 4 bits,定義消息幀的操作類(lèi)型,如果接收到一個(gè)未知Opcode,接收端必須使連接失敗。(0x0-延續(xù)幀,0x1-文本幀,0x2-二進(jìn)制幀,0x8-關(guān)閉幀,0x9-PING幀,0xA-PONG幀(在接收到PING幀時(shí),終端必須發(fā)送一個(gè)PONG幀響應(yīng),除非它已經(jīng)接收到關(guān)閉幀),0x3-0x7保留給未來(lái)的非控制幀,0xB-F保留給未來(lái)的控制幀)
- Mask: 1 bit,表示該幀是否為隱藏的,即被加密保護(hù)的。1-是,0-否。Mask=1時(shí),必須傳一個(gè)Masking-key,用于解除隱藏(客戶(hù)端發(fā)送消息給服務(wù)器端,Mask必須為1)。
- Payload length: 7 bits, 7+16 bits, or 7+64 bits,有效載荷數(shù)據(jù)的長(zhǎng)度(擴(kuò)展數(shù)據(jù)長(zhǎng)度+應(yīng)用數(shù)據(jù)長(zhǎng)度,擴(kuò)展數(shù)據(jù)長(zhǎng)度可以為0)。
if 0-125, that is the payload length. If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length. If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length.
- Masking-key: 0 or 4 bytes,用于解除幀隱藏(加密)的key,Mask=1時(shí)不為空,Mask=0時(shí)不用傳。
- Payload data: (x+y) bytes,有效載荷數(shù)據(jù)包括擴(kuò)展數(shù)據(jù)(x bytes)和應(yīng)用數(shù)據(jù)(y bytes)。有效載荷數(shù)據(jù)是用戶(hù)真正要傳輸?shù)臄?shù)據(jù)。
這樣的二進(jìn)制消息幀設(shè)計(jì),與HTTP協(xié)議相比,WebSocket協(xié)議可以提供約500:1的流量減少和3:1的延遲減少。
(4)揮手(關(guān)閉連接)
揮手相對(duì)于握手要簡(jiǎn)單很多,客戶(hù)端和服務(wù)器端任何一方都可以通過(guò)發(fā)送關(guān)閉幀來(lái)發(fā)起揮手請(qǐng)求。發(fā)送關(guān)閉幀的一方,之后不再發(fā)送任何數(shù)據(jù)給對(duì)方;接收到關(guān)閉幀的一方,如果之前沒(méi)有發(fā)送過(guò)關(guān)閉幀,則必須發(fā)送一個(gè)關(guān)閉幀作為響應(yīng)。關(guān)閉幀中可以攜帶關(guān)閉原因。
在發(fā)送和接收一個(gè)關(guān)閉幀消息之后,就認(rèn)為WebSocket連接已關(guān)閉,且必須關(guān)閉底層TCP連接。
除了通過(guò)關(guān)閉握手來(lái)關(guān)閉連接外,WebSocket連接也可能在另一方離開(kāi)或底層TCP連接關(guān)閉時(shí)突然關(guān)閉。
4、WebSocket優(yōu)點(diǎn)
-
較少的控制開(kāi)銷(xiāo)。在連接建立后,服務(wù)器和客戶(hù)端之間交換數(shù)據(jù)時(shí),用于協(xié)議控制的數(shù)據(jù)包頭部相對(duì)于HTTP請(qǐng)求每次都要攜帶完整的頭部,顯著減少。
-
更強(qiáng)的實(shí)時(shí)性。由于協(xié)議是全雙工的,所以服務(wù)器可以隨時(shí)主動(dòng)給客戶(hù)端下發(fā)數(shù)據(jù)。相對(duì)于HTTP請(qǐng)求需要等待客戶(hù)端發(fā)起請(qǐng)求服務(wù)端才能響應(yīng),延遲明顯更少。
-
保持連接狀態(tài)。與HTTP不同的是,Websocket需要先建立連接,這就使得其成為一種有狀態(tài)的協(xié)議,之后通信時(shí)可以省略部分狀態(tài)信息。而HTTP請(qǐng)求可能需要在每個(gè)請(qǐng)求都攜帶狀態(tài)信息(如身份認(rèn)證等)。
-
更好的二進(jìn)制支持。Websocket定義了二進(jìn)制幀,相對(duì)HTTP,可以更輕松地處理二進(jìn)制內(nèi)容。
-
支持?jǐn)U展。Websocket定義了擴(kuò)展,用戶(hù)可以擴(kuò)展協(xié)議、實(shí)現(xiàn)部分自定義的子協(xié)議。
-
更好的壓縮效果。相對(duì)于HTTP壓縮,Websocket在適當(dāng)?shù)臄U(kuò)展支持下,可以沿用之前內(nèi)容的上下文,在傳遞類(lèi)似的數(shù)據(jù)時(shí),可以顯著提高壓縮率。
三、Java API for WebSocket(JSR356)
JSR356在Java EE7時(shí)歸為Java EE標(biāo)準(zhǔn)的一部分(后來(lái)Java EE更名為Jakarta EE,世上再無(wú)Java EE,以下統(tǒng)一稱(chēng)Jakarta EE),所有兼容Jakarta EE的應(yīng)用服務(wù)器,都必須遵循JSR356標(biāo)準(zhǔn)的WebSocket協(xié)議API。
根據(jù)JSR356規(guī)定, 建立WebSocket連接的服務(wù)器端和客戶(hù)端,兩端對(duì)稱(chēng),可以互相通信,差異性較小,抽象成API,就是一個(gè)個(gè)Endpoint(端點(diǎn)),只不過(guò)服務(wù)器端的叫ServerEndpoint,客戶(hù)端的叫ClientEndpoint。客戶(hù)端向服務(wù)端發(fā)送WebSocket握手請(qǐng)求,建立連接后就創(chuàng)建一個(gè)ServerEndpoint對(duì)象。(這里的Endpoint和Tomcat連接器里的AbstractEndpoint名稱(chēng)上有點(diǎn)像,但是兩個(gè)毫不相干的東西,就像周杰倫和周杰的關(guān)系。)
ServerEndpoint和ClientEndpoint在API上差異也很小,有相同的生命周期事件(OnOpen、OnClose、OnError、OnMessage),不同之處是ServerEndpoint作為服務(wù)器端點(diǎn),可以指定一個(gè)URI路徑供客戶(hù)端連接,ClientEndpoint沒(méi)有。
1、服務(wù)端API
服務(wù)器端的Endpoint有兩種實(shí)現(xiàn)方式,一種是注解方式@ServerEndpoint,一種是繼承抽象類(lèi)Endpoint。
(1)注解方式@ServerEndpoint
首先看看@ServerEndpoint有哪些要素:
- value,可以指定一個(gè)URI路徑標(biāo)識(shí)一個(gè)Endpoint。
- subprotocols,用戶(hù)在WebSocket協(xié)議下自定義擴(kuò)展一些子協(xié)議。
- decoders,用戶(hù)可以自定義一些消息解碼器,比如通信的消息是一個(gè)對(duì)象,接收到消息可以自動(dòng)解碼封裝成消息對(duì)象。
- encoders,有解碼器就有編碼器,定義解碼器和編碼器的好處是可以規(guī)范使用層消息的傳輸。
- configurator,ServerEndpoint配置類(lèi),主要提供ServerEndpoint對(duì)象的創(chuàng)建方式擴(kuò)展(如果使用Tomcat的WebSocket實(shí)現(xiàn),默認(rèn)是反射創(chuàng)建ServerEndpoint對(duì)象)。
@ServerEndpoint可以注解到任何類(lèi)上,但是想實(shí)現(xiàn)服務(wù)端的完整功能,還需要配合幾個(gè)生命周期的注解使用,這些生命周期注解只能注解在方法上:
- @OnOpen 建立連接時(shí)觸發(fā)。
- @OnClose 關(guān)閉連接時(shí)觸發(fā)。
- @OnError 發(fā)生異常時(shí)觸發(fā)。
- @OnMessage 接收到消息時(shí)觸發(fā)。
(2)繼承抽象類(lèi)Endpoint
繼承抽象類(lèi)Endpoint,重寫(xiě)幾個(gè)生命周期方法。
怎么沒(méi)有onMessage方法,實(shí)現(xiàn)onMessage還需要繼承實(shí)現(xiàn)一個(gè)接口jakarta.websocket.MessageHandler,MessageHandler接口又分為Partial和Whole,實(shí)現(xiàn)的MessageHandler需要在onOpen觸發(fā)時(shí)注冊(cè)到j(luò)akarta.websocket.Session中。
繼承抽象類(lèi)Endpoint的方式相對(duì)于注解方式要麻煩的多,除了繼承Endpoint和實(shí)現(xiàn)接口MessageHandler外,還必須實(shí)現(xiàn)一個(gè)jakarta.websocket.server.ServerApplicationConfig來(lái)管理Endpoint,比如給Endpoint分配URI路徑。
而encoders、decoders、configurator等配置信息由jakarta.websocket.server.ServerEndpointConfig管理,默認(rèn)實(shí)現(xiàn)jakarta.websocket.server.DefaultServerEndpointConfig。
所以如果使用 Java 版WebSocket服務(wù)器端實(shí)現(xiàn)首推注解方式。
2、客戶(hù)端API
對(duì)于客戶(hù)端API,也是有注解方式和繼承抽象類(lèi)Endpoint方式。
- 注解方式,只需要將@ServerEndpoint換成@ClientEndpoint。
- 繼承抽象類(lèi)Endpoint方式,需要一個(gè)jakarta.websocket.ClientEndpointConfig來(lái)管理encoders、decoders、configurator等配置信息,默認(rèn)實(shí)現(xiàn)jakarta.websocket.DefaultClientEndpointConfig。
3、上下文Session
WebSocket是一個(gè)有狀態(tài)的連接,建立連接后的通信都是通過(guò)jakarta.websocket.Session保持狀態(tài),一個(gè)連接一個(gè)Session,每一個(gè)Session有一個(gè)唯一標(biāo)識(shí)Id。
Session的主要職責(zé)涉及:
- 基礎(chǔ)信息管理(request信息(getRequestURI、getRequestParameterMap、getPathParameters等)、協(xié)議版本getProtocolVersion、子協(xié)議getNegotiatedSubprotocol等)。
- 連接管理(狀態(tài)判斷isOpen、接收消息的MessageHandler、發(fā)送消息的異步遠(yuǎn)程端點(diǎn)RemoteEndpoint.Async和同步遠(yuǎn)程端點(diǎn)RemoteEndpoint.Basic等)。
4、HandshakeRequest 和 HandshakeResponse
HandshakeRequest 和 HandshakeResponse了解即可,這兩個(gè)接口主要用于WebScoket握手升級(jí)過(guò)程中握手請(qǐng)求響應(yīng)的封裝,如果只是單純使用WebSocket,不會(huì)接觸到這兩個(gè)接口。
(1)HandshakeRequest
(2)HandshakeResponse
Sec-WebSocket-Accept根據(jù)客戶(hù)端傳的Sec-WebSocket-Key生成,如下是Tomcat10.0.6 WebSocket源碼實(shí)現(xiàn)中生成Sec-WebSocket-Accept的算法:
private static String getWebSocketAccept(String key) {byte[] digest = ConcurrentMessageDigest.digestSHA1(key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT);return Base64.encodeBase64String(digest); }5、WebSocketContainer
jakarta.websocket.WebSocketContainer顧名思義,就是WebSocket的容器,集大成者。其主要職責(zé)包括但不限于connectToServer,客戶(hù)端連接服務(wù)器端,基于瀏覽器的WebSocket客戶(hù)端連接服務(wù)器端,由瀏覽器支持,但是基于Java版的WebSocket客戶(hù)端就可以通過(guò)WebSocketContainer#connectToServer向服務(wù)端發(fā)起連接請(qǐng)求。
四、WebSocket基于Tomcat應(yīng)用
(如下使用的是javax.websocket包,未使用最新的jakarta.websocket,主要是測(cè)試項(xiàng)目基于SpringBoot+Tomcat9.x的,Java API for WebSocket版本需要保持一致。)
1、服務(wù)器端實(shí)現(xiàn)
(1)@ServerEndpoint注解方式
import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong;@ServerEndpoint(value = "/ws/test/{userId}", encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}, configurator = MyServerConfigurator.class) public class WebSocketServerEndpoint {private Session session;private String userId;@OnOpenpublic void OnOpen(Session session, @PathParam(value = "userId") String userId) {this.session = session;this.userId = userId;// 建立連接后,將連接存到一個(gè)map里endpointMap.put(userId, this);Message message = new Message(0, "connected, hello " + userId);sendMsg(message);}@OnClosepublic void OnClose() {// 關(guān)閉連接時(shí)觸發(fā),從map中刪除連接endpointMap.remove(userId);System.out.println("server closed...");}@OnMessagepublic void onMessage(Message message) {System.out.println("server recive message=" + message.toString());}@OnErrorpublic void onError(Throwable t) throws Throwable {this.session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "系統(tǒng)異常"));t.printStackTrace();}/*** 群發(fā)* @param data*/public void sendAllMsg(Message data) {for (WebSocketServerEndpoint value : endpointMap.values()) {value.sendMsgAsync(data);}}/*** 推送消息給指定 userId* @param data* @param userId*/public void sendMsg(Message data, String userId) {WebSocketServerEndpoint endpoint = endpointMap.get(userId);if (endpoint == null) {System.out.println("not conected to " + userId);return;}endpoint.sendMsgAsync(data);}private void sendMsg(Message data) {try {this.session.getBasicRemote().sendObject(data);} catch (IOException ioException) {ioException.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}private void sendMsgAsync(Message data) {this.session.getAsyncRemote().sendObject(data);}// 存儲(chǔ)建立連接的Endpointprivate static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>(); }每一個(gè)客戶(hù)端與服務(wù)器端建立連接后,都會(huì)生成一個(gè)WebSocketServerEndpoint,可以通過(guò)一個(gè)Map將其與userId對(duì)應(yīng)存起來(lái),為后續(xù)群發(fā)廣播和單獨(dú)推送消息給某個(gè)客戶(hù)端提供便利。
注意:@ServerEndpoint的encoders、decoders、configurator等配置信息在實(shí)際使用中可以不定義,如果項(xiàng)目簡(jiǎn)單,完全可以用默認(rèn)的。
如果通信消息被封裝成一個(gè)對(duì)象,如示例的Message(因?yàn)樵创a過(guò)于簡(jiǎn)單就不展示了,屬性主要有code、msg、data),就必須提供編碼器和解碼器。也可以在每次發(fā)送消息時(shí)硬編碼轉(zhuǎn)為字符串,在接收到消息時(shí)轉(zhuǎn)為Message。有了編碼器和解碼器,顯得比較規(guī)范,轉(zhuǎn)為字符串由編碼器做,字符串轉(zhuǎn)為對(duì)象由解碼器做,但也使得架構(gòu)變復(fù)雜了,視項(xiàng)目需求而定。
Configurator的用處就是自定義Endpoint對(duì)象創(chuàng)建方式,默認(rèn)Tomcat提供的是通過(guò)反射。WebScoket是每個(gè)連接都會(huì)創(chuàng)建一個(gè)Endpoint對(duì)象,如果連接比較多,很頻繁,通過(guò)反射創(chuàng)建,用后即毀,可能不是一個(gè)好主意,所以可以搞一個(gè)對(duì)象池,用過(guò)回收,用時(shí)先從對(duì)象池中拿,有就重置,省去實(shí)例化分配內(nèi)存等消耗過(guò)程。
如果使用SpringBoot內(nèi)置Tomcat、undertow、Netty等,接入WebSocket時(shí)除了加@ServerEndpoint還需要加一個(gè)@Component,再給Spring注冊(cè)一個(gè)ServerEndpointExporter類(lèi),這樣,服務(wù)端Endpoint就交由Spring去掃描注冊(cè)了。
@Configuration public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {ServerEndpointExporter serverEndpointExporter = new ServerEndpointExporter();return serverEndpointExporter;} }外置Tomcat就不需要這么麻煩,Tomcat會(huì)默認(rèn)掃描classpath下帶有@ServerEndpoint注解的類(lèi)。(SpringBoot接入Websocket后續(xù)會(huì)單獨(dú)出文章講解,也挺有意思的)
(2)繼承抽象類(lèi)Endpoint方式
import javax.websocket.*; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap;public class WebSocketServerEndpoint extends Endpoint {private Session session;private String userId;@Overridepublic void onOpen(Session session, EndpointConfig endpointConfig) {this.session = session;this.userId = session.getPathParameters().get("userId");session.addMessageHandler(new MessageHandler());endpointMap.put(userId, this);Message message = new Message(0, "connected, hello " + userId);sendMsg(message);}@Overridepublic void onClose(Session session, CloseReason closeReason) {endpointMap.remove(userId);}@Overridepublic void onError(Session session, Throwable throwable) {throwable.printStackTrace();}/*** 群發(fā)* @param data*/public void sendAllMsg(Message data) {for (WebSocketServerEndpoint value : endpointMap.values()) {value.sendMsgAsync(data);}}/*** 推送消息給指定 userId* @param data* @param userId*/public void sendMsg(Message data, String userId) {WebSocketServerEndpoint endpoint = endpointMap.get(userId);if (endpoint == null) {System.out.println("not conected to " + userId);return;}endpoint.sendMsgAsync(data);}private void sendMsg(Message data) {try {this.session.getBasicRemote().sendObject(data);} catch (IOException ioException) {ioException.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}private void sendMsgAsync(Message data) {this.session.getAsyncRemote().sendObject(data);}private class MessageHandler implements javax.websocket.MessageHandler.Whole<Message> {@Overridepublic void onMessage(Message message) {System.out.println("server recive message=" + message.toString());}}private static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>();}繼承抽象類(lèi)Endpoint方式比加注解@ServerEndpoint方式麻煩的很,主要是需要自己實(shí)現(xiàn)MessageHandler和ServerApplicationConfig。@ServerEndpoint的話(huà)都是使用默認(rèn)的,原理上差不多,只是注解更自動(dòng)化,更簡(jiǎn)潔。
MessageHandler做的事情,一個(gè)@OnMessage就搞定了,ServerApplicationConfig做的URI映射、decoders、encoders,configurator等,一個(gè)@ServerEndpoint就可以了。
import javax.websocket.Decoder; import javax.websocket.Encoder; import javax.websocket.Endpoint; import javax.websocket.server.ServerApplicationConfig; import javax.websocket.server.ServerEndpointConfig; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set;public class MyServerApplicationConfig implements ServerApplicationConfig {@Overridepublic Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {Set<ServerEndpointConfig> result = new HashSet<ServerEndpointConfig>();List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();decoderList.add(MessageDecoder.class);List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();encoderList.add(MessageEncoder.class);if (set.contains(WebSocketServerEndpoint3.class)) {ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(WebSocketServerEndpoint3.class, "/ws/test3").decoders(decoderList).encoders(encoderList).configurator(new MyServerConfigurator()).build();result.add(serverEndpointConfig);}return result;}@Overridepublic Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {return set;} }如果使用SpringBoot內(nèi)置Tomcat,則不需要ServerApplicationConfig了,但是需要給Spring注冊(cè)一個(gè)ServerEndpointConfig。
@Bean public ServerEndpointConfig serverEndpointConfig() {List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();decoderList.add(MessageDecoder.class);List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();encoderList.add(MessageEncoder.class);ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(WebSocketServerEndpoint3.class, "/ws/test3/{userId}").decoders(decoderList).encoders(encoderList).configurator(new MyServerConfigurator()).build();return serverEndpointConfig; }(3)早期Tomcat7中Server端實(shí)現(xiàn)對(duì)比
Tomcat7早期版本7.0.47之前還沒(méi)有出JSR 356時(shí),自己搞了一套接口,其實(shí)就是一個(gè)Servlet。
和遵循JSR356標(biāo)準(zhǔn)的版本對(duì)比,有一個(gè)比較大的變化是,createWebSocketInbound創(chuàng)建生命周期事件處理器StreamInbound的時(shí)機(jī)是WebSocket協(xié)議升級(jí)之前,此時(shí)還可以通過(guò)用戶(hù)線程緩存(ThreadLocal等)的HttpServletRequest對(duì)象,獲取一些請(qǐng)求頭等信息。
而遵循JSR356標(biāo)準(zhǔn)的版本實(shí)現(xiàn),創(chuàng)建生命周期事件處理的Endpoint是在WebSocket協(xié)議升級(jí)完成(經(jīng)過(guò)HTTP握手)之后創(chuàng)建的,而WebSocket握手成功給客戶(hù)端響應(yīng)101前,會(huì)結(jié)束銷(xiāo)毀HttpServletRequest對(duì)象,此時(shí)是獲取不到請(qǐng)求頭等信息的。
import org.apache.catalina.websocket.StreamInbound; import org.apache.catalina.websocket.WebSocketServlet;import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServletRequest;@WebServlet(urlPatterns = "/ws/test") public class MyWeSocketServlet extends WebSocketServlet {@Overrideprotected StreamInbound createWebSocketInbound(String subProtocol, HttpServletRequest request) {MyMessageInbound messageInbound = new MyMessageInbound(subProtocol, request);return messageInbound;}} import org.apache.catalina.websocket.MessageInbound; import org.apache.catalina.websocket.WsOutbound;import javax.servlet.http.HttpServletRequest; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer;public class MyMessageInbound extends MessageInbound {private String subProtocol;private HttpServletRequest request;public MyMessageInbound(String subProtocol, HttpServletRequest request) {this.subProtocol = subProtocol;this.request = request;}@Overrideprotected void onOpen(WsOutbound outbound) {String msg = "connected, hello";ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());try {outbound.writeBinaryMessage(byteBuffer);} catch (IOException e) {e.printStackTrace();}}@Overrideprotected void onClose(int status) {}@Overrideprotected void onBinaryMessage(ByteBuffer byteBuffer) throws IOException {// 接收到客戶(hù)端信息}@Overrideprotected void onTextMessage(CharBuffer charBuffer) throws IOException {// 接收到客戶(hù)端信息} }2、客戶(hù)端實(shí)現(xiàn)
(1)前端js版
js版的客戶(hù)端主要依托瀏覽器對(duì)WebScoket的支持,在生命周期事件觸發(fā)上和服務(wù)器端的差不多,這也應(yīng)證了建立WebSocket連接的兩端是對(duì)等的。
編寫(xiě)WebSocket客戶(hù)端需要注意以下幾點(diǎn):
- 和服務(wù)器端商議好傳輸?shù)南⒌母袷?#xff0c;一般為json字符串,比較直觀,編碼解碼都很簡(jiǎn)單,也可以是其他商定的格式。
- 需要心跳檢測(cè),定時(shí)給服務(wù)器端發(fā)送消息,保持連接正常。
- 正常關(guān)閉連接,即關(guān)閉瀏覽器窗口前主動(dòng)關(guān)閉連接,以免服務(wù)器端拋異常。
- 如果因?yàn)楫惓嚅_(kāi)連接,支持重連。
這里推薦一個(gè)在線測(cè)試WebSocket連接和發(fā)送消息的網(wǎng)站easyswoole.com/wstool.html:
真的很牛逼,很方便,很簡(jiǎn)單。還有源碼github:https://github.com/easy-swoole/wstool,感興趣可以看看。
(2)@ClientEndpoint注解方式
Java版客戶(hù)端不用多說(shuō),把@ServerEndpoint換成@ClientEndpoint就可以了,其他都一樣。@ClientEndpoint比@ServerEndpoint就少了一個(gè)value,不需要設(shè)置URI。
@ClientEndpoint(encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}) public class WebSocketClientEndpoint {private Session session;@OnOpenpublic void OnOpen(Session session) {this.session = session;Message message = new Message(0, "connecting...");sendMsg(message);}@OnClosepublic void OnClose() {Message message = new Message(0, "client closed...");sendMsg(message);System.out.println("client closed");}@OnMessagepublic void onMessage(Message message) {System.out.println("client recive message=" + message.toString());}@OnErrorpublic void onError(Throwable t) throws Throwable {t.printStackTrace();}public void sendMsg(Message data) {try {this.session.getBasicRemote().sendObject(data);} catch (IOException ioException) {ioException.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}public void sendMsgAsync(Message data) {this.session.getAsyncRemote().sendObject(data);} }連接服務(wù)器端:
WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(WebSocketClientEndpoint.class,new URI("ws://localhost:8080/ws/test"));(3)繼承抽象類(lèi)Endpoint方式
繼承抽象類(lèi)Endpoint方式也和服務(wù)器端的差不多,但是不需要實(shí)現(xiàn)ServerApplicationConfig,需要實(shí)例化一個(gè)ClientEndpointConfig。Endpoint實(shí)現(xiàn)類(lèi)和服務(wù)器端的一樣,就省略了,如下是連接服務(wù)器端的代碼:
ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build(); container.connectToServer(new WebSocketClientEndpoint(),clientEndpointConfig,new URI("ws://localhost:8080/websocket/hello"));3、基于Nginx反向代理注意事項(xiàng)
一般web服務(wù)器會(huì)用Nginx做反向代理,經(jīng)過(guò)Nginx反向轉(zhuǎn)發(fā)的HTTP請(qǐng)求不會(huì)帶上Upgrade和Connection消息頭,所以需要在Nginx配置里顯式指定需要升級(jí)為WebSocket的URI帶上這兩個(gè)頭:
location /chat/ {proxy_pass http://backend;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";proxy_connect_timeout 4s; proxy_read_timeout 7200s; proxy_send_timeout 12s; }默認(rèn)情況下,如果代理服務(wù)器在60秒內(nèi)沒(méi)有傳輸任何數(shù)據(jù),連接將被關(guān)閉。這個(gè)超時(shí)可以通過(guò)proxy_read_timeout指令來(lái)增加。或者,可以將代理服務(wù)器配置為定期發(fā)送WebSocket PING幀以重置超時(shí)并檢查連接是否仍然活躍。
具體可參考:http://nginx.org/en/docs/http/websocket.html
五、WebSocket在Tomcat中的源碼實(shí)現(xiàn)
所有兼容Java EE的應(yīng)用服務(wù)器,必須遵循JSR356 WebSocket Java API標(biāo)準(zhǔn),Tomcat也不例外。而且Tomcat也是支持WebSocket最早的Web應(yīng)用服務(wù)器框架(之一),在還沒(méi)有出JSR356標(biāo)準(zhǔn)時(shí),就已經(jīng)自定義了一套WebSocket API,但是JSR356一出,不得不改弦更張。
通過(guò)前面的講解,在使用上完全沒(méi)有問(wèn)題,但是有幾個(gè)問(wèn)題完全是黑盒的:
- Server Endpoint 是如何被掃描加載的?
- WebSocket是如何借助HTTP 進(jìn)行握手升級(jí)的?
- WebSocket建立連接后如何保持連接不斷,互相通信的?
(如下源碼解析,需要對(duì)Tomcat連接器源碼有一定了解)
1、WsSci初始化
Tomcat 提供了一個(gè)org.apache.tomcat.websocket.server.WsSci類(lèi)來(lái)初始化、加載WebSocket。從類(lèi)名上顧名思義,利用了Sci加載機(jī)制,何為Sci加載機(jī)制?就是實(shí)現(xiàn)接口 jakarta.servlet.ServletContainerInitializer,在Tomcat部署裝載Web項(xiàng)目(org.apache.catalina.core.StandardContext#startInternal)時(shí)主動(dòng)觸發(fā)ServletContainerInitializer#onStartup,做一些擴(kuò)展的初始化操作。
WsSci主要做了一件事,就是掃描加載Server Endpoint,并將其加到WebSocket容器里jakarta.websocket.WebSocketContainer。
WsSci主要會(huì)掃描三種類(lèi):
- 加了@ServerEndpoint的類(lèi)。
- Endpoint的子類(lèi)。
- ServerApplicationConfig的子類(lèi)。
(1)WsSci#onStartup
@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class,Endpoint.class}) public class WsSci implements ServletContainerInitializer {@Overridepublic void onStartup(Set<Class<?>> clazzes, ServletContext ctx)throws ServletException {WsServerContainer sc = init(ctx, true);if (clazzes == null || clazzes.size() == 0) {return;}// Group the discovered classes by typeSet<ServerApplicationConfig> serverApplicationConfigs = new HashSet<>();Set<Class<? extends Endpoint>> scannedEndpointClazzes = new HashSet<>();Set<Class<?>> scannedPojoEndpoints = new HashSet<>();try {// wsPackage is "jakarta.websocket."String wsPackage = ContainerProvider.class.getName();wsPackage = wsPackage.substring(0, wsPackage.lastIndexOf('.') + 1);for (Class<?> clazz : clazzes) {JreCompat jreCompat = JreCompat.getInstance();int modifiers = clazz.getModifiers();if (!Modifier.isPublic(modifiers) ||Modifier.isAbstract(modifiers) ||Modifier.isInterface(modifiers) ||!jreCompat.isExported(clazz)) {// Non-public, abstract, interface or not in an exported// package (Java 9+) - skip it.continue;}// Protect against scanning the WebSocket API JARs// 防止掃描WebSocket API jarif (clazz.getName().startsWith(wsPackage)) {continue;}if (ServerApplicationConfig.class.isAssignableFrom(clazz)) {// 1、clazz是ServerApplicationConfig子類(lèi)serverApplicationConfigs.add((ServerApplicationConfig) clazz.getConstructor().newInstance());}if (Endpoint.class.isAssignableFrom(clazz)) {// 2、clazz是Endpoint子類(lèi)@SuppressWarnings("unchecked")Class<? extends Endpoint> endpoint =(Class<? extends Endpoint>) clazz;scannedEndpointClazzes.add(endpoint);}if (clazz.isAnnotationPresent(ServerEndpoint.class)) {// 3、clazz是加了注解ServerEndpoint的類(lèi)scannedPojoEndpoints.add(clazz);}}} catch (ReflectiveOperationException e) {throw new ServletException(e);}// Filter the resultsSet<ServerEndpointConfig> filteredEndpointConfigs = new HashSet<>();Set<Class<?>> filteredPojoEndpoints = new HashSet<>();if (serverApplicationConfigs.isEmpty()) {// 從這里看出@ServerEndpoint的服務(wù)器端是可以不用ServerApplicationConfig的filteredPojoEndpoints.addAll(scannedPojoEndpoints);} else {// serverApplicationConfigs不為空,for (ServerApplicationConfig config : serverApplicationConfigs) {Set<ServerEndpointConfig> configFilteredEndpoints =config.getEndpointConfigs(scannedEndpointClazzes);if (configFilteredEndpoints != null) {filteredEndpointConfigs.addAll(configFilteredEndpoints);}// getAnnotatedEndpointClasses 對(duì)于 scannedPojoEndpoints起到一個(gè)過(guò)濾作用// 不滿(mǎn)足條件的后面不加到WsServerContainer里Set<Class<?>> configFilteredPojos =config.getAnnotatedEndpointClasses(scannedPojoEndpoints);if (configFilteredPojos != null) {filteredPojoEndpoints.addAll(configFilteredPojos);}}}try {// 繼承抽象類(lèi)Endpoint的需要使用者手動(dòng)封裝成ServerEndpointConfig// 而加了注解@ServerEndpoint的類(lèi) Tomcat會(huì)自動(dòng)封裝成ServerEndpointConfig// Deploy endpointsfor (ServerEndpointConfig config : filteredEndpointConfigs) {sc.addEndpoint(config);}// Deploy POJOsfor (Class<?> clazz : filteredPojoEndpoints) {sc.addEndpoint(clazz, true);}} catch (DeploymentException e) {throw new ServletException(e);}}static WsServerContainer init(ServletContext servletContext,boolean initBySciMechanism) {WsServerContainer sc = new WsServerContainer(servletContext);servletContext.setAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc);// 注冊(cè)監(jiān)聽(tīng)器WsSessionListener給servletContext,// 在http session銷(xiāo)毀時(shí)觸發(fā) ws session的關(guān)閉銷(xiāo)毀servletContext.addListener(new WsSessionListener(sc));// Can't register the ContextListener again if the ContextListener is// calling this methodif (initBySciMechanism) {// 注冊(cè)監(jiān)聽(tīng)器WsContextListener給servletContext,// 在 servletContext初始化時(shí)觸發(fā)WsSci.init// 在 servletContext銷(xiāo)毀時(shí)觸發(fā)WsServerContainer的銷(xiāo)毀// 不過(guò)呢,只在WsSci.onStartup時(shí)注冊(cè)一次servletContext.addListener(new WsContextListener());}return sc;} }從上述源碼中可以看出ServerApplicationConfig起到一個(gè)過(guò)濾的作用:
- 當(dāng)沒(méi)有ServerApplicationConfig時(shí),加了@ServerEndpoint的類(lèi)會(huì)默認(rèn)全部加到一個(gè)Set集合(filteredPojoEndpoints),所以加了@ServerEndpoint的類(lèi)可以不需要自定義實(shí)現(xiàn)ServerApplicationConfig。
- 當(dāng)有ServerApplicationConfig時(shí),ServerApplicationConfig#getEndpointConfigs用來(lái)過(guò)濾Endpoint子類(lèi),并且Endpoint子類(lèi)必須封裝成一個(gè)ServerEndpointConfig。
- ServerApplicationConfig#getAnnotatedEndpointClasses用來(lái)過(guò)濾加了注解@ServerEndpoint的類(lèi),一般空實(shí)現(xiàn)就行了(如果不想某個(gè)類(lèi)被加到WsServerContainer里,那不加@ServerEndpoint不就可以了)。
過(guò)濾之后的Endpoint子類(lèi)和加了注解@ServerEndpoint的類(lèi)會(huì)分別調(diào)用不同形參的WsServerContainer#addEndpoint,將其加到WsServerContainer里。
(2)WsServerContainer#addEndpoint
- 將Endpoint子類(lèi)加到WsServerContainer里,調(diào)用的是形參為ServerEndpointConfig的addEndpoint:
因?yàn)镋ndpoint子類(lèi)需要使用者封裝成ServerEndpointConfig,不需要Tomcat來(lái)封裝。
- 將加了注解@ServerEndpoint的類(lèi)加到WsServerContainer,調(diào)用的是形參為Class<?>的addEndpoint(fromAnnotatedPojo參數(shù)暫時(shí)在這個(gè)方法里沒(méi)什么用處):
該方法主要職責(zé)就是解析@ServerEndpoint,獲取path、decoders、encoders、configurator等構(gòu)建一個(gè)ServerEndpointConfig對(duì)象
最終調(diào)用的都是如下這個(gè)比較復(fù)雜的方法,fromAnnotatedPojo表示是否是加了@ServerEndpoint的類(lèi)。主要做了兩件事:
-
對(duì)加了@ServerEndpoint類(lèi)的生命周期方法(@OnOpen、@OnClose、@OnError、@OnMessage)的掃描和映射封裝。
-
對(duì)path的有效性檢查和path param解析。
(3)PojoMethodMapping方法映射和形參解析
PojoMethodMapping構(gòu)造函數(shù)比較長(zhǎng),主要是對(duì)加了@OnOpen、@OnClose、@OnError、@OnMessage的方法進(jìn)行校驗(yàn)和映射,以及對(duì)每個(gè)方法的形參進(jìn)行解析和校驗(yàn),主要邏輯總結(jié)如下:
- 對(duì)當(dāng)前類(lèi)以及其父類(lèi)中的方法進(jìn)行掃描。
- 當(dāng)前類(lèi)中不能存在多個(gè)相同注解的方法,否則會(huì)拋出Duplicate annotation異常。
- 父類(lèi)和子類(lèi)中存在相同注解的方法,子類(lèi)必須重寫(xiě)該方法,否則會(huì)拋出Duplicate annotation異常。
- 對(duì)于@OnMessage,可以有多個(gè),但是接收消息的類(lèi)型必須不同,消息類(lèi)型大概分為三種:PongMessage心跳消息、字節(jié)型、字符型。
- 如果掃描到對(duì)的注解都是父類(lèi)的方法,子類(lèi)重寫(xiě)了該方法,但是沒(méi)有加響應(yīng)的注解,則會(huì)被清除。
- 形參解析。
雖然方法名可以隨意,但是形參卻有著強(qiáng)制限制:
- @onOpen方法,可以有的參數(shù)Session、EndpointConfig、@PathParam,不能有其他參數(shù)。
- @onError方法,可以有的參數(shù)Session、@PathParam, 必須有Throwable,不能有其他參數(shù)。
- @onClose方法,可以有的參數(shù)Session, CloseReason, @PathParam,不能有其他參數(shù)。
2、協(xié)議升級(jí)(握手)
Tomcat中WebSocket是通過(guò)UpgradeToken機(jī)制實(shí)現(xiàn)的,其具體的升級(jí)處理器為WsHttpUpgradeHandler。WebSocket協(xié)議升級(jí)的過(guò)程比較曲折,首先要通過(guò)過(guò)濾器WsFilter進(jìn)行升級(jí)判斷,然后調(diào)用org.apache.catalina.connector.Request#upgrade進(jìn)行UpgradeToken的構(gòu)建,最后通過(guò)org.apache.catalina.connector.Request#coyoteRequest回調(diào)函數(shù)action將UpgradeToken回傳給連接器為后續(xù)升級(jí)處理做準(zhǔn)備。
(1)WsFilter
WebSocket協(xié)議升級(jí)的過(guò)程比較曲折。帶有WebSocket握手的請(qǐng)求會(huì)平安經(jīng)過(guò)Tomcat的Connector,被轉(zhuǎn)發(fā)到Servlet容器中,在業(yè)務(wù)處理之前經(jīng)過(guò)過(guò)濾器WsFilter判斷是否需要升級(jí)(WsFilter 在 org.apache.catalina.core.ApplicationFilterChain過(guò)濾鏈中觸發(fā)):
- 首先判斷WsServerContainer是否有進(jìn)行Endpoint的掃描和注冊(cè)以及請(qǐng)頭中是否有Upgrade: websocket。
- 獲取請(qǐng)求path即uri在WsServerContainer中找對(duì)應(yīng)的ServerEndpointConfig。
- 調(diào)用UpgradeUtil.doUpgrade進(jìn)行升級(jí)。
(2)UpgradeUtil#doUpgrade
UpgradeUtil#doUpgrade主要做了如下幾件事情:
- 檢查HttpServletRequest的一些請(qǐng)求頭的有效性,如Connection: upgrade、Sec-WebSocket-Version:13、Sec-WebSocket-Key等。
- 給HttpServletResponse設(shè)置一些響應(yīng)頭,如Upgrade:websocket、Connection: upgrade、根據(jù)Sec-WebSocket-Key的值生成響應(yīng)頭Sec-WebSocket-Accept的值。
- 封裝WsHandshakeRequest和WsHandshakeResponse。
- 調(diào)用HttpServletRequest#upgrade進(jìn)行升級(jí),并獲取WsHttpUpgradeHandler(具體的升級(jí)流程處理器)。
(3)Request#upgrade
Request#upgrade主要做了三件事:
- 實(shí)例化WsHttpUpgradeHandler并構(gòu)建UpgradeToken。
- 回調(diào)coyoteRequest.action,將UpgradeToken回傳給連接器。
- 設(shè)置響應(yīng)碼101。
(4)回調(diào)機(jī)制ActionHook#action
一些發(fā)生在Servlet容器的動(dòng)作可能需要回傳給連接器做處理,比如WebSocket的握手升級(jí),所以連接器就給org.apache.coyote.Request設(shè)置了一個(gè)動(dòng)作鉤子``ActionHook#action。一些動(dòng)作表示定義在枚舉類(lèi)ActionCode中,ActionCode.UPGRADE就代表協(xié)議升級(jí)動(dòng)作。org.apache.coyote.AbstractProcessor實(shí)現(xiàn)了ActionHook接口,ActionCode.UPGRADE動(dòng)作會(huì)調(diào)用org.apache.coyote.http11.Http11Processor#doHttpUpgrade,只是簡(jiǎn)單將upgradeToken設(shè)置給Http11Processor`。
(5)ConnectionHandler#process
Tomcat連接器是同步調(diào)用容器業(yè)務(wù)處理,容器中的業(yè)務(wù)處理結(jié)束后還是回到連接器繼續(xù)往下執(zhí)行。
連接器將請(qǐng)求轉(zhuǎn)發(fā)給容器處理是在適配器里完成的,容器中流程處理結(jié)束返回到org.apache.catalina.connector.CoyoteAdapter#service,繼續(xù)往下執(zhí)行,最終結(jié)束并回收HttpServletrequest、HttpServletreponse對(duì)象。
org.apache.catalina.connector.CoyoteAdapter#service是在org.apache.coyote.http11.Http11Processor#service中調(diào)用的,
Http11Processor#service是HTTP請(qǐng)求處理主流程,通過(guò)upgradeToken != null來(lái)判斷是否為升級(jí)操作,s是則返回SocketState.UPGRADING。
最后來(lái)到org.apache.coyote.AbstractProtocol.ConnectionHandler#process一個(gè)連接處理的主流程,根據(jù)Http11Processor#service返回SocketState.UPGRADING來(lái)進(jìn)行升級(jí)操作,如下只截取了和WebSocket協(xié)議升級(jí)相關(guān)流程的代碼:
- 獲取UpgradeToken,從中取出HttpUpgradeHandler,對(duì)于WebSocket來(lái)說(shuō)是WsHttpUpgradeHandler。
- 調(diào)用WsHttpUpgradeHandler#init啟動(dòng)協(xié)議升級(jí)處理。
(6)WsHttpUpgradeHandler#init握手成功
走到這里,基本上就是握手成功了,接下來(lái)就是創(chuàng)建WsSession和觸發(fā)onOpen。
WsSession的構(gòu)建中會(huì)實(shí)例化Endpoint,如果實(shí)例化出來(lái)的對(duì)象不是Endpoint類(lèi)型,即加了@ServerEndpoint的實(shí)例對(duì)象,則用一個(gè)PojoEndpointServer進(jìn)行包裝,而PojoEndpointServer是繼承了抽象類(lèi)Endpoint的。
觸發(fā)onOpen時(shí)會(huì)將WsSession傳進(jìn)去,對(duì)于加PojoEndpointServer,因?yàn)橛脩?hù)自定義的方法名和形參不確定,所以通過(guò)反射調(diào)用用戶(hù)自定義的onopen形式的方法,并且會(huì)將通過(guò)@onMessage解析出的MessageHandler設(shè)置給WsSession。
3、數(shù)據(jù)傳輸和解析
握手成功之后就建立了雙向通信的連接,該連接有別于HTTP/1.1長(zhǎng)連接(應(yīng)用服務(wù)器中工作線程循環(huán)占用),而是占用一條TCP連接。在連接建立是進(jìn)行TCP三次握手,之后全雙工互相通信,將不需要再進(jìn)行耗時(shí)的TCP的三次握手和四次揮手,一方需要關(guān)閉WebSocket連接時(shí),發(fā)送關(guān)閉幀,另一方接收到關(guān)閉幀之后,也發(fā)送個(gè)關(guān)閉幀作為響應(yīng),之后就認(rèn)為WebSocket連接關(guān)閉了,并且關(guān)閉底層TCP連接(四次揮手)。
實(shí)則WebSocket全雙工是建立在TCP的長(zhǎng)鏈接上的,TCP長(zhǎng)鏈接長(zhǎng)時(shí)間沒(méi)有消息通信,會(huì)定時(shí)保活,一般WebSocket會(huì)通過(guò)代理如nginx等進(jìn)行連接通信,nginx有一個(gè)連接超時(shí)沒(méi)有任何信息傳輸時(shí),會(huì)斷開(kāi),所以需要WebSocket一端定時(shí)發(fā)送心跳保活。
(1)接收客戶(hù)端消息
客戶(hù)端來(lái)了消息,由連接器的Poller輪詢(xún)監(jiān)測(cè)socket底層是否有數(shù)據(jù)到來(lái),有數(shù)據(jù)可讀,則封裝成一個(gè)SocketProcessor扔到線程池里處理,org.apache.coyote.http11.upgrade.UpgradeProcessorInternal#dispatch具有處理升級(jí)協(xié)議連接,org.apache.tomcat.websocket.server.WsHttpUpgradeHandler#upgradeDispatch是專(zhuān)門(mén)處理WebSocket連接的處理器。
org.apache.tomcat.websocket.server.WsFrameServer是對(duì)服務(wù)器端消息幀處理的封裝,包括讀取底層數(shù)據(jù),按消息幀格式解析、拼裝出有效載荷數(shù)據(jù),觸發(fā)onMessage。
因?yàn)樵创a篇幅較多,只展示具體源碼調(diào)用流程:
(2)發(fā)送消息給客戶(hù)端
一般,客戶(hù)端發(fā)送WebSocket握手請(qǐng)求,和服務(wù)器端建立連接后,服務(wù)器端需要將連接(Endpoint+WsSession)保存起來(lái),為后續(xù)主動(dòng)推送消息給客戶(hù)端提供方便。
Tomcat提供了可以發(fā)送三種數(shù)據(jù)類(lèi)型(文本、二進(jìn)制、Object對(duì)象)和兩種發(fā)送方式(同步、異步)的發(fā)送消息的方法。
- org.apache.tomcat.websocket.WsRemoteEndpointAsync異步發(fā)送。
- org.apache.tomcat.websocket.WsRemoteEndpointBasic 同步發(fā)送。
發(fā)送消息也同樣需要按消息幀格式封裝,然后通過(guò)socket寫(xiě)到網(wǎng)絡(luò)里即可。
六、要點(diǎn)回顧
WebSocket的出現(xiàn)不是空穴來(lái)風(fēng),起初在HTTP/1.1基礎(chǔ)上通過(guò)輪詢(xún)和長(zhǎng)連接達(dá)到信息實(shí)時(shí)同步的功能,但是這并沒(méi)有跳出HTTP/1.1自身的缺陷。HTTP/1.1明顯的兩個(gè)缺陷:消息頭冗長(zhǎng)且為文本傳輸,請(qǐng)求響應(yīng)模式。為此,WebSocket誕生了,跳出HTTP/1.1,建立一個(gè)新的真正全雙工通信協(xié)議。
不僅僅要會(huì)在項(xiàng)目中使用WebSocket,還要知道其通信原理和在應(yīng)用服務(wù)器中的實(shí)現(xiàn)原理,很多注意事項(xiàng)都是在查閱了官方資源和源碼之后恍然大悟的。
- 在Tomcat中使用WebSocket不可以在Endpoint里獲取緩存的HttpServletRequest對(duì)象,因?yàn)樵赪ebSocket握手之前,HTTP/1.1請(qǐng)求就算結(jié)束了(HttpServletRequest對(duì)象被回收),建立連接之后就更是獨(dú)立于HTTP/1.1了。
- 建立連接的WebSocket,會(huì)生成新的Endpoint和WsSession。
- 使用內(nèi)置Tomcat需要注意,WsSci做的事情交給了Spring做。
- WebSocket全雙工是建立在TCP長(zhǎng)連接的基礎(chǔ)之上。
- … …
七、參考文獻(xiàn)
如若文章有錯(cuò)誤理解,歡迎批評(píng)指正,同時(shí)非常期待你的留言和點(diǎn)贊。如果覺(jué)得有用,不妨點(diǎn)個(gè)在看,讓更多人受益。
總結(jié)
以上是生活随笔為你收集整理的WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: XIO: fatal IO error
- 下一篇: Android ORC文字识别之识别身份