kafka netty_惠而浦:使用Netty和Kafka的微服务
kafka netty
介紹
在上一個博客中 ,我介紹了Netty用作Web服務(wù)器。 該示例運行良好……只要需要廣播服務(wù)器即可。
大多數(shù)情況下不是很有用。 更有可能的是,每個客戶端僅接收針對其的數(shù)據(jù),并保留了特殊情況下的廣播,例如“服務(wù)器在15分鐘內(nèi)停機!” 關(guān)于該特定服務(wù)器示例的另一件事是,一切都是獨立的。 例如,單片應(yīng)用程序很好,但是在當今環(huán)境中,分布式微服務(wù)要好得多。 可伸縮性和可靠性至關(guān)重要。
Netty和Kafka在一起很棒。 Netty擅長處理大量客戶,Kafka擅長使大量服務(wù)協(xié)同工作。 結(jié)合起來,它們是開發(fā)中的最佳選擇。 但是,有些“陷阱”可能會使其變得繁瑣。 該博客以及示例微服務(wù)/ Netty體系結(jié)構(gòu)和功能全面的代碼將有望幫助減輕煩惱并實現(xiàn)甜味。
第一要務(wù)
示例代碼位于此處 。
有詳細的自述文件,描述了設(shè)置環(huán)境所需的內(nèi)容。 我試圖將需求降到最低,僅Java 8和Maven 。 SLF4J和Logback用于記錄日志。 我為Mac OSX和Ubuntu設(shè)置了腳本(我在Parallels容器中運行的14.04版本是我測試過的腳本),因此如果您在Windows上進行開發(fā),則表示歉意。 該代碼全是Java,并且我在Windows上看到過Kafka教程,因此所有內(nèi)容都應(yīng)在此處運行。 Maven構(gòu)建也應(yīng)該產(chǎn)生可以啟動的目標,因此,在安裝Zookeeper / Kafka的時候加了一點肘油(您可以按照腳本查看需要的設(shè)置),手動運行它并不重要。視窗。
注:如README.md中所述,該腳本將刪除任何現(xiàn)有的Zookeeper / Kafka安裝和數(shù)據(jù)。 如果您已有設(shè)置,請不要使用腳本!
安裝和配置必備mvn package如果不使用腳本,請運行mvn package如果是,則運行maclocal_run.sh (或linuxlocal_run.sh )。 該腳本將下載Zk / Kafka(如果尚未下載),進行安裝,配置,啟動它們,運行mvn package ,啟動服務(wù)并最終啟動服務(wù)器。 一旦啟動,就抵制離開外殼的沖動,因為它會自動為架構(gòu)的每個部分彈出新的選項卡。 啟動Whirlpool服務(wù)器之后,就可以開始了。
我強烈建議創(chuàng)建一個腳本,以在本地安裝,配置,構(gòu)建和啟動微服務(wù)環(huán)境。 創(chuàng)建每個單獨的服務(wù)是一個很大的痛苦。 必要時也可以使用Docker,但我發(fā)現(xiàn)只需本地運行所有內(nèi)容,下載所需的內(nèi)容就少得多。
作為一個預(yù)告片,這里是UI(您也可以從GitHub上的README.md看到它)。
- 要添加股票代碼,請輸入它(即“ GOOG”),然后單擊“股票”下的A按鈕。 要刪除它,請單擊X。
- 要添加一個網(wǎng)站來測試它是打開還是關(guān)閉,請鍵入完全限定的URL(即http://facebook.com ),然后單擊“ UpDown”下的A按鈕。 要刪除它,請單擊X。
- 要添加天氣檢查,請在中鍵入城市,州(即“芝加哥,il”),然后單擊“城市,州”下的A按鈕。 要刪除它,請單擊X。
- 由于訂閱與每個服務(wù)一起存儲在內(nèi)存中,因此訂閱在頁面刷新甚至登錄/注銷(具有相同的用戶ID)后都不會丟失。 當然,“真實”系統(tǒng)將使用數(shù)據(jù)庫。
- 訂閱每10秒鐘更新一次,因此我不會壓倒Yahoo API,因此添加數(shù)據(jù)后請耐心等待。
建筑
在此示例中,我試圖考慮可能有用的良好通用服務(wù)。 我最終選擇了股票報價服務(wù),“此網(wǎng)站是否正常運轉(zhuǎn)”服務(wù)以及氣象服務(wù)。 這些中的每一個都獨立于各自具有Kafka主題的其他主題運行。
我選擇配置Kafka的方式是每個服務(wù)使用一個命令主題,每個服務(wù)使用一個數(shù)據(jù)主題。 一切都可以只使用一個全局主題,讀者可以決定要處理的內(nèi)容,但將其分離出來可以使其更加清晰和整潔。
這是數(shù)據(jù)如何通過Kafka流動的示意圖。 它是通過一個免費的基于Keyhole的基于Web的實用程序Mockola完成的 。 請注意,服務(wù)器知道所有主題,但是服務(wù)僅知道它們自己的主題。 cmd主題用于將命令發(fā)送到服務(wù),而數(shù)據(jù)主題(在其上沒有-cmd主題)用于從服務(wù)發(fā)送數(shù)據(jù)。 同樣,所有這些都可以在一個bus主題上進行處理,但是通過將它們分離出來,可以更輕松地了解發(fā)生了什么。
服務(wù)
現(xiàn)在讓我們談?wù)劮?wù)。 這三者非常相似,因此有一項基本服務(wù)可以完成大部分工作。 每個服務(wù)都有三個線程,由Java ExecutorService處理。 關(guān)于Executor服務(wù)的一件好事是,如果出現(xiàn)問題,它將自動重新啟動線程。 這有助于彈性。
每個服務(wù)通過告訴基類使用什么主題和命令主題來啟動自己。 然后,基類啟動三個線程:一個用于從cmd主題讀取命令,一個用于定期為客戶端收集數(shù)據(jù),一個用于在數(shù)據(jù)主題上發(fā)送數(shù)據(jù)。 這些線程使用非阻塞Java并發(fā)類ConcurrentLinkedQueue和ConcurrentHashMap 。 哈希映射存儲每個用戶的訂閱集,隊列存儲準備發(fā)送給數(shù)據(jù)主題的響應(yīng)。
每個服務(wù)的流程是同時工作的三個線程。 閱讀器使用Kafka使用者從其命令主題讀取命令。 根據(jù)命令,添加或刪除訂閱。 該線程非常笨拙,因為它不要求服務(wù)對請求進行任何驗證,而只是盲目地將發(fā)送給訂閱的內(nèi)容添加進去。 生產(chǎn)代碼顯然會添加一個調(diào)用,要求服務(wù)在允許成功訂閱之前驗證命令。 創(chuàng)建一個響應(yīng)以放置主題,然后等待下一個命令。
注意 :關(guān)于數(shù)據(jù)的一些話題。 我使用JSON作為傳輸格式,但是XML或您想要的其他任何內(nèi)容也可以使用。 重要的是,每個人都同意數(shù)據(jù)格式并堅持使用。 通用模塊具有POJO類,這些類定義了數(shù)據(jù)將遵循的協(xié)定。 通常對所有消息有用的是時間戳,消息類型和客戶端的ID。
另一個有用的東西是到期時間戳。 這些示例消息永遠存在。 Message類僅查看Message的類型和ID。 服務(wù)器使用它來確定需要處理哪種類型的消息以及誰對該消息感興趣。 沒有這些,就很難甚至不可能處理數(shù)據(jù)。 現(xiàn)在,消息格式可以涉及很多,其中一些格式使用標題和部分來描述復(fù)雜的數(shù)據(jù)。 本示例嘗試使所有內(nèi)容盡可能簡單。
凈值服務(wù)器
讓我們一次上一堂課。
NettyHttpFileHandler
與以前的博客相比,該類幾乎沒有變化。 可重用的片段已移至WebSocketHelper類。 該文件的主要用途是提供瀏覽器要求的文件。
WebSocket助手
可能令人困惑的第一項是類變量clientAttr 。 在Netty Channel中存儲數(shù)據(jù)要求將其附加到AttributeKey 。 這類似于Java并發(fā)類中的Atomic實例-它提供??了數(shù)據(jù)容器。 我們將存儲客戶端ID(在本例中為用戶名,但也可以很容易地作為會話ID),以便我們確定哪個Channel需要接收消息。
realWriteAndFlush()方法設(shè)置適當?shù)臉祟},內(nèi)容長度和cookie。 然后,它寫入并刷新HTTP響應(yīng)。 線
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);告訴Netty這是需要寫入客戶端的數(shù)據(jù)的結(jié)尾,因此Netty會將其發(fā)送出去。
特別說明 :關(guān)于cookie的創(chuàng)建,請確保未設(shè)置HTTP Only標志。 如果是,則JavaScript無法看到Cookie,也不會與WebSocket升級請求一起發(fā)送。 這樣一來,您就必須創(chuàng)建自己的頁面刷新管理和會話管理方法。
關(guān)于cookie的另一件事是使用Netty cookie編碼器的STRICT版本,因此它將不允許多個具有相同名稱的cookie。 我不確定何時允許這種情況發(fā)生。
WebSocketMessageHandler
這個類只是定義了一個接口WhirlpoolServerHandler使用交談的WhirlpoolMessageHandler 。
WhirlpoolMessageHandler
這是Netty和Kafka之間存在連接的地方。 兩個執(zhí)行器處理一個讀取器線程和一個寫入器線程。
編寫器線程在請求??隊列中查找消息(有關(guān)這些消息在一分鐘內(nèi)來自何處的更多信息),并將消息放置在適當?shù)腒afka命令主題上。
閱讀器線程在Kafka數(shù)據(jù)主題上查找傳入消息,為每個主題查找正確的Channel,然后將消息寫入這些主題。
當客戶端通過WebSockets發(fā)送消息時, WhirlpoolServerHandler將確保已handleMessage()完整的消息,然后調(diào)用handleMessage() 。 該方法確定是否為有效消息,然后將請求添加到請求隊列中,以便讀取器線程可以將其提取并提供給Kafka。
WhirlpoolServerHandler
這堂課有幾件有趣的事。 首先,它可以區(qū)分HTTP,REST和WebSocket消息之間的區(qū)別。 執(zhí)行此操作的Netty重寫方法是channelRead0 。 這是Netty用來告訴我們消息何時到達以及消息是哪種類型的方法。 對于HTTP和REST調(diào)用, handleHttpRequest調(diào)用handleHttpRequest ,對于handleWebSocketFrame將調(diào)用handleWebSocketFrame 。
如果存在cookie方法,則handleHttpRequest方法handleHttpRequest讀取該cookie。 在POST上,它會查找登錄和注銷信息。 對于登錄,它將找出用戶名/密碼,創(chuàng)建cookie,并防止多次使用相同的名稱登錄。 所有這些代碼將在應(yīng)用程序的生產(chǎn)版本中添加額外的安全性進行拆分。 要注銷,它會查找Channel,清理,關(guān)閉它并使cookie過期。
對于WebSocketUpgrade ,它要求Netty處理啟動websocket所需的復(fù)雜握手。 完成此操作后,會將用戶添加到握手期間創(chuàng)建的Channel。 這是用戶連接到Channel的地方,如果cookie沒有在請求中出現(xiàn),那將不是一件容易的事。
在此唯一需要注意的另一件事是,此類設(shè)置為處理為SPA(單頁應(yīng)用程序)編碼的客戶端,因為它將將所有無法識別的調(diào)用重定向到index.html 。
該類中的其他方法更多地是為了提供信息,將在高級情況下使用。
漩渦服務(wù)器
此類啟動Netty服務(wù)器并創(chuàng)建通道管道。 這是Netty的一個標準類,緊隨Netty示例。
最后的想法
顯然,此代碼中還有更多內(nèi)容。 每個服務(wù)和服務(wù)器的多個實例可以同時運行,并且Zk / Kafka可以集群以幫助提高彈性。 一個測試微服務(wù)應(yīng)用程序彈性的強大實用程序是另一個名為TroubleMaker的免費開源Keyhole實用程序。 我還沒有機會測試這個例子,但是我很期待這個機會。
我們沒有涉及安全性,盡管我以前希望展示Netty與Shiro的集成,但這是一個非常復(fù)雜的話題。 我只能說這是有可能的,但是我還沒有將所有內(nèi)容都包裹在腦海中,以至于無法形成一個連貫的博客。
希望您喜歡該博客,并找到有用的代碼。 通過博客或Twitter與我聯(lián)系( @johnwboardman ,在這里我總是很欣賞新的關(guān)注)。
翻譯自: https://www.javacodegeeks.com/2016/05/whirlpool-microservices-using-netty-kafka.html
kafka netty
總結(jié)
以上是生活随笔為你收集整理的kafka netty_惠而浦:使用Netty和Kafka的微服务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: PHP环境变量配置(php环境 linu
- 下一篇: 现在的ddos一般流量多大(现在的DDO