vaex 处理海量数据_核心业务“瘦身”进行时!手把手带你搭建海量数据实时处理架构...
2.1 如何構(gòu)建數(shù)據(jù)庫日志文件實(shí)時(shí)采集系統(tǒng)
該平臺需要從銀行多個交易系統(tǒng)中,實(shí)時(shí)獲取客戶余額變動和交易明細(xì)數(shù)據(jù)。該過程要求數(shù)據(jù)采集組件能夠提供高性能、高可用性、高安全可靠性的實(shí)時(shí)采集、傳輸功能,因此我們采用了具備這些特性的 OGG 和 CDC 采集框架。CDC(Change Data Capture):基于數(shù)據(jù)庫日志實(shí)現(xiàn)對數(shù)據(jù)源變化的實(shí)時(shí)捕獲,并且實(shí)時(shí)傳輸?shù)侥繕?biāo)端。CDC組件通過讀取各個業(yè)務(wù)生產(chǎn)系統(tǒng)數(shù)據(jù)庫的日志文件捕獲得到更新(插入、刪除、更新)的交易記錄信息數(shù)據(jù),經(jīng)過行列過濾,字符編碼轉(zhuǎn)換后由 TCP/IP 發(fā)送給目標(biāo)端,目標(biāo)端接收到源端數(shù)據(jù)后,經(jīng)過數(shù)值轉(zhuǎn)換,字符編碼轉(zhuǎn)換,沖突檢測后將變更數(shù)據(jù)通過 Confluent Rest API 把數(shù)據(jù)傳送到 Kafka,將數(shù)據(jù)直接進(jìn)行持久化之前進(jìn)行消息隊(duì)列的數(shù)據(jù)緩存。OGG(Oracle GoldenGate)是一種基于日志的挖掘的技術(shù),它通過解析源數(shù)據(jù)庫在線日志或歸檔日志獲得數(shù)據(jù)的增量變化后,再將這些變化的數(shù)據(jù)傳輸?shù)?Kafka 中,Kafka將數(shù)據(jù)直接進(jìn)行持久化之前進(jìn)行消息隊(duì)列的數(shù)據(jù)緩存。2.2 如何保證對海量數(shù)據(jù)的實(shí)時(shí)處理
相比其他實(shí)時(shí)處理框架如 Spark 來說,Storm 的實(shí)時(shí)性較高,延時(shí)低,而在線交易服務(wù)平臺實(shí)時(shí)性要求比較高,要求毫秒級的數(shù)據(jù)處理。Storm 作為純實(shí)時(shí)的計(jì)算框架,其實(shí)時(shí)計(jì)算能力能達(dá)到毫秒級。Storm 是基于數(shù)據(jù)流的實(shí)時(shí)處理系統(tǒng),提供了大吞吐量的實(shí)時(shí)計(jì)算能力。在一條數(shù)據(jù)到達(dá)系統(tǒng)的時(shí)候,系統(tǒng)會立即在內(nèi)存中進(jìn)行相應(yīng)的計(jì)算,因此 Storm 適合要求實(shí)時(shí)性較高的數(shù)據(jù)分析場景。此外,Storm 支持分布式并行計(jì)算,即使海量數(shù)據(jù)大量涌入,也能得到實(shí)時(shí)處理。Storm 還具備以下幾個優(yōu)點(diǎn):低延遲、高可用、分布式、可擴(kuò)展、數(shù)據(jù)不丟失,并且提供簡單容易理解的接口,便于開發(fā)。2.3 如何實(shí)現(xiàn)采集層與實(shí)時(shí)處理層的對接
在采集層和實(shí)時(shí)處理層之間,往往需要加一個消息隊(duì)列機(jī)制,用于實(shí)現(xiàn)采集層與實(shí)時(shí)處理層的解耦,并緩存需要實(shí)時(shí)處理的數(shù)據(jù),保證所有數(shù)據(jù)都能被有序的正確的處理。此外,從源端采集的數(shù)據(jù)流并不是均勻的,而是時(shí)而多時(shí)而少的數(shù)據(jù)流。特別是在高并發(fā)的條件下,數(shù)據(jù)庫日志的數(shù)據(jù)會出現(xiàn)井噴式增長,如果 Storm 的消費(fèi)速度(即使 Storm ?的實(shí)時(shí)計(jì)算能力已經(jīng)很快了)慢于日志的產(chǎn)生速度,必然會導(dǎo)致大量數(shù)據(jù)滯后和丟失,因此我們加上 Kafka 消息系統(tǒng)作為數(shù)據(jù)緩沖區(qū),Kafka 可以將不均勻的數(shù)據(jù)轉(zhuǎn)換成均勻的消息流,從而與 Storm 結(jié)合起來,實(shí)現(xiàn)穩(wěn)定的流式計(jì)算。Kafka 是一個分布式的、分區(qū)化、可復(fù)制提交的日志服務(wù)。作為一個可擴(kuò)展、高可靠的消息系統(tǒng),在流處理中,經(jīng)常用來保存收集流數(shù)據(jù),提供給之后對接的 Storm 流數(shù)據(jù)框架進(jìn)行處理。作為一個消息隊(duì)列系統(tǒng),與大多數(shù)消息系統(tǒng)比較,Kafka 具有更好的吞吐量、內(nèi)置分區(qū)、副本和故障轉(zhuǎn)移等功能,這有利于及時(shí)處理大規(guī)模的消息。03?SequoiaDB?作為存儲層的優(yōu)勢在線交易服務(wù)平臺需要滿足實(shí)時(shí)處理之后海量數(shù)據(jù)的高速存儲和高效檢索,并且需要保證數(shù)據(jù)的可用性與可靠性。SequoiaDB 是一款優(yōu)秀的分布式數(shù)據(jù)庫,可以被用來存儲海量的數(shù)據(jù),其底層主要基于分布式、高可用、高性能與動態(tài)數(shù)據(jù)類型設(shè)計(jì),同時(shí)兼顧了關(guān)系型數(shù)據(jù)庫中眾多的優(yōu)秀設(shè)計(jì)如事務(wù)、多索引、動態(tài)查詢和更新、SQL等。利用巨杉數(shù)據(jù)庫自身的分布式存儲機(jī)制與多索引功能,能夠很好地為應(yīng)用提供高并發(fā)、低延時(shí)的查詢、更新、寫入和刪除操作服務(wù)。
SequoiaDB 使用 MPP(海量并行處理)架構(gòu),整個集群主要由三個角色構(gòu)成,分別是協(xié)調(diào)節(jié)點(diǎn),編目節(jié)點(diǎn)和數(shù)據(jù)節(jié)點(diǎn)。其中,編目節(jié)點(diǎn)存儲元數(shù)據(jù),協(xié)調(diào)節(jié)點(diǎn)負(fù)責(zé)分布式系統(tǒng)的任務(wù)分發(fā),數(shù)據(jù)節(jié)點(diǎn)負(fù)責(zé)數(shù)據(jù)存儲和操作。當(dāng)有應(yīng)用程序向協(xié)調(diào)節(jié)點(diǎn)發(fā)送訪問請求時(shí),協(xié)調(diào)節(jié)點(diǎn)首先通過與編目節(jié)點(diǎn)通信,了解底層數(shù)據(jù)存儲的結(jié)構(gòu)與規(guī)則,再將查詢?nèi)蝿?wù)分發(fā)給不同的數(shù)據(jù)節(jié)點(diǎn),然后聚合所有數(shù)據(jù)節(jié)點(diǎn)上的結(jié)果,并將結(jié)果排序作為合適的查詢結(jié)果。SequoiaDB 具備以下幾點(diǎn)優(yōu)勢:1)?具備豐富的查詢模型:SequoiaDB 適合于各種各樣的應(yīng)用程序。它提供了豐富的索引和查詢支持,包括二級索引,聚合框架等。2)?具有常用驅(qū)動:開發(fā)者整合了系統(tǒng)環(huán)境和代碼庫的原生驅(qū)動庫,通過原生驅(qū)動庫與數(shù)據(jù)庫交互,使得 SequoiaDB 的使用變得簡單和自然。3)?支持水平可擴(kuò)展:開發(fā)人員能夠利用通過服務(wù)器和云基礎(chǔ)架構(gòu)來增加 SequoiaDB 系統(tǒng)的容量,以應(yīng)對數(shù)據(jù)量和吞吐量的增長。4)?高可用性:數(shù)據(jù)的多份副本通過遠(yuǎn)程復(fù)制來維護(hù)。遇到故障系統(tǒng)會自動轉(zhuǎn)移到輔助節(jié)點(diǎn)、機(jī)架和數(shù)據(jù)中心上,使得企業(yè)不需要自定義和優(yōu)化代碼,就能讓系統(tǒng)正常運(yùn)行。5)?內(nèi)存級的性能:數(shù)據(jù)在內(nèi)存中直接讀取和寫入。并且為了系統(tǒng)的持久性,系統(tǒng)會在后臺持續(xù)把數(shù)據(jù)寫入磁盤。這些都為系統(tǒng)提供了快速的性能,使得系統(tǒng)無需使用單獨(dú)的緩存層。04?技術(shù)架構(gòu)實(shí)時(shí)處理架構(gòu)主要分為數(shù)據(jù)實(shí)時(shí)采集,實(shí)時(shí)處理,實(shí)時(shí)存儲三個模塊。其中 CDC,OGG用來獲取數(shù)據(jù),Kafka 用來臨時(shí)保存數(shù)據(jù),Strom 用來進(jìn)行數(shù)據(jù)實(shí)時(shí)計(jì)算,SequoiaDB是分布式數(shù)據(jù)庫,用來保存數(shù)據(jù)。
系統(tǒng)結(jié)構(gòu)整個實(shí)時(shí)分析系統(tǒng)的架構(gòu)先由 OGG/CDC 實(shí)時(shí)捕獲數(shù)據(jù)庫日志文件,提取其中數(shù)據(jù)的變化,如增、刪、改等操作,并存進(jìn) Kafka 消息系統(tǒng)中。然后由 Storm 系統(tǒng)消費(fèi) Kafka 中的消息,消費(fèi)記錄由 Zookeeper 集群管理,這樣即使 Kafka 宕機(jī)重啟后也能找到上次的消費(fèi)記錄。接著從上次宕機(jī)點(diǎn)繼續(xù)從 Kafka 的 Broker 中進(jìn)行消費(fèi),并使用定義好的 Storm Topology 去進(jìn)行日志信息的分析,輸出到 SequoiaDB 分布式數(shù)據(jù)庫中進(jìn)行持久化,最后提供在線實(shí)時(shí)查詢接口供用戶進(jìn)行查詢。4.1 數(shù)據(jù)采集
在日志收集流程方面,針對不同的系統(tǒng)環(huán)境,我們設(shè)計(jì)了不同的采集流程。外圍系統(tǒng)采用實(shí)時(shí)數(shù)據(jù)同步工具 OGG 進(jìn)行數(shù)據(jù)實(shí)時(shí)采集。OGG 通過捕捉進(jìn)程在源系統(tǒng)端讀取數(shù)據(jù)庫日志文件進(jìn)行解析,提取其中數(shù)據(jù)的變化如增、刪、改等操作,并將相關(guān)信息轉(zhuǎn)換為自定義的中間格式存放在隊(duì)列文件中,再利用傳送進(jìn)程將隊(duì)列文件通過 TCP/IP 傳送到 Kafka 隊(duì)列中。OGG數(shù)據(jù)采集流程圖
而對于核心系統(tǒng),通過在核心系統(tǒng)源端部署 InfoSphere CDC 實(shí)時(shí)采集數(shù)據(jù)庫日志及其文件以捕獲源端數(shù)據(jù)庫產(chǎn)生的更新(插入、刪除、更新)交易記錄信息,通過連續(xù)鏡像運(yùn)行模式,不間斷地把最新交易數(shù)據(jù)傳送到目標(biāo)端。在目標(biāo)系統(tǒng)上同樣運(yùn)行? InfoSphere ?CDC,接收來自于不同源系統(tǒng)傳過來的數(shù)據(jù),再通過 Confluent Rest API把數(shù)據(jù)傳送到 Kafka,在對數(shù)據(jù)進(jìn)行計(jì)算或者直接進(jìn)行持久化之前進(jìn)行消息隊(duì)列的數(shù)據(jù)緩存。4.2 實(shí)時(shí)處理
這里采用 Storm 進(jìn)行實(shí)時(shí)處理,Storm 作為實(shí)時(shí)處理框架具備低延遲、高可用、分布式、可擴(kuò)展、數(shù)據(jù)不丟失等特點(diǎn)。這些特點(diǎn)促使 Storm 在保證數(shù)據(jù)不丟失的前提下,依然具備快速的處理速度。在 Storm 集群中 Master 節(jié)點(diǎn)上運(yùn)行的一個守護(hù)進(jìn)程叫“Nimbus”,負(fù)責(zé)集群中計(jì)算程序的分發(fā)、任務(wù)的分發(fā)、監(jiān)控任務(wù)和工作節(jié)點(diǎn)的運(yùn)行情況等;Worker 節(jié)點(diǎn)上運(yùn)行的守護(hù)進(jìn)程叫“Supervisor”,負(fù)責(zé)接收 Nimbus 分發(fā)的任務(wù)并運(yùn)行,每一個 Worker 上都會運(yùn)行著 Topology 程序的一部分,而一個 Topology 程序的運(yùn)行就是由集群上多個 Worker 一起協(xié)同工作的。Nimubs 和 Supervisor 之間的協(xié)調(diào)工作通過 Zookeeper 來管理,Nimbus 和 Supervisor 自己本身在集群上是無狀態(tài)的,它們的狀態(tài)都保存在 Zookeeper 上,所以任何節(jié)點(diǎn)的宕機(jī)和動態(tài)擴(kuò)容都不會影響整個集群的工作運(yùn)行,并且支持 fast-fail 機(jī)制。在 Storm 上做實(shí)時(shí)計(jì)算,需要自定義一個計(jì)算程序“Topology”,一個 Topology 程序由 Spout 和 Bolt 共同組成,Storm 就是通過 Topology 程序?qū)?shù)據(jù)流 Stream 通過可靠(ACK機(jī)制)的分布式計(jì)算生成我們的目標(biāo)數(shù)據(jù)流 Stream。我們使用 Kafkaspout從 Kafka 的 queue 中不間斷地獲得對應(yīng)的 topic 數(shù)據(jù),然后通過自定義 bolt 來做數(shù)據(jù)處理,分別區(qū)分出增、刪、改記錄,再通過自定義 bolt 來調(diào)用 SequoiaDB API 對SequoiaDB 數(shù)據(jù)庫進(jìn)行對應(yīng)的增,刪,改操作,從而達(dá)到對源數(shù)據(jù)實(shí)時(shí)復(fù)制的目的。實(shí)時(shí)處理
4.3 數(shù)據(jù)存儲
數(shù)據(jù)源獲取數(shù)據(jù)經(jīng)過 Kafka 和 Storm實(shí)時(shí)處理之后,通過調(diào)用 SequoiaDB API 接口將實(shí)時(shí)解析后的數(shù)據(jù)存儲到 SequoiaDB 中。通過 SQL 查詢 SequoiaDB 為 OLAP 場景提供支持,也可通過 JDBC 為在線應(yīng)用提供 OLTP 服務(wù)。將海量數(shù)據(jù)保存在 SequoiaDB 分布式數(shù)據(jù)庫中,利用其數(shù)據(jù)庫自身的分布式存儲機(jī)制與多索引功能,能夠很好地為應(yīng)用提供高并發(fā)、低延時(shí)的查詢,以及更新、寫入和刪除操作等服務(wù)。數(shù)據(jù)實(shí)時(shí)處理流程示意圖SequoiaDB 數(shù)據(jù)庫底層采用多維分區(qū)的方式將海量數(shù)據(jù)分散到多個數(shù)據(jù)分區(qū)組上進(jìn)行存儲。該方式通過結(jié)合了 Hash 分布方式和 Partition 分布方式的優(yōu)點(diǎn),讓集合中的數(shù)據(jù)以更小的顆粒度分布到數(shù)據(jù)庫多個數(shù)據(jù)分區(qū)組上,從而提升數(shù)據(jù)庫的性能。多維分區(qū)架構(gòu)圖
采用分區(qū)的目的主要是為了解決單臺服務(wù)器硬件資源受限問題,如內(nèi)存或者磁盤 I/O 瓶頸問題,使得機(jī)器能夠得到橫向擴(kuò)展;此外還能將系統(tǒng)壓力分散到多臺機(jī)器上,從而提高系統(tǒng)性能,并且不會增加應(yīng)用程序復(fù)雜性。同時(shí)結(jié)合 SequoiaDB 的副本模式,保證系統(tǒng)的高可用性。05 實(shí)現(xiàn)價(jià)值5.1 商業(yè)價(jià)值越來越多的企業(yè)不再滿足于通過夜間運(yùn)行批量任務(wù)作業(yè)的方式來處理信息,更傾向于實(shí)時(shí)地獲取數(shù)據(jù)的價(jià)值。他們認(rèn)為數(shù)據(jù)的價(jià)值只有在剛產(chǎn)生時(shí)才是最大的,認(rèn)為在數(shù)據(jù)剛產(chǎn)生時(shí)就移動、處理和使用才是最有意義的。在線交易服務(wù)平臺作為實(shí)時(shí)處理架構(gòu)的最佳實(shí)踐,將各個系統(tǒng)的數(shù)據(jù)進(jìn)行實(shí)時(shí)處理,整合得到有價(jià)值的數(shù)據(jù),并將其保存到 SequoiaDB 數(shù)據(jù)庫中供用戶實(shí)時(shí)查詢使用。數(shù)據(jù)實(shí)時(shí)處理系統(tǒng)不僅提高了用戶的滿意度,還將實(shí)時(shí)處理技術(shù)與實(shí)際業(yè)務(wù)應(yīng)用有效地結(jié)合了起來。在未來,將會有更多的業(yè)務(wù)場景需要該技術(shù)的支持。5.2 技術(shù)價(jià)值一個穩(wěn)定可靠且高效的實(shí)時(shí)處理架構(gòu)是將實(shí)時(shí)數(shù)據(jù)轉(zhuǎn)化為價(jià)值的基礎(chǔ)。在線交易服務(wù)平臺作為由數(shù)據(jù)實(shí)時(shí)處理架構(gòu)搭建起來的平臺,能夠穩(wěn)定的在生成環(huán)境中運(yùn)行,提供高效的服務(wù),在技術(shù)上具有很高的參考價(jià)值。該數(shù)據(jù)實(shí)時(shí)處理架構(gòu)實(shí)現(xiàn)了 SequoiaDB 與其他數(shù)據(jù)庫的實(shí)時(shí)對接,能夠方便從其他數(shù)據(jù)庫中遷移和備份數(shù)據(jù),可以作為 SequoiaDB ?與其他數(shù)據(jù)庫實(shí)時(shí)對接的中間件。點(diǎn)擊閱讀原文,獲取更多精彩內(nèi)容~
總結(jié)
以上是生活随笔為你收集整理的vaex 处理海量数据_核心业务“瘦身”进行时!手把手带你搭建海量数据实时处理架构...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: u盘fat32怎么转换fat U盘FAT
- 下一篇: service注入为null_如何解决q