多个客户端抢夺命名管道_使用Kafka构建数据管道
目標:使用Kafka和使用Redis的服務層編寫數據管道。
先決條件
請根據您的操作系統安裝以下組件:
· Kafka
· Zookeeper
· Redis
· Java
目標觀眾
本文針對的是正在構建第一個數據管道的工程師。 但是,已建立數據管道的工程師可以快速瀏覽它。
期待什么
關于如何建立數據管道的概念證明。 這不包括數據管道可操作,具有彈性和可擴展性(即生產就緒)所需的工作。
大綱
假設我們有一種只能通過語音訪問的產品(類似于Google Home或Amazon Alexa)。 在產品(以硬件設備的形式)處于打開狀態的整個會話中,我們有權聽取用戶的對話。
Photo by Ben White on Unsplash
目的
在每個家庭(或設備通過WiFi連接的任何位置)中查找當前正在討論的有趣話題。 這些信息將幫助我們更好地為客戶提供服務。注意:我們要找到的有趣主題與Kafka主題不同。 以下各節中有關Kafka主題的更多信息。
方法
· 語音對話應轉換為文本。 語音朗讀等技術可為我們提供幫助。 我們可以建立自己的模型或購買第三方服務。 對于此帖子,此模塊超出范圍。
· 轉換后的文本應實時發送到后端系統。 這些系統/過程稱為生產者。
· 數據的保存方式應使過程(生產者除外)可以要求將來的數據,也可以等待將來的數據生成。 我們稱這種系統為遵循先進先出語義的隊列。
· 應該部署稱為使用者的進程,以從隊列中讀取數據,將數據在內存中保留一定時間(通常以分鐘為單位),并執行所需的業務邏輯計算。
· 根據需要,可以將上述計算的結果匯總的數據發送到另一個Queue或將其持久化到數據庫中。
注意:某些硬件設備能夠執行業務邏輯計算(這類設備稱為邊緣設備)。 使用邊緣設備構建數據管道不在當前文章的討論范圍之內。
假設條件
電影說明用于模擬用戶的對話。 在下一篇文章中,我們將用電影字幕代替描述。 每部電影都被視為家。
Kafka概念
在動手構建第一個數據管道之前,讓我們清晰地了解以下Kafka的基本構建基塊。 經紀人:經紀人是生態系統的切入點。 它們允許生產者將數據寫入主題分區中的Queue中,并允許使用者以特定偏移量讀取數據在主題分區中。 主題:主題是Kafka存儲中的邏輯標識符。 它類似于數據庫中的表名。 生產者和消費者必須指定主題名稱以分別寫入或讀取Kafka存儲。
分區:分區是Kafka分布式系統中容錯能力和吞吐量的基礎。
分區是Kafka中并行性的一個單元。 更多分區將允許消費者并行使用消息。 但是,這樣做的代價是要寫入分區需要更多的文件指針,客戶端(生產者和使用者)都需要更多的內存。 有關此的更多信息,請查看有關分區的融合博客。
當Kafka作為多節點群集啟動時,其中一個節點被分配為[特定主題]每個分區的領導者。 每個分區都復制到多個節點以實現容錯功能。 領導者節點負責將寫入其中的數據發送到其復制節點。 如果領導者節點停止服務,則將復制的節點之一選作分區的領導者。
設計分區:設計編號 以下主題中的分區和分區鍵的重要性應給予重視:
· 順序保證-應該按時間順序閱讀所有生成的事件。 可以說,消耗模式是按順序讀取所有事件(從我們的示例中讀取),然后分區鍵可以是唯一標識符。 這將確保同一家庭的所有事件都將進入同一分區。 這并不意味著我們需要為每個房屋創建一個分區,該分區可能以百萬計。 一個分區將包含來自多個家庭的對話。 但是在2個分區中不會出現家庭對話。
· 并發性—可以同時讀取分區中的消息。
· 邏輯上的數據分離—能夠同時讀取數據是好的。 分區鍵的設計方式應使我們的消費模式所需的所有數據都駐留在分區中。
偏移量:分區中的每條消息都用一個偏移量標識,該數量不斷增加。 偏移量類似于表中的索引或數組中的索引。 這些偏移量用作檢索數據的參考。
Anatomy of Kafka Topic
生產者:生產者是使用生產者API與經紀人建立聯系的任何客戶端。 它必須提到一個主題名稱才能將數據寫入Kafka。 如果尚未創建主題,則將自動創建新主題(可以從屬性中關閉自動創建新主題的配置)
消費者:消費者是使用消費者API與經紀人建立聯系的任何客戶端。 它必須提到一個主題名稱才能從Kafka中讀取數據。
Producer writes and consumer(s) read simultaneously
使用者組:一組使用者(可能在同一臺計算機上或在不同計算機上運行)可以具有唯一的使用者組ID [字符串或整數]。 使用消費者API擴展新消費者時,可以使用此消費者組ID。 消費者組的消費者訂閱一個主題(或多個主題)。 Kafka負責將下一條消息傳遞給訂閱該主題的消費者。 Zookeeper用于記錄每個消費者組讀取的偏移量。 一旦服務器確認消費者已經閱讀了該消息,就下一條消息被傳遞。
示例:消費者組的示例是具有組ID為emailnotif的電子郵件通知發送組,具有組ID為smsnotif的SMS通知發送組,具有組ID為appnotif的App通知發送組等。我們代碼庫中的消費者組配置可以包含-消費者的名稱 消費者組的組ID消費者組中的消費者數量。 該信息可以另存為配置文件或數據庫中,并用于啟動足夠數量的使用者線程。
發布者-訂閱者(Pubsub)模型
Pubsub是產生和使用消息的唯一方法,即生產者將發布消息,而消費者將訂閱主題分區以讀取消息。 Kafka主題僅用作消息的存儲系統。 在郵件的保留期之前,郵件將保留在系統中。 之后,Kafka將清除消息。
消息隊列模式:Pubsub模型也可以用于使用應用程序邏輯將主題轉換為消息隊列。 當所有使用者組中的一個使用者已讀取消息時,通常應使用此模式。一旦使用者組讀取消息,應用程序邏輯應從主題中刪除消息。
可以說,我們在隊列中有一條消息流,對于隊列中的每條消息,我們都需要發送電子郵件,應用程序和SMS通知[您可以假定該消息將具有所有必需的詳細信息]。 在這種情況下,我們將一組消費者分配給消費者組ID,Kafka將確保僅將消息發送給其中一個消費者。
讓我們(實際上)開始
運行Zookeeper和Kafka服務器
制作人:讓我們將有關主題語音的消息寫到Kafka Server。
創建一個線程安全的KafkaProducer實例。 實例可以在多個線程之間共享,而沒有任何額外的開銷。
Properties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");producerProps.put("acks", "all");producerProps.put("retries", 0);producerProps.put("batch.size", 16384);producerProps.put("buffer.memory", 33554432);producerProps.put("linger.ms", 1);producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//KafkaProducer is thread-safelistenProducer = new KafkaProducer(producerProps);為什么使用帶有鍵值對的KafkaProducer?Key可以是任何數據類型(例如,在大多數情況下為Integer或String),可用于在主題中指定分區。 值可以是任何數據類型(Avro或Json等),最終將由Kafka編碼為字節數組,然后再由生產者發送給代理,并在消費者收到消息后由Kafka解碼為原始dataype。
通過使用包含電影ID和電影摘要作為制表符分隔行的文件來模擬文本對話
//simulate: read a file and send its content to kafkatry (Stream lines = Files.lines(Paths.get("plot_summaries.txt"))) {lines.forEach(p::action);} catch (IOException e) { //additional logging as needed. e.printStackTrace();}對于上述文件plot_summaries.txt中的每一行,將執行以下操作,該操作還涉及將數據發送到代理
void action(String s) { String[] split = s.split(""); s = split[1]; listenProducer.send(new ProducerRecord<>("voice", split[0], s));}操作.send不會立即調用網絡調用。 這由batch.size和buffer.memory配置平衡。
· 郵件是批量發送的。 批次取決于batch.size(以字節為單位)。 此設置適用于每個分區。
· 客戶端調用.send方法時,將使用buffer.memory對消息進行緩沖以累積到batch.size。 buffer.memory適用于整個生產者
· 如果buffer.memory已填滿,.producer客戶端將阻止.send調用。 如果在max.block.ms之前客戶端可以將消息添加到緩沖區,則客戶端將被阻止。 發布max.block.ms異常將被引發
制作人的完整代碼可以在ListenProducer上找到
使用者:允許從主題語音中讀取數據。
consumerProps = new Properties();consumerProps.put("bootstrap.servers", "localhost:9092");consumerProps.put("group.id", "text-reading-group");consumerProps.put("enable.auto.commit", true);consumerProps.put("auto.commit.interval.ms", 1000);consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer不是線程安全的。 因此,不能在多個線程之間共享KafkaConsumer的單個實例。 創建子線程之后,應該啟動新的使用者。 生產者的完整代碼在ListenConsumer中。
ListenFrom l = new ListenFrom();//KafkaConsumer is not thread-safeKafkaConsumer listenConsumer = new KafkaConsumer<>(l.consumerProps);listenConsumer.subscribe(Arrays.asList("voice"));有兩種方法可以確定線程數。
· 使用分配方法:當需要對分配給使用者線程的分區進行更好的控制時,將使用分配。 應當注意,屬于同一使用者組的多個使用者線程不會讀取同一分區。
· 使用訂閱方法:使用訂閱,kafka可以確保使用者線程之間分區的公平平衡。 它使消費者工作更靈活地分叉適當的編號。 消費者線程。 這將是創建使用者線程的理想方法。
注意:
· 還有其他方法可以確定編號。 消費者,但總的來說太復雜而無法編碼和維護(在我看來)。
· 為了建立數據管道,如果使用了任何流處理框架(例如Spark或Flink),則創建線程[或內核]的開銷將由框架本身來處理。
讓我們應用業務邏輯:從文件中讀取停用詞并從中創建列表。 此列表用于刪除無關緊要的單詞(例如I,you,a,an等單詞)
Stream lines = Files.lines(Paths.get("stopwords.txt"));List stopwords = lines.map(String::trim).mapString::toLowerCase).distinct().collect(Collectors.toList());對于服務器上每分鐘的輪詢,以獲取最新消息,根據需要處理記錄,并將聚合的數據持久保存到Redis。
while (true) { Duration d = Duration.ofMillis(1000*60); ConsumerRecords records = listenConsumer.poll(d); l.consume(records, stopwords);}private void consume(ConsumerRecords records, List stopwords) { for (ConsumerRecord r : records) { String[] tokens = ((String) r.value()).split("s"); HashSet keys = new HashSet<>(); for (String t : tokens) { if (!stopwords.contains(t.toLowerCase())) { String key = (String) r.key(); //TODO: change to pipeline commands j.zincrby(key, 1.0, t); j.zincrby("global-token-count", 1.0, t); keys.add(key); } } if(keys.size()>0) { System.out.println(keys); String[] a = new String[keys.size()]; j.sadd("movies-list", keys.toArray(a)); } } }數據消耗層:通過我們的數據管道持久化到Redis的數據可以被多個下游客戶端使用。 一個這樣的客戶端每5分鐘查詢一次Redis,并檢查對話次數最多的最新10個主題。
public ViewLayer() { //TODO: change to pool j = new Jedis("127.0.0.1", 6379); }public static void main(String args[]) throws InterruptedException { ViewLayer v = new ViewLayer(); while (true) { Set tokens = v.j .zrevrangeByScoreWithScores("global-token-count", "+inf", "-inf", 10, 10); for (Tuple token : tokens) { System.out.println(token.getElement() + "," + token.getScore()); } Thread.sleep(1000*60*5); //TODO: repeat the same for each movie// for (String s : j.smembers("movieslist")) { } } }后續步驟:盡管標題中未提及,但我們現在構建的是實時數據管道,但沒有任何分布式處理框架(如Spark或Flink)的支持。
由于以下原因,我們現在構建的管道難以擴展:
· 數據管道開發人員應編寫和維護膠合代碼,以使多個使用者根據負載擴展規模。 這與應該編寫的處理邏輯代碼一起將是額外的負擔。
· 實時數據管道有兩種。 一種讀取偏移量的數據應用所需的業務邏輯,然后寫入目標。 第二種將在一定時間內以分鐘為單位從一個或多個流中累積相關數據,以執行業務邏輯。 業務邏輯可以包括與我們的示例類似的轉換和聚合。 這稱為微批處理。 分布式框架(如Spark)具有處理這些多個微批處理的能力。 對于第一種方法,要使管道具有可伸縮性,容錯性和彈性,最好使用分布式處理框架。
因此,在我們的下一篇文章中,讓我們將當前的實時數據管道轉換為使用Flink和Spark等分布式框架。
資源:
1.plot_summaries.txt在CMU電影摘要語料庫中。
2.stopwords.txt在NLTK停用詞列表中。
(本文翻譯自Sarath Jiguru的文章《Building Data Pipelines With Kafka》,參考:https://medium.com/@sarathjiguru/building-data-pipelines-with-kafka-735ea0b739b4)
總結
以上是生活随笔為你收集整理的多个客户端抢夺命名管道_使用Kafka构建数据管道的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一年的第几周怎么算_外企必备,算某一天是
- 下一篇: nginx 负载均衡 404_nginx