日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

现代IM系统中消息推送和存储架构的实现

發布時間:2024/8/23 windows 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 现代IM系统中消息推送和存储架构的实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

摘要: 前言 IM全稱是『Instant Messaging』,中文名是即時通訊。在這個高度信息化的移動互聯網時代,生活中IM類產品已經成為必備品,比較有名的如釘釘、微信、QQ等以IM為核心功能的產品。當然目前微信已經成長為一個生態型產品,但其核心功能還是IM。

前言

IM全稱是『Instant Messaging』,中文名是即時通訊。在這個高度信息化的移動互聯網時代,生活中IM類產品已經成為必備品,比較有名的如釘釘、微信、QQ等以IM為核心功能的產品。當然目前微信已經成長為一個生態型產品,但其核心功能還是IM。還有一些非以IM系統為核心的應用,最典型的如一些在線游戲、社交應用,IM也是其重要的功能模塊。可以說,帶有社交屬性的應用,IM功能一定是必不可少的。

IM系統在互聯網初期即存在,其基礎技術架構在這十幾年的發展中更新迭代多次,從早期的CS、P2P架構,到現在后臺已經演變為一個復雜的分布式系統,涉及移動端、網絡、安全和存儲等技術的方方面面。其支撐的規模也從早期的少量日活,到現在微信這個巨頭最新公布的達到9億的日活的體量。

IM系統中最核心的部分是消息系統,消息系統中最核心的功能是消息的同步和存儲:

消息的同步:將消息完整的、快速的從發送方傳遞到接收方,就是消息的同步。消息同步系統最重要的衡量指標就是消息傳遞的實時性、完整性以及能支撐的消息規模。從功能上來說,一般至少要支持在線和離線推送,高級的IM系統還支持『多端同步』。
消息的存儲:消息存儲即消息的持久化保存,這里不是指消息在客戶端本地的保存,而是指云端的保存,功能上對應的就是『消息漫游』。『消息漫游』的好處是可以實現賬號在任意端登陸查看所有歷史消息,這也是高級IM系統特有的功能之一。
本篇文章內容主要涉及IM系統中的消息系統架構,會介紹一種基于TableStore構建的消息同步以及存儲系統的架構實現,能夠支持消息系統中的高級特性『多端同步』以及『消息漫游』。在性能和規模上,能夠做到全量消息云端存儲,百萬TPS以及毫秒級延遲的消息同步能力。

架構設計

本章主要會介紹基于TableStore的現代IM消息系統的架構設計,在詳細介紹架構設計之前,會先介紹一種Timeline邏輯模型,來抽象和簡化對IM消息同步和存儲模型的理解。理解了Timeline模型后,會介紹如何基于此模型對消息的同步以及存儲進行建模。基于Timeline模型,在實現消息同步和存儲時還會有各方面的技術權衡,例如如何對消息同步常見的讀擴散和寫擴散兩種模型進行對比和選擇,以及針對Timeline模型的特征如何來選擇底層數據庫。

傳統架構 vs 現代架構

上圖是消息系統傳統架構與現代架構的簡單對比。

傳統架構下,消息是先同步后存儲。對于在線的用戶,消息會直接實時同步到在線的接收方,消息同步成功后,并不會進行持久化。而對于離線的用戶或者消息無法實時同步成功時,消息會持久化到離線庫,當接收方重新連接后,會從離線庫拉取所有未讀消息。當離線庫中的消息成功同步到接收方后,消息會從離線庫中刪除。傳統的消息系統,服務端的主要工作是維護發送方和接收方的連接狀態,并提供在線消息同步和離線消息緩存的能力,保證消息一定能夠從發送方傳遞到接收方。服務端不會對消息進行持久化,所以也無法支持消息漫游。

