日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

分布式实时计算—从霍普金大学数据错误谈谈如何保证实时计算数据准确性

發(fā)布時(shí)間:2024/4/15 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式实时计算—从霍普金大学数据错误谈谈如何保证实时计算数据准确性 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

原文作者:實(shí)時(shí)流式計(jì)算

原文地址:從霍普金大學(xué)數(shù)據(jù)錯(cuò)誤談?wù)勅绾伪WC實(shí)時(shí)計(jì)算數(shù)據(jù)準(zhǔn)確性

目錄

一、Kafka

1、Produce端消息傳遞

1、Consumer端消息傳遞

3、精確一次


作為全球新冠疫情數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì)的權(quán)威,約翰斯·霍普金斯大學(xué)的實(shí)時(shí)數(shù)據(jù)一直是大家實(shí)時(shí)關(guān)注的,也是各大媒體的主要數(shù)據(jù)來(lái)源。在今天早上的相當(dāng)一段長(zhǎng)的時(shí)間,霍普金斯大學(xué)的全球疫情分布大屏中顯示,全球確診人數(shù)已經(jīng)突破200萬(wàn)。有圖有真相

隨后相關(guān)媒體也進(jìn)行了轉(zhuǎn)發(fā),不過(guò)這個(gè)數(shù)據(jù)明顯波動(dòng)太大,隨后該網(wǎng)站也修改了數(shù)據(jù)

約翰斯·霍普金斯大學(xué)系統(tǒng)科學(xué)與工程中心就制作了“全球新冠病毒擴(kuò)散地圖”,用于實(shí)時(shí)可視化和跟蹤報(bào)告的病例。于1月22日首次公開(kāi)。為了提高數(shù)據(jù)的實(shí)時(shí)性,數(shù)據(jù)的來(lái)源通過(guò)手動(dòng)和自動(dòng)獲取的方式。手動(dòng)的方式出錯(cuò)的概率還是很大的,如果我們可以通過(guò)實(shí)時(shí)流獲取數(shù)據(jù)的方式,就可以避免數(shù)據(jù)錯(cuò)誤的問(wèn)題,這其實(shí)是數(shù)據(jù)從一方到達(dá)另一方的數(shù)據(jù)是否準(zhǔn)確的問(wèn)題,也就是端到端的一致性。這種消息傳遞的定義叫做消息傳遞語(yǔ)義,我們要了解的是message delivery semantic?也就是消息傳遞語(yǔ)義。這是一個(gè)通用的概念,也就是消息傳遞過(guò)程中消息傳遞的保證性。分為三種:

