日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)

發(fā)布時(shí)間:2023/12/18 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

首發(fā)CSDN:徐同學(xué)呀,原創(chuàng)不易,轉(zhuǎn)載請(qǐng)注明源鏈接。我是徐同學(xué),用心輸出高質(zhì)量文章,希望對(duì)你有所幫助。 本篇基于Tomcat10.0.6。建議收藏起來(lái)慢慢看。

文章目錄

    • 一、前言
    • 二、什么是WebSocket
      • 1、HTTP/1.1的缺陷
      • 2、WebSocket發(fā)展歷史
        • (1)背景
        • (2)歷史
      • 3、WebSocket握手和雙向通信
        • (1)定義
        • (2)握手(建立連接)
        • (3)消息幀
        • (4)揮手(關(guān)閉連接)
      • 4、WebSocket優(yōu)點(diǎn)
    • 三、Java API for WebSocket(JSR356)
      • 1、服務(wù)端API
        • (1)注解方式@ServerEndpoint
        • (2)繼承抽象類(lèi)Endpoint
      • 2、客戶(hù)端API
      • 3、上下文Session
      • 4、HandshakeRequest 和 HandshakeResponse
        • (1)HandshakeRequest
        • (2)HandshakeResponse
      • 5、WebSocketContainer
    • 四、WebSocket基于Tomcat應(yīng)用
      • 1、服務(wù)器端實(shí)現(xiàn)
        • (1)@ServerEndpoint注解方式
        • (2)繼承抽象類(lèi)Endpoint方式
        • (3)早期Tomcat7中Server端實(shí)現(xiàn)對(duì)比
      • 2、客戶(hù)端實(shí)現(xiàn)
        • (1)前端js版
        • (2)@ClientEndpoint注解方式
        • (3)繼承抽象類(lèi)Endpoint方式
      • 3、基于Nginx反向代理注意事項(xiàng)
    • 五、WebSocket在Tomcat中的源碼實(shí)現(xiàn)
      • 1、WsSci初始化
        • (1)WsSci#onStartup
        • (2)WsServerContainer#addEndpoint
        • (3)PojoMethodMapping方法映射和形參解析
      • 2、協(xié)議升級(jí)(握手)
        • (1)WsFilter
        • (2)UpgradeUtil#doUpgrade
        • (3)Request#upgrade
        • (4)回調(diào)機(jī)制ActionHook#action
        • (5)ConnectionHandler#process
        • (6)WsHttpUpgradeHandler#init握手成功
      • 3、數(shù)據(jù)傳輸和解析
        • (1)接收客戶(hù)端消息
        • (2)發(fā)送消息給客戶(hù)端
    • 六、要點(diǎn)回顧
    • 七、參考文獻(xiàn)

一、前言

WebSocket是一種全雙工通信協(xié)議,即客戶(hù)端可以向服務(wù)端發(fā)送請(qǐng)求,服務(wù)端也可以主動(dòng)向客戶(hù)端推送數(shù)據(jù)。這樣的特點(diǎn),使得它在一些實(shí)時(shí)性要求比較高的場(chǎng)景效果斐然(比如微信朋友圈實(shí)時(shí)通知、在線協(xié)同編輯等)。主流瀏覽器以及一些常見(jiàn)服務(wù)端通信框架(Tomcat、netty、undertow、webLogic等)都對(duì)WebSocket進(jìn)行了技術(shù)支持。那么,WebSocket具體是什么?為什么會(huì)出現(xiàn)WebSocket?如何做到全雙工通信?解決了什么問(wèn)題?

二、什么是WebSocket

1、HTTP/1.1的缺陷

HTTP/1.1最初是為網(wǎng)絡(luò)中超文本資源(HTML),請(qǐng)求-響應(yīng)傳輸而設(shè)計(jì)的,后來(lái)支持了傳輸更多類(lèi)型的資源,如圖片、視頻等,但都沒(méi)有改變它單向的請(qǐng)求-響應(yīng)模式。

隨著互聯(lián)網(wǎng)的日益壯大,HTTP/1.1功能使用上已體現(xiàn)捉襟見(jiàn)肘的疲態(tài)。雖然可以通過(guò)某些方式滿(mǎn)足需求(如Ajax、Comet),但是性能上還是局限于HTTP/1.1,那么HTTP/1.1有哪些缺陷呢:

  • 請(qǐng)求-響應(yīng)模式,只能客戶(hù)端發(fā)送請(qǐng)求給服務(wù)端,服務(wù)端才可以發(fā)送響應(yīng)數(shù)據(jù)給客戶(hù)端。
  • 傳輸數(shù)據(jù)為文本格式,且請(qǐng)求/響應(yīng)頭部冗長(zhǎng)重復(fù)。

(為了區(qū)分HTTP/1.1和HTTP/1.2,下面描述中,HTTP均代表HTTP/1.1)

2、WebSocket發(fā)展歷史

(1)背景

在WebSocket出現(xiàn)之前,主要通過(guò)長(zhǎng)輪詢(xún)和HTTP長(zhǎng)連接實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)更新,這種方式有個(gè)統(tǒng)稱(chēng)叫Comet,Tomcat8.5之前有對(duì)Comet基于流的HTTP長(zhǎng)連接做支持,后來(lái)因?yàn)閃ebSocket的成熟和標(biāo)準(zhǔn)化,以及Comet自身依然是基于HTTP,在性能消耗和瓶頸上無(wú)法跳脫HTTP,就把Comet廢棄了。

還有一個(gè)SPDY技術(shù),也對(duì)HTTP進(jìn)行了改進(jìn),多路復(fù)用流、服務(wù)器推送等,后來(lái)演化成HTTP/2.0,因?yàn)檫m用場(chǎng)景和解決的問(wèn)題不同,暫不對(duì)HTTP/2.0做過(guò)多解釋,不過(guò)對(duì)于HTTP/2.0和WebSocket在Tomcat實(shí)現(xiàn)中都是作為協(xié)議升級(jí)來(lái)處理的。

(Comet和SPDY的原理不是本篇重點(diǎn),沒(méi)有展開(kāi)講解,感興趣的同學(xué)可自行百度)

(2)歷史

在這種背景下,HTML5制定了WebSocket

  • 籌備階段,WebSocket被劃分為HTML5標(biāo)準(zhǔn)的一部分,2008年6月,Michael Carter進(jìn)行了一系列討論,最終形成了稱(chēng)為WebSocket的協(xié)議。
  • 2009年12月,Google Chrome 4是第一個(gè)提供標(biāo)準(zhǔn)支持的瀏覽器,默認(rèn)情況下啟用了WebSocket。
  • 2010年2月,WebSocket協(xié)議的開(kāi)發(fā)從W3C和WHATWG小組轉(zhuǎn)移到IETF(TheInternet Engineering Task Force),并在Ian Hickson的指導(dǎo)下進(jìn)行了兩次修訂。
  • 2011年,IETF將WebSocket協(xié)議標(biāo)準(zhǔn)化為RFC 6455起,大多數(shù)Web瀏覽器都在實(shí)現(xiàn)支持WebSocket協(xié)議的客戶(hù)端API。此外,已經(jīng)開(kāi)發(fā)了許多實(shí)現(xiàn)WebSocket協(xié)議的Java庫(kù)。
  • 2013年,發(fā)布JSR356標(biāo)準(zhǔn),Java API for WebSocket。

(為什么要去了解WebSocket的發(fā)展歷史和背景呢?個(gè)人認(rèn)為可以更好的理解某個(gè)技術(shù)實(shí)現(xiàn)的演變歷程,比如Tomcat,早期有Comet沒(méi)有WebSocket時(shí),Tomcat就對(duì)Comet做了支持,后來(lái)有WebSocket了,但是還沒(méi)出JSR356標(biāo)準(zhǔn),Tomcat就對(duì)Websocket做了支持,自定義API,再后來(lái)有了JSR356,Tomcat立馬緊跟潮流,廢棄自定義的API,實(shí)現(xiàn)JSR356那一套,這就使得在Tomcat7使用WebSocket的同學(xué),想升為T(mén)omcat8(其實(shí)Tomcat7.0.47之后就是JSR356標(biāo)準(zhǔn)了),發(fā)現(xiàn)WebSocket接入方式變了,而且一些細(xì)節(jié)也變了。)

3、WebSocket握手和雙向通信

(1)定義

WebSocket全雙工通信協(xié)議,在客戶(hù)端和服務(wù)端建立連接后,可以持續(xù)雙向通信,和HTTP同屬于應(yīng)用層協(xié)議,并且都依賴(lài)于傳輸層的TCP/IP協(xié)議。

雖然WebSocket有別于HTTP,是一種新協(xié)議,但是RFC 6455中規(guī)定:

it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries.

  • WebSocket通過(guò)HTTP端口80和443進(jìn)行工作,并支持HTTP代理和中介,從而使其與HTTP協(xié)議兼容。
  • 為了實(shí)現(xiàn)兼容性,WebSocket握手使用HTTP Upgrade頭從HTTP協(xié)議更改為WebSocket協(xié)議。
  • Websocket使用ws或wss的統(tǒng)一資源標(biāo)志符(URI),分別對(duì)應(yīng)明文和加密連接。

(2)握手(建立連接)

在雙向通信之前,必須通過(guò)握手建立連接。Websocket通過(guò) HTTP/1.1 協(xié)議的101狀態(tài)碼進(jìn)行握手,首先客戶(hù)端(如瀏覽器)發(fā)出帶有特殊消息頭(Upgrade、Connection)的請(qǐng)求到服務(wù)器,服務(wù)器判斷是否支持升級(jí),支持則返回響應(yīng)狀態(tài)碼101,表示協(xié)議升級(jí)成功,對(duì)于WebSocket就是握手成功。

客戶(hù)端請(qǐng)求示例:

GET /test HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: tFGdnEL/5fXMS9yKwBjllg== Origin: http://example.com Sec-WebSocket-Protocol: v10.stomp, v11.stomp, v12.stomp Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits Sec-WebSocket-Version: 13
  • Connection必須設(shè)置Upgrade,表示客戶(hù)端希望連接升級(jí)。
  • Upgrade: websocket表明協(xié)議升級(jí)為websocket。
  • Sec-WebSocket-Key字段內(nèi)記錄著握手過(guò)程中必不可少的鍵值,由客戶(hù)端(瀏覽器)生成,可以盡量避免普通HTTP請(qǐng)求被誤認(rèn)為Websocket協(xié)議。
  • Sec-WebSocket-Version 表示支持的Websocket版本。RFC6455要求使用的版本是13。
  • Origin字段是必須的。如果缺少origin字段,WebSocket服務(wù)器需要回復(fù)HTTP 403 狀態(tài)碼(禁止訪問(wèn)),通過(guò)Origin可以做安全校驗(yàn)。

服務(wù)端響應(yīng)示例:

HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: HaA6EjhHRejpHyuO0yBnY4J4n3A= Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15 Sec-WebSocket-Protocol: v12.stomp

Sec-WebSocket-Accept的字段值是由握手請(qǐng)求中的Sec-WebSocket-Key的字段值生成的。成功握手確立WebSocket連接之后,通信時(shí)不再使用HTTP的數(shù)據(jù)幀,而采用WebSocket獨(dú)立的數(shù)據(jù)幀。

(3)消息幀

WebSocket使用二進(jìn)制消息幀作為雙向通信的媒介。何為消息幀?發(fā)送方將每個(gè)應(yīng)用程序消息拆分為一個(gè)或多個(gè)幀,通過(guò)網(wǎng)絡(luò)將它們傳輸?shù)侥康牡?#xff0c;并重新組裝解析出一個(gè)完整消息。

有別于HTTP/1.1文本消息格式(冗長(zhǎng)的消息頭和分隔符等),WebSocket消息幀規(guī)定一定的格式,以二進(jìn)制傳輸,更加短小精悍。二者相同之處就是都是基于TCP/IP流式協(xié)議(沒(méi)有規(guī)定消息邊界)。

如下是消息幀的基本結(jié)構(gòu)圖:

  • FIN: 1 bit,表示該幀是否為消息的最后一幀。1-是,0-否。
  • RSV1,RSV2,RSV3: 1 bit each,預(yù)留(3位),擴(kuò)展的預(yù)留標(biāo)志。一般情況為0,除非協(xié)商的擴(kuò)展定義為非零值。如果接收到非零值且不為協(xié)商擴(kuò)展定義,接收端必須使連接失敗。
  • Opcode: 4 bits,定義消息幀的操作類(lèi)型,如果接收到一個(gè)未知Opcode,接收端必須使連接失敗。(0x0-延續(xù)幀,0x1-文本幀,0x2-二進(jìn)制幀,0x8-關(guān)閉幀,0x9-PING幀,0xA-PONG幀(在接收到PING幀時(shí),終端必須發(fā)送一個(gè)PONG幀響應(yīng),除非它已經(jīng)接收到關(guān)閉幀),0x3-0x7保留給未來(lái)的非控制幀,0xB-F保留給未來(lái)的控制幀)
  • Mask: 1 bit,表示該幀是否為隱藏的,即被加密保護(hù)的。1-是,0-否。Mask=1時(shí),必須傳一個(gè)Masking-key,用于解除隱藏(客戶(hù)端發(fā)送消息給服務(wù)器端,Mask必須為1)。
  • Payload length: 7 bits, 7+16 bits, or 7+64 bits,有效載荷數(shù)據(jù)的長(zhǎng)度(擴(kuò)展數(shù)據(jù)長(zhǎng)度+應(yīng)用數(shù)據(jù)長(zhǎng)度,擴(kuò)展數(shù)據(jù)長(zhǎng)度可以為0)。

if 0-125, that is the payload length. If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length. If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length.

  • Masking-key: 0 or 4 bytes,用于解除幀隱藏(加密)的key,Mask=1時(shí)不為空,Mask=0時(shí)不用傳。
  • Payload data: (x+y) bytes,有效載荷數(shù)據(jù)包括擴(kuò)展數(shù)據(jù)(x bytes)和應(yīng)用數(shù)據(jù)(y bytes)。有效載荷數(shù)據(jù)是用戶(hù)真正要傳輸?shù)臄?shù)據(jù)。

這樣的二進(jìn)制消息幀設(shè)計(jì),與HTTP協(xié)議相比,WebSocket協(xié)議可以提供約500:1的流量減少和3:1的延遲減少。

(4)揮手(關(guān)閉連接)

揮手相對(duì)于握手要簡(jiǎn)單很多,客戶(hù)端和服務(wù)器端任何一方都可以通過(guò)發(fā)送關(guān)閉幀來(lái)發(fā)起揮手請(qǐng)求。發(fā)送關(guān)閉幀的一方,之后不再發(fā)送任何數(shù)據(jù)給對(duì)方;接收到關(guān)閉幀的一方,如果之前沒(méi)有發(fā)送過(guò)關(guān)閉幀,則必須發(fā)送一個(gè)關(guān)閉幀作為響應(yīng)。關(guān)閉幀中可以攜帶關(guān)閉原因。

在發(fā)送和接收一個(gè)關(guān)閉幀消息之后,就認(rèn)為WebSocket連接已關(guān)閉,且必須關(guān)閉底層TCP連接。

除了通過(guò)關(guān)閉握手來(lái)關(guān)閉連接外,WebSocket連接也可能在另一方離開(kāi)或底層TCP連接關(guān)閉時(shí)突然關(guān)閉。

4、WebSocket優(yōu)點(diǎn)

  • 較少的控制開(kāi)銷(xiāo)。在連接建立后,服務(wù)器和客戶(hù)端之間交換數(shù)據(jù)時(shí),用于協(xié)議控制的數(shù)據(jù)包頭部相對(duì)于HTTP請(qǐng)求每次都要攜帶完整的頭部,顯著減少。

  • 更強(qiáng)的實(shí)時(shí)性。由于協(xié)議是全雙工的,所以服務(wù)器可以隨時(shí)主動(dòng)給客戶(hù)端下發(fā)數(shù)據(jù)。相對(duì)于HTTP請(qǐng)求需要等待客戶(hù)端發(fā)起請(qǐng)求服務(wù)端才能響應(yīng),延遲明顯更少。

  • 保持連接狀態(tài)。與HTTP不同的是,Websocket需要先建立連接,這就使得其成為一種有狀態(tài)的協(xié)議,之后通信時(shí)可以省略部分狀態(tài)信息。而HTTP請(qǐng)求可能需要在每個(gè)請(qǐng)求都攜帶狀態(tài)信息(如身份認(rèn)證等)。

  • 更好的二進(jìn)制支持。Websocket定義了二進(jìn)制幀,相對(duì)HTTP,可以更輕松地處理二進(jìn)制內(nèi)容。

  • 支持?jǐn)U展。Websocket定義了擴(kuò)展,用戶(hù)可以擴(kuò)展協(xié)議、實(shí)現(xiàn)部分自定義的子協(xié)議。

  • 更好的壓縮效果。相對(duì)于HTTP壓縮,Websocket在適當(dāng)?shù)臄U(kuò)展支持下,可以沿用之前內(nèi)容的上下文,在傳遞類(lèi)似的數(shù)據(jù)時(shí),可以顯著提高壓縮率。

三、Java API for WebSocket(JSR356)

JSR356在Java EE7時(shí)歸為Java EE標(biāo)準(zhǔn)的一部分(后來(lái)Java EE更名為Jakarta EE,世上再無(wú)Java EE,以下統(tǒng)一稱(chēng)Jakarta EE),所有兼容Jakarta EE的應(yīng)用服務(wù)器,都必須遵循JSR356標(biāo)準(zhǔn)的WebSocket協(xié)議API。

根據(jù)JSR356規(guī)定, 建立WebSocket連接的服務(wù)器端和客戶(hù)端,兩端對(duì)稱(chēng),可以互相通信,差異性較小,抽象成API,就是一個(gè)個(gè)Endpoint(端點(diǎn)),只不過(guò)服務(wù)器端的叫ServerEndpoint,客戶(hù)端的叫ClientEndpoint。客戶(hù)端向服務(wù)端發(fā)送WebSocket握手請(qǐng)求,建立連接后就創(chuàng)建一個(gè)ServerEndpoint對(duì)象。(這里的Endpoint和Tomcat連接器里的AbstractEndpoint名稱(chēng)上有點(diǎn)像,但是兩個(gè)毫不相干的東西,就像周杰倫和周杰的關(guān)系。)

ServerEndpoint和ClientEndpoint在API上差異也很小,有相同的生命周期事件(OnOpen、OnClose、OnError、OnMessage),不同之處是ServerEndpoint作為服務(wù)器端點(diǎn),可以指定一個(gè)URI路徑供客戶(hù)端連接,ClientEndpoint沒(méi)有。

1、服務(wù)端API

服務(wù)器端的Endpoint有兩種實(shí)現(xiàn)方式,一種是注解方式@ServerEndpoint,一種是繼承抽象類(lèi)Endpoint。

(1)注解方式@ServerEndpoint

首先看看@ServerEndpoint有哪些要素:

  • value,可以指定一個(gè)URI路徑標(biāo)識(shí)一個(gè)Endpoint。
  • subprotocols,用戶(hù)在WebSocket協(xié)議下自定義擴(kuò)展一些子協(xié)議。
  • decoders,用戶(hù)可以自定義一些消息解碼器,比如通信的消息是一個(gè)對(duì)象,接收到消息可以自動(dòng)解碼封裝成消息對(duì)象。
  • encoders,有解碼器就有編碼器,定義解碼器和編碼器的好處是可以規(guī)范使用層消息的傳輸。
  • configurator,ServerEndpoint配置類(lèi),主要提供ServerEndpoint對(duì)象的創(chuàng)建方式擴(kuò)展(如果使用Tomcat的WebSocket實(shí)現(xiàn),默認(rèn)是反射創(chuàng)建ServerEndpoint對(duì)象)。