現代架構下,消息是先存儲后同步。先存儲后同步的好處是,如果接收方確認接收到了消息,那這條消息一定是已經在云端保存了。并且消息會有兩個庫來保存,一個是消息存儲庫,用于全量保存所有會話的消息,主要用于支持消息漫游。另一個是消息同步庫,主要用于接收方的多端同步。消息從發送方發出后,經過服務端轉發,服務端會先將消息保存到消息存儲庫,后保存到消息同步庫。完成消息的持久化保存后,對于在線的接收方,會直接選擇在線推送。但在線推送并不是一個必須路徑,只是一個更優的消息傳遞路徑。對于在線推送失敗或者離線的接收方,會有另外一個統一的消息同步方式。接收方會主動的向服務端拉取所有未同步消息,但接收方何時來同步以及會在哪些端來同步消息對服務端來說是未知的,所以要求服務端必須保存所有需要同步到接收方的消息,這是消息同步庫的主要作用。對于新的同步設備,會有消息漫游的需求,這是消息存儲庫的主要作用,在消息存儲庫中,可以拉取任意會話的全量歷史消息。

以上是傳統架構和現代架構的一個簡單的對比,現代架構上整個消息的同步和存儲流程,并沒有變復雜太多,但是其能實現多端同步以及消息漫游。現代架構中最核心的就是兩個消息庫『消息同步庫』和『消息存儲庫』,是消息同步和存儲最核心的基礎。而本篇文章接下來的部分,都是圍繞這兩個庫的設計和實現來展開。

Timeline模型

在分析『消息同步庫』和『消息存儲庫』的設計和實現之前,在本章會先介紹一個邏輯模型-Timeline。Timeline模型會幫助我們簡化對消息同步和存儲模型的理解,而消息庫的設計和實現也是圍繞Timeline的特性和需求來展開。

如圖是Timeline模型的一個抽象表述,Timeline可以簡單理解為是一個消息隊列,但這個消息隊列有如下特性:

每個消息擁有一個順序ID(SeqId),在隊列后面的消息的SeqId一定比前面的消息的SeqId大,也就是保證SeqId一定是增長的,但是不要求嚴格遞增。
新的消息永遠在尾部添加,保證新的消息的SeqId永遠比已經存在隊列中的消息都大。
可根據SeqId隨機定位到具體的某條消息進行讀取,也可以任意讀取某個給定范圍內的所有消息。
有了這些特性后,消息的同步可以拿Timeline來很簡單的實現。圖中的例子中,消息發送方是A,消息接收方是B,同時B存在多個接收端,分別是B1、B2和B3。A向B發送消息,消息需要同步到B的多個端,待同步的消息通過一個Timeline來進行交換。A向B發送的所有消息,都會保存在這個Timeline中,B的每個接收端都是獨立的從這個Timeline中拉取消息。每個接收端同步完畢后,都會在本地記錄下最新同步到的消息的SeqId,即最新的一個位點,作為下次消息同步的起始位點。服務端不會保存各個端的同步狀態,各個端均可以在任意時間從任意點開始拉取消息。

消息漫游也是基于Timeline,和消息同步唯一的區別是,消息漫游要求服務端能夠對Timeline內的所有數據進行持久化。

基于Timeline,從邏輯模型上能夠很簡單的理解在服務端如何去實現消息同步和存儲,并支持多端同步和消息漫游這些高級功能。落地到實現的難點主要在如何將邏輯模型映射到物理模型,Timeline的實現對數據庫會有哪些要求?我們應該選擇何種數據庫去實現?這些是接下來會討論到的問題。

消息存儲模型

如圖是基于Timeline的消息存儲模型,消息存儲要求每個會話都對應一個獨立的Timeline。如圖例子所示,A與B/C/D/E/F均發生了會話,每個會話對應一個獨立的Timeline,每個Timeline內存有這個會話中的所有消息,服務端會對每個Timeline進行持久化。服務端能夠對所有會話Timeline中的全量消息進行持久化,也就擁有了消息漫游的能力。

消息同步模型

消息同步模型會比消息存儲模型稍復雜一些,消息的同步一般有讀擴散和寫擴散兩種不同的方式,分別對應不同的Timeline物理模型。

