如何保证消息队列里的数据顺序执行?
使用MQ的時候,經常會有按順序消費的需求,比如大數據團隊為了做數據分析,會把數據庫里數據同步到其他系統做一些數據統計分析。同步MySQL的時候,為了保證數據同步的實時性,會在中間加一個MQ,多個線程來消費MQ里的數據。
這種同步一般是讀取binlog數據,你在MySQL里增改刪了數據,對應出來就是3條增改刪binlog日志發送到MQ里面,消費的時候肯定必須要按照增改刪的順序執行。如果你換成刪除、修改、增加,就導致數據亂套了。
圖1?binlog同步
我們以kafka舉例,看下哪些環節會出現數據順序不一致情況,又怎么解決。
假設kafka分配了3個partition,kafka的一個特性就是,能保證寫入一個partition中的數據一定是有順序的。
生產者寫的時候,可以指定一個key,比如是訂單id作為key,這個id對應的數據一定會寫到同一個partition中去,而且這個partition中的數據都是有順序的。
圖2?kafka partition
kafka的消費者開始消費partition中的數據,一個消費消費一個partition,一個partition只能被一個消費者消費,不會出現一個消費者同時消費多個partition的情況。假如現在有3個partition,你啟動4個消費者,那么就會有一個消費者消費不到數據。
圖3 一個消費者消費一個partition
到目前為止,每個消費者消費到的數據都是有順序性的。但消費者內部如果是單線程的,效率就會比較低,如果生產者寫入kafka的數據量比較大,消費不及時,就會出現消息堆積的情況,所以消費者需要多線程的方式運行。
假如消費者里啟動了3個線程,并發的來消費數據,線程之間如果不做同步控制,還是會導致數據亂掉。
圖4 消費者多線程消費MQ
那如何保證kafka消費者多線程按順序消費數據呢?
多個線程不能直接拿數據去處理,此時我們可以在同步系統中搞多個內存隊列,消費者拿到數據之后,根據每條數據的key做hash取模,把相同id的數據分配到同一個內存隊列中去。
每個內存隊列里的數據都是有順序性的,給每個內存隊列都對應一個線程,去消費內存隊列中的數據。
假如有3條增改刪的數據,都是對同一個id的處理,那么hash取模后就會寫入到同一個內存隊列里去,由同一個線程去消費,然后按順序寫入數據庫中。
如果消費者按照單線程消費處理,一條數據耗費幾十毫秒,1秒鐘只能處理十幾條數據,吞吐量就會非常低。如果開啟多線程的方式處理,就會幾倍的提高吞吐量,同時也保證了數據的順序性。
整個流程按這樣的設計方案來處理,就可以保證數據的順序性。
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
總結
以上是生活随笔為你收集整理的如何保证消息队列里的数据顺序执行?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python管理系统web版_【程序源代
- 下一篇: 垃圾oracle_第 14 章 垃圾回收