@ServerEndpoint可以注解到任何類(lèi)上,但是想實(shí)現(xiàn)服務(wù)端的完整功能,還需要配合幾個(gè)生命周期的注解使用,這些生命周期注解只能注解在方法上:

  • @OnOpen 建立連接時(shí)觸發(fā)。
  • @OnClose 關(guān)閉連接時(shí)觸發(fā)。
  • @OnError 發(fā)生異常時(shí)觸發(fā)。
  • @OnMessage 接收到消息時(shí)觸發(fā)。

(2)繼承抽象類(lèi)Endpoint

繼承抽象類(lèi)Endpoint,重寫(xiě)幾個(gè)生命周期方法。

怎么沒(méi)有onMessage方法,實(shí)現(xiàn)onMessage還需要繼承實(shí)現(xiàn)一個(gè)接口jakarta.websocket.MessageHandler,MessageHandler接口又分為Partial和Whole,實(shí)現(xiàn)的MessageHandler需要在onOpen觸發(fā)時(shí)注冊(cè)到j(luò)akarta.websocket.Session中。

繼承抽象類(lèi)Endpoint的方式相對(duì)于注解方式要麻煩的多,除了繼承Endpoint和實(shí)現(xiàn)接口MessageHandler外,還必須實(shí)現(xiàn)一個(gè)jakarta.websocket.server.ServerApplicationConfig來(lái)管理Endpoint,比如給Endpoint分配URI路徑。

而encoders、decoders、configurator等配置信息由jakarta.websocket.server.ServerEndpointConfig管理,默認(rèn)實(shí)現(xiàn)jakarta.websocket.server.DefaultServerEndpointConfig。

所以如果使用 Java 版WebSocket服務(wù)器端實(shí)現(xiàn)首推注解方式。

2、客戶(hù)端API

對(duì)于客戶(hù)端API,也是有注解方式和繼承抽象類(lèi)Endpoint方式。

  • 注解方式,只需要將@ServerEndpoint換成@ClientEndpoint。
  • 繼承抽象類(lèi)Endpoint方式,需要一個(gè)jakarta.websocket.ClientEndpointConfig來(lái)管理encoders、decoders、configurator等配置信息,默認(rèn)實(shí)現(xiàn)jakarta.websocket.DefaultClientEndpointConfig。

3、上下文Session

WebSocket是一個(gè)有狀態(tài)的連接,建立連接后的通信都是通過(guò)jakarta.websocket.Session保持狀態(tài),一個(gè)連接一個(gè)Session,每一個(gè)Session有一個(gè)唯一標(biāo)識(shí)Id。

Session的主要職責(zé)涉及:

  • 基礎(chǔ)信息管理(request信息(getRequestURI、getRequestParameterMap、getPathParameters等)、協(xié)議版本getProtocolVersion、子協(xié)議getNegotiatedSubprotocol等)。
  • 連接管理(狀態(tài)判斷isOpen、接收消息的MessageHandler、發(fā)送消息的異步遠(yuǎn)程端點(diǎn)RemoteEndpoint.Async和同步遠(yuǎn)程端點(diǎn)RemoteEndpoint.Basic等)。

4、HandshakeRequest 和 HandshakeResponse

HandshakeRequest 和 HandshakeResponse了解即可,這兩個(gè)接口主要用于WebScoket握手升級(jí)過(guò)程中握手請(qǐng)求響應(yīng)的封裝,如果只是單純使用WebSocket,不會(huì)接觸到這兩個(gè)接口。

(1)HandshakeRequest

(2)HandshakeResponse

Sec-WebSocket-Accept根據(jù)客戶(hù)端傳的Sec-WebSocket-Key生成,如下是Tomcat10.0.6 WebSocket源碼實(shí)現(xiàn)中生成Sec-WebSocket-Accept的算法:

private static String getWebSocketAccept(String key) {byte[] digest = ConcurrentMessageDigest.digestSHA1(key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT);return Base64.encodeBase64String(digest); }

5、WebSocketContainer

jakarta.websocket.WebSocketContainer顧名思義,就是WebSocket的容器,集大成者。其主要職責(zé)包括但不限于connectToServer,客戶(hù)端連接服務(wù)器端,基于瀏覽器的WebSocket客戶(hù)端連接服務(wù)器端,由瀏覽器支持,但是基于Java版的WebSocket客戶(hù)端就可以通過(guò)WebSocketContainer#connectToServer向服務(wù)端發(fā)起連接請(qǐng)求。


四、WebSocket基于Tomcat應(yīng)用

(如下使用的是javax.websocket包,未使用最新的jakarta.websocket,主要是測(cè)試項(xiàng)目基于SpringBoot+Tomcat9.x的,Java API for WebSocket版本需要保持一致。)

1、服務(wù)器端實(shí)現(xiàn)

(1)@ServerEndpoint注解方式

import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong;@ServerEndpoint(value = "/ws/test/{userId}", encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}, configurator = MyServerConfigurator.class) public class WebSocketServerEndpoint {private Session session;private String userId;@OnOpenpublic void OnOpen(Session session, @PathParam(value = "userId") String userId) {this.session = session;this.userId = userId;// 建立連接后,將連接存到一個(gè)map里endpointMap.put(userId, this);Message message = new Message(0, "connected, hello " + userId);sendMsg(message);}@OnClosepublic void OnClose() {// 關(guān)閉連接時(shí)觸發(fā),從map中刪除連接endpointMap.remove(userId);System.out.println("server closed...");}@OnMessagepublic void onMessage(Message message) {System.out.println("server recive message=" + message.toString());}@OnErrorpublic void onError(Throwable t) throws Throwable {this.session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "系統(tǒng)異常"));t.printStackTrace();}/*** 群發(fā)* @param data*/public void sendAllMsg(Message data) {for (WebSocketServerEndpoint value : endpointMap.values()) {value.sendMsgAsync(data);}}/*** 推送消息給指定 userId* @param data* @param userId*/public void sendMsg(Message data, String userId) {WebSocketServerEndpoint endpoint = endpointMap.get(userId);if (endpoint == null) {System.out.println("not conected to " + userId);return;}endpoint.sendMsgAsync(data);}private void sendMsg(Message data) {try {this.session.getBasicRemote().sendObject(data);} catch (IOException ioException) {ioException.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}private void sendMsgAsync(Message data) {this.session.getAsyncRemote().sendObject(data);}// 存儲(chǔ)建立連接的Endpointprivate static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>(); }

每一個(gè)客戶(hù)端與服務(wù)器端建立連接后,都會(huì)生成一個(gè)WebSocketServerEndpoint,可以通過(guò)一個(gè)Map將其與userId對(duì)應(yīng)存起來(lái),為后續(xù)群發(fā)廣播和單獨(dú)推送消息給某個(gè)客戶(hù)端提供便利。

注意:@ServerEndpoint的encoders、decoders、configurator等配置信息在實(shí)際使用中可以不定義,如果項(xiàng)目簡(jiǎn)單,完全可以用默認(rèn)的。

如果通信消息被封裝成一個(gè)對(duì)象,如示例的Message(因?yàn)樵创a過(guò)于簡(jiǎn)單就不展示了,屬性主要有code、msg、data),就必須提供編碼器和解碼器。也可以在每次發(fā)送消息時(shí)硬編碼轉(zhuǎn)為字符串,在接收到消息時(shí)轉(zhuǎn)為Message。有了編碼器和解碼器,顯得比較規(guī)范,轉(zhuǎn)為字符串由編碼器做,字符串轉(zhuǎn)為對(duì)象由解碼器做,但也使得架構(gòu)變復(fù)雜了,視項(xiàng)目需求而定。

Configurator的用處就是自定義Endpoint對(duì)象創(chuàng)建方式,默認(rèn)Tomcat提供的是通過(guò)反射。WebScoket是每個(gè)連接都會(huì)創(chuàng)建一個(gè)Endpoint對(duì)象,如果連接比較多,很頻繁,通過(guò)反射創(chuàng)建,用后即毀,可能不是一個(gè)好主意,所以可以搞一個(gè)對(duì)象池,用過(guò)回收,用時(shí)先從對(duì)象池中拿,有就重置,省去實(shí)例化分配內(nèi)存等消耗過(guò)程。

如果使用SpringBoot內(nèi)置Tomcat、undertow、Netty等,接入WebSocket時(shí)除了加@ServerEndpoint還需要加一個(gè)@Component,再給Spring注冊(cè)一個(gè)ServerEndpointExporter類(lèi),這樣,服務(wù)端Endpoint就交由Spring去掃描注冊(cè)了。

@Configuration public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {ServerEndpointExporter serverEndpointExporter = new ServerEndpointExporter();return serverEndpointExporter;} }

外置Tomcat就不需要這么麻煩,Tomcat會(huì)默認(rèn)掃描classpath下帶有@ServerEndpoint注解的類(lèi)。(SpringBoot接入Websocket后續(xù)會(huì)單獨(dú)出文章講解,也挺有意思的)

(2)繼承抽象類(lèi)Endpoint方式

