酷狗音乐的大数据实践
此文是根據酷狗音樂大數據架構師王勁在【QCON高可用架構群】中的分享內容整理而成,轉發請注明出處。
王勁:目前就職酷狗音樂,大數據架構師,負責酷狗大數據技術規劃、建設、應用。 11年的IT從業經驗,2年分布式應用開發,3年大數據技術實踐經驗,主要研究方向流式計算、大數據存儲計算、分布式存儲系統、NoSQL、搜索引擎等。
編輯整理:陳剛@北京智識
本次分享的主要內容包括:什么是大數據,大數據技術架構,大數據技術實現,持續改進四個方面。
大數據平臺是一個龐大的系統工程,整個建設周期很長,涉及的生態鏈很長(包括:數據采集、接入,清洗、存儲計算、數據挖掘,可視化等環節,每個環節都當做一個系統建設),風險也很大。
1.什么是大數據
所謂“大數據”(big data) 指的是這樣一種現象:一個公司日常運營所生成和積累用戶行為數據“增長”如此之快, 以至于難以使用現有的數據庫管理工具來駕馭,困難存在于數據的獲取、存儲、搜索、共享、分析和可視化等方面。這些數據量是如此之大,已經不是以我們所熟知的多少G和多少T為單位來衡量,而是以P(1000個T), E(一百萬個T)或Z(10億個T)為計量單位,所以稱之為大數據。
(插圖1)
大數據來源:半個世紀以來,隨著計算機技術全面融入社會生活,信息爆炸已經積累到了一個開始引發變革的程度。它不僅使世界充斥著比以往更多的信息,而且其增長速度也在加快。信息爆炸的學科如天文學和基因學,創造出了“大數據”這個概念。如今,這個概念幾乎應用到了所有人類智力與發展的領域中。21世紀是數據信息大發展的時代,移動互聯、社交網絡、電子商務等極大拓展了互聯網的邊界和應用范圍,各種數據正在迅速膨脹并變大。 互聯網(社交、搜索、電商)、移動互聯網(微博)、物聯網(傳感器,智慧地球)、車聯網、GPS、醫學影像、安全監控、金融(銀行、股市、保險)、電信(通話、短信)都在瘋狂產生著數據。
(插圖2)
大數據特征:“大量化(Volume)、多樣化(Variety)、快速化(Velocity)、價值密度低(Value)”就是“大數據”顯著的4V特征,或者說,只有具備這些特點的數據,才是大數據。
(插圖3)
?
大數據技術要解決的問題:大數據技術被設計用于在成本可承受的條件下,通過非常快速(velocity)地采集、發現和分析,從大量(volumes)、多類別(variety)的數據中提取價值(value),將是IT領域新一代的技術與架構。
(插圖4)
大數據的應用前景:從應用方向上看,通過對大數據的儲存、挖掘與分析,大數據在營銷、企業管理、數據標準化與情報分析等領域大有作為。從應用行業來看,大數據一方面可以應用于客戶服務水平提升及營銷方式的改進,另一方面可以助力行業內企業降低成本,提升運營效益,同時還能幫助企業進行商業模式的創新及發現新的市場商機。從對整個社會的價值來看,大數據在智慧城市、智慧交通及災難預警等方面都有巨大的潛在應用價值。專業機構預測,隨著互聯網技術的高速發展,云計算、物聯網應用的日益豐富,大數據未來發展前景將更為廣闊。
(插圖5)
大數據現在在生活中無處不在了,那作為IT技術人員,酷狗是怎么通過技術來怎么解決大數據的問題呢?
2.酷狗數據中心大數據技術架構
第一代大數據架構:
(插圖6)
主要基于Hadoop1.x+hive做離線計算(T+1),缺點:等所有數據到達后才開始計算,集群資源使用率分布不均衡,凌晨1點到中午12點,集群資源最繁忙;中午12點到晚上12點,集群資源屬于空閑狀態。
在大數據中,數據的時效性越高,數據越有價值(如:實時個性化推薦系統,RTB系統,實時預警系統等),因此,又研發了第二代技術架構。目前數據中心并行運行兩套集群(hadoop1.x,hadoop2.6),新業務直接接入新集群,舊集群的業務數據正在遷移到新集群中;新集群的結果與舊集群的結果對比(很快會全部切換到新集群)。
第二代大數據技術架構
從數據處理流程看,分數據源、數據接入、數據清洗、存儲計算、數據服務、數據消費等環節,大數據處理流程,如下圖:
(插圖7)
第二代大數據技術整體架構圖如下:
(插圖8)
大數據計算分實時計算與離線計算,在整個集群中,奔著能實時計算的,一定走實時計算流處理,通過實時計算流來減輕集群的資源使用率集中現象。
離線計算(批處理):通過spark,spark SQL實現【關于Spark就不詳細介紹了,近幾年發展最快的大數據處理框架】,整體性能比hive提高5—10倍,hive腳本都在轉換為Spark SQL。
實時計算:基于Storm,Drools,Esper。【關于storm的詳細內容,這里不做介紹了,想要查看介紹,請關注本公眾號,并查看歷史消息即可。Drools(詳解見:http://www.drools.org/),Esper(詳解見:http://blog.csdn.net/luonanqin/article/category/1557469)】
HBase/MySQL:用于實時計算,離線計算結果存儲服務。
Redis:用于中間計算結果存儲或字典數據等。
ElasticSearch:用于明細數據實時查詢及HBase的二級索引存儲。
通過新的技術架構實現比原有的數據時效性明顯提高,現在能實時提供DAU,PV等數據,整體離線計算的整體時長縮短了50%。
3.酷狗大數據技術實現
下面給大家講解下,數據采集接入、數據清洗、實時監控系統、明細查詢的實現細節。
組件之數據采集
從數據處理流圖中,可以知道,數據源分為前端日志,服務端日志,業務數據。下面講解數據是怎么采集接入的。
a.前端日志采集接入:
前端日志采集要求實時,可靠性,高可用性等特性。技術選型時,對開源的數據采集工具flume,scribe,chukwa測試對比,發現基本滿足不了業務場景需求。
(插圖9)
詳細的對比,大家可以看看(http://www.ttlsa.com/log-system/scribe-chukwa-kafka-flume-log-system-contrast/) 。所以,選擇基于kafka,自己開發前置kafka代理網關,來完成數據采集需求。前置代理網關的開發過程中走了一些彎路,最后采用nginx+lua開發,基于lua實現了kafka生產者協議。kafka的生產者協議實現,有興趣可以去https://github.com/doujiang24/lua-resty-kafka看看,另一同事實現的,現在的github上還比較活躍,提需求的人也有不少。
采集網關的具體實現如下:
[數據可靠性]
采集網關,對數據可靠性,提供以下兩點保障:
數據由sdk 發往采集網關,網關收到數據即返回成功,由網關確保數據發往kafka。 1)sdk 未收到成功,就會重試(安全:帶上上次task id); 2)js,flash 端沒有重試邏輯
網關與kafka通訊,提供兩種不同類別保障: 1)at most once (最多一次發送成功) 2) at least once (至少一次發送成功)
[網關與kafka通訊可靠性保障細節]
前提:由于kafka 本身,在整體設計以及通訊協議上,并不提供強一致性保證(exactly once)[http://kafka.apache.org/documentation.html#semantics] 所以,網關與kafka通訊中,每次發送數據區分為三種狀態: 1)kafka寫入成功(兩份) 2)可以安全重試(kafka肯定沒收到) 3)不可安全重試(kakfa可能寫入了或者沒有)
網關為保障數據的準確性,采用以下策略: 1)可安全重試的,由網關本地緩存,再發送kafka(優先redis,再磁盤) 2)不可安全重試的,將進入指定容錯topic
通過以上策略,確保在正常topic 里提供[at most once] 保障; 算上容錯topic 提供[at least once] 保障;
不可安全重試的狀態包括: 1)網絡錯誤(發送超時,等待響應超時) 2)RequestTimedOut 狀態(該錯誤碼,在動態遷移partition 時,數據并未被成功寫入;其他則會被成功寫入)
有了兩層的保障,再配合網關上報的監控(收到的量,成功發送量,本地緩存量),可以 1)從每個topic 的量,監控整個系統的運行情況 2) 對于普通業務場景,只需要正常使用topic 3)如果網絡有長時間的抖動,或者kafka出現宕機(都將導致出現較多不可安全重試內容),可能需要特殊處理容錯topic內的數據
b.后端日志采集接入:
FileCollect,考慮到很多線上環境的環境變量不能改動,為減少侵入式,目前是采用Go語言實現文件采集。
前端,服務端的數據整體架構如下圖:
(插圖10)
c.業務數據接入
Canal:利用Canal通過MySQL的binlog機制實時同步業務增量數據。(有關canal的介紹,參考:http://agapple.iteye.com/blog/1796633)
組件之數據清洗(ETL)
上面介紹了,數據采集接入的實現方式,接下來介紹的都是基于Storm框架實現的,先介紹數據清洗(ETL), 此處的ETL只是做簡單的數據轉義,補全,異常數據處理。
(插圖11)
Storm(數據清洗) Kafka Spout 負責消費Kafka數據
IsDecode Bolt 負責判斷數據是否解碼
Decode Bolt 負責數據解碼
Rules Bolt 負責數據規則解析,引入規則引擎(Drools),解決數據變更需求。
FormatRule 負責數據格式化規則,適應不同的格式。
DataAdapter 負責數據存儲適配,需要適配HDFS,HBase,Spark,數據庫等。
Error Bolt 負責異常數據寫入HDFS,方便異常數據明細查詢。
Stat Bolt 統計從kafka消費數據量
在使用Storm中遇到了,業務配置需要變更時,怎么實現動態變更的問題?基于事件驅動的方式解決配置動態變更需求,最初考慮kafka創建事件隊列,通過監控隊列的事件數據來實現。后來在萬能的GitHub找到了Storm-Signal,使用Storm-Signal實現,詳細介紹見:https://github.com/ptgoetz/storm-signals?;贒rools實現規則引擎時,需要解決怎么不重啟topology的情況下,讓修改的規則文件生效,也是基于Storm-Signal組件和把drl文件轉換為流存儲在redis中,這樣動態獲取drl流文件。
組件之實時監控
接下來,給大家介紹下實時監控系統基于Storm的實現。在介紹之前,先介紹下OpenTSDB,實時監控的存儲是采用OpenTSDB來處理的。
a.OpenTSDB
OpenTSDB是基于HBase存儲時間序列數據的一個開源數據庫,確切地說,它只是一個HBase的應用而已,其對于時間序列數據的處理可以供其他系統參考和借鑒。OpenTSDB使用HBase作為存儲中心,它無須采樣,可以完整的收集和存儲上億的數據點,支持秒級別的數據監控,得益于HBase的分布式列式存儲,HBase可以靈活的支持metrics的增加,可以支持上萬機器和上億數據點的采集。在openTSDB中,TSD是HBase對外通信的daemon程序,沒有master/slave之分,也沒有共享狀態,因此利用這點和HBase集群的特點就可以消除單點。用戶可以通過telnet或者http協議直接訪問TSD接口,也可以通過rpc訪問TSD。每一個需要獲取metrics的Servers都需要設置一個Collector用來收集時間序列數據。這個Collector就是你收集數據的腳本。
(插圖12)
如果想快速地展示mysql中在一段時間內執行delete子句的數量,慢查詢的數量,創建的臨時文件數量以及99%的延遲數量等等。OpenTSDB則可以非常容易存儲和處理百萬級別以上的數據點,并能實時動態的生成對應的圖,如下圖:
(插圖13)
OpenTSDB使用async HBase,這是個完全異步、非阻塞、線程安全、HBase api,使用更少的線程、鎖以及內存可以提供更高的吞吐量,特別對于大量的寫操作。下圖為讀寫流程:
(插圖14)
在HBase中,表結構的設計對性能具有很大的影響,其中tsdb-uid表和tsdb表見表1和表2
b.實時監控系統
總體架構圖:
(插圖17)
Kafka Spout 負責讀Kafka數據
Decode Bolt 負責日志解碼
Detail Bolt 負責原始數據存儲ES集群,提示實時原始日志查詢。
Stat Rules Bolt負責日志格式解析,引入規則引擎(Drools),解決數據格式變更需求。
TSD Bolt 負責多維度統計結果存儲,通過TSD服務建立統計指標與Rowkey的映射關系
Alarm Bolt 負責多維度統計結果寫入kafka Alarm Queue。
在實時監控系統的瓶頸不是在實時計算上,而是在結果存儲方面?在存儲方面,花了大量的時間去調優測試,其中也參考了攜程對OpenTSDB的一些建議(攜程的OpenTSDB使用的很好,好像還申請了專利)。存儲這塊,我們主要對它做了以下改進和優化,跟我們的需求進行定制修改源代碼。OpenTSDB的改進和優化
(1)去除聚合時的分組插值,直接聚合
(2)修改了startkey和endkey中時間戳,只查詢需要的行,對查詢結果時間戳添加時區的偏移。
(3)添加降采樣表,供采樣粒度較大的查詢使用,減少了rowkey數,提升了查詢性能。
(4)添加協處理器支持,減少io和序列化/反序列化開銷。
第4點的改動比較大,把OpenTSDB的查詢處理功能有客戶端搬遷到服務端(協處理器),大大減少了減少io和序列化/反序列化開銷,性能提升明顯。還有點,我們通過降維,添加分區標識來提高性能,這點主要利用HBase的分區特性。
組件之明細查詢
接下再給大家介紹下,明細數據查詢方案,引入了搜索引擎,架構如下圖:
(插圖18)
目前實現了實時監控系統的明細日志查詢功能。基于Storm把明細數據轉換成相應的格式,通過ElasticSearch-Storm組件實現日志實時寫入ES,再通過ES實時查詢。我們現在還利用ES解決HBase的二級索引問題。
關于Storm,我們抽出了一些公共組件,提高開發效率。
(插圖19)
4.持續改進
目前我們數據中心存在的問題:
1、業務代碼開發量大
2、海量數據查詢問題
3、數據的解讀問題
針對以上問題,從以下方面改進:
1、基于SummingBird實現Lambda架構
2、大數據存儲與查詢優化
3、數據可視化應用。大家對SummingBird有興趣的,可以參考:http://clockfly.diandian.com/post/2014-05-19/summingbirdintroductionapplication
Q&A
Q1、想問下王總:spark目前主要使用在什么場景下,批處理,流處理,數據倉庫sql這種all in one的模式比較起來有優勢嗎?還有就是Spark是如何和其他平臺共享集群資源的?感謝!
spark我們目前的大部分新業務都是用spark與spark SQL實現,例如:用戶畫像。批處理,流處理這塊主要是針對海量數據,在大數據中sql的支持度還不是很完善,spark主要基于yarn,資源管理由yarn來負責,我們通過公平調度的策略管理資源。
Q2、數據可視化的時候,不同角色的數據權限控制是怎樣做的?
我們通過角色對應的功能點,目前還沒做到數據權限,后續會通過acl去實現,在hadoop2.6已經提供了,我們集群的版本已經是最新的了
Q3、提個問題,我們在使用 spark sql 中發現其很不穩定,經常崩潰,而hive目前還是很穩定的,請問您是如何解決的呢?
我們采用的是spark 1.3版本,使用spark sql也是也會遇到一些問題,通過調整內存參數解決,一般通過spark原始api。
Q4、收集日志的sdk,與gateway之間是通過什么通信的?長連接,還是http?怎么協調多個不同的client?
通過http協議,定義了自己的通信協議,不是長連接。手機,pc不同的客戶端,都是標識
Q5、storm和spark都有實時計算特性,各適合什么場景?
對穩定性,實時性要求高的,我們會采用storm,spark Streaming目前版本時延比較大,還有對kafka的支持不是很完善,坑多
Q6、這個架構多少人在維護,感覺語言,框架都用的挺多;修改過框架后,如果遇到版本升級,在升級版本的時候,如何處理修改過的內容
十多人維護,語言主要是java,scala。我們的改動比較大,所以后續會自己添加一些新的特性,機會成熟也可以考慮開源
Q7、關于大數據實現用戶畫像方面的成果、大概方案,能再介紹下不?感興趣。
其實畫像這塊,目前還是第一版本,數據訓練與分類是分開的,通過spark利用訓練好的模型通過行為數據跑用戶標簽,二期,會基于spark Streaming spark mllib HBase完善標簽系統,方案已定了。
Q8、spark目前批處理,流處理,數據倉庫sql這種all in one的模式和hadoop生態比較,在實際使用中起來有優勢嗎?還有就是Spark是如何和其他平臺共享集群資源?
相比mr,hive,作業的執行效率高,在相同的時間內處理的任務都要多,但hive的穩定性目前比spark SQL要好。spark部署在yarn上,由yarn負責集群資源調度。
補充:就是說spark在批處理的效率仍然高于mr,而all in one仍然還是各自處理數據,存儲到外部。并沒有在spark內直接實現數據互通吧?
是的,最終還是存儲在hdfs,引入了tachyon,tachyon提高數據的讀取。
補充:我本人現在做一個小項目,目的想實現在spark多應用之間的內存緩存數據的互通,例如streaming形成一個history不落地直接給sql使用。不知道有沒有適合場景?
kafka Spark streaming有很多應用場景,后期,我們會畫像會采用該方案
轉載于:https://www.cnblogs.com/davidwang456/articles/9273376.html
總結
以上是生活随笔為你收集整理的酷狗音乐的大数据实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 服务容错模式
- 下一篇: 基于 Spring Cloud 的服务治