PyFlink + 区块链?揭秘行业领头企业 BTC.com 如何实现实时计算
大家好,我們是 BTC.com 團隊。2020 年,我們有幸接觸到了 Flink 和 PyFlink 生態(tài),從團隊自身需求出發(fā),完善了團隊內(nèi)實時計算的任務(wù)和需求,搭建了流批一體的計算環(huán)境。
在實現(xiàn)實時計算的過程中,我們在實踐中收獲了一些經(jīng)驗,在此分享一些這方面的心路歷程。主要分享的大綱如下:
- 困惑 ? 描述 ? 思考 ? 行動
- 流批一體的架構(gòu)
- 架構(gòu)
- 效果
- Zeppelin、PyFlink on K8S 等實踐
- Zeppelin
- PyFlink on K8S
- 區(qū)塊鏈領(lǐng)域?qū)嵺`
- 展望 ? 總結(jié)
01 困惑 ? 描述 ? 思考 ? 行動
作為工程師,我們每天都在不斷地了解需求,研發(fā)業(yè)務(wù)。
有一天,我們被拉到了一次團隊總結(jié)會議上,收到了以下的需求:
銷售總監(jiān) A:
我們想要知道銷售的歷史和實時轉(zhuǎn)化率、銷售額,能不能統(tǒng)計一下實時的 TOP5 的商品,還有就是大促時候,用戶實時訪問、商品實時瀏覽量 TOP5 的情況呢,可以根據(jù)他歷史訪問的記錄實時推薦相關(guān)的嗎?
市場總監(jiān) B:
我們想要知道市場推廣的效果,每次活動的實時數(shù)據(jù),不然我們的市場投放無法準(zhǔn)確評估效果,及時反饋啊。
研發(fā)總監(jiān) C:
有些用戶的 Bug 無法復(fù)現(xiàn),日志可以再實時一點嗎?傳統(tǒng)日志分析,需要一定的梳理,可不可以直接清洗 / 處理相關(guān)的數(shù)據(jù)?
采購總監(jiān) D:
這些年是不是流行數(shù)字化,采購這邊想預(yù)測采購需求,做一下實時分類和管理支出,預(yù)測未來供應(yīng)來源,完善一下成本。這個有辦法做嗎?還有有些供應(yīng)商不太穩(wěn)定啊,能監(jiān)控到他們的情況嗎?
運維總監(jiān) E:
網(wǎng)站有時候訪問比較慢,沒有地方可以看到實時的機器情況,搞個什么監(jiān)控大屏,這個有辦法解決嗎?
部門領(lǐng)導(dǎo) F:
可以實現(xiàn)上面的人的需求嗎。
做以上的了解之后,才發(fā)現(xiàn),大家對于數(shù)據(jù)需求的渴望程度,使用方不僅需要歷史的數(shù)據(jù),而且還需要實時性的數(shù)據(jù)。
在電商、金融、制造等行業(yè),數(shù)據(jù)有著迅猛的增長,諸多的企業(yè)面臨著新的挑戰(zhàn),數(shù)據(jù)分析的實時處理框架,比如說做一些實時數(shù)據(jù)分析報表、實時數(shù)據(jù)處理計算等。
和大多數(shù)企業(yè)類似,在此之前,我們是沒有實時計算這方面的經(jīng)驗和積累的。這時,就開始困惑了,怎樣可以更好地做上面的需求,在成本和效果之間取得平衡,如何設(shè)計相關(guān)的架構(gòu)?
窮則思變,在有了困惑以后,我們就開始準(zhǔn)備梳理已有的條件和我們到底需要什么。
我們的業(yè)務(wù)范圍主要在區(qū)塊鏈瀏覽器與數(shù)據(jù)服務(wù)、區(qū)塊鏈礦池、多幣種錢包等。在區(qū)塊鏈瀏覽器的業(yè)務(wù)里,BTC.com 目前已是全球領(lǐng)先的區(qū)塊鏈數(shù)據(jù)服務(wù)平臺,礦池業(yè)務(wù)在業(yè)內(nèi)排行第一,區(qū)塊鏈瀏覽器也是全球前三大瀏覽器之一。
首先,我們通過 parser 解析區(qū)塊鏈上的數(shù)據(jù),得到各方面的數(shù)據(jù)信息,可以分析出每個幣種的地址活躍度、地址交易情況、交易流向、參與程度等內(nèi)容。目前,BTC.com 區(qū)塊鏈瀏覽器與行業(yè)內(nèi)各大礦池和交易所等公司都有相關(guān)合作,可以更好地實現(xiàn)一些數(shù)據(jù)的統(tǒng)計、整理、歸納、輸出等。
面向的用戶,不僅有專業(yè)的區(qū)塊鏈開發(fā)人員,也有各樣的 B 端和 C 端用戶,C 端用戶可以進(jìn)行區(qū)塊鏈地址的標(biāo)注,智能合約的運行,查看智能合約相關(guān)內(nèi)容等,以及鏈上數(shù)據(jù)的檢索和查看。B 端用戶則有更專業(yè)的支持和指導(dǎo),提供 API、區(qū)塊鏈節(jié)點等一些的定制以及交易加速、鏈上的業(yè)務(wù)合作、數(shù)據(jù)定制等。
從數(shù)據(jù)量級來講,截至目前,比特幣大概有 5 億筆交易,3000 多萬地址,22 億輸出(output:每筆交易的輸出),并且還在不斷增長中。以太坊的話,則更多。而 BTC.com 的礦池和區(qū)塊鏈瀏覽器都支持多幣種,各幣種的總數(shù)據(jù)量級約為幾十 T。
礦池是礦工購買礦機設(shè)備后連接到的服務(wù)平臺,礦工可以通過連接礦池從而獲取更穩(wěn)定的收益。這是一個需要保證 7 * 24 小時穩(wěn)定的服務(wù),里面有礦機不斷地提交其計算好的礦池下發(fā)的任務(wù)的解,礦池將達(dá)到網(wǎng)絡(luò)難度的解進(jìn)行廣播。這個過程也可以認(rèn)為是近乎是實時的,礦機通過提交到服務(wù)器,服務(wù)器內(nèi)部再提交到 Kafka 消息隊列,同時有一些組件監(jiān)聽這些消息進(jìn)行消費。而這些提交上來的解可以從中分析出礦機的工作狀態(tài)、算力、連接情況等。
在業(yè)務(wù)上,我們需要進(jìn)行歷史數(shù)據(jù)和實時數(shù)據(jù)的計算。
歷史數(shù)據(jù)要關(guān)聯(lián)一些幣價,歷史交易信息,而這些交易信息需要一直保存,是一種典型的批處理任務(wù)。
每當(dāng)有新區(qū)塊的確認(rèn),就有一些數(shù)據(jù)可以得到處理和分析,比如某個地址在這個區(qū)塊里發(fā)生了一筆交易,那么可以從其交易流向去分析是什么樣的交易,挖掘交易相關(guān)性?;蛘呤窃谶@個區(qū)塊里有一些特殊的交易,比如 segwit 的交易、比如閃電網(wǎng)絡(luò)的交易,就是有一些這個幣種特有的東西可以進(jìn)行解析分析和統(tǒng)計。并且在新區(qū)塊確認(rèn)時的難度預(yù)測也有所變化。
還有就是大額交易的監(jiān)控,通過新區(qū)塊的確認(rèn)和未確認(rèn)交易,鎖定一些大額交易,結(jié)合地址的一些標(biāo)注,鎖定交易流向,更好地進(jìn)行數(shù)據(jù)分析。
還有是一些區(qū)塊鏈方面的 OLAP 方面的需求。
總結(jié)了在數(shù)據(jù)統(tǒng)計方面的需求和問題以后,我們就開始進(jìn)行思考:什么是最合適的架構(gòu),如何讓人員參與少、成本低?
解決問題,無非就是提出假設(shè),通過度量,然后刷新認(rèn)知。
在瀏覽了一些資料以后,我們認(rèn)為,大部分的計算框架都是通過輸入,進(jìn)行處理,然后得到輸出。首先,我們要獲取到數(shù)據(jù),這里數(shù)據(jù)可以從 MySQL 也可以從 Kafka,然后進(jìn)行計算,這里計算可以是聚合,也可以是 TOP 5 類型的,在實時的話,可能還會有窗口類型的。在計算完之后,將結(jié)果做下發(fā),下發(fā)到消息渠道和存儲,發(fā)送到微信或者釘釘,落地到 MySQL 等。
團隊一開始嘗試了 Spark,搭建了 Yarn,使用了 Airflow 作為調(diào)度框架,通過做 MySQL 的集成導(dǎo)入,開發(fā)了一些批處理任務(wù),有著離線任務(wù)的特點,數(shù)據(jù)固定、量大、計算周期長,需要做一些復(fù)雜操作。
在一些批處理任務(wù)上,這種架構(gòu)是穩(wěn)定的,但是隨著業(yè)務(wù)的發(fā)展,有了越來越多的實時的需求,并且實時的數(shù)據(jù)并不能保證按順序到達(dá),按時間戳排序,消息的時間字段是允許前后有差距的。在數(shù)據(jù)模型上,需求驅(qū)動式的開發(fā),成本相對來說,Spark 的方式對于當(dāng)時來說較高,對于狀態(tài)的處理不是很好,導(dǎo)致影響一部分的效率。
其實在 2019 年的時候,就有在調(diào)研一些實時計算的事情,關(guān)注到了 Flink 框架,當(dāng)時還是以 Java 為主,整體框架概念上和 Spark 不同,認(rèn)為批處理是一種特殊的流,但是因為團隊沒有 Java 方面的基因和沉淀,使用 Flink 作為實時計算的架構(gòu),在當(dāng)時就暫告一個段落。
在 2020 年初的時候,不管是 Flink 社區(qū) 還是 InfoQ,還是 B 站,都有在推廣 PyFlink,而且當(dāng)時尤其是程鶴群[1]和孫金城[2]的視頻以及孫金城老師的博客[3]的印象深刻。于是就想嘗試 PyFlink,其有著流批一體的優(yōu)勢,而且還支持 Python 的一些函數(shù),支持 pandas,甚至以后還可以支持 Tensorflow、Keras,這對我們的吸引力是巨大的。在之后,就在構(gòu)思我們的在 PyFlink 上的流批一體的架構(gòu)。
02 流批一體的架構(gòu)
架構(gòu)
首先我們要梳理數(shù)據(jù),要清楚數(shù)據(jù)從哪里來。在以 Spark 為主的時期,數(shù)據(jù)是定期從數(shù)據(jù)源加載(增量)數(shù)據(jù),通過一定的轉(zhuǎn)換邏輯,然后寫入目的地,由于數(shù)據(jù)量和業(yè)務(wù)需要,延遲通常在小時級別,而實時的話,需要盡可能短的延遲,因此將數(shù)據(jù)源進(jìn)行了分類,整體分成了幾部分,一部分是傳統(tǒng)的數(shù)據(jù)我們存放在 MySQL 持久化做保存,這部分之后可以直接作為批處理的計算,也可以導(dǎo)入 Hive,做進(jìn)一步的計算。實時的部分,實際上是有很多思路,一種方式是通過 MySQL 的 Binlog 做解析,還有就是 MySQL 的 CDC 功能,在多方考量下,最后我們選擇了 Kafka,不僅是因為其是優(yōu)秀的分布式流式平臺,而且團隊也有對其的技術(shù)沉淀。
并且實際上在本地開發(fā)的時候,安裝 Kafka 也比較方便,只需要 Brew Install Kafka,而且通過 Conduktor 客戶端,也可以方便的看到每個 Topic 的情況。于是就對現(xiàn)有的 Parser 進(jìn)行改造,使其支持 Kafka,在當(dāng)收到新的區(qū)塊時,會立即向 Kafka 發(fā)送一個消息,然后進(jìn)行處理。
大概是在 2018 年的時候,團隊將整體的業(yè)務(wù)遷移到了 Kubernetes 上,在業(yè)務(wù)不斷發(fā)展的過程中,其對開發(fā)和運維上來說,減輕了很多負(fù)擔(dān),所以建議有一定規(guī)模的業(yè)務(wù),最好是遷移到 Kubernetes,其對成本的優(yōu)化,DevOps,以及高可用的支持,都是其他平臺和傳統(tǒng)方式無法比擬的。
在開發(fā)作業(yè)的過程中,我們在盡可能的使用 Flink SQL,同時結(jié)合一些 Java、Python 的 UDF、UDAF、UDTF。每個作業(yè)通過初始化類似于以下的語句,形成一定的模式:
self.source_ddl = '''CREATE TABLE SourceTable (xxx int) WITH ''' self.sink_ddl = '''CREATE TABLE SinkTable (xxx int) WITH ''' self.transform_ddl = '''INSERT INTO SinkTableSELECT udf(xxx)FROM SourceTableGROUP BY FROM_UNIXTIME(`timestamp`, 'yyyyMMdd') '''在未來的話,會針對性地將數(shù)據(jù)進(jìn)行分層,按照業(yè)界通用的 ODS、DWD、DWS、ADS,分出原始層,明細(xì)層和匯總層,進(jìn)一步做好數(shù)據(jù)的治理。
效果
最終我們團隊基于 PyFlink 開發(fā)快速地完成了已有的任務(wù),部分是批處理作業(yè),處理過去幾天的數(shù)據(jù),部分是實時作業(yè),根據(jù) Kafka 的消息進(jìn)行消費,目前還算比較穩(wěn)定。
部署時選擇了 Kubernetes,具體下面會進(jìn)行分享。在 K8S 部署了 Jobmanager 和 Taskmanager,并且使用 Kubernetes 的 job 功能作為批處理作業(yè)的部署,之后考慮接入一些監(jiān)控平臺,比如 Prometheus 之類的。
在成本方面,由于是使用的 Kubernetes 集群,因此在機器上只有擴展主機的成本,在這種方式上,成本要比傳統(tǒng)的 Yarn 部署方式要低,并且之后 Kuberntes 會支持原生部署,在擴展 Jobmanager 和 Taskmanager 上面會更加方便。
03 Zeppelin、PyFlink on K8S 等實踐
Zeppelin 是我們用來進(jìn)行數(shù)據(jù)探索和邏輯驗證,有些數(shù)據(jù)在本地不是真實數(shù)據(jù),利用 Zeppelin 連接實際的鏈上數(shù)據(jù),進(jìn)行計算的邏輯驗證,當(dāng)驗證完成后,便可轉(zhuǎn)換成生產(chǎn)需要的代碼進(jìn)行部署。
一、Kubernetes 上搭建 PyFlink 和 Zeppelin
1. 整理后的部署 Demo 在 github,可以參閱相關(guān)鏈接[4]。
2. 關(guān)于配置文件,修改以下配置的作用。
(1). 修改 configmap 的 flink-conf.yaml 文件的 taskmanager 配置。
taskmanager.numberOfTaskSlots: 10
調(diào)整 Taskmanager 可以調(diào)整運行的 job 的數(shù)量。
(2). 在 Zeppelin 的 dockerfile 中修改 zeppelin-site.xml 文件。
cp conf/zeppelin-site.xml.template conf/zeppelin-site.xml; \ sed -i 's#<value>127.0.0.1</value>#<value>0.0.0.0</value>#g' conf/zeppelin-site.xml; \ sed -i 's#<value>auto</value>#<value>local</value>#g' conf/zeppelin-site.xml- 修改請求來源為 0.0.0.0,如果是線上環(huán)境,建議開啟白名單,加上 auth 認(rèn)證。
- 修改 interpreter 的啟動模式為 local,auto 會導(dǎo)致在集群啟動時,以 K8s 的模式啟動,目前 K8s 模式只支持 Spark,local 模式可以理解為,Zeppelin 將在本地啟動一個連接 Flink 的 interpreter 進(jìn)程。
- Zeppelin 和在本地提交 Flink 作業(yè)類似,也需要 PyFlink 的基礎(chǔ)環(huán)境,所以需要將 Flink 對應(yīng)版本的 jar 包放入鏡像內(nèi)。
3. Zeppelin 的 ingress 中添加 websocket 配置。
nginx.ingress.kubernetes.io/configuration-snippet: |proxy_set_header Upgrade "websocket";proxy_set_header Connection "Upgrade";Zeppelin 在瀏覽器需要和 server 端建立 socket 連接,需要在 ingress 添加 websocket 配置。
4.Flink 和 Zeppelin 數(shù)據(jù)持久化的作用。
volumeMounts: - mountPath: /zeppelin/notebook/name: data volumes: - name: datapersistentVolumeClaim:claimName: zeppelin-pvc --- apiVersion: v1 kind: PersistentVolumeClaim metadata:name: zeppelin-pvc spec:storageClassName: efs-scaccessModes:- ReadWriteOnceresources:requests:storage: 1Gi- 對 Flink 的 /opt/flink/lib 目錄做持久化的目的,是當(dāng)我們需要新的 jar 包時,可以直接進(jìn)入 Flink 的 pod 進(jìn)行下載,并存放到 lib 目錄,保證 jobmanager 和 taskmanager 的 jar 版本一致,同時也無需更換鏡像。
- Zeppelin 的任務(wù)作業(yè)代碼會存放在 /zeppelin/notebook/ 目錄下,目的是方便保存編寫好的代碼。
5. Flink 命令提交 job 作業(yè)的方式。
(1). 本地安裝 PyFlink,Python 需要3.5及以上版本。
$ pip3 install apache-flink==1.11.1
(2). 測試 Demo
def word_count():env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())sink_ddl = """create table Results (word VARCHAR, `count` BIGINT) with ( 'connector' = 'print')"""t_env.sql_update(sink_ddl)elements = [(word, 1) for word in content.split(" ")]# 這里也可以通過 Flink SQLt_env.from_elements(elements, ["word", "count"]) \.group_by("word") \.select("word, count(1) as count") \.insert_into("Results")t_env.execute("word_count")if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")word_count()或者是實時處理的 Demo:
def handle_kafka_message():s_env = StreamExecutionEnvironment.get_execution_environment()# s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)s_env.set_parallelism(1)st_env = StreamTableEnvironment \.create(s_env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())source_ddl = '''CREATE TABLE SourceTable (word string) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Topic','connector.properties.bootstrap.servers' = 'localhost:9092','connector.properties.zookeeper.connect' = 'localhost:2121','format.type' = 'json','format.derive-schema' = 'true')'''sink_ddl = """create table Results (word VARCHAR) with ('connector' = 'print')"""st_env.sql_update(sink_ddl)st_env.sql_update(source_ddl)st_env.from_path("source").insert_into("sink")st_env.execute("KafkaTest")if __name__ == '__main__':handle_kafka_message()(3). 本地測試 Flink 命令提交 job 作業(yè)。
$ flink run -m localhost:8081 -py word_count.py python/table/batch/word_count.py Job has been submitted with JobID 0a31b61c2f974bcc3f344f57829fc5d5 Program execution finished Job with JobID 0a31b61c2f974bcc3f344f57829fc5d5 has finished. Job Runtime: 741 ms(4). 如果存在多個 Python 文件,可以先 zip 打包后再進(jìn)行提交作業(yè)。
$ zip -r flinkdemo.zip ./* $ flink run -m localhost:8081 -pyfs flinkdemo.zip -pym main(5). Kubernetes 通過集群的 CronJob 定時調(diào)度來提交 Job,之后會做自研一些 UI 后臺界面做作業(yè)管理與監(jiān)控。
04 在區(qū)塊鏈領(lǐng)域?qū)嵺`
隨著區(qū)塊鏈技術(shù)的越來越成熟,應(yīng)用越來越多,行業(yè)標(biāo)準(zhǔn)化、規(guī)范化的趨勢也開始顯現(xiàn),也越來越依賴于云計算、大數(shù)據(jù),畢竟是數(shù)字經(jīng)濟的產(chǎn)物。BTC.com 也在扎根于區(qū)塊鏈技術(shù)基礎(chǔ)設(shè)施,為各類公司各類應(yīng)用提供數(shù)據(jù)和業(yè)務(wù)上的支持。
近些年,有個詞火遍了 IT 業(yè)界,中臺,不管是大公司還是創(chuàng)業(yè)公司,都喜歡扯上這個概念,號稱自己業(yè)務(wù)中臺,數(shù)據(jù)中臺等。我們的理解中,中臺是一種整合各方面資源的能力,從傳統(tǒng)的單兵作戰(zhàn),到提升武器裝備后勤保障,提升作戰(zhàn)能力。在數(shù)據(jù)上打破數(shù)據(jù)孤島,在需求快速變化的前臺和日趨穩(wěn)定的后臺中取得平衡。而中臺更重要的是服務(wù),最終還是要回饋到客戶,回饋到合作伙伴。
在區(qū)塊鏈領(lǐng)域,BTC.com 有著深厚的行業(yè)技術(shù)積累,可以提供各方面數(shù)據(jù)化的能力。比如在利用機器學(xué)習(xí)進(jìn)行鏈上數(shù)據(jù)的預(yù)估,預(yù)估 eth 的 gas price,還有最佳手續(xù)費等,利用 keras 深度學(xué)習(xí)的能力,進(jìn)行一些回歸計算,在之后也會將 Flink、機器學(xué)習(xí)和區(qū)塊鏈結(jié)合起來,對外提供更多預(yù)測類和規(guī)范化分類的數(shù)據(jù)樣本,之前是在用定時任務(wù)不斷訓(xùn)練模型,與 Flink 結(jié)合之后,會更加實時。在這方面,以后也會提供更多的課題,比如幣價與 Defi,輿情,市場等的關(guān)系,區(qū)塊鏈地址與交易的標(biāo)注和分類。甚至于將機器學(xué)習(xí)訓(xùn)練的模型,放于 IPFS 網(wǎng)絡(luò)中,通過去中心化的代幣進(jìn)行訓(xùn)練,提供方便調(diào)用樣本和模型的能力。
在目前,BTC.com 推出了一些通過數(shù)據(jù)挖掘?qū)崿F(xiàn)的能力,包括交易推送、OLAP 鏈上分析報表等,改善和提升相關(guān)行業(yè)和開發(fā)者實際的體驗。我們在各種鏈上都有監(jiān)控節(jié)點,監(jiān)控各區(qū)塊鏈網(wǎng)絡(luò)的可用性、去中心化程度,監(jiān)控智能合約。在接入一些聯(lián)盟鏈、隱私加密貨幣,可以為聯(lián)盟鏈、隱私加密貨幣提供這方面的數(shù)據(jù)能力。
BTC.com 將為區(qū)塊鏈產(chǎn)業(yè)生態(tài)發(fā)展做出更多努力,以科技公司的本質(zhì),以技術(shù)發(fā)展為第一驅(qū)動力,以市場和客戶為導(dǎo)向,開發(fā)創(chuàng)新和融合應(yīng)用,做好基礎(chǔ)設(shè)施。
05 展望與總結(jié)
從實時計算的趨勢,到流批一體的架構(gòu),通過對 PyFlink 和 Flink 的學(xué)習(xí),穩(wěn)定在線上運行了多種作業(yè)任務(wù),對接了實際業(yè)務(wù)需求。并且搭建了 Zeppelin 平臺,使得業(yè)務(wù)開發(fā)上更加方便。在計算上盡可能地依賴 SQL,方便各方面的集成與調(diào)試。
在社區(qū)方面,PyFlink 也是沒有令我們失望的,較快的響應(yīng)能力,不斷完善的文檔。在 Confluence[5]上也可以看到一些 Flink Improvement Proposals,其中也有一些是 PyFlink 相關(guān)的,在不遠(yuǎn)的將來,還會支持 Pandas UDAF,DataStream API,ML API,也期望在之后可以支持 Joblistener,總之,在這里也非常感謝相關(guān)團隊。
未來的展望,總結(jié)起來就是,通過業(yè)務(wù)實現(xiàn)數(shù)據(jù)的價值化。而數(shù)據(jù)中臺的終局,是將數(shù)據(jù)變現(xiàn)。
?
?
?
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的PyFlink + 区块链?揭秘行业领头企业 BTC.com 如何实现实时计算的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark On MaxCompute如
- 下一篇: 一种通用整形数组压缩方法