import javax.websocket.*; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap;public class WebSocketServerEndpoint extends Endpoint {private Session session;private String userId;@Overridepublic void onOpen(Session session, EndpointConfig endpointConfig) {this.session = session;this.userId = session.getPathParameters().get("userId");session.addMessageHandler(new MessageHandler());endpointMap.put(userId, this);Message message = new Message(0, "connected, hello " + userId);sendMsg(message);}@Overridepublic void onClose(Session session, CloseReason closeReason) {endpointMap.remove(userId);}@Overridepublic void onError(Session session, Throwable throwable) {throwable.printStackTrace();}/*** 群發(fā)* @param data*/public void sendAllMsg(Message data) {for (WebSocketServerEndpoint value : endpointMap.values()) {value.sendMsgAsync(data);}}/*** 推送消息給指定 userId* @param data* @param userId*/public void sendMsg(Message data, String userId) {WebSocketServerEndpoint endpoint = endpointMap.get(userId);if (endpoint == null) {System.out.println("not conected to " + userId);return;}endpoint.sendMsgAsync(data);}private void sendMsg(Message data) {try {this.session.getBasicRemote().sendObject(data);} catch (IOException ioException) {ioException.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}private void sendMsgAsync(Message data) {this.session.getAsyncRemote().sendObject(data);}private class MessageHandler implements javax.websocket.MessageHandler.Whole<Message> {@Overridepublic void onMessage(Message message) {System.out.println("server recive message=" + message.toString());}}private static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>();}

繼承抽象類(lèi)Endpoint方式比加注解@ServerEndpoint方式麻煩的很,主要是需要自己實(shí)現(xiàn)MessageHandler和ServerApplicationConfig。@ServerEndpoint的話(huà)都是使用默認(rèn)的,原理上差不多,只是注解更自動(dòng)化,更簡(jiǎn)潔。

MessageHandler做的事情,一個(gè)@OnMessage就搞定了,ServerApplicationConfig做的URI映射、decoders、encoders,configurator等,一個(gè)@ServerEndpoint就可以了。

import javax.websocket.Decoder; import javax.websocket.Encoder; import javax.websocket.Endpoint; import javax.websocket.server.ServerApplicationConfig; import javax.websocket.server.ServerEndpointConfig; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set;public class MyServerApplicationConfig implements ServerApplicationConfig {@Overridepublic Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {Set<ServerEndpointConfig> result = new HashSet<ServerEndpointConfig>();List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();decoderList.add(MessageDecoder.class);List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();encoderList.add(MessageEncoder.class);if (set.contains(WebSocketServerEndpoint3.class)) {ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(WebSocketServerEndpoint3.class, "/ws/test3").decoders(decoderList).encoders(encoderList).configurator(new MyServerConfigurator()).build();result.add(serverEndpointConfig);}return result;}@Overridepublic Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {return set;} }

如果使用SpringBoot內(nèi)置Tomcat,則不需要ServerApplicationConfig了,但是需要給Spring注冊(cè)一個(gè)ServerEndpointConfig。

@Bean public ServerEndpointConfig serverEndpointConfig() {List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();decoderList.add(MessageDecoder.class);List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();encoderList.add(MessageEncoder.class);ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(WebSocketServerEndpoint3.class, "/ws/test3/{userId}").decoders(decoderList).encoders(encoderList).configurator(new MyServerConfigurator()).build();return serverEndpointConfig; }

(3)早期Tomcat7中Server端實(shí)現(xiàn)對(duì)比

Tomcat7早期版本7.0.47之前還沒(méi)有出JSR 356時(shí),自己搞了一套接口,其實(shí)就是一個(gè)Servlet。

和遵循JSR356標(biāo)準(zhǔn)的版本對(duì)比,有一個(gè)比較大的變化是,createWebSocketInbound創(chuàng)建生命周期事件處理器StreamInbound的時(shí)機(jī)是WebSocket協(xié)議升級(jí)之前,此時(shí)還可以通過(guò)用戶(hù)線程緩存(ThreadLocal等)的HttpServletRequest對(duì)象,獲取一些請(qǐng)求頭等信息。

而遵循JSR356標(biāo)準(zhǔn)的版本實(shí)現(xiàn),創(chuàng)建生命周期事件處理的Endpoint是在WebSocket協(xié)議升級(jí)完成(經(jīng)過(guò)HTTP握手)之后創(chuàng)建的,而WebSocket握手成功給客戶(hù)端響應(yīng)101前,會(huì)結(jié)束銷(xiāo)毀HttpServletRequest對(duì)象,此時(shí)是獲取不到請(qǐng)求頭等信息的。

import org.apache.catalina.websocket.StreamInbound; import org.apache.catalina.websocket.WebSocketServlet;import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServletRequest;@WebServlet(urlPatterns = "/ws/test") public class MyWeSocketServlet extends WebSocketServlet {@Overrideprotected StreamInbound createWebSocketInbound(String subProtocol, HttpServletRequest request) {MyMessageInbound messageInbound = new MyMessageInbound(subProtocol, request);return messageInbound;}} import org.apache.catalina.websocket.MessageInbound; import org.apache.catalina.websocket.WsOutbound;import javax.servlet.http.HttpServletRequest; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer;public class MyMessageInbound extends MessageInbound {private String subProtocol;private HttpServletRequest request;public MyMessageInbound(String subProtocol, HttpServletRequest request) {this.subProtocol = subProtocol;this.request = request;}@Overrideprotected void onOpen(WsOutbound outbound) {String msg = "connected, hello";ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());try {outbound.writeBinaryMessage(byteBuffer);} catch (IOException e) {e.printStackTrace();}}@Overrideprotected void onClose(int status) {}@Overrideprotected void onBinaryMessage(ByteBuffer byteBuffer) throws IOException {// 接收到客戶(hù)端信息}@Overrideprotected void onTextMessage(CharBuffer charBuffer) throws IOException {// 接收到客戶(hù)端信息} }

2、客戶(hù)端實(shí)現(xiàn)

(1)前端js版

js版的客戶(hù)端主要依托瀏覽器對(duì)WebScoket的支持,在生命周期事件觸發(fā)上和服務(wù)器端的差不多,這也應(yīng)證了建立WebSocket連接的兩端是對(duì)等的。

編寫(xiě)WebSocket客戶(hù)端需要注意以下幾點(diǎn):

  • 和服務(wù)器端商議好傳輸?shù)南⒌母袷?#xff0c;一般為json字符串,比較直觀,編碼解碼都很簡(jiǎn)單,也可以是其他商定的格式。
  • 需要心跳檢測(cè),定時(shí)給服務(wù)器端發(fā)送消息,保持連接正常。
  • 正常關(guān)閉連接,即關(guān)閉瀏覽器窗口前主動(dòng)關(guān)閉連接,以免服務(wù)器端拋異常。
  • 如果因?yàn)楫惓嚅_(kāi)連接,支持重連。
// 對(duì)websocket進(jìn)行簡(jiǎn)單封裝 WebSocketOption.prototype = {// 創(chuàng)建websocket操作createWebSocket: function () {try {if('WebSocket' in window) {this.ws = new WebSocket(this.wsUrl);} else if('MozWebSocket' in window) { this.ws = new MozWebSocket(this.wsUrl);} else {alert("您的瀏覽器不支持websocket協(xié)議,建議使用新版谷歌、火狐等瀏覽器,請(qǐng)勿使用IE10以下瀏覽器,360瀏覽器請(qǐng)使用極速模式,不要使用兼容模式!"); }this.lifeEventHandle();} catch(e) {this.reconnect(this.wsUrl);console.log(e);} },// 生命周期事件操作lifeEventHandle: function() {var self = this;this.ws.onopen = function (event) {self.connectCount = 1;//心跳檢測(cè)重置if (self.heartCheck == null) {self.heartCheck = new HeartCheckObj(self.ws);}self.sendMsg(5, "")self.heartCheck.reset().start(); console.log("websocket連接成功!" + new Date().toUTCString());};this.ws.onclose = function (event) {// 全部設(shè)置為初始值self.heartCheck = null;self.reconnect(self.wsUrl); console.log("websocket連接關(guān)閉!" + new Date().toUTCString());};this.ws.onerror = function () {self.reconnect(self.wsUrl);console.log("websocket連接錯(cuò)誤!");};//如果獲取到消息,心跳檢測(cè)重置this.ws.onmessage = function (event) { //心跳檢測(cè)重置if (self.heartCheck == null) {self.heartCheck = new HeartCheckObj(self.ws);}self.heartCheck.reset().start(); console.log("websocket收到消息啦:" + event.data);// 業(yè)務(wù)處理// 接收到的消息可以放到localStorage里,然后在其他地方取出來(lái)}},// 斷線重連操作reconnect: function() {var self = this;if (this.lockReconnect) return;console.log(this.lockReconnect)this.lockReconnect = true;//沒(méi)連接上會(huì)一直重連,設(shè)置延遲避免請(qǐng)求過(guò)多,重連時(shí)間設(shè)置按倍數(shù)增加setTimeout(function () { self.createWebSocket(self.wsUrl);self.lockReconnect = false;self.connectCount++;}, 10000 * (self.connectCount));},// 發(fā)送消息操作sendMsg: function(cmd, data) {var sendData = {"cmd": cmd, "msg": data};try {this.ws.send(JSON.stringify(sendData));} catch(err) {console.log("發(fā)送數(shù)據(jù)失敗, err=" + err)}},// 關(guān)閉websocket接口操作closeWs: function() {this.ws.close();} }/*** 封裝心跳檢測(cè)對(duì)象<p>*/ function HeartCheckObj(ws) {this.ws = ws;// 心跳時(shí)間this.timeout = 10000;// 定時(shí)事件this.timeoutObj = null;// 自動(dòng)斷開(kāi)事件this.serverTimeoutObj = null; } HeartCheckObj.prototype = {setWs: function(ws) {this.ws = ws;},reset: function() {clearTimeout(this.timeoutObj);clearTimeout(this.serverTimeoutObj);return this;},// 開(kāi)始心跳檢測(cè)start: function() {var self = this;this.timeoutObj = setTimeout(function() {//這里發(fā)送一個(gè)心跳,后端收到后,返回一個(gè)心跳消息,//onmessage拿到返回的心跳就說(shuō)明連接正常var ping = {"cmd":1, "msg": "ping"};self.ws.send(JSON.stringify(ping));//如果onmessage那里超過(guò)一定時(shí)間還沒(méi)重置,說(shuō)明后端主動(dòng)斷開(kāi)了self.serverTimeoutObj = setTimeout(function() {//如果onclose會(huì)執(zhí)行reconnect,我們執(zhí)行ws.close()就行了.如果直接執(zhí)行reconnect 會(huì)觸發(fā)onclose導(dǎo)致重連兩次self.ws.close(); }, self.timeout)}, self.timeout)} }/*** -------------------------* 創(chuàng)建websocket的主流程 ** -------------------------*/ var currentDomain = document.domain; var wsUrl = "ws://" + currentDomain + "/test"var webSocketOption = new WebSocketOption(wsUrl) webSocketOption.createWebSocket()// 監(jiān)聽(tīng)窗口關(guān)閉事件,當(dāng)窗口關(guān)閉時(shí),主動(dòng)去關(guān)閉websocket連接,防止連接還沒(méi)斷開(kāi)就關(guān)閉窗口,server端會(huì)拋異常。 window.onbeforeunload = function() {webSocketOption.closeWs(); }

