日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

大数据乘(tu)风(tou)破(bian)浪(qiang)之路

發(fā)布時(shí)間:2024/3/12 编程问答 62 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据乘(tu)风(tou)破(bian)浪(qiang)之路 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

QUESTION1大數(shù)據(jù)能做什么?

大數(shù)據(jù)無(wú)處不在,大數(shù)據(jù)應(yīng)用于各個(gè)行業(yè),包括金融、汽車(chē)、餐飲、電信、能源、體能和娛樂(lè)等在內(nèi)的社會(huì)各行各業(yè)都已經(jīng)融入了大數(shù)據(jù)的印跡。

  • 電商大數(shù)據(jù)——精準(zhǔn)營(yíng)銷(xiāo)法寶
  • 電商是最早利用大數(shù)據(jù)進(jìn)行精準(zhǔn)營(yíng)銷(xiāo)的行業(yè),除了精準(zhǔn)營(yíng)銷(xiāo),電商可以依據(jù)客戶(hù)消費(fèi)習(xí)慣來(lái)提前為客戶(hù)備貨,并利用便利店作為貨物中轉(zhuǎn)點(diǎn),在客戶(hù)下單15分鐘內(nèi)將貨物送上門(mén),提高客戶(hù)體驗(yàn)。

    例如:馬云的菜鳥(niǎo)網(wǎng)絡(luò)宣稱(chēng)的24小時(shí)完成在中國(guó)境內(nèi)的送貨;以及劉強(qiáng)東宣傳未來(lái)京東將在15分鐘完成送貨上門(mén)都是基于客戶(hù)消費(fèi)習(xí)慣的大數(shù)據(jù)分析和預(yù)測(cè)。

  • 金融大數(shù)據(jù)——財(cái)源滾滾來(lái)
  • 隨著大數(shù)據(jù)技術(shù)的應(yīng)用,越來(lái)越多的金融企業(yè)也開(kāi)始投身到大數(shù)據(jù)應(yīng)用實(shí)踐中。

    麥肯錫的一份研究顯示,金融業(yè)在大數(shù)據(jù)價(jià)值潛力指數(shù)中排名第一。

    典型的案例有:花旗銀行利用IBM沃森電腦為財(cái)富管理客戶(hù)推薦產(chǎn)品;美國(guó)銀行利用客戶(hù)點(diǎn)擊數(shù)據(jù)集為客戶(hù)提供特色服務(wù),如有競(jìng)爭(zhēng)的信用額度;招商銀行利用客戶(hù)刷卡、存取款、電子銀行轉(zhuǎn)帳、微信評(píng)論等行為數(shù)據(jù)進(jìn)行分析,每周給客戶(hù)發(fā)送針對(duì)性廣告信息,里面有顧客可能感興趣的產(chǎn)品和優(yōu)惠信息。

    可見(jiàn),大數(shù)據(jù)在金融行業(yè)的應(yīng)用可以總結(jié)為以下五個(gè)方面:精準(zhǔn)營(yíng)銷(xiāo)、風(fēng)險(xiǎn)管控、決策支持、效率提升以及產(chǎn)品設(shè)計(jì)。

  • 醫(yī)療大數(shù)據(jù)——看病更高效
  • 大數(shù)據(jù)讓就醫(yī)、看病更簡(jiǎn)單。隨著大數(shù)據(jù)在醫(yī)療行業(yè)的深度融合,大數(shù)據(jù)平臺(tái)積累了海量的病例、病例報(bào)告、治愈方案、藥物報(bào)告等信息資源,所有常見(jiàn)的病例、既往病例等都記錄在案,醫(yī)生通過(guò)有效、連續(xù)的診療記錄,能夠給病人優(yōu)質(zhì)、合理的診療方案。這樣不僅提高醫(yī)生的看病效率,而且能夠降低誤診率,從而讓患者在最短的時(shí)間接受最好的治療。

  • 零售大數(shù)據(jù)——最懂消費(fèi)者
  • 零售行業(yè)大數(shù)據(jù)應(yīng)用有兩個(gè)層面,一個(gè)層面是零售行業(yè)可以了解客戶(hù)消費(fèi)喜好和趨勢(shì),進(jìn)行商品的精準(zhǔn)營(yíng)銷(xiāo),降低營(yíng)銷(xiāo)成本。另一層面是依據(jù)客戶(hù)購(gòu)買(mǎi)產(chǎn)品,為客戶(hù)提供可能購(gòu)買(mǎi)的其它產(chǎn)品,擴(kuò)大銷(xiāo)售額,也屬于精準(zhǔn)營(yíng)銷(xiāo)范疇。例如:美國(guó)零售業(yè)的傳奇故事——“啤酒與尿布”。

  • 交通大數(shù)據(jù)——暢通出行
  • 交通作為人類(lèi)行為的重要組成和重要條件之一,對(duì)于大數(shù)據(jù)的感知也是最急迫的。目前,交通的大數(shù)據(jù)應(yīng)用主要在兩個(gè)方面,一方面可以利用大數(shù)據(jù)傳感器數(shù)據(jù)來(lái)了解車(chē)輛通行密度,合理進(jìn)行道路規(guī)劃包括單行線路規(guī)劃。另一方面可以利用大數(shù)據(jù)來(lái)實(shí)現(xiàn)即時(shí)信號(hào)燈調(diào)度,提高已有線路運(yùn)行能力。

  • 輿情監(jiān)控大數(shù)據(jù)——名偵探柯南
  • 《黑貓警長(zhǎng)》大家都很熟悉,它講述的是“黑貓警長(zhǎng)”如何精明能干、對(duì)壞人窮追不舍、跌宕起伏的故事情節(jié)。拿到大數(shù)據(jù)時(shí)代背景下的話,雖然它也能體現(xiàn)“黑貓警長(zhǎng)”的盡職盡責(zé)、聰明能干,但更多的會(huì)歸結(jié)到一個(gè)問(wèn)題:為何還是如此的被動(dòng)、低效?疾病可以預(yù)防,難道犯罪不能預(yù)防么?

    答案是肯定的。國(guó)家正在將大數(shù)據(jù)技術(shù)用于輿情監(jiān)控,其收集到的數(shù)據(jù)除了解民眾訴求,降低群體事件之外,還可以用于犯罪管理。

    ?

    以上內(nèi)容來(lái)自百度,舉個(gè)身邊的例子,ping哥在淘寶購(gòu)買(mǎi)了一罐奶粉,淘寶會(huì)基于海量下單記錄推送他一些之前買(mǎi)過(guò)奶粉的用戶(hù)后續(xù)也會(huì)下單的商品,比如紙尿布,沒(méi)準(zhǔn)還會(huì)推送酒。為什么會(huì)推送酒呢,結(jié)合他的性別和購(gòu)買(mǎi)奶粉這種行為對(duì)他分析,極大概率是已婚男士,那關(guān)于酒的故事還要從一個(gè)女人開(kāi)始說(shuō)起。。。

    ?

    ?

    QUESTION2大數(shù)據(jù)的數(shù)據(jù)從哪來(lái)?

    1、埋點(diǎn)產(chǎn)生的用戶(hù)行為數(shù)據(jù):用戶(hù)在使用產(chǎn)品過(guò)程中,與客戶(hù)端產(chǎn)品交互過(guò)程中產(chǎn)生的數(shù)據(jù),比如頁(yè)面瀏覽、點(diǎn)擊、停留、評(píng)論、點(diǎn)贊、收藏等 , 互聯(lián)網(wǎng)核心指標(biāo)PV、UV的統(tǒng)計(jì)基礎(chǔ).

    2、JavaEE后臺(tái)產(chǎn)生的業(yè)務(wù)數(shù)據(jù).

    3、爬蟲(chóng):就是模擬客戶(hù)端發(fā)送網(wǎng)絡(luò)請(qǐng)求,接收請(qǐng)求響應(yīng),一種按照一定的規(guī)則,自動(dòng)地抓取互聯(lián)網(wǎng)信息的程序。只要瀏覽器能夠做的事情,原則上,爬蟲(chóng)都能夠做到。簡(jiǎn)單來(lái)說(shuō),爬蟲(chóng)就是自動(dòng)從網(wǎng)絡(luò)上收集信息的一種程序,復(fù)雜點(diǎn)來(lái)說(shuō),就是一整套關(guān)于數(shù)據(jù)請(qǐng)求、處理、存儲(chǔ)的程序

    ?俗話說(shuō)的好,爬蟲(chóng)學(xué)的好,監(jiān)獄進(jìn)的早。。。

    QUESTION3 數(shù)據(jù)有了,怎么樣采集到分布式集群中呢?

    多圖帶你認(rèn)知采集通道

    業(yè)務(wù)系統(tǒng)數(shù)據(jù)處理鏈路

    ?

    ?

    前端埋點(diǎn)數(shù)據(jù)處理鏈路

    ?

    1、Apache Flume

    1、1 flume是干什么的?

    flume 是一個(gè)分布式的數(shù)據(jù)收集系統(tǒng),具有高可靠、高可用、事務(wù)管理、失敗重啟、聚合和傳輸?shù)裙δ堋?shù)據(jù)處理速度快,完全可以用于生產(chǎn)環(huán)境。

    Flume 的使用不只限于日志數(shù)據(jù)。因?yàn)閿?shù)據(jù)源可以定制,flume 可以被用來(lái)傳輸大量事件數(shù)據(jù),這些數(shù)據(jù)不僅僅包括網(wǎng)絡(luò)通訊數(shù)據(jù)、社交媒體產(chǎn)生的數(shù)據(jù)、電子郵件信息等等。

    flume如何搜集日志?

    我們把flume比作情報(bào)人員

    (1)搜集信息

    (2)獲取記憶信息

    (3)傳遞報(bào)告間諜信息

    flume是怎么完成上面三件事情的,三個(gè)組件:

    agent

    flume 的核心是 agent。agent 是一個(gè) java 進(jìn)程,運(yùn)行在日志收集端,通過(guò) agent 接收日志,然后暫存起來(lái),再發(fā)送到目的地。 每臺(tái)機(jī)器運(yùn)行一個(gè) agent。 agent 里面可以包含多個(gè) source,channel,sink。

    source

    source 是數(shù)據(jù)的收集端,負(fù)責(zé)將數(shù)據(jù)捕獲后進(jìn)行特殊的格式化,將數(shù)據(jù)封裝到 event 里,然后將事件推入 channel 中。flume 提供了很多內(nèi)置的 source,支持 avro,log4j,syslog 等等。如果內(nèi)置的 source 無(wú)法滿(mǎn)足環(huán)境的需求,flume 還支持自定義 source。

    channel

    channel 是連接 source 和 sink 的組件,大家可以將它看做一個(gè)數(shù)據(jù)的緩沖區(qū)(數(shù)據(jù)隊(duì)列),它可以將事件暫存到內(nèi)存中也可以持久化到本地磁盤(pán)上, 直到 sink 處理完該事件。兩個(gè)較為常用的 channel,MemoryChannel 和 FileChannel。

    sink

    sink 從 channel 中取出事件,然后將數(shù)據(jù)發(fā)到別處,可以向文件系統(tǒng)、數(shù)據(jù)庫(kù)、hadoop、kafka,也可以是其他 agent 的 source。

    配置單個(gè)組件

    流中的每一個(gè)組件(source、channel、sink)都有自己的名稱(chēng)、類(lèi)型以及一系列配置屬性。例如,一個(gè) Avro source 需要配置 hostname (或者 IP 地址)以及端口號(hào)來(lái)接收數(shù)據(jù)。一個(gè)內(nèi)存模式 channel 可以有最大隊(duì)列長(zhǎng)度的屬性("capacity": channel 中最大容納多少事件)。一個(gè) HDFS slink 則需要知道文件系統(tǒng)的 URL(hdfs://**)、文件落地的路徑、文件回滾的評(píng)率("hdfs.rollInterval": 每隔多少秒將零時(shí)文件回滾成最終文件保存到 HDFS 中)。所有這些關(guān)于各個(gè)組件的屬性需要在配置文件中進(jìn)行指定。

    ?

    ?

    Flume還有多種拓?fù)淠J?方便靈活搭建,下圖實(shí)現(xiàn)了復(fù)制和多路復(fù)用的功能

    ?

    ?

    一個(gè)簡(jiǎn)單的測(cè)試

    這里,我們給出一個(gè)配置文件的示例,該示例為 flume 單節(jié)點(diǎn)部署的配置方式。

    ?# example.conf: A single-node Flume configuration

    ?# Name the components on this agent

    ?a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = netcat

    ?a1.sources.r1.bind = localhost

    a1.sources.r1.port =?44444

    # Describe the sink

    ?a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    ?a1.channels.c1.capacity =?1000

    a1.channels.c1.transactionCapacity =?100

    # Bind the source and sink to the channel

    ?a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    bin/flume-ng agent -n a1 -c conf/ -f job/exp.conf -Dflume.root.logger=info,console

    看看這個(gè)配置文件,我們可以發(fā)現(xiàn)這個(gè) agent 的名稱(chēng)是 a1。其中該 agent 的 source 監(jiān)聽(tīng) 44444 端口。channel 采用內(nèi)存模式,而 slink 直接輸出數(shù)據(jù)到 控制臺(tái)上(logger)。配置文件指定了各個(gè)組件的名稱(chēng),并描述了它們的類(lèi)型以及其他屬性。當(dāng)然,一個(gè)配置文件可以配置多個(gè) agent 屬性,當(dāng)希望運(yùn)行指定 agent 進(jìn)程時(shí),我們需要在命令行中顯示的給出該 agent 的名稱(chēng)。

    ?

    2、Apache Kafka

    初識(shí) Kafka

    ?

    Kafka是啥?用Kafka官方的話來(lái)說(shuō)就是:

    Kafka is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant,?wicked fast, and runs in production in thousands of companies.

    大致的意思就是,這是一個(gè)實(shí)時(shí)數(shù)據(jù)處理系統(tǒng),可以橫向擴(kuò)展、高可靠,而且還變態(tài)快,已經(jīng)被很多公司使用。

    那么什么是實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)呢?顧名思義,實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)就是數(shù)據(jù)一旦產(chǎn)生,就要能快速進(jìn)行處理的系統(tǒng)。

    對(duì)于實(shí)時(shí)數(shù)據(jù)處理,我們最常見(jiàn)的,就是消息中間件了,也叫MQ(Message Queue,消息隊(duì)列),也有叫Message Broker的。

    首先,我將從消息中間件的角度,帶大家看看Kafka的內(nèi)部結(jié)構(gòu),看看它是如何做到橫向擴(kuò)展、高可靠的同時(shí),還能變態(tài)快的。

    為什么需要消息中間件

    消息中間件的作用主要有兩點(diǎn):

    ·?解耦消息的生產(chǎn)和消費(fèi)。

    ·?緩沖。

    想象一個(gè)場(chǎng)景,你的一個(gè)創(chuàng)建訂單的操作,在訂單創(chuàng)建完成之后,需要觸發(fā)一系列其他的操作,比如進(jìn)行用戶(hù)訂單數(shù)據(jù)的統(tǒng)計(jì)、給用戶(hù)發(fā)送短信、給用戶(hù)發(fā)送郵件等等,就像這樣:

    createOrder(...){

    ?...

    ?statOrderData(...);

    ?sendSMS();

    ?sendEmail();}

    代碼這樣寫(xiě)似乎沒(méi)什么問(wèn)題,可是過(guò)了一段時(shí)間,你給系統(tǒng)引進(jìn)了一個(gè)用戶(hù)行為分析服務(wù),它也需要在訂單創(chuàng)建完成之后,進(jìn)行一個(gè)分析用戶(hù)行為的操作,而且隨著系統(tǒng)的逐漸壯大,創(chuàng)建訂單之后要觸發(fā)的操作也就越來(lái)越多,代碼也漸漸膨脹成這樣:

    createOrder(...){

    ?...

    ?statOrderData(...);

    ?sendSMS();

    ?sendEmail();

    ?// new operation?statUserBehavior(...);

    ?doXXX(...);

    ?doYYY(...);

    ?// more and more operations?...}

    導(dǎo)致代碼越來(lái)越膨脹的癥結(jié)在于,消息的生產(chǎn)和消費(fèi)耦合在一起了。createOrder方法不僅僅要負(fù)責(zé)生產(chǎn)“訂單已創(chuàng)建”這條消息,還要負(fù)責(zé)處理這條消息。

    這就好比espn的記者,在知道勇士拿到NBA冠軍之后,拿起手機(jī),翻開(kāi)勇士球迷通訊錄,給球迷一個(gè)一個(gè)打電話,告訴他們,勇士奪冠了。

    事實(shí)上,espn的記者只需要在他們官網(wǎng)發(fā)布這條消息,然后球迷自行訪問(wèn)espn,去上面獲取這條新聞;又或者球迷訂閱了espn,那么訂閱系統(tǒng)會(huì)主動(dòng)把發(fā)布在官網(wǎng)的消息推送給球迷。

    同樣,createOrder也需要一個(gè)像espn官網(wǎng)那樣的載體,也就是消息中間件,在訂單創(chuàng)建完成之后,把一條主題為“orderCreated”的消息,放到消息中間件去就ok了,不必關(guān)心需要把這條消息發(fā)給誰(shuí)。這就完成了消息的生產(chǎn)。

    至于需要在訂單創(chuàng)建完成之后觸發(fā)操作的服務(wù),則只需要訂閱主題為“orderCreated”的消息,在消息中間件出現(xiàn)新的“orderCreated”消息時(shí),就會(huì)收到這條消息,然后進(jìn)行相應(yīng)的處理。

    因此,通過(guò)使用消息中間件,上面的代碼也就簡(jiǎn)化成了:

    createOrder(...){

    ?...

    ?sendOrderCreatedMessage(...);}

    以后如果在訂單創(chuàng)建之后有新的操作需要執(zhí)行,這串代碼也不需要修改,只需要給對(duì)消息進(jìn)行訂閱即可。

    另外,通過(guò)這樣的解耦,消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)更加的靈活,不必每次消息一產(chǎn)生就要馬上去處理(雖然通常消費(fèi)者側(cè)也會(huì)有線程池等緩沖機(jī)制),可以等自己有空了的時(shí)候,再過(guò)來(lái)消息中間件這里取數(shù)據(jù)進(jìn)行處理。這就是消息中間件帶來(lái)的緩沖作用。

    kafka 的設(shè)計(jì)目標(biāo)

  • 提供優(yōu)秀的消息持久化能力,對(duì) TB 級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問(wèn)性能。
  • 高吞吐率。即使在非常廉價(jià)的機(jī)器上也能做到每臺(tái)機(jī)每秒 100000 條消息的傳輸。
  • 支持 kafka server 間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè) partition 內(nèi)的消息順序傳輸。
  • 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。
  • Kafka一代 - 消息隊(duì)列

    從上面的描述,我們可以看出,消息中間件之所以可以解耦消息的生產(chǎn)和消費(fèi),主要是它提供了一個(gè)存放消息的地方——生產(chǎn)者把消息放進(jìn)來(lái),消費(fèi)者在從中取出消息進(jìn)行處理。

    那么這個(gè)存放消息的地方,應(yīng)該采用什么數(shù)據(jù)結(jié)構(gòu)呢?

    在絕大多數(shù)情況下,我們都希望先發(fā)送進(jìn)來(lái)的消息,可以先被處理(FIFO),這符合大多數(shù)的業(yè)務(wù)邏輯,少數(shù)情況下我們會(huì)給消息設(shè)置優(yōu)先級(jí)。不管怎樣,對(duì)于消息中間件來(lái)說(shuō),一個(gè)先進(jìn)先出的隊(duì)列,是非常合適的數(shù)據(jù)結(jié)構(gòu):

    ?

    ?

    那么要怎樣保證消息可以被順序消費(fèi)呢?

    消費(fèi)者過(guò)來(lái)獲取消息時(shí),每次都把index=0的數(shù)據(jù)返回過(guò)去,然后再刪除index=0的那條數(shù)據(jù)?

    很明顯不行,因?yàn)橛嗛喠诉@條消息的消費(fèi)者數(shù)量,可能是0,也可能是1,還可能大于1。如果每次消費(fèi)完就刪除了,那么其他訂閱了這條消息的消費(fèi)者就獲取不到這條消息了。

    事實(shí)上,Kafka會(huì)對(duì)數(shù)據(jù)進(jìn)行持久化存儲(chǔ)(至于存放多長(zhǎng)時(shí)間,這是可以配置的),消費(fèi)者端會(huì)記錄一個(gè)offset,表明該消費(fèi)者當(dāng)前消費(fèi)到哪條數(shù)據(jù),所以下次消費(fèi)者想繼續(xù)消費(fèi),只需從offset+1的位置繼續(xù)消費(fèi)就好了。

    消費(fèi)者甚至可以通過(guò)調(diào)整offset的值,重新消費(fèi)以前的數(shù)據(jù)。

    那么這就是Kafka了嗎?不,這只是一條非常普通的消息隊(duì)列,我們姑且叫它為Kafka一代吧。

    這個(gè)Kafka一代用一條消息隊(duì)列實(shí)現(xiàn)了消息中間件,這樣的簡(jiǎn)單實(shí)現(xiàn)存在不少問(wèn)題:

    ·?吞吐量低。我們把全部消息都放在一條隊(duì)列了,請(qǐng)求一多,它肯定應(yīng)付不過(guò)來(lái)。

    由此就引申出了Kafka二代

    Kafka二代 - Partition

    二代Kafka引入了Partition的概念,也就是采用多條隊(duì)列, 每條隊(duì)列里面的消息都是相同的topic:

    ?

    我們可以看到,每個(gè)Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個(gè)消息都被賦予了一個(gè)唯一的offset值。

    發(fā)布到Kafka主題的每條消息包括鍵值和時(shí)間戳。消息到達(dá)服務(wù)器端的指定分區(qū)后,都會(huì)分配到一個(gè)自增的偏移量。原始的消息內(nèi)容和分配的偏移量以及其他一些元數(shù)據(jù)信息最后都會(huì)存儲(chǔ)到分區(qū)日志文件中。消息的鍵也可以不用設(shè)置,這種情況下消息會(huì)均衡地分布到不同的分區(qū)。

    1) 分區(qū)的原因?

    (1)方便在集群中擴(kuò)展,每個(gè)Partition可以通過(guò)調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了;?

    (2)可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫(xiě)了。

    傳統(tǒng)消息系統(tǒng)在服務(wù)端保持消息的順序,如果有多個(gè)消費(fèi)者消費(fèi)同一個(gè)消息隊(duì)列,服務(wù)端會(huì)以消費(fèi)存儲(chǔ)的順序依次發(fā)送給消費(fèi)者。但由于消息是異步發(fā)送給消費(fèi)者的,消息到達(dá)消費(fèi)者的順序可能是無(wú)序的,這就意味著在并行消費(fèi)時(shí),傳統(tǒng)消息系統(tǒng)無(wú)法很好地保證消息被順序處理。雖然我們可以設(shè)置一個(gè)專(zhuān)用的消費(fèi)者只消費(fèi)一個(gè)隊(duì)列,以此來(lái)解決消息順序的問(wèn)題,但是這就使得消費(fèi)處理無(wú)法真正執(zhí)行。

    Kafka比傳統(tǒng)消息系統(tǒng)有更強(qiáng)的順序性保證,它使用主題的分區(qū)作為消息處理的并行單元。Kafka以分區(qū)作為最小的粒度,將每個(gè)分區(qū)分配給消費(fèi)者組中不同的而且是唯一的消費(fèi)者,并確保一個(gè)分區(qū)只屬于一個(gè)消費(fèi)者,即這個(gè)消費(fèi)者就是這個(gè)分區(qū)的唯一讀取線程。那么,只要分區(qū)的消息是有序的,消費(fèi)者處理的消息順序就有保證。每個(gè)主題有多個(gè)分區(qū),不同的消費(fèi)者處理不同的分區(qū),所以Kafka不僅保證了消息的有序性,也做到了消費(fèi)者的負(fù)載均衡。

    2)分區(qū)的原則?

    (1)指定了patition,則直接使用;?

    (2)未指定patition但指定key,通過(guò)對(duì)key的value進(jìn)行hash出一個(gè)patition?

    (3)patition和key都未指定,使用輪詢(xún)選出一個(gè)patition。

    Kafka二代足夠完美了嗎?當(dāng)然不是,我們雖然通過(guò)Partition提升了性能,但是我們忽略了一個(gè)很重要的問(wèn)題——高可用

    萬(wàn)一機(jī)器掛掉了怎么辦?單點(diǎn)系統(tǒng)總是不可靠的。我們必須考慮備用節(jié)點(diǎn)和數(shù)據(jù)備份的問(wèn)題。

    Kafka三代 - Broker集群

    很明顯,為了解決HA問(wèn)題,我們需要集群

    Kafka對(duì)集群的支持也是非常友好的。在Kafka中,集群里的每個(gè)實(shí)例叫做Broker,就像這樣:

    ?

    每個(gè)partition不再只有一個(gè),而是有一個(gè)leader(紅色)和多個(gè)replica(藍(lán)色),生產(chǎn)者根據(jù)消息的topic和key值,確定了消息要發(fā)往哪個(gè)partition之后(假設(shè)是p1),會(huì)找到partition對(duì)應(yīng)的leader(也就是broker2里的p1),然后將消息發(fā)給leader,leader負(fù)責(zé)消息的寫(xiě)入,并與其余的replica進(jìn)行同步。

    一旦某一個(gè)partition的leader掛掉了,那么只需提拔一個(gè)replica出來(lái),讓它成為leader就ok了,系統(tǒng)依舊可以正常運(yùn)行。

    通過(guò)Broker集群的設(shè)計(jì),我們不僅解決了系統(tǒng)高可用的問(wèn)題,還進(jìn)一步提升了系統(tǒng)的吞吐量,因?yàn)閞eplica同樣可以為消費(fèi)者提供數(shù)據(jù)查找的功能。

    Kafka沒(méi)那么簡(jiǎn)單

    以上只是帶大家初步認(rèn)識(shí)一下Kafka,很多細(xì)節(jié)并沒(méi)有深入討論,比如:

    1、Kafka的消息結(jié)構(gòu)?
    我們只知道Kafka內(nèi)部是一個(gè)消息隊(duì)列,但是隊(duì)列里的元素長(zhǎng)什么樣,包含了哪些消息呢?

    2、Zookeeper和Kafka的關(guān)系?
    如果玩過(guò)Kafka的,就會(huì)發(fā)現(xiàn),我們?cè)谑褂肒afka時(shí),需要先啟動(dòng)一個(gè)ZK,那么這個(gè)ZK的作用到底是什么呢?

    3、數(shù)據(jù)可靠性和重復(fù)消費(fèi)
    生產(chǎn)者把消息發(fā)給Kafka,發(fā)送過(guò)程中掛掉、或者Kafka保存消息時(shí)發(fā)送異常怎么辦?

    同理,消費(fèi)者獲取消費(fèi)時(shí)發(fā)生異常怎么辦?

    甚至,如果消費(fèi)者已經(jīng)消費(fèi)了數(shù)據(jù),但是修改offset時(shí)失敗了,導(dǎo)致重復(fù)消費(fèi)怎么辦?

    等等這些異常場(chǎng)景,都是Kafka需要考慮的。

    4、 pull or push
    消費(fèi)者側(cè)在獲取消息時(shí),是通過(guò)主動(dòng)去pull消息呢?還是由Kafka給消費(fèi)者push消息?

    這兩種方式各自有什么優(yōu)劣?

    5、 如何提高消費(fèi)者處理性能
    還是之前的訂單創(chuàng)建的例子,訂單創(chuàng)建后,你要給用戶(hù)發(fā)送短信,現(xiàn)在你發(fā)現(xiàn)由于你只有一個(gè)消費(fèi)者在發(fā)送短信,忙不過(guò)來(lái),怎么辦?這就有了Kafka里頭的消費(fèi)者組(Consumer Group)的設(shè)計(jì)。

    ……

    終極問(wèn)題:一條消息從生產(chǎn),到被消費(fèi),完整流程是怎樣的?

    如果能詳盡透徹地回答這個(gè)問(wèn)題,那你對(duì)Kafka的理解也就非常深入了。

    ?

    QUESTION4數(shù)據(jù)采集到了,如何進(jìn)行存儲(chǔ)及計(jì)算呢??

    離線數(shù)據(jù)開(kāi)發(fā)

    1、APACHE Hadoop

    初識(shí)Hadoop

     hadoop是什么?Hadoop是一種分析和處理大數(shù)據(jù)的軟件平臺(tái),是Apache的一個(gè)用Java語(yǔ)言所實(shí)現(xiàn)的開(kāi)源軟件的框架,在大量計(jì)算機(jī)組成的集群當(dāng)中實(shí)現(xiàn)了對(duì)于海量的數(shù)據(jù)進(jìn)行的分布式計(jì)算。

    1.1MapReduce是什么

    1、概念理解

    ??????Hadoop Map/Reduce是一個(gè)使用簡(jiǎn)易的軟件框架,基于它寫(xiě)出來(lái)的應(yīng)用程序能夠運(yùn)行在由上千個(gè)商用機(jī)器組成的大型集群上,并以一種可靠容錯(cuò)的方式并行處理上T級(jí)別的數(shù)據(jù)集。

    2、Map(映射)

    ??????“Map”:主結(jié)點(diǎn)讀入輸入數(shù)據(jù),把它分成可以用相同方法解決的小數(shù)據(jù)塊(這里是一個(gè)分而治之的思想),然后把這些小數(shù)據(jù)塊分發(fā)到不同的工作節(jié)點(diǎn)上(worder nodes)上,每一個(gè)工作節(jié)點(diǎn)(worder node)循環(huán)做同樣的事,這就行成了一個(gè)樹(shù)行結(jié)構(gòu),而每一個(gè)葉子節(jié)點(diǎn)有來(lái)處理每一個(gè)具體的小數(shù)據(jù)塊,再把這些處理結(jié)果返回給父節(jié)點(diǎn)。

    3、Reduce(歸約)

    ??????“Reduce”:主結(jié)節(jié)得到所有子節(jié)點(diǎn)的處理結(jié)果,然后把所有結(jié)果組合并且返回到輸出。

    4、個(gè)人理解

    ??????簡(jiǎn)單的來(lái)講,map就是分,reduce就是合。怎么理解呢?我們來(lái)看個(gè)例子。?
    ??????我們將100噸磚,從山東運(yùn)到北京,如果我們用一輛能裝1噸的大卡車(chē)來(lái)運(yùn),一天跑一個(gè)來(lái)回,那么我們需要100天,可是如果我們用10輛這樣的車(chē)來(lái)做同樣的事情,那么我們10天就可以完成了。雖然在現(xiàn)實(shí)生活中,我們?cè)黾恿塑?chē)費(fèi)等一系列支出,可能不太劃算,但是對(duì)于計(jì)算機(jī)來(lái)說(shuō),我們的成本是相當(dāng)?shù)偷摹K栽谟哟髷?shù)據(jù)的到來(lái)時(shí),MapReduce將大大提高的計(jì)算的速度,特別方便。

    ?

    ?

    ?

    1.2Shuffle - 奇跡發(fā)生的地方




    ?

    上面的流程是整個(gè)MapReduce最全工作流程,但是Shuffle過(guò)程只是從第7步開(kāi)始到第16步結(jié)束,具體Shuffle過(guò)程詳解,如下:
    1)MapTask收集我們的map()方法輸出的kv對(duì),放到內(nèi)存緩沖區(qū)中
    2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤(pán)文件,可能會(huì)溢出多個(gè)文件
    3)多個(gè)溢出文件會(huì)被合并成大的溢出文件
    4)在溢出過(guò)程及合并的過(guò)程中,都要調(diào)用Partitioner進(jìn)行分區(qū)和針對(duì)key進(jìn)行排序
    5)ReduceTask根據(jù)自己的分區(qū)號(hào),去各個(gè)MapTask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
    6)ReduceTask會(huì)取到同一個(gè)分區(qū)的來(lái)自不同MapTask的結(jié)果文件,ReduceTask會(huì)將這些文件再進(jìn)行合并(歸并排序)
    7)合并成大文件后,Shuffle的過(guò)程也就結(jié)束了,后面進(jìn)入ReduceTask的邏輯運(yùn)算過(guò)程(從文件中取出一個(gè)一個(gè)的鍵值對(duì)Group,調(diào)用用戶(hù)自定義的reduce()方法)
    3.注意
    Shuffle中的緩沖區(qū)大小會(huì)影響到MapReduce程序的執(zhí)行效率,原則上說(shuō),緩沖區(qū)越大,磁盤(pán)io的次數(shù)越少,執(zhí)行速度就越快。
    緩沖區(qū)的大小可以通過(guò)參數(shù)調(diào)整,參數(shù):io.sort.mb默認(rèn)100M。

    ?

    ?

    2、基于Apache Hive 的離線數(shù)倉(cāng)

    2.1 什么是Hive

    Hive是Facebook開(kāi)源的基于Hadoop的一個(gè)數(shù)據(jù)倉(cāng)庫(kù)工具,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫(kù)表,并提供完整的sql查詢(xún)功能,可以將sql語(yǔ)句轉(zhuǎn)換為MapReduce任務(wù)進(jìn)行運(yùn)行。其優(yōu)點(diǎn)是學(xué)習(xí)成本低,可以通過(guò)類(lèi)SQL語(yǔ)句快速實(shí)現(xiàn)簡(jiǎn)單的MapReduce統(tǒng)計(jì),不必開(kāi)發(fā)專(zhuān)門(mén)的MapReduce應(yīng)用,十分適合數(shù)據(jù)倉(cāng)庫(kù)的統(tǒng)計(jì)分析。

    Hive是建立在 Hadoop 上的數(shù)據(jù)倉(cāng)庫(kù)基礎(chǔ)構(gòu)架。它提供了一系列的工具,可以用來(lái)進(jìn)行數(shù)據(jù)提取轉(zhuǎn)化加載(ETL),這是一種可以存儲(chǔ)、查詢(xún)和分析存儲(chǔ)在 Hadoop 中的大規(guī)模數(shù)據(jù)的機(jī)制。Hive 定義了簡(jiǎn)單的類(lèi) SQL 查詢(xún)語(yǔ)言,稱(chēng)為 HQL,它允許熟悉 SQL 的用戶(hù)查詢(xún)數(shù)據(jù)。同時(shí),這個(gè)語(yǔ)言也允許熟悉 MapReduce 開(kāi)發(fā)者的開(kāi)發(fā)自定義的 mapper 和 reducer 來(lái)處理內(nèi)建的 mapper 和 reducer 無(wú)法完成的復(fù)雜的分析工作。

    ?

    2.2、Hive本質(zhì)

    將HQL轉(zhuǎn)換為MapReduce程序

    體現(xiàn)在:

    1、處理數(shù)據(jù)在HDFS上

    2、通過(guò)MR實(shí)現(xiàn)

    3、執(zhí)行程序運(yùn)行在Yarn上

    ?

    2.3、數(shù)倉(cāng)建模

    ODS:通常而言,原始數(shù)據(jù)的種類(lèi)是非常豐富的,我們可能從幾十個(gè)業(yè)務(wù)方把數(shù)據(jù)拉回來(lái),然后格式化放到HDFS上。但很多時(shí)候,情況并不這么簡(jiǎn)單,雖然有很多的損壞數(shù)據(jù)、臟數(shù)據(jù)等是不需要統(tǒng)計(jì)的,但是我們需要來(lái)看為什么會(huì)產(chǎn)生臟數(shù)據(jù),這時(shí)候原始數(shù)據(jù)就會(huì)提供很好的樣板。再有些時(shí)候,針對(duì)一些流量作弊的數(shù)據(jù),如果按照統(tǒng)一規(guī)則,很容易就給過(guò)濾掉了,然后運(yùn)營(yíng)就問(wèn)過(guò)來(lái)為什么對(duì)方提供的數(shù)據(jù)與我們的差異這么多大,這時(shí)候同樣需要去看原始日志。因而,ODS的意義,在于保存最完整的數(shù)據(jù)現(xiàn)場(chǎng),便于一些特殊場(chǎng)景下的問(wèn)題排查使用。

    ?

    DWD:如果采集的數(shù)據(jù)沒(méi)有問(wèn)題了,我們這里就需要做數(shù)據(jù)的預(yù)處理了。例如存在HDFS上的標(biāo)準(zhǔn)格式,我們就用字符串的格式來(lái)統(tǒng)一存儲(chǔ)。還有時(shí)候因?yàn)閳?chǎng)景要求,需要直接轉(zhuǎn)成ParquentORC等列存格式,也需要在這里做轉(zhuǎn)換。但預(yù)處理并不是簡(jiǎn)單的轉(zhuǎn)換格式,還需要處理一些臟數(shù)據(jù),例如字段缺失、格式錯(cuò)誤、亂碼、空值,等等,在這一層處理好之后,后續(xù)的計(jì)算便不需要再擔(dān)心各種各樣的異常情況,對(duì)于開(kāi)發(fā)效率的提升有著極大的幫助。有些時(shí)候還要發(fā)揮一些特定作用,因?yàn)闃I(yè)務(wù)的意外導(dǎo)致各種各樣的錯(cuò)誤數(shù)據(jù)進(jìn)來(lái),也是時(shí)有發(fā)生的。比如客戶(hù)消費(fèi)了,金額總得是正的吧,但如果業(yè)務(wù)那邊產(chǎn)生了一些錯(cuò)誤,需要將金額設(shè)置成負(fù)值,雖然業(yè)務(wù)那邊好處理了,但數(shù)據(jù)這里就頭疼了。所以還需要經(jīng)常打一些補(bǔ)丁,來(lái)處理金額負(fù)值這種異常情況。所以看起來(lái)DWD像是多余的一層,但當(dāng)業(yè)務(wù)場(chǎng)景足夠復(fù)雜之后,它所發(fā)揮的作用還是很大的。這里數(shù)據(jù)預(yù)處理主要采用MR來(lái)進(jìn)行,基本上遇不到數(shù)據(jù)傾斜等問(wèn)題。

    ?

    DWS:當(dāng)所有的數(shù)據(jù)都存好了,處理完臟數(shù)據(jù)之后,下一步我們就需要考慮如何處理和組織統(tǒng)計(jì)邏輯了。數(shù)據(jù)倉(cāng)庫(kù)之所以叫數(shù)據(jù)倉(cāng)庫(kù),正是因?yàn)镈WS層的重要。數(shù)據(jù)模型有很多,如:3NF范式模型、維度模型星座、雪花、星型、Data Vault等,但最常用的還是星型模型。通常我們會(huì)根據(jù)主題來(lái)進(jìn)行表數(shù)據(jù)的統(tǒng)計(jì),這里還有一個(gè)常用的說(shuō)法,叫“中間層”。例如我們數(shù)據(jù)層次自上往下分別是:用戶(hù)、廣告投放計(jì)劃、計(jì)劃詳情,用戶(hù)本身有行業(yè)、主體公司等屬性,廣告投放計(jì)劃包括了單元、創(chuàng)意等屬性,計(jì)劃詳情包括了投放類(lèi)型、投放地域等屬性。那么我們?cè)谶@個(gè)DWS層,就需要針對(duì)所有可能的維度,包括用戶(hù)、行業(yè)、主體公司、廣告投放計(jì)劃、單元、創(chuàng)意、計(jì)劃詳情、投放類(lèi)型、投放地域做統(tǒng)計(jì),每個(gè)類(lèi)型都盡可能的冗余維度信息,例如用戶(hù)維度的統(tǒng)計(jì)要把行業(yè)、主體公司等維度冗余進(jìn)來(lái),放到一張表里。這么做雖然特別違反三范式的原則,也違反很多模型,但是冗余盡可能多的信息,對(duì)于提高下游計(jì)算的速度、減少運(yùn)算數(shù)據(jù)量、簡(jiǎn)化業(yè)務(wù)邏輯、合并計(jì)算單元等具有特別大的好處。

    ?

    ADS:當(dāng)需求足夠多時(shí),我們要提供的報(bào)表就不是幾十張的概念了,而是成百上千張,這么多的表怎么保證數(shù)據(jù)的一致性呢?怎么保證需求響應(yīng)的速度呢?基本上都是ADS層需要面臨的問(wèn)題。在前一個(gè)層次DWS中,我們把所有的主題都盡可能多的冗余了維度信息,因此這里需要盡量從單一中間層表中進(jìn)行數(shù)據(jù)統(tǒng)計(jì),中間層的數(shù)據(jù)一致性,就代表了最終業(yè)務(wù)數(shù)據(jù)的一致性。響應(yīng)速度同理,在某些不得不關(guān)聯(lián)的業(yè)務(wù)場(chǎng)景下,因?yàn)橹虚g層的存在,使得數(shù)據(jù)量減少了很多,需求響應(yīng)速度也就提升了很多。

    ?

    DIM:維度信息

    3、實(shí)時(shí)數(shù)據(jù)開(kāi)發(fā)組件-Flink

    ?1.1 ?初識(shí)Flink

    Flink起源于Stratosphere項(xiàng)目,Stratosphere是在2010~2014年由3所地處柏林的大學(xué)和歐洲的一些其他的大學(xué)共同進(jìn)行的研究項(xiàng)目,2014年4月Stratosphere的代碼被復(fù)制并捐贈(zèng)給了Apache軟件基金會(huì),參加這個(gè)孵化項(xiàng)目的初始成員是Stratosphere系統(tǒng)的核心開(kāi)發(fā)人員,2014年12月,Flink一躍成為Apache軟件基金會(huì)的頂級(jí)項(xiàng)目。

    在德語(yǔ)中,Flink一詞表示快速和靈巧,項(xiàng)目采用一只松鼠的彩色圖案作為logo,這不僅是因?yàn)樗墒缶哂锌焖俸挽`巧的特點(diǎn),還因?yàn)榘亓值乃墒笥幸环N迷人的紅棕色,而Flink的松鼠logo擁有可愛(ài)的尾巴,尾巴的顏色與Apache軟件基金會(huì)的logo顏色相呼應(yīng),也就是說(shuō),這是一只Apache風(fēng)格的松鼠。

    Flink Logo

    Flink項(xiàng)目的理念是:“Apache Flink是為分布式、高性能、隨時(shí)可用以及準(zhǔn)確的流處理應(yīng)用程序打造的開(kāi)源流處理框架”。

    Apache Flink是一個(gè)框架和分布式處理引擎,用于對(duì)無(wú)界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。Flink被設(shè)計(jì)在所有常見(jiàn)的集群環(huán)境中運(yùn)行,以?xún)?nèi)存執(zhí)行速度和任意規(guī)模來(lái)執(zhí)行計(jì)算。

    1.2 ?Flink的重要特點(diǎn)

    1.2.1 ?事件驅(qū)動(dòng)型(Event-driven)

    事件驅(qū)動(dòng)型應(yīng)用是一類(lèi)具有狀態(tài)的應(yīng)用,它從一個(gè)或多個(gè)事件流提取數(shù)據(jù),并根據(jù)到來(lái)的事件觸發(fā)計(jì)算、狀態(tài)更新或其他外部動(dòng)作。比較典型的就是以kafka為代表的消息隊(duì)列幾乎都是事件驅(qū)動(dòng)型應(yīng)用。

    與之不同的就是SparkStreaming微批次,如圖:

    事件驅(qū)動(dòng)型:

    1.2.2 流與批的世界觀

    批處理的特點(diǎn)是有界、持久、大量,非常適合需要訪問(wèn)全套記錄才能完成的計(jì)算工作,一般用于離線統(tǒng)計(jì)。

    流處理的特點(diǎn)是無(wú)界、實(shí)時(shí), ?無(wú)需針對(duì)整個(gè)數(shù)據(jù)集執(zhí)行操作,而是對(duì)通過(guò)系統(tǒng)傳輸?shù)拿總€(gè)數(shù)據(jù)項(xiàng)執(zhí)行操作,一般用于實(shí)時(shí)統(tǒng)計(jì)。

    在spark的世界觀中,一切都是由批次組成的,離線數(shù)據(jù)是一個(gè)大批次,而實(shí)時(shí)數(shù)據(jù)是由一個(gè)一個(gè)無(wú)限的小批次組成的。

    而在flink的世界觀中,一切都是由流組成的,離線數(shù)據(jù)是有界限的流,實(shí)時(shí)數(shù)據(jù)是一個(gè)沒(méi)有界限的流,這就是所謂的有界流和無(wú)界流。

    無(wú)界數(shù)據(jù)流:無(wú)界數(shù)據(jù)流有一個(gè)開(kāi)始但是沒(méi)有結(jié)束,它們不會(huì)在生成時(shí)終止并提供數(shù)據(jù),必須連續(xù)處理無(wú)界流,也就是說(shuō)必須在獲取后立即處理event。對(duì)于無(wú)界數(shù)據(jù)流我們無(wú)法等待所有數(shù)據(jù)都到達(dá),因?yàn)檩斎胧菬o(wú)界的,并且在任何時(shí)間點(diǎn)都不會(huì)完成。處理無(wú)界數(shù)據(jù)通常要求以特定順序(例如事件發(fā)生的順序)獲取event,以便能夠推斷結(jié)果完整性。

    有界數(shù)據(jù)流:有界數(shù)據(jù)流有明確定義的開(kāi)始和結(jié)束,可以在執(zhí)行任何計(jì)算之前通過(guò)獲取所有數(shù)據(jù)來(lái)處理有界流,處理有界流不需要有序獲取,因?yàn)榭梢允冀K對(duì)有界數(shù)據(jù)集進(jìn)行排序,有界流的處理也稱(chēng)為批處理。

    這種以流為世界觀的架構(gòu),獲得的最大好處就是具有極低的延遲。

    2?Flink運(yùn)行時(shí)的組件

    Flink運(yùn)行時(shí)架構(gòu)主要包括四個(gè)不同的組件,它們會(huì)在運(yùn)行流處理應(yīng)用程序時(shí)協(xié)同工作:作業(yè)管理器(JobManager)、資源管理器(ResourceManager)、任務(wù)管理器(TaskManager),以及分發(fā)器(Dispatcher)。因?yàn)镕link是用Java和Scala實(shí)現(xiàn)的,所以所有組件都會(huì)運(yùn)行在Java虛擬機(jī)上。每個(gè)組件的職責(zé)如下:

    l?作業(yè)管理器(JobManager)

    控制一個(gè)應(yīng)用程序執(zhí)行的主進(jìn)程,也就是說(shuō),每個(gè)應(yīng)用程序都會(huì)被一個(gè)不同的JobManager所控制執(zhí)行。JobManager會(huì)先接收到要執(zhí)行的應(yīng)用程序,這個(gè)應(yīng)用程序會(huì)包括:作業(yè)圖(JobGraph)、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有的類(lèi)、庫(kù)和其它資源的JAR包。JobManager會(huì)把JobGraph轉(zhuǎn)換成一個(gè)物理層面的數(shù)據(jù)流圖,這個(gè)圖被叫做“執(zhí)行圖”(ExecutionGraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)。JobManager會(huì)向資源管理器(ResourceManager)請(qǐng)求執(zhí)行任務(wù)必要的資源,也就是任務(wù)管理器(TaskManager)上的插槽(slot)。一旦它獲取到了足夠的資源,就會(huì)將執(zhí)行圖分發(fā)到真正運(yùn)行它們的TaskManager上。而在運(yùn)行過(guò)程中,JobManager會(huì)負(fù)責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說(shuō)檢查點(diǎn)(checkpoints)的協(xié)調(diào)。

    l?資源管理器(ResourceManager)

    主要負(fù)責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定義的處理資源單元。Flink為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如YARN、Mesos、K8s,以及standalone部署。當(dāng)JobManager申請(qǐng)插槽資源時(shí),ResourceManager會(huì)將有空閑插槽的TaskManager分配給JobManager。如果ResourceManager沒(méi)有足夠的插槽來(lái)滿(mǎn)足JobManager的請(qǐng)求,它還可以向資源提供平臺(tái)發(fā)起會(huì)話,以提供啟動(dòng)TaskManager進(jìn)程的容器。另外,ResourceManager還負(fù)責(zé)終止空閑的TaskManager,釋放計(jì)算資源。

    l?任務(wù)管理器(TaskManager)

    Flink中的工作進(jìn)程。通常在Flink中會(huì)有多個(gè)TaskManager運(yùn)行,每一個(gè)TaskManager都包含了一定數(shù)量的插槽(slots)。插槽的數(shù)量限制了TaskManager能夠執(zhí)行的任務(wù)數(shù)量。啟動(dòng)之后,TaskManager會(huì)向資源管理器注冊(cè)它的插槽;收到資源管理器的指令后,TaskManager就會(huì)將一個(gè)或者多個(gè)插槽提供給JobManager調(diào)用。JobManager就可以向插槽分配任務(wù)(tasks)來(lái)執(zhí)行了。在執(zhí)行過(guò)程中,一個(gè)TaskManager可以跟其它運(yùn)行同一應(yīng)用程序的TaskManager交換數(shù)據(jù)。

    l?分發(fā)器(Dispatcher)

    可以跨作業(yè)運(yùn)行,它為應(yīng)用提交提供了REST接口。當(dāng)一個(gè)應(yīng)用被提交執(zhí)行時(shí),分發(fā)器就會(huì)啟動(dòng)并將應(yīng)用移交給一個(gè)JobManager。由于是REST接口,所以Dispatcher可以作為集群的一個(gè)HTTP接入點(diǎn),這樣就能夠不受防火墻阻擋。Dispatcher也會(huì)啟動(dòng)一個(gè)Web UI,用來(lái)方便地展示和監(jiān)控作業(yè)執(zhí)行的信息。Dispatcher在架構(gòu)中可能并不是必需的,這取決于應(yīng)用提交運(yùn)行的方式。

    ?

    3.1 Window及基于體溫?cái)?shù)據(jù)的Demo

    3.1.1?Window概述

    streaming流式計(jì)算是一種被設(shè)計(jì)用于處理無(wú)限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無(wú)限數(shù)據(jù)集是指一種不斷增長(zhǎng)的本質(zhì)上無(wú)限的數(shù)據(jù)集,而window是一種切割無(wú)限數(shù)據(jù)為有限塊進(jìn)行處理的手段。

    Window是無(wú)限數(shù)據(jù)流處理的核心,Window將一個(gè)無(wú)限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計(jì)算操作。

    3.1.2 Window類(lèi)型

    Window可以分成兩類(lèi):

    ??CountWindow:按照指定的數(shù)據(jù)條數(shù)生成一個(gè)Window,與時(shí)間無(wú)關(guān)。

    ??TimeWindow:按照時(shí)間生成Window。

    對(duì)于TimeWindow,可以根據(jù)窗口實(shí)現(xiàn)原理的不同分成三類(lèi):滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)和會(huì)話窗口(Session Window)。

    1.?滾動(dòng)窗口(Tumbling Windows)

    將數(shù)據(jù)依據(jù)固定的窗口長(zhǎng)度對(duì)數(shù)據(jù)進(jìn)行切片。

    特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,沒(méi)有重疊。

    滾動(dòng)窗口分配器將每個(gè)元素分配到一個(gè)指定窗口大小的窗口中,滾動(dòng)窗口有一個(gè)固定的大小,并且不會(huì)出現(xiàn)重疊。例如:如果你指定了一個(gè)5分鐘大小的滾動(dòng)窗口,窗口的創(chuàng)建如下圖所示:

    圖 滾動(dòng)窗口

    適用場(chǎng)景:適合做BI統(tǒng)計(jì)等(做每個(gè)時(shí)間段的聚合計(jì)算)。

    2.?滑動(dòng)窗口(Sliding Windows)

    滑動(dòng)窗口是固定窗口的更廣義的一種形式,滑動(dòng)窗口由固定的窗口長(zhǎng)度和滑動(dòng)間隔組成。

    特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,可以有重疊。

    滑動(dòng)窗口分配器將元素分配到固定長(zhǎng)度的窗口中,與滾動(dòng)窗口類(lèi)似,窗口的大小由窗口大小參數(shù)來(lái)配置,另一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口開(kāi)始的頻率。因此,滑動(dòng)窗口如果滑動(dòng)參數(shù)小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會(huì)被分配到多個(gè)窗口中。

    例如,你有10分鐘的窗口和5分鐘的滑動(dòng),那么每個(gè)窗口中5分鐘的窗口里包含著上個(gè)10分鐘產(chǎn)生的數(shù)據(jù),如下圖所示:

    圖 滑動(dòng)窗口

    適用場(chǎng)景:對(duì)最近一個(gè)時(shí)間段內(nèi)的統(tǒng)計(jì)(求某接口最近5min的失敗率來(lái)決定是否要報(bào)警)。

    3.?會(huì)話窗口(Session Windows)

    由一系列事件組合一個(gè)指定時(shí)間長(zhǎng)度的timeout間隙組成,類(lèi)似于web應(yīng)用的session,也就是一段時(shí)間沒(méi)有接收到新數(shù)據(jù)就會(huì)生成新的窗口。

    特點(diǎn):時(shí)間無(wú)對(duì)齊。

    session窗口分配器通過(guò)session活動(dòng)來(lái)對(duì)元素進(jìn)行分組,session窗口跟滾動(dòng)窗口和滑動(dòng)窗口相比,不會(huì)有重疊和固定的開(kāi)始時(shí)間和結(jié)束時(shí)間的情況,相反,當(dāng)它在一個(gè)固定的時(shí)間周期內(nèi)不再收到元素,即非活動(dòng)間隔產(chǎn)生,那個(gè)這個(gè)窗口就會(huì)關(guān)閉。一個(gè)session窗口通過(guò)一個(gè)session間隔來(lái)配置,這個(gè)session間隔定義了非活躍周期的長(zhǎng)度,當(dāng)這個(gè)非活躍周期產(chǎn)生,那么當(dāng)前的session將關(guān)閉并且后續(xù)的元素將被分配到新的session窗口中去。

    Window-demo ?實(shí)時(shí)輸出五秒內(nèi)體溫最高的人的信息

    package com.morant.apitest

    ?

    import org.apache.flink.api.scala.ExecutionEnvironment

    import org.apache.flink.streaming.api.TimeCharacteristic

    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor

    import org.apache.flink.streaming.api.scala._

    import org.apache.flink.streaming.api.windowing.time.Time

    ?

    ?

    object WindowTest {

    ? def main(args: Array[String]): Unit = {

    ?

    ? ? val env = StreamExecutionEnvironment.getExecutionEnvironment
    ? ?env.setParallelism(1)

    ? ? env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    ? ? env.getConfig.setAutoWatermarkInterval(100)

    ?

    ?

    ? ? ? ? val inputStream: DataStream[String] = env.readTextFile("D:\\DEVELOP_CODE\\workspace\\FlinkTest\\src\\main\\resources\\Temperature.txt")

    // ? ?val inputStream = env.socketTextStream("172.19.177.124", 7777)
    ? ?// Transform操作 ? ?val dataStream: DataStream[Temperature] = inputStream

    ? ? ? .map(data => {

    ? ? ? ? val dataArray = data.split(",")

    ? ? ? ? Temperature(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)

    ? ? ? })

    ? ? ? // ? ? ?.assignAscendingTimestamps(_.timestamp * 1000L) ? ? ?// 對(duì)亂序數(shù)據(jù)分配時(shí)間戳和watermark ? ? ?.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[Temperature](Time.seconds(1)) {

    ? ? ? ? override def extractTimestamp(t: Temperature): Long = t.timestamp * 1000L ? ? ?} )

    ?

    ? ? // 統(tǒng)計(jì)每個(gè)溫度計(jì)器每5秒內(nèi)的溫度最大值 ? ?val processedStream = dataStream

    ? ? ? .keyBy(_.id)

    ? ? ? // ? ? ?.window(TumblingProcessingTimeWindows.of(Time.hours(1))) ? ? ?.timeWindow(Time.seconds(5)) // 定義長(zhǎng)度為5秒的滾動(dòng)窗口 ? ? ?.reduce((curMaxTempData, newData) =>

    ? ? ? ? Temperature(curMaxTempData.id, ? ? ? ? ?newData.timestamp, ? ? ? ? ?newData.temperature.max(curMaxTempData.temperature)

    ? ? ? ? )

    ? ? ? )

    ? ? // ? ? ?.reduce()
    ? ?processedStream.print()

    ? ? env.execute("window test")

    ? }

    }

    4.1?側(cè)輸出流(SideOutput)及體溫大于37.5放入測(cè)輸出流的Demo

    大部分的DataStream API的算子的輸出是單一輸出,也就是某種數(shù)據(jù)類(lèi)型的流。除了split算子,可以將一條流分成多條流,這些流的數(shù)據(jù)類(lèi)型也都相同。process function的side outputs功能可以產(chǎn)生多條流,并且這些流的數(shù)據(jù)類(lèi)型可以不一樣。一個(gè)side output可以定義為OutputTag[X]對(duì)象,X是輸出流的數(shù)據(jù)類(lèi)型。process function可以通過(guò)Context對(duì)象發(fā)射一個(gè)事件到一個(gè)或者多個(gè)side outputs。

    ?

    SideOutPut ?將體溫高于37.5的信息再測(cè)輸出流中輸出

    case class Temperature( id: String, timestamp: Long, temperature: Double )

    object SideOutputTest {

    ? def main(args: Array[String]): Unit = {

    ? ? val env = StreamExecutionEnvironment.getExecutionEnvironment ? ?env.setParallelism(1)

    ?

    ? ? ? ? val inputStream: DataStream[String] = env.readTextFile("D:\\DEVELOP_CODE\\workspace\\FlinkTest\\src\\main\\resources\\Temperature.txt")

    // ? ?val inputStream = env.socketTextStream("192.168.1.101", 7777)
    ? ?// Transform操作 ? ?val dataStream: DataStream[Temperature] = inputStream

    ? ? ? .map(data => {

    ? ? ? ? val dataArray = data.split(",")

    ? ? ? ? Temperature(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)

    ? ? ? })

    ?

    ? ? val highTempStream = dataStream

    ? ? ? .process(new SplitTempMonitor())

    ?

    ? ? highTempStream.print("low")

    ? ? highTempStream.getSideOutput(new OutputTag[String]("high-temp")).print("high")

    ?

    ? ? env.execute("process function test")

    ? }

    }

    ?

    // 自定義process function,實(shí)現(xiàn)分流操作class SplitTempMonitor() extends ProcessFunction[Temperature, Temperature]{

    ? override def processElement(value: Temperature, ctx: ProcessFunction[Temperature, Temperature]#Context, out: Collector[Temperature]): Unit = {

    ? ? // 判斷當(dāng)前數(shù)據(jù)的溫度值,如果在37.5以上,輸出到側(cè)輸出流 ? ?if( value.temperature > 37.5 ){

    ? ? ? ctx.output(new OutputTag[String]("high-temp"), value.id+"->有病")

    ? ? } else{

    ? ? ? // 37.5度以下的數(shù)據(jù),輸出到主流 ? ? ?out.collect(value)

    ? ? }

    ? }

    }

    ?

    ?

    ?

    總結(jié)

    以上是生活随笔為你收集整理的大数据乘(tu)风(tou)破(bian)浪(qiang)之路的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。