?解釋特征
最多一次(at most once消息可能丟失也可能被處理,但最多只會(huì)被處理一次可能丟失 不會(huì)重復(fù)
至少一次(at least once消息不會(huì)丟失,但可能被處理多次。可能重復(fù) 不會(huì)丟失
精確傳遞一次(exactly once消息被處理且只會(huì)被處理一次。不丟失 不重復(fù) 就一次

那么我們希望能做到精確傳遞一次(exactly once),雖然可能會(huì)付出一些性能的代價(jià)。我們從幾個(gè)常見(jiàn)的流計(jì)算框架中,看一看都是如何解決端到端的一致性的問(wèn)題。

一、Kafka

Kafka是最初由Linkedin公司開(kāi)發(fā),是一個(gè)分布式、支持分區(qū)的(partition)、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng),它的最大的特性就是可以實(shí)時(shí)的處理大量數(shù)據(jù)以滿足各種需求場(chǎng)景:比如基于hadoop的批處理系統(tǒng)、低延遲的實(shí)時(shí)系統(tǒng)、storm/Spark流式處理引擎,web/nginx日志、訪問(wèn)日志,消息服務(wù)等等,用scala語(yǔ)言編寫,Linkedin于2010年貢獻(xiàn)給了Apache基金會(huì)并成為頂級(jí)開(kāi)源項(xiàng)目。

而kafka其實(shí)有兩次消息傳遞,一次生產(chǎn)者發(fā)送消息給kafka,一次消費(fèi)者去kafka消費(fèi)消息。兩次傳遞都會(huì)影響最終結(jié)果,兩次都是精確一次,最終結(jié)果才是精確一次。兩次中有一次會(huì)丟失消息,或者有一次會(huì)重復(fù),那么最終的結(jié)果就是可能丟失或者重復(fù)的。

1、Produce端消息傳遞

這是producer端的代碼:

Properties properties = new Properties();properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");properties.put("acks", "all");properties.put("retries", 0);properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);for (int i = 1; i <= 600; i++) {kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));System.out.println("testkafka"+i);}kafkaProducer.close();

其中指定了一個(gè)參數(shù)acks 可以有三個(gè)值選擇:

  • 0:producer完全不管broker的處理結(jié)果,回調(diào)也就沒(méi)有用了,并不能保證消息成功發(fā)送 但是這種吞吐量最高
  • all或者-1:leader broker會(huì)等消息寫入 并且ISR都寫入后才會(huì)響應(yīng),這種只要ISR有副本存活就肯定不會(huì)丟失,但吞吐量最低。
  • 1:默認(rèn)的值 leader broker自己寫入后就響應(yīng),不會(huì)等待ISR其他的副本寫入,只要leader broker存活就不會(huì)丟失,即保證了不丟失,也保證了吞吐量。

所以設(shè)置為0時(shí),實(shí)現(xiàn)了at most once,而且從這邊看只要保證集群穩(wěn)定的情況下,不設(shè)置為0,消息不會(huì)丟失。但是還有一種情況就是消息成功寫入,而這個(gè)時(shí)候由于網(wǎng)絡(luò)問(wèn)題producer沒(méi)有收到寫入成功的響應(yīng),producer就會(huì)開(kāi)啟重試的操作,直到網(wǎng)絡(luò)恢復(fù),消息就發(fā)送了多次。這就是at least once了。kafka producer 的參數(shù)acks 的默認(rèn)值為1,所以默認(rèn)的producer級(jí)別是at least once。并不能exactly once。

1、Consumer端消息傳遞

consumer是靠offset保證消息傳遞的。consumer消費(fèi)的代碼如下:

Properties props = new Properties();props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset","earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));try{while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}finally{consumer.close();}

其中有一個(gè)參數(shù)是 enable.auto.commit。若設(shè)置為true consumer在消費(fèi)之前提交位移 就實(shí)現(xiàn)了at most once。若是消費(fèi)后提交 就實(shí)現(xiàn)了 at least once 默認(rèn)的配置就是這個(gè)。kafka consumer的參數(shù)enable.auto.commit的默認(rèn)值為true ,所以默認(rèn)的consumer級(jí)別是at least once。也并不能exactly once。

圖 consumer-groups

3、精確一次

通過(guò)了解producer端與consumer端的設(shè)置,我們發(fā)現(xiàn)kafka在兩端的默認(rèn)配置都是at least once,肯能重復(fù),通過(guò)配置的話呢也不能做到exactly once,好像kafka的消息一定會(huì)丟失或者重復(fù)的,是不是沒(méi)有辦法做到exactly once了呢?確實(shí)在kafka 0.11.0.0版本之前producer端確實(shí)是不可能的,但是在kafka 0.11.0.0版本之后,kafka正式推出了idempotent producer。也就是冪等的producer還有對(duì)事務(wù)的支持

冪等的producer:kafka 0.11.0.0版本引入了idempotent producer機(jī)制,在這個(gè)機(jī)制中同一消息可能被producer發(fā)送多次,但是在broker端只會(huì)寫入一次,他為每一條消息編號(hào)去重,而且對(duì)kafka開(kāi)銷影響不大。如何設(shè)置開(kāi)啟呢? 需要設(shè)置producer端的新參數(shù) enable.idempotent 為true。而多分區(qū)的情況,我們需要保證原子性的寫入多個(gè)分區(qū),即寫入到多個(gè)分區(qū)的消息要么全部成功,要么全部回滾。

這時(shí)候就需要使用事務(wù),在producer端設(shè)置 transcational.id為一個(gè)指定字符串。這樣冪等producer只能保證單分區(qū)上無(wú)重復(fù)消息;事務(wù)可以保證多分區(qū)寫入消息的完整性。

這樣producer端實(shí)現(xiàn)了exactly once,那么consumer端呢?consumer端由于可能無(wú)法消費(fèi)事務(wù)中所有消息,并且消息可能被刪除,所以事務(wù)并不能解決consumer端exactly once的問(wèn)題,我們可能還是需要自己處理這方面的邏輯。比如自己管理offset的提交,不要自動(dòng)提交,也是可以實(shí)現(xiàn)exactly once的。還有一個(gè)選擇就是使用kafka自己的流處理引擎,也就是Kafka Streams,設(shè)置processing.guarantee=exactly_once,就可以輕松實(shí)現(xiàn)exactly once了。

