Kafka端到端审计
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-end-to-end-audit/
概述
Kafka端到端審計是指生產者生產的消息存入至broker,以及消費者從broker中消費消息這個過程之間消息個數及延遲的審計,以此可以檢測是否有數據丟失,是否有數據重復以及端到端的延遲等。
目前主要調研了3個產品:
對于Kafka端到端的審計主要通過:
內嵌timestamp的方式
主要是通過設置一個審計的時間間隔(這里稱之為time_bucket_interval,可以設置幾秒或者幾分鐘,這個可以自定義), 每個timestamp都會被分配到相應的桶中,算法有:
這樣可以獲得相應time_bucket的起始時間time_bucket_start,一個time_bucket的區間可以記錄為[time_bucket_start, time_bucket_start+time_bucket_interval]。
每發送或者消費一條消息可以根據消息payload內嵌的時間戳,分配到相應桶中,然后對桶進行計數,之后進行存儲,簡單的可以存儲到,比如:Map<long time_bucket_start, long count>之中。
內嵌index的方式
這種方式就更容易理解了,對于每條消息都分配一個全局唯一的index,如果topic及相應的partition固定的話,可以為每一個topic-partition設置一個全局的index,當有消息發送到某個topic-partition中,那么首先獲取其topic-partition對應的index, 然后內嵌到payload中,之后再發送到broker。消費者進行消費審計,可以判斷出哪個消息丟失,哪個消息重復等等。如果要計算端到端延遲的話,還需要在payload中內嵌timestamp以作相應的計算。
下面來簡要分析下三個產品。
Chaperone
github地址:https://github.com/uber/chaperone
官方介紹(中文):http://www.infoq.com/cn/news/2016/12/Uber-Chaperone-Kafka
官方介紹(英文):https://eng.uber.com/chaperone/
Chaperone進行消息端到端的校驗主要是基于message內置timestamp實現的,根據timestamp將message分配到不同的bucket中。之后就是對這個bucket中的消息進行計數等一系列的audit操作,然后將這個audit操作之后的信息auditMessage保存起來,auditMessage的內容:
- topicName:被audit的topic
- time_bucket_start:bucket的起始時間
- time_bucket_end
- metrics_count:time_bucket中的個數
- metrics_mean_latency, metrics_p95_latency, metrics_p99_latency,metrics_max_latency:延遲
- tier
- hostname
- datacenter
- uuid
注意這里的latency的計算規則是:currentTimeMillis - (timestamp*1000)。
Chaperone的架構
Chaperone的整體架構分為:AuditLibrary, ChaperoneService, ChaperoneCollector和WebService, 它們會收集數據,并進行相關計算,自動檢測出丟失和延遲的數據,并展示審計結果。
從Chaperone的github上的源碼來看:
Chaperone分為ChaperoneClient, ChaperoneCollector, ChaperoneDistribution, ChaperoneServiceController, ChaperoneServiceWorker這5個子項目。對比著上面的架構圖來分析。
- ChaperoneClient對應著AuditLibrary,主要是用來audit message的庫(library),并不以實際服務運行,可以在Producer或者Consumer客戶端中調用,默認使用10mins的滾動時間bucket來不斷地從每個主題收集消息。然后發送到kafka的chaperone-audit這個topic中。官方文檔介紹說AuditLibrary會被ChaperoneService, ChaperoneCollector和WebService這三個組件所依賴,但代碼中來看并非完全如此,略有出入。
- ChaperoneDistribution可以忽略
- ChaperoneServiceController和ChaperoneServiceWorker對應架構圖中的ChaperoneService,ChaperoneServiceController主要用來檢測topics并分配topic-partitions給ChaperoneServiceWorker用以審計(audit)。ChaperoneServiceWorker主要是audit message的一個服務。
- ChaperoneServiceWorker采用scala語言編寫,內部又將ChaperoneClient或者說AuditLibrary又重新用Scala實現了一番,并豐富了一下應用,比如采用hsqldb存儲數據,zk存取offsets來實現WAL(預寫式日志,具體可見下段介紹)
- Chaperone認為message中內嵌timestamp是十分必須的,但是從ChaperoneServiceWorker的代碼來看消息沒有timestamp也能運行,當消息沒有時間戳,那么會記錄noTimeMsgCount,Chaperone介紹會有一個牛逼的算法來分析消息中的timestamp(其實就是讀取消息的開頭部分,而不是全部整條消息,類似報文截斷解析,下面也有涉及介紹),如果解析timestamp失敗,會記錄malformedMsgCount。
- ChaperoneCollector對是用來讀取audit的數據,然后持久化操作,默認存入mysql中,看代碼也可選存入redis中。
- 源碼中沒有WebService這個東西,估計是uber內部的web系統,讀取下mysql中的內容展示到頁面而已。
如果程序段內嵌Audit Library(ChaperoneClient),那么整個audit過程如下:
如果producer端或者consumer端需要進行消息審計,那么可以內嵌Audit Library。就以發送端為例,消息先發送到kafka中,然后對這條消息進行審計,將審計的信息存入到kafka中,之后有專門的ChaperoneServiceCollector進行數據消費,之后存入mysql中,也可以選擇存入redis中。頁面系統webService可以查詢mysql(redis)中的數據,之后進而在頁面中展示。
如果使用ChanperoneServiceWork,整個流轉過程如下:
上面是對broker端進行審計的過程。首先從存儲消息的kafka(圖中上面的kafka)中消費數據,之后對收到的消息進行審計操作,之后將審計消息auditmsg以及相應的offset存儲起來(auditmsg存入hsqldb中,offset存到用來存儲審計數據的kafka的zk之中),之后再將審計消息auditmsg存入kafka(圖中下面的kafka)中,最后成功存儲并返回給消費端(Consumer1,即ChaperoneServiceWork),之后再把hsqldb中的auditmsg標記為已統計。之后ChaperoneServiceCollector和producer端(consumer端)內嵌Audit Library時相同。
官方文檔部分介紹如下:
每個消息只被審計一次
為了確保每個消息只被審計一次,ChaperoneService使用了預寫式日志(WAL)。ChaperoneService每次在觸發Kafka審計消息時,會往審計消息里添加一個UUID。這個帶有相關偏移量的消息在發送到Kafka之前被保存在WAL里。在得到Kafka的確認之后,WAL里的消息被標記為已完成。如果ChaperoneService崩潰,在重啟后它可以重新發送WAL里未被標記的審計消息,并定位到最近一次的審計偏移量,然后繼續消費。WAL確保了每個Kafka消息只被審計一次,而且每個審計消息至少會被發送一次。
接下來,ChaperoneCollector使用ChaperoneService之前添加過的UUID來移除重復消息。有了UUID和WAL,我們可以確保審計的一次性。在代理客戶端和服務器端難以實現一次性保證,因為這樣會給它們帶來額外的開銷。我們依賴它們的優雅關閉操作,這樣它們的狀態才會被沖刷出去。
在層間使用一致性的時間戳
因為Chaperone可以在多個層里看到相同的Kafka消息,所以為消息內嵌時間戳是很有必要的。如果沒有這些時間戳,在計數時會發生時間錯位。在Uber,大部分發送到Kafka的數據要么使用avro風格的schema編碼,要么使用JSON格式。對于使用schema編碼的消息,可以直接獲取時間戳。而對于JSON格式的消息,需要對JSON數據進行解碼才能拿到時間戳。為了加快這個過程,我們實現了一個基于流的JSON消息解析器,這個解析器無需預先解碼整個消息就可以掃描到時間戳。這個解析器用在ChaperoneService里是很高效的,不過對代理客戶端和服務器來說仍然需要付出很高代價。所以在這兩個層里,我們使用的是消息的處理時間戳。因為時間戳的不一致造成的層間計數差異可能會觸發錯誤的數據丟失警告。我們正在著手解決時間戳不一致問題,之后也會把解決方案公布出來。
溫馨提示: github上的quickstart中,如果不能根據腳本自動安裝kafka和zk,而是自己安裝kafka和zk的話,需要改動腳本、配置文件甚至源碼才能將服務運行起來。另外需要安裝hsqldb和mysql(redis)。
Confluent Control Center
文檔地址:http://docs.confluent.io/3.0.0/control-center/docs/index.html
這是個收費產品,文檔中介紹的并不多。和Chaperone相同,主要也是根據消息payload內嵌timestamp來實現,計算time_bucket的算法是:floor((timestamp /15)*15)。
架構圖如下:
主要是在producer端或者consumer端內嵌審計程序(相當于Chaperone的Audit Library)繼續審計,最終將審計消息同樣存入kafka中,最后的web系統是直接消費kafka中的審計消息進行內容呈現。
web系統部分呈現如下:
Kafka Monitor
github地址:https://github.com/linkedin/kafka-monitor
Kafka Monitor是基于在消息payload內嵌index和timestamp來實現審計:消息丟失,消息重復以及端到端延遲等。
web系統部分呈現如下:
幾種典型的metrics解釋:
| produce-avaliablility-avg | The average produce availability |
| consume-avaliability-avg | The average consume availability |
| records-produced-total | The total number of records that are produced |
| records-consumed-total | The total number of records that are consumed |
| records-lost-total | The total number of records that are lost |
| records-duplicated-total | The total number of records that are duplicated |
| records-delay-ms-avg | The average latency of records from producer to consumer |
| records-produced-rate | The average number of records per second that are produced |
| produce-error-rate | The average number of errors per second |
| consume-error-rate | The average number of errors per second |
| records-delay-ms-99th | The 99th percentile latency of records from producer to consu |
| records-delay-ms-999th | The 999th percentile latency of records from producer to consumer |
| records-delay-ms-max | The maximum latency of records from producer to consumer |
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-end-to-end-audit/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Kafka端到端审计的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 为什么QueueingConsumer会
- 下一篇: Highly Available (Mi