這里推薦一個(gè)在線測(cè)試WebSocket連接和發(fā)送消息的網(wǎng)站easyswoole.com/wstool.html:

真的很牛逼,很方便,很簡(jiǎn)單。還有源碼github:https://github.com/easy-swoole/wstool,感興趣可以看看。

(2)@ClientEndpoint注解方式

Java版客戶(hù)端不用多說(shuō),把@ServerEndpoint換成@ClientEndpoint就可以了,其他都一樣。@ClientEndpoint比@ServerEndpoint就少了一個(gè)value,不需要設(shè)置URI。

@ClientEndpoint(encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}) public class WebSocketClientEndpoint {private Session session;@OnOpenpublic void OnOpen(Session session) {this.session = session;Message message = new Message(0, "connecting...");sendMsg(message);}@OnClosepublic void OnClose() {Message message = new Message(0, "client closed...");sendMsg(message);System.out.println("client closed");}@OnMessagepublic void onMessage(Message message) {System.out.println("client recive message=" + message.toString());}@OnErrorpublic void onError(Throwable t) throws Throwable {t.printStackTrace();}public void sendMsg(Message data) {try {this.session.getBasicRemote().sendObject(data);} catch (IOException ioException) {ioException.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}public void sendMsgAsync(Message data) {this.session.getAsyncRemote().sendObject(data);} }

連接服務(wù)器端:

WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(WebSocketClientEndpoint.class,new URI("ws://localhost:8080/ws/test"));

(3)繼承抽象類(lèi)Endpoint方式

繼承抽象類(lèi)Endpoint方式也和服務(wù)器端的差不多,但是不需要實(shí)現(xiàn)ServerApplicationConfig,需要實(shí)例化一個(gè)ClientEndpointConfig。Endpoint實(shí)現(xiàn)類(lèi)和服務(wù)器端的一樣,就省略了,如下是連接服務(wù)器端的代碼:

ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build(); container.connectToServer(new WebSocketClientEndpoint(),clientEndpointConfig,new URI("ws://localhost:8080/websocket/hello"));

3、基于Nginx反向代理注意事項(xiàng)

一般web服務(wù)器會(huì)用Nginx做反向代理,經(jīng)過(guò)Nginx反向轉(zhuǎn)發(fā)的HTTP請(qǐng)求不會(huì)帶上Upgrade和Connection消息頭,所以需要在Nginx配置里顯式指定需要升級(jí)為WebSocket的URI帶上這兩個(gè)頭:

location /chat/ {proxy_pass http://backend;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";proxy_connect_timeout 4s; proxy_read_timeout 7200s; proxy_send_timeout 12s; }

默認(rèn)情況下,如果代理服務(wù)器在60秒內(nèi)沒(méi)有傳輸任何數(shù)據(jù),連接將被關(guān)閉。這個(gè)超時(shí)可以通過(guò)proxy_read_timeout指令來(lái)增加。或者,可以將代理服務(wù)器配置為定期發(fā)送WebSocket PING幀以重置超時(shí)并檢查連接是否仍然活躍。

具體可參考:http://nginx.org/en/docs/http/websocket.html

五、WebSocket在Tomcat中的源碼實(shí)現(xiàn)

所有兼容Java EE的應(yīng)用服務(wù)器,必須遵循JSR356 WebSocket Java API標(biāo)準(zhǔn),Tomcat也不例外。而且Tomcat也是支持WebSocket最早的Web應(yīng)用服務(wù)器框架(之一),在還沒(méi)有出JSR356標(biāo)準(zhǔn)時(shí),就已經(jīng)自定義了一套WebSocket API,但是JSR356一出,不得不改弦更張。

通過(guò)前面的講解,在使用上完全沒(méi)有問(wèn)題,但是有幾個(gè)問(wèn)題完全是黑盒的:

  • Server Endpoint 是如何被掃描加載的?
  • WebSocket是如何借助HTTP 進(jìn)行握手升級(jí)的?
  • WebSocket建立連接后如何保持連接不斷,互相通信的?

(如下源碼解析,需要對(duì)Tomcat連接器源碼有一定了解)

1、WsSci初始化

Tomcat 提供了一個(gè)org.apache.tomcat.websocket.server.WsSci類(lèi)來(lái)初始化、加載WebSocket。從類(lèi)名上顧名思義,利用了Sci加載機(jī)制,何為Sci加載機(jī)制?就是實(shí)現(xiàn)接口 jakarta.servlet.ServletContainerInitializer,在Tomcat部署裝載Web項(xiàng)目(org.apache.catalina.core.StandardContext#startInternal)時(shí)主動(dòng)觸發(fā)ServletContainerInitializer#onStartup,做一些擴(kuò)展的初始化操作。

WsSci主要做了一件事,就是掃描加載Server Endpoint,并將其加到WebSocket容器里jakarta.websocket.WebSocketContainer。

WsSci主要會(huì)掃描三種類(lèi):

  • 加了@ServerEndpoint的類(lèi)。
  • Endpoint的子類(lèi)。
  • ServerApplicationConfig的子類(lèi)。

(1)WsSci#onStartup

@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class,Endpoint.class}) public class WsSci implements ServletContainerInitializer {@Overridepublic void onStartup(Set<Class<?>> clazzes, ServletContext ctx)throws ServletException {WsServerContainer sc = init(ctx, true);if (clazzes == null || clazzes.size() == 0) {return;}// Group the discovered classes by typeSet<ServerApplicationConfig> serverApplicationConfigs = new HashSet<>();Set<Class<? extends Endpoint>> scannedEndpointClazzes = new HashSet<>();Set<Class<?>> scannedPojoEndpoints = new HashSet<>();try {// wsPackage is "jakarta.websocket."String wsPackage = ContainerProvider.class.getName();wsPackage = wsPackage.substring(0, wsPackage.lastIndexOf('.') + 1);for (Class<?> clazz : clazzes) {JreCompat jreCompat = JreCompat.getInstance();int modifiers = clazz.getModifiers();if (!Modifier.isPublic(modifiers) ||Modifier.isAbstract(modifiers) ||Modifier.isInterface(modifiers) ||!jreCompat.isExported(clazz)) {// Non-public, abstract, interface or not in an exported// package (Java 9+) - skip it.continue;}// Protect against scanning the WebSocket API JARs// 防止掃描WebSocket API jarif (clazz.getName().startsWith(wsPackage)) {continue;}if (ServerApplicationConfig.class.isAssignableFrom(clazz)) {// 1、clazz是ServerApplicationConfig子類(lèi)serverApplicationConfigs.add((ServerApplicationConfig) clazz.getConstructor().newInstance());}if (Endpoint.class.isAssignableFrom(clazz)) {// 2、clazz是Endpoint子類(lèi)@SuppressWarnings("unchecked")Class<? extends Endpoint> endpoint =(Class<? extends Endpoint>) clazz;scannedEndpointClazzes.add(endpoint);}if (clazz.isAnnotationPresent(ServerEndpoint.class)) {// 3、clazz是加了注解ServerEndpoint的類(lèi)scannedPojoEndpoints.add(clazz);}}} catch (ReflectiveOperationException e) {throw new ServletException(e);}// Filter the resultsSet<ServerEndpointConfig> filteredEndpointConfigs = new HashSet<>();Set<Class<?>> filteredPojoEndpoints = new HashSet<>();if (serverApplicationConfigs.isEmpty()) {// 從這里看出@ServerEndpoint的服務(wù)器端是可以不用ServerApplicationConfig的filteredPojoEndpoints.addAll(scannedPojoEndpoints);} else {// serverApplicationConfigs不為空,for (ServerApplicationConfig config : serverApplicationConfigs) {Set<ServerEndpointConfig> configFilteredEndpoints =config.getEndpointConfigs(scannedEndpointClazzes);if (configFilteredEndpoints != null) {filteredEndpointConfigs.addAll(configFilteredEndpoints);}// getAnnotatedEndpointClasses 對(duì)于 scannedPojoEndpoints起到一個(gè)過(guò)濾作用// 不滿(mǎn)足條件的后面不加到WsServerContainer里Set<Class<?>> configFilteredPojos =config.getAnnotatedEndpointClasses(scannedPojoEndpoints);if (configFilteredPojos != null) {filteredPojoEndpoints.addAll(configFilteredPojos);}}}try {// 繼承抽象類(lèi)Endpoint的需要使用者手動(dòng)封裝成ServerEndpointConfig// 而加了注解@ServerEndpoint的類(lèi) Tomcat會(huì)自動(dòng)封裝成ServerEndpointConfig// Deploy endpointsfor (ServerEndpointConfig config : filteredEndpointConfigs) {sc.addEndpoint(config);}// Deploy POJOsfor (Class<?> clazz : filteredPojoEndpoints) {sc.addEndpoint(clazz, true);}} catch (DeploymentException e) {throw new ServletException(e);}}static WsServerContainer init(ServletContext servletContext,boolean initBySciMechanism) {WsServerContainer sc = new WsServerContainer(servletContext);servletContext.setAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc);// 注冊(cè)監(jiān)聽(tīng)器WsSessionListener給servletContext,// 在http session銷(xiāo)毀時(shí)觸發(fā) ws session的關(guān)閉銷(xiāo)毀servletContext.addListener(new WsSessionListener(sc));// Can't register the ContextListener again if the ContextListener is// calling this methodif (initBySciMechanism) {// 注冊(cè)監(jiān)聽(tīng)器WsContextListener給servletContext,// 在 servletContext初始化時(shí)觸發(fā)WsSci.init// 在 servletContext銷(xiāo)毀時(shí)觸發(fā)WsServerContainer的銷(xiāo)毀// 不過(guò)呢,只在WsSci.onStartup時(shí)注冊(cè)一次servletContext.addListener(new WsContextListener());}return sc;} }

從上述源碼中可以看出ServerApplicationConfig起到一個(gè)過(guò)濾的作用:

  • 當(dāng)沒(méi)有ServerApplicationConfig時(shí),加了@ServerEndpoint的類(lèi)會(huì)默認(rèn)全部加到一個(gè)Set集合(filteredPojoEndpoints),所以加了@ServerEndpoint的類(lèi)可以不需要自定義實(shí)現(xiàn)ServerApplicationConfig。
  • 當(dāng)有ServerApplicationConfig時(shí),ServerApplicationConfig#getEndpointConfigs用來(lái)過(guò)濾Endpoint子類(lèi),并且Endpoint子類(lèi)必須封裝成一個(gè)ServerEndpointConfig。
  • ServerApplicationConfig#getAnnotatedEndpointClasses用來(lái)過(guò)濾加了注解@ServerEndpoint的類(lèi),一般空實(shí)現(xiàn)就行了(如果不想某個(gè)類(lèi)被加到WsServerContainer里,那不加@ServerEndpoint不就可以了)。

過(guò)濾之后的Endpoint子類(lèi)和加了注解@ServerEndpoint的類(lèi)會(huì)分別調(diào)用不同形參的WsServerContainer#addEndpoint,將其加到WsServerContainer里。

(2)WsServerContainer#addEndpoint

  • 將Endpoint子類(lèi)加到WsServerContainer里,調(diào)用的是形參為ServerEndpointConfig的addEndpoint:
public void addEndpoint(ServerEndpointConfig sec) throws DeploymentException {addEndpoint(sec, false); }

因?yàn)镋ndpoint子類(lèi)需要使用者封裝成ServerEndpointConfig,不需要Tomcat來(lái)封裝。

  • 將加了注解@ServerEndpoint的類(lèi)加到WsServerContainer,調(diào)用的是形參為Class<?>的addEndpoint(fromAnnotatedPojo參數(shù)暫時(shí)在這個(gè)方法里沒(méi)什么用處):

該方法主要職責(zé)就是解析@ServerEndpoint,獲取path、decoders、encoders、configurator等構(gòu)建一個(gè)ServerEndpointConfig對(duì)象

最終調(diào)用的都是如下這個(gè)比較復(fù)雜的方法,fromAnnotatedPojo表示是否是加了@ServerEndpoint的類(lèi)。主要做了兩件事:

  • 對(duì)加了@ServerEndpoint類(lèi)的生命周期方法(@OnOpen、@OnClose、@OnError、@OnMessage)的掃描和映射封裝。

  • 對(duì)path的有效性檢查和path param解析。

(3)PojoMethodMapping方法映射和形參解析

PojoMethodMapping構(gòu)造函數(shù)比較長(zhǎng),主要是對(duì)加了@OnOpen、@OnClose、@OnError、@OnMessage的方法進(jìn)行校驗(yàn)和映射,以及對(duì)每個(gè)方法的形參進(jìn)行解析和校驗(yàn),主要邏輯總結(jié)如下:

  • 對(duì)當(dāng)前類(lèi)以及其父類(lèi)中的方法進(jìn)行掃描。
  • 當(dāng)前類(lèi)中不能存在多個(gè)相同注解的方法,否則會(huì)拋出Duplicate annotation異常。
  • 父類(lèi)和子類(lèi)中存在相同注解的方法,子類(lèi)必須重寫(xiě)該方法,否則會(huì)拋出Duplicate annotation異常。
  • 對(duì)于@OnMessage,可以有多個(gè),但是接收消息的類(lèi)型必須不同,消息類(lèi)型大概分為三種:PongMessage心跳消息、字節(jié)型、字符型。
  • 如果掃描到對(duì)的注解都是父類(lèi)的方法,子類(lèi)重寫(xiě)了該方法,但是沒(méi)有加響應(yīng)的注解,則會(huì)被清除。
  • 形參解析。
public PojoMethodMapping(Class<?> clazzPojo, List<Class<? extends Decoder>> decoderClazzes, String wsPath,InstanceManager instanceManager) throws DeploymentException {this.wsPath = wsPath;List<DecoderEntry> decoders = Util.getDecoders(decoderClazzes, instanceManager);Method open = null;Method close = null;Method error = null;Method[] clazzPojoMethods = null;Class<?> currentClazz = clazzPojo;while (!currentClazz.equals(Object.class)) {Method[] currentClazzMethods = currentClazz.getDeclaredMethods();if (currentClazz == clazzPojo) {clazzPojoMethods = currentClazzMethods;}for (Method method : currentClazzMethods) {if (method.isSynthetic()) {// Skip all synthetic methods.// They may have copies of annotations from methods we are// interested in and they will use the wrong parameter type// (they always use Object) so we can't used them here.continue;}if (method.getAnnotation(OnOpen.class) != null) {checkPublic(method);if (open == null) {open = method;} else {if (currentClazz == clazzPojo ||!isMethodOverride(open, method)) {// Duplicate annotation// 拋出Duplicate annotation異常的兩種情況:// 1. 當(dāng)前的類(lèi)有多個(gè)相同注解的方法,如有兩個(gè)@OnOpen// 2. 當(dāng)前類(lèi)時(shí)父類(lèi),有相同注解的方法,但是其子類(lèi)沒(méi)有重寫(xiě)這個(gè)方法// 即 父類(lèi)和子類(lèi)有多個(gè)相同注解的方法,且沒(méi)有重寫(xiě)關(guān)系throw new DeploymentException(sm.getString("pojoMethodMapping.duplicateAnnotation",OnOpen.class, currentClazz));}}} else if (method.getAnnotation(OnClose.class) != null) {checkPublic(method);if (close == null) {close = method;} else {if (currentClazz == clazzPojo ||!isMethodOverride(close, method)) {// Duplicate annotationthrow new DeploymentException(sm.getString("pojoMethodMapping.duplicateAnnotation",OnClose.class, currentClazz));}}} else if (method.getAnnotation(OnError.class) != null) {checkPublic(method);if (error == null) {error = method;} else {if (currentClazz == clazzPojo ||!isMethodOverride(error, method)) {// Duplicate annotationthrow new DeploymentException(sm.getString("pojoMethodMapping.duplicateAnnotation",OnError.class, currentClazz));}}} else if (method.getAnnotation(OnMessage.class) != null) {checkPublic(method);MessageHandlerInfo messageHandler = new MessageHandlerInfo(method, decoders);boolean found = false;// 第一次掃描OnMessage時(shí),onMessage為空,不會(huì)走下面的for,然后就把messageHandler加到onMessage里// 如果非首次掃描到這里,即向上掃描父類(lèi),允許有多個(gè)接收消息類(lèi)型完全不同的onmessagefor (MessageHandlerInfo otherMessageHandler : onMessage) {// 如果多個(gè)onmessage接收的消息類(lèi)型有相同的,則可能會(huì)拋出Duplicate annotation// 1. 同一個(gè)類(lèi)中多個(gè)onmessage有接收相同類(lèi)型的消息// 2. 父子類(lèi)中多個(gè)onmessage有接收相同類(lèi)型的消息,但不是重寫(xiě)關(guān)系if (messageHandler.targetsSameWebSocketMessageType(otherMessageHandler)) {found = true;if (currentClazz == clazzPojo ||!isMethodOverride(messageHandler.m, otherMessageHandler.m)) {// Duplicate annotationthrow new DeploymentException(sm.getString("pojoMethodMapping.duplicateAnnotation",OnMessage.class, currentClazz));}}}if (!found) {onMessage.add(messageHandler);}} else {// Method not annotated}}currentClazz = currentClazz.getSuperclass();}// If the methods are not on clazzPojo and they are overridden// by a non annotated method in clazzPojo, they should be ignoredif (open != null && open.getDeclaringClass() != clazzPojo) {// open 有可能是父類(lèi)的,子類(lèi)即clazzPojo有重寫(xiě)該方法,但是沒(méi)有加OnOpen注解// 則 open置為nullif (isOverridenWithoutAnnotation(clazzPojoMethods, open, OnOpen.class)) {open = null;}}if (close != null && close.getDeclaringClass() != clazzPojo) {if (isOverridenWithoutAnnotation(clazzPojoMethods, close, OnClose.class)) {close = null;}}if (error != null && error.getDeclaringClass() != clazzPojo) {if (isOverridenWithoutAnnotation(clazzPojoMethods, error, OnError.class)) {error = null;}}List<MessageHandlerInfo> overriddenOnMessage = new ArrayList<>();for (MessageHandlerInfo messageHandler : onMessage) {if (messageHandler.m.getDeclaringClass() != clazzPojo&& isOverridenWithoutAnnotation(clazzPojoMethods, messageHandler.m, OnMessage.class)) {overriddenOnMessage.add(messageHandler);}}// 子類(lèi)重寫(xiě)了的onmessage方法,但沒(méi)有加OnMessage注解的需要從onMessage list 中刪除for (MessageHandlerInfo messageHandler : overriddenOnMessage) {onMessage.remove(messageHandler);}this.onOpen = open;this.onClose = close;this.onError = error;// 參數(shù)解析onOpenParams = getPathParams(onOpen, MethodType.ON_OPEN);onCloseParams = getPathParams(onClose, MethodType.ON_CLOSE);onErrorParams = getPathParams(onError, MethodType.ON_ERROR); }

雖然方法名可以隨意,但是形參卻有著強(qiáng)制限制:

  • @onOpen方法,可以有的參數(shù)Session、EndpointConfig、@PathParam,不能有其他參數(shù)。
  • @onError方法,可以有的參數(shù)Session、@PathParam, 必須有Throwable,不能有其他參數(shù)。
  • @onClose方法,可以有的參數(shù)Session, CloseReason, @PathParam,不能有其他參數(shù)。

2、協(xié)議升級(jí)(握手)

Tomcat中WebSocket是通過(guò)UpgradeToken機(jī)制實(shí)現(xiàn)的,其具體的升級(jí)處理器為WsHttpUpgradeHandler。WebSocket協(xié)議升級(jí)的過(guò)程比較曲折,首先要通過(guò)過(guò)濾器WsFilter進(jìn)行升級(jí)判斷,然后調(diào)用org.apache.catalina.connector.Request#upgrade進(jìn)行UpgradeToken的構(gòu)建,最后通過(guò)org.apache.catalina.connector.Request#coyoteRequest回調(diào)函數(shù)action將UpgradeToken回傳給連接器為后續(xù)升級(jí)處理做準(zhǔn)備。

(1)WsFilter

WebSocket協(xié)議升級(jí)的過(guò)程比較曲折。帶有WebSocket握手的請(qǐng)求會(huì)平安經(jīng)過(guò)Tomcat的Connector,被轉(zhuǎn)發(fā)到Servlet容器中,在業(yè)務(wù)處理之前經(jīng)過(guò)過(guò)濾器WsFilter判斷是否需要升級(jí)(WsFilter 在 org.apache.catalina.core.ApplicationFilterChain過(guò)濾鏈中觸發(fā)):

  • 首先判斷WsServerContainer是否有進(jìn)行Endpoint的掃描和注冊(cè)以及請(qǐng)頭中是否有Upgrade: websocket。
  • 獲取請(qǐng)求path即uri在WsServerContainer中找對(duì)應(yīng)的ServerEndpointConfig。
  • 調(diào)用UpgradeUtil.doUpgrade進(jìn)行升級(jí)。

(2)UpgradeUtil#doUpgrade

UpgradeUtil#doUpgrade主要做了如下幾件事情:

  • 檢查HttpServletRequest的一些請(qǐng)求頭的有效性,如Connection: upgrade、Sec-WebSocket-Version:13、Sec-WebSocket-Key等。
  • 給HttpServletResponse設(shè)置一些響應(yīng)頭,如Upgrade:websocket、Connection: upgrade、根據(jù)Sec-WebSocket-Key的值生成響應(yīng)頭Sec-WebSocket-Accept的值。
  • 封裝WsHandshakeRequest和WsHandshakeResponse。
  • 調(diào)用HttpServletRequest#upgrade進(jìn)行升級(jí),并獲取WsHttpUpgradeHandler(具體的升級(jí)流程處理器)。
// org.apache.tomcat.websocket.server.UpgradeUtil#doUpgrade public static void doUpgrade(WsServerContainer sc, HttpServletRequest req,HttpServletResponse resp, ServerEndpointConfig sec,Map<String,String> pathParams)throws ServletException, IOException {// Validate the rest of the headers and reject the request if that// validation failsString key;String subProtocol = null;// 檢查請(qǐng)求頭中是否有 Connection: upgradeif (!headerContainsToken(req, Constants.CONNECTION_HEADER_NAME,Constants.CONNECTION_HEADER_VALUE)) {resp.sendError(HttpServletResponse.SC_BAD_REQUEST);return;}// 檢查請(qǐng)求頭中的 Sec-WebSocket-Version:13if (!headerContainsToken(req, Constants.WS_VERSION_HEADER_NAME,Constants.WS_VERSION_HEADER_VALUE)) {resp.setStatus(426);resp.setHeader(Constants.WS_VERSION_HEADER_NAME,Constants.WS_VERSION_HEADER_VALUE);return;}// 獲取 Sec-WebSocket-Keykey = req.getHeader(Constants.WS_KEY_HEADER_NAME);if (key == null) {resp.sendError(HttpServletResponse.SC_BAD_REQUEST);return;}// Origin check,校驗(yàn) Origin 是否有權(quán)限String origin = req.getHeader(Constants.ORIGIN_HEADER_NAME);if (!sec.getConfigurator().checkOrigin(origin)) {resp.sendError(HttpServletResponse.SC_FORBIDDEN);return;}// Sub-protocolsList<String> subProtocols = getTokensFromHeader(req,Constants.WS_PROTOCOL_HEADER_NAME);subProtocol = sec.getConfigurator().getNegotiatedSubprotocol(sec.getSubprotocols(), subProtocols);// Extensions// Should normally only be one header but handle the case of multiple// headersList<Extension> extensionsRequested = new ArrayList<>();Enumeration<String> extHeaders = req.getHeaders(Constants.WS_EXTENSIONS_HEADER_NAME);while (extHeaders.hasMoreElements()) {Util.parseExtensionHeader(extensionsRequested, extHeaders.nextElement());}// Negotiation phase 1. By default this simply filters out the// extensions that the server does not support but applications could// use a custom configurator to do more than this.List<Extension> installedExtensions = null;if (sec.getExtensions().size() == 0) {installedExtensions = Constants.INSTALLED_EXTENSIONS;} else {installedExtensions = new ArrayList<>();installedExtensions.addAll(sec.getExtensions());installedExtensions.addAll(Constants.INSTALLED_EXTENSIONS);}List<Extension> negotiatedExtensionsPhase1 = sec.getConfigurator().getNegotiatedExtensions(installedExtensions, extensionsRequested);// Negotiation phase 2. Create the Transformations that will be applied// to this connection. Note than an extension may be dropped at this// point if the client has requested a configuration that the server is// unable to support.List<Transformation> transformations = createTransformations(negotiatedExtensionsPhase1);List<Extension> negotiatedExtensionsPhase2;if (transformations.isEmpty()) {negotiatedExtensionsPhase2 = Collections.emptyList();} else {negotiatedExtensionsPhase2 = new ArrayList<>(transformations.size());for (Transformation t : transformations) {negotiatedExtensionsPhase2.add(t.getExtensionResponse());}}// Build the transformation pipelineTransformation transformation = null;StringBuilder responseHeaderExtensions = new StringBuilder();boolean first = true;for (Transformation t : transformations) {if (first) {first = false;} else {responseHeaderExtensions.append(',');}append(responseHeaderExtensions, t.getExtensionResponse());if (transformation == null) {transformation = t;} else {transformation.setNext(t);}}// Now we have the full pipeline, validate the use of the RSV bits.if (transformation != null && !transformation.validateRsvBits(0)) {throw new ServletException(sm.getString("upgradeUtil.incompatibleRsv"));}// 設(shè)置resp的響應(yīng)頭Upgrade:websocket、 Connection: upgrade 、Sec-WebSocket-Accept:// If we got this far, all is good. Accept the connection.resp.setHeader(Constants.UPGRADE_HEADER_NAME,Constants.UPGRADE_HEADER_VALUE);resp.setHeader(Constants.CONNECTION_HEADER_NAME,Constants.CONNECTION_HEADER_VALUE);// 通過(guò)Sec-WebSocket-Key生成Sec-WebSocket-Accept的值resp.setHeader(HandshakeResponse.SEC_WEBSOCKET_ACCEPT,getWebSocketAccept(key));if (subProtocol != null && subProtocol.length() > 0) {// RFC6455 4.2.2 explicitly states "" is not valid hereresp.setHeader(Constants.WS_PROTOCOL_HEADER_NAME, subProtocol);}if (!transformations.isEmpty()) {resp.setHeader(Constants.WS_EXTENSIONS_HEADER_NAME, responseHeaderExtensions.toString());}WsHandshakeRequest wsRequest = new WsHandshakeRequest(req, pathParams);WsHandshakeResponse wsResponse = new WsHandshakeResponse();WsPerSessionServerEndpointConfig perSessionServerEndpointConfig =new WsPerSessionServerEndpointConfig(sec);sec.getConfigurator().modifyHandshake(perSessionServerEndpointConfig,wsRequest, wsResponse);wsRequest.finished();// Add any additional headersfor (Entry<String,List<String>> entry :wsResponse.getHeaders().entrySet()) {for (String headerValue: entry.getValue()) {resp.addHeader(entry.getKey(), headerValue);}}// 調(diào)用 request.upgrade 進(jìn)行升級(jí)WsHttpUpgradeHandler wsHandler =req.upgrade(WsHttpUpgradeHandler.class);wsHandler.preInit(perSessionServerEndpointConfig, sc, wsRequest,negotiatedExtensionsPhase2, subProtocol, transformation, pathParams,req.isSecure());}

(3)Request#upgrade

Request#upgrade主要做了三件事:

  • 實(shí)例化WsHttpUpgradeHandler并構(gòu)建UpgradeToken。
  • 回調(diào)coyoteRequest.action,將UpgradeToken回傳給連接器。
  • 設(shè)置響應(yīng)碼101。
// org.apache.catalina.connector.Request#upgrade public <T extends HttpUpgradeHandler> T upgrade(Class<T> httpUpgradeHandlerClass) throws java.io.IOException, ServletException {T handler;InstanceManager instanceManager = null;try {// Do not go through the instance manager for internal Tomcat classes since they don't// need injectionif (InternalHttpUpgradeHandler.class.isAssignableFrom(httpUpgradeHandlerClass)) {handler = httpUpgradeHandlerClass.getConstructor().newInstance();} else {instanceManager = getContext().getInstanceManager();handler = (T) instanceManager.newInstance(httpUpgradeHandlerClass);}} catch (ReflectiveOperationException | NamingException | IllegalArgumentException |SecurityException e) {throw new ServletException(e);}// 構(gòu)建 UpgradeToken,UpgradeToken主要包含WsHttpUpgradeHandler、context、協(xié)議名稱(chēng)protocolUpgradeToken upgradeToken = new UpgradeToken(handler, getContext(), instanceManager,getUpgradeProtocolName(httpUpgradeHandlerClass));// 回調(diào)action 進(jìn)行升級(jí)coyoteRequest.action(ActionCode.UPGRADE, upgradeToken);// Output required by RFC2616. Protocol specific headers should have// already been set.// 設(shè)置響應(yīng)101response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);return handler; }

(4)回調(diào)機(jī)制ActionHook#action

一些發(fā)生在Servlet容器的動(dòng)作可能需要回傳給連接器做處理,比如WebSocket的握手升級(jí),所以連接器就給org.apache.coyote.Request設(shè)置了一個(gè)動(dòng)作鉤子``ActionHook#action。一些動(dòng)作表示定義在枚舉類(lèi)ActionCode中,ActionCode.UPGRADE就代表協(xié)議升級(jí)動(dòng)作。org.apache.coyote.AbstractProcessor實(shí)現(xiàn)了ActionHook接口,ActionCode.UPGRADE動(dòng)作會(huì)調(diào)用org.apache.coyote.http11.Http11Processor#doHttpUpgrade,只是簡(jiǎn)單將upgradeToken設(shè)置給Http11Processor`。

(5)ConnectionHandler#process

Tomcat連接器是同步調(diào)用容器業(yè)務(wù)處理,容器中的業(yè)務(wù)處理結(jié)束后還是回到連接器繼續(xù)往下執(zhí)行。

連接器將請(qǐng)求轉(zhuǎn)發(fā)給容器處理是在適配器里完成的,容器中流程處理結(jié)束返回到org.apache.catalina.connector.CoyoteAdapter#service,繼續(xù)往下執(zhí)行,最終結(jié)束并回收HttpServletrequest、HttpServletreponse對(duì)象

org.apache.catalina.connector.CoyoteAdapter#service是在org.apache.coyote.http11.Http11Processor#service中調(diào)用的,

Http11Processor#service是HTTP請(qǐng)求處理主流程,通過(guò)upgradeToken != null來(lái)判斷是否為升級(jí)操作,s是則返回SocketState.UPGRADING。

最后來(lái)到org.apache.coyote.AbstractProtocol.ConnectionHandler#process一個(gè)連接處理的主流程,根據(jù)Http11Processor#service返回SocketState.UPGRADING來(lái)進(jìn)行升級(jí)操作,如下只截取了和WebSocket協(xié)議升級(jí)相關(guān)流程的代碼:

  • 獲取UpgradeToken,從中取出HttpUpgradeHandler,對(duì)于WebSocket來(lái)說(shuō)是WsHttpUpgradeHandler。
  • 調(diào)用WsHttpUpgradeHandler#init啟動(dòng)協(xié)議升級(jí)處理。

(6)WsHttpUpgradeHandler#init握手成功

走到這里,基本上就是握手成功了,接下來(lái)就是創(chuàng)建WsSession和觸發(fā)onOpen。

WsSession的構(gòu)建中會(huì)實(shí)例化Endpoint,如果實(shí)例化出來(lái)的對(duì)象不是Endpoint類(lèi)型,即加了@ServerEndpoint的實(shí)例對(duì)象,則用一個(gè)PojoEndpointServer進(jìn)行包裝,而PojoEndpointServer是繼承了抽象類(lèi)Endpoint的。

觸發(fā)onOpen時(shí)會(huì)將WsSession傳進(jìn)去,對(duì)于加PojoEndpointServer,因?yàn)橛脩?hù)自定義的方法名和形參不確定,所以通過(guò)反射調(diào)用用戶(hù)自定義的onopen形式的方法,并且會(huì)將通過(guò)@onMessage解析出的MessageHandler設(shè)置給WsSession。

3、數(shù)據(jù)傳輸和解析

握手成功之后就建立了雙向通信的連接,該連接有別于HTTP/1.1長(zhǎng)連接(應(yīng)用服務(wù)器中工作線程循環(huán)占用),而是占用一條TCP連接。在連接建立是進(jìn)行TCP三次握手,之后全雙工互相通信,將不需要再進(jìn)行耗時(shí)的TCP的三次握手和四次揮手,一方需要關(guān)閉WebSocket連接時(shí),發(fā)送關(guān)閉幀,另一方接收到關(guān)閉幀之后,也發(fā)送個(gè)關(guān)閉幀作為響應(yīng),之后就認(rèn)為WebSocket連接關(guān)閉了,并且關(guān)閉底層TCP連接(四次揮手)。

實(shí)則WebSocket全雙工是建立在TCP的長(zhǎng)鏈接上的,TCP長(zhǎng)鏈接長(zhǎng)時(shí)間沒(méi)有消息通信,會(huì)定時(shí)保活,一般WebSocket會(huì)通過(guò)代理如nginx等進(jìn)行連接通信,nginx有一個(gè)連接超時(shí)沒(méi)有任何信息傳輸時(shí),會(huì)斷開(kāi),所以需要WebSocket一端定時(shí)發(fā)送心跳保活。

(1)接收客戶(hù)端消息

客戶(hù)端來(lái)了消息,由連接器的Poller輪詢(xún)監(jiān)測(cè)socket底層是否有數(shù)據(jù)到來(lái),有數(shù)據(jù)可讀,則封裝成一個(gè)SocketProcessor扔到線程池里處理,org.apache.coyote.http11.upgrade.UpgradeProcessorInternal#dispatch具有處理升級(jí)協(xié)議連接,org.apache.tomcat.websocket.server.WsHttpUpgradeHandler#upgradeDispatch是專(zhuān)門(mén)處理WebSocket連接的處理器。

org.apache.tomcat.websocket.server.WsFrameServer是對(duì)服務(wù)器端消息幀處理的封裝,包括讀取底層數(shù)據(jù),按消息幀格式解析、拼裝出有效載荷數(shù)據(jù),觸發(fā)onMessage。

因?yàn)樵创a篇幅較多,只展示具體源碼調(diào)用流程:

(2)發(fā)送消息給客戶(hù)端

一般,客戶(hù)端發(fā)送WebSocket握手請(qǐng)求,和服務(wù)器端建立連接后,服務(wù)器端需要將連接(Endpoint+WsSession)保存起來(lái),為后續(xù)主動(dòng)推送消息給客戶(hù)端提供方便。

Tomcat提供了可以發(fā)送三種數(shù)據(jù)類(lèi)型(文本、二進(jìn)制、Object對(duì)象)和兩種發(fā)送方式(同步、異步)的發(fā)送消息的方法。

  • org.apache.tomcat.websocket.WsRemoteEndpointAsync異步發(fā)送。
  • org.apache.tomcat.websocket.WsRemoteEndpointBasic 同步發(fā)送。

發(fā)送消息也同樣需要按消息幀格式封裝,然后通過(guò)socket寫(xiě)到網(wǎng)絡(luò)里即可。

六、要點(diǎn)回顧

WebSocket的出現(xiàn)不是空穴來(lái)風(fēng),起初在HTTP/1.1基礎(chǔ)上通過(guò)輪詢(xún)和長(zhǎng)連接達(dá)到信息實(shí)時(shí)同步的功能,但是這并沒(méi)有跳出HTTP/1.1自身的缺陷。HTTP/1.1明顯的兩個(gè)缺陷:消息頭冗長(zhǎng)且為文本傳輸,請(qǐng)求響應(yīng)模式。為此,WebSocket誕生了,跳出HTTP/1.1,建立一個(gè)新的真正全雙工通信協(xié)議。

不僅僅要會(huì)在項(xiàng)目中使用WebSocket,還要知道其通信原理和在應(yīng)用服務(wù)器中的實(shí)現(xiàn)原理,很多注意事項(xiàng)都是在查閱了官方資源和源碼之后恍然大悟的。

  • 在Tomcat中使用WebSocket不可以在Endpoint里獲取緩存的HttpServletRequest對(duì)象,因?yàn)樵赪ebSocket握手之前,HTTP/1.1請(qǐng)求就算結(jié)束了(HttpServletRequest對(duì)象被回收),建立連接之后就更是獨(dú)立于HTTP/1.1了。
  • 建立連接的WebSocket,會(huì)生成新的Endpoint和WsSession。
  • 使用內(nèi)置Tomcat需要注意,WsSci做的事情交給了Spring做。
  • WebSocket全雙工是建立在TCP長(zhǎng)連接的基礎(chǔ)之上。
  • … …

七、參考文獻(xiàn)

  • https://datatracker.ietf.org/doc/html/rfc6455(可能需要翻墻)
  • https://www.oracle.com/technical-resources/articles/java/jsr356.html
  • https://medium.com/swlh/websockets-with-spring-part-1-http-and-websocket-36c69df1c2ee(可能需要翻墻)
  • http://nginx.org/en/docs/http/websocket.html
  • https://zh.wikipedia.org/wiki/WebSocket
  • 書(shū)籍:《Tomcat架構(gòu)解析》劉光瑞(Tomcat8.5)11.3.4 Tomcat的WebSocket實(shí)現(xiàn)
  • 書(shū)籍:《Tomcat內(nèi)核設(shè)計(jì)剖析》汪建(Tomcat7)10.6 WebSocket協(xié)議的支持
  • 書(shū)籍:《圖解HTTP》9.3 使用瀏覽器進(jìn)行全雙工通信的WebSocket
  • 極客時(shí)間:《深入拆解Tomcat & Jetty》李號(hào)雙(Tomcat9.x)18.新特性:Tomcat如何支持WebSocket?
  • Tomcat注釋源碼:https://gitee.com/stefanpy/tomcat-source-code-learning
  • 如若文章有錯(cuò)誤理解,歡迎批評(píng)指正,同時(shí)非常期待你的留言和點(diǎn)贊。如果覺(jué)得有用,不妨點(diǎn)個(gè)在看,讓更多人受益。

    總結(jié)

    以上是生活随笔為你收集整理的WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。