如圖是讀擴散和寫擴散兩種不同同步模式下對應的不同的Timeline模型,按圖中的示例,A作為消息接收者,其與B/C/D/E/F發生了會話,每個會話中的新的消息都需要同步到A的某個端,看下讀擴散和寫擴散兩種模式下消息如何做同步。

讀擴散:消息存儲模型中,每個會話的Timeline中保存了這個會話的全量消息。讀擴散的消息同步模式下,每個會話中產生的新的消息,只需要寫一次到其用于存儲的Timeline中,接收端從這個Timeline中拉取新的消息。優點是消息只需要寫一次,相比寫擴散的模式,能夠大大降低消息寫入次數,特別是在群消息這種場景下。但其缺點也比較明顯,接收端去同步消息的邏輯會相對復雜和低效。接收端需要對每個會話都拉取一次才能獲取全部消息,讀被大大的放大,并且會產生很多無效的讀,因為并不是每個會話都會有新消息產生。
寫擴散:寫擴散的消息同步模式,需要有一個額外的Timeline來專門用于消息同步,通常是每個接收端都會擁有一個獨立的同步Timeline,用于存放需要向這個接收端同步的所有消息。每個會話中的消息,會產生多次寫,除了寫入用于消息存儲的會話Timeline,還需要寫入需要同步到的接收端的同步Timeline。在個人與個人的會話中,消息會被額外寫兩次,除了寫入這個會話的存儲Timeline,還需要寫入參與這個會話的兩個接收者的同步Timeline。而在群這個場景下,寫入會被更加的放大,如果這個群擁有N個參與者,那每條消息都需要額外的寫N次。寫擴散同步模式的優點是,在接收端消息同步邏輯會非常簡單,只需要從其同步Timeline中讀取一次即可,大大降低了消息同步所需的讀的壓力。其缺點就是消息寫入會被放大,特別是針對群這種場景。
在IM這種應用場景下,通常會選擇寫擴散這種消息同步模式。IM場景下,一條消息只會產生一次,但是會被讀取多次,是典型的讀多寫少的場景,消息的讀寫比例大概是10:1。若使用讀擴散同步模式,整個系統的讀寫比例會被放大到100:1。一個優化的好的系統,必須從設計上去平衡這種讀寫壓力,避免讀或寫任意一維觸碰到天花板。所以IM系統這類場景下,通常會應用寫擴散這種同步模式,來平衡讀和寫,將100:1的讀寫比例平衡到30:30。當然寫擴散這種同步模式,還需要處理一些極端場景,例如萬人大群。針對這種極端寫擴散的場景,會退化到使用讀擴散。一個簡單的IM系統,通常會在產品層面限制這種大群的存在,而對于一個高級的IM系統,會采用讀寫擴散混合的同步模式,來滿足這類產品的需求。

消息庫設計

基于Timeline模型,以及Timeline模型在消息存儲和消息同步的應用,我們看下消息同步庫和消息存儲庫的設計。

如圖是基于Timeline的消息庫設計。

消息同步庫:消息同步庫用于存儲所有用于消息同步的Timeline,每個Timeline對應一個接收端,主要用作寫擴散模式的消息同步。這個庫不需要永久保留所有需要同步的消息,因為消息在同步到所有端后其生命周期就可以結束,就可以被回收。但是如前面所介紹的,一個實現簡單的多端同步消息系統,在服務端不會保存有所有端的同步狀態,而是依賴端自己主動來做同步。所以服務端不知道消息何時可以回收,通常的做法是為這個庫里的消息設定一個固定的生命周期,例如一周或者一個月,生命周期結束可被淘汰。
消息存儲庫:消息存儲庫用于存儲所有會話的Timeline,每個Timeline包含了一個會話中的所有消息。這個庫主要用于消息漫游時拉取某個會話的所有歷史消息,也用于讀擴散模式的消息同步。
消息同步庫和消息存儲庫,對數據庫有不同的要求,如何對數據庫做選型,在下面會討論。

數據庫選型

消息系統最核心的兩個庫是消息同步庫和消息存儲庫,兩個庫對數據庫有不同的要求:


總結下來,對數據庫的要求有如下幾點:

表結構設計能夠滿足Timeline模型的功能要求:不要求關系模型,能夠實現隊列模型,并能夠支持生成自增的SeqId。
能夠支持高并發寫和范圍讀,規模在十萬級TPS。
能夠保存海量數據,百TB級。
能夠為數據定義生命周期。
阿里云表格存儲(TableStore)是基于LSM存儲引擎的分布式NoSQL數據庫,支持百萬TPS高并發讀寫,PB級數據存儲,數據支持TTL,能夠很好的滿足以上需求,并且支持自增列,能夠非常完美的設計和實現Timeline的物理模型。

架構實現

本章會以一段非常精簡的代碼,來展示如何基于TableStore實現Timeline模型,并基于Timeline模型進行消息存儲和推送。
這篇文章中給出的代碼,主要目的是為了演示如何能夠實現一個精簡Timeline的最基本功能。馬上我們會推出一個完整的Timeline Library,來將基于Timeline進行消息存儲和推送的代碼的開發變得無比簡單。

所有示例代碼基于如下SDK版本:

<dependency><groupId>com.aliyun.openservices</groupId><artifactId>tablestore</artifactId><version>4.3.1</version> </dependency>

表結構設計