Apache Flink是由Apache軟件基金會(huì)開(kāi)發(fā)的開(kāi)源流處理框架,其核心是用Java和Scala編寫的分布式流數(shù)據(jù)流引擎。Flink以數(shù)據(jù)并行和流水線方式執(zhí)行任意流數(shù)據(jù)程序,Flink的流水線運(yùn)行時(shí)系統(tǒng)可以執(zhí)行批處理和流處理程序。此外,Flink的運(yùn)行時(shí)本身也支持迭代算法的執(zhí)行。我們從flink消費(fèi)并寫入kafka的例子是如何通過(guò)兩部提交來(lái)保證exactly-once語(yǔ)義的為了保證exactly-once,所有寫入kafka的操作必須是事務(wù)的。在兩次checkpiont之間要批量提交數(shù)據(jù),這樣在任務(wù)失敗后就可以將沒(méi)有提交的數(shù)據(jù)回滾。

兩部提交協(xié)議的第一步是預(yù)提交。flink的jobmanager會(huì)在數(shù)據(jù)流中插入一個(gè)檢查點(diǎn)的標(biāo)記(這個(gè)標(biāo)記可以用來(lái)區(qū)別這次checkpoint的數(shù)據(jù)和下次checkpoint的數(shù)據(jù))。這個(gè)標(biāo)記會(huì)在整個(gè)dag中傳遞。每個(gè)dag中的算子遇到這個(gè)標(biāo)記就會(huì)觸發(fā)這個(gè)算子狀態(tài)的快照。

讀取kafka的算子,在遇到檢查點(diǎn)標(biāo)記時(shí)會(huì)存儲(chǔ)kafka的offset。之后,會(huì)把這個(gè)檢查點(diǎn)標(biāo)記傳到下一個(gè)算子。接下來(lái)就到了flink的內(nèi)存操作算子。這些內(nèi)部算子就不用考慮兩部提交協(xié)議了,因?yàn)樗麄兊臓顟B(tài)會(huì)隨著flink整體的狀態(tài)來(lái)更新或者回滾。

到了和外部系統(tǒng)打交道的時(shí)候,就需要兩步提交協(xié)議來(lái)保證數(shù)據(jù)不丟失不重復(fù)了。在預(yù)提交這個(gè)步驟下,所有向kafka提交的數(shù)據(jù)都是預(yù)提交。

當(dāng)所有算子的快照完成,也就是這次的checkpoint完成時(shí),flink的jobmanager會(huì)向所有算子發(fā)通知說(shuō)這次checkpoint完成,flink負(fù)責(zé)向kafka寫入數(shù)據(jù)的算子也會(huì)正式提交之前寫操作的數(shù)據(jù)。在任務(wù)運(yùn)行中的任何階段失敗,都會(huì)從上一次的狀態(tài)恢復(fù),所有沒(méi)有正式提交的數(shù)據(jù)也會(huì)回滾。

總結(jié)一下flink的兩步提交:

  • 當(dāng)所有算子都完成他們的快照時(shí),進(jìn)行正式提交操作
  • 當(dāng)任意子任務(wù)在預(yù)提交階段失敗時(shí),其他任務(wù)立即停止,并回滾到上一次成功快照的狀態(tài)。
  • 在預(yù)提交狀態(tài)成功后,外部系統(tǒng)需要完美支持正式提交之前的操作。如果有提交失敗發(fā)生,整個(gè)flink應(yīng)用會(huì)進(jìn)入失敗狀態(tài)并重啟,重啟后將會(huì)繼續(xù)從上次狀態(tài)來(lái)嘗試進(jìn)行提交操作。

這樣flink就通過(guò)狀態(tài)和兩次提交協(xié)議來(lái)保證了端到端的exactly-once語(yǔ)義。

更多Flink,Kafka,Spark等相關(guān)技術(shù)博文,科技資訊,歡迎關(guān)注實(shí)時(shí)流式計(jì)算 公眾號(hào)后臺(tái)回復(fù) “電子書(shū)” 下載300頁(yè)Flink實(shí)戰(zhàn)電子書(shū)

?

總結(jié)

以上是生活随笔為你收集整理的分布式实时计算—从霍普金大学数据错误谈谈如何保证实时计算数据准确性的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。