public static void main(String[] args) {String endpoint = "<endpoint>";String accessId = "<access_id>";String accessKey = "<access_key>";String instanceName = "<instance_name>";SyncClient client = new SyncClient(endpoint, accessId, accessKey, instanceName);String pushTable = "PushTable";String storeTable = "StoreTable";createTimelineTable(client, pushTable);createTimelineTable(client, storeTable);client.shutdown();}private static void createTimelineTable(SyncClient client, String tableName) {TableMeta tableMeta = new TableMeta(tableName);tableMeta.addPrimaryKeyColumn("timeline_id", PrimaryKeyType.STRING);tableMeta.addAutoIncrementPrimaryKeyColumn("seq_id");TableOptions options = new TableOptions();options.setMaxVersions(1);options.setTimeToLive(-1); // 配置消息永久保留CreateTableRequest request = new CreateTableRequest(tableMeta, options);client.createTable(request);}

以上是創建Timeline表的示例代碼,總共需要創建兩張表,一張表作為消息同步庫,名稱為『PushTable』,另一張表作為消息存儲庫,名稱為『StoreTable』。

推送和存儲實現

public static void main(String[] args) {String endpoint = "<endpoint>";String accessId = "<access_id>";String accessKey = "<access_key>";String instanceName = "<instance_name>";SyncClient client = new SyncClient(endpoint, accessId, accessKey, instanceName);String pushTable = "PushTable";String storeTable = "StoreTable";String groupName = "TableStore(釘釘號:11789671)";List<String> groupMembers = Arrays.asList("A", "B", "C", "D", "E");// 群產生新的消息,并且推送給所有的群成員pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Hello World!");pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Hello Alibaba!");pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Hello Aliyun!");pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Hello TableStore!");pushGroupMessages(client, pushTable, storeTable, groupName, groupMembers, "Bye!");client.shutdown();}private static void pushGroupMessages(SyncClient client, String pushTable, String storeTable, String groupName, List<String> groupMembers, String message) {// 先將群消息持久化到存儲TimelinewriteMessage(client, storeTable, groupName, message);// 通過寫擴散的模式將群消息同步到所有的群成員for (String groupMember : groupMembers) {writeMessage(client, pushTable, groupMember, message);}}private static void writeMessage(SyncClient client, String timelineTable, String timelineId, String message) {PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder().addPrimaryKeyColumn("timeline_id", PrimaryKeyValue.fromString(timelineId)).addPrimaryKeyColumn("seq_id", PrimaryKeyValue.AUTO_INCREMENT).build();RowPutChange rowChange = new RowPutChange(timelineTable, primaryKey);rowChange.addColumn("message", ColumnValue.fromString(message));PutRowRequest request = new PutRowRequest(rowChange);client.putRow(request);}

以上是模擬一個群內消息同步和存儲的示例代碼。群名稱為『TableStore(釘釘號:11789671)』,群內成員有『A, B, C, D, E』。群內新的消息,需要先存儲到群的存儲Timeline(Timeline ID為群名稱),之后需要以寫擴散的模式推送到群內每個成員的同步Timeline(以群成員名稱作為Timeline ID)。

public static void main(String[] args) {String endpoint = "<endpoint>";String accessId = "<access_id>";String accessKey = "<access_key>";String instanceName = "<instance_name>";SyncClient client = new SyncClient(endpoint, accessId, accessKey, instanceName);String pushTable = "PushTable";String storeTable = "StoreTable";String groupName = "TableStore(釘釘號:11789671)";List<String> groupMembers = Arrays.asList("A", "B", "C", "D", "E");// 某個群成員同步群消息List<String> messages = syncMessages(client, pushTable, "A", 0);for (String message : messages) {System.out.println(message);}// 某個群成員查看該群所有的歷史消息messages = syncMessages(client, storeTable, groupName, 0);for (String message : messages) {System.out.println(message);}client.shutdown();}private static List<String> syncMessages(SyncClient client, String timelineTable, String timelineId, long seqId) {RangeIteratorParameter param = new RangeIteratorParameter(timelineTable);PrimaryKey startKey = PrimaryKeyBuilder.createPrimaryKeyBuilder().addPrimaryKeyColumn("timeline_id", PrimaryKeyValue.fromString(timelineId)).addPrimaryKeyColumn("seq_id", PrimaryKeyValue.fromLong(seqId)).build();param.setInclusiveStartPrimaryKey(startKey);PrimaryKey endKey = PrimaryKeyBuilder.createPrimaryKeyBuilder().addPrimaryKeyColumn("timeline_id", PrimaryKeyValue.fromString(timelineId)).addPrimaryKeyColumn("seq_id", PrimaryKeyValue.INF_MAX).build();param.setExclusiveEndPrimaryKey(endKey);param.setMaxVersions(1);Iterator<Row> iter = client.createRangeIterator(param);List<String> messages = new ArrayList<String>();while (iter.hasNext()) {Row row = iter.next();messages.add(row.getLatestColumn("message").getValue().asString());}return messages;}

以上是拉取群內歷史消息以及某個群成員進行消息同步的示例代碼,主要邏輯在syncMessages函數內。示例代碼中,拉取消息都是從seq_id為0開始,0為TableStore自增列中最小值,所以代表了從最小的一個位點開始拉取消息,即拉取全量消息。

后記

這篇文章主要介紹了現代IM系統中消息推送和存儲架構的實現,基于邏輯的Timeline模型,我們可以很清晰明了的理解整個消息推送和存儲的架構。基于TableStore,可以非常簡單的實現Timeline模型,其中自增列功能,完美的匹配了Timeline模型中所需要的最關鍵的SeqId自增。

TableStore(表格存儲)是阿里云自主研發的專業級分布式NoSQL數據庫,是基于共享存儲的高性能、低成本、易擴展、全托管的半結構化數據存儲平臺,支撐互聯網和物聯網數據的高效計算與分析。IM系統的消息推送和存儲場景,是TableStore在社交領域的重要應用之一。

基于Timeline的消息存儲和推送模型,將不光應用在IM消息系統中,還可應用在例如Feeds流、實時消息同步、直播彈幕等場景。在Feeds流場景下,我們也有了比較深入的研究,可以參考《如何打造千萬級Feeds流系統》這篇文章。而在其他更多的場景下,我們將會有更多的深入研究。

作者:木洛

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的现代IM系统中消息推送和存储架构的实现的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。