日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Clickhouse Engine kafka 将kafka数据同步clickhouse

發布時間:2024/9/27 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Clickhouse Engine kafka 将kafka数据同步clickhouse 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本篇文章轉自:https://blog.csdn.net/weixin_41461992/article/details/106790507

起因

由于需要做各種數據庫擺渡到kafka的組件研究。
其中clickhouse和kafka間的數據擺渡,根據官方給出的kafka引擎文檔,便有了我這篇實踐記錄。
相應的,該配置也非常簡單。

官方傳送門: kafka engine clickhouse

這邊對數據庫和kafka環境不再累述。

一、開發環境

kafka 2.4
zookeeper 3.4.5
clickhouse 20.4.5.36
centos7

二、 介紹

clickhouse支持kafka的表雙向同步,其中提供的為Kafka引擎。

其大致情況為如下情況:Kafka主題中存在對應的數據格式,Clickhouse創建一個Kafka引擎表(即相當于一個消費者),當主題有消息進入時,獲取該消息,將其進行消費,然后物化視圖同步插入到MergeTree表中。

該引擎還支持反向寫入到Kafka中,即往Kafka引擎表中插入數據,可以同步到Kafka中(同樣可以使用物化視圖將不同引擎需要的表數據同步插入到Kafka引擎表中)。

下面為Kafka Engine的一些配置:
老版本格式為:

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])

新版本格式為:

Kafka SETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic1,topic2',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n',kafka_schema = '',kafka_num_consumers = 2

必填參數(例如topic、kafka集群、消費者組等):

  • kafka_broker_list – 以逗號分隔的 brokers 列表 (例如kafka1:9092,kafka2:9092,kafka3:9092)。
  • kafka_topic_list – topic 列表 (你的topic名字,也可以多個)。
  • kafka_group_name – Kafka 消費組名稱 (group1)。如果不希望消息在集群中重復,請在每個分片中使用相同的組名。
  • kafka_format – 消息體格式。使用與 SQL 部分的 FORMAT 函數相同表示方法,例如 JSONEachRow、CSV、XML等。Formats格式傳送門
  • 非必填的參數:

  • kafka_row_delimiter - 每個消息體(記錄)之間的分隔符。
  • kafka_schema – 如果解析格式需要一個 schema 時,此參數必填。
  • kafka_num_consumers – 單個表的消費者數量。默認值是:1,如果一個消費者的吞吐量不足,則指定更多的消費者。眾所周知消費者的總數不應該超過 topic 中分區的數量,因為每個分區只能分配一個消費者。
  • 三、實踐

  • 創建,使用引擎創建一個Kafka消費者并作為一條數據流。
  • CREATE TABLE queue (q_date Date,level String,message String) ENGINE = Kafka SETTINGS kafka_broker_list = 'k1:9092,k2:9092,k3:9092',kafka_topic_list = 'my_topic',kafka_group_name = 'kafka_group_test',kafka_format = 'CSV',kafka_num_consumers = 4;

    消費的消息會被自動追蹤,因此每個消息在不同的消費組里只會記錄一次。如果希望獲得兩次數據,則使用另一個組名創建副本。

    消費組可以靈活配置并且在集群之間同步。例如,如果集群中有10個主題和5個表副本,則每個副本將獲得2個主題。如果副本數量發生變化,主題將自動在副本中重新分配。

  • 創建一個結構表
  • CREATE TABLE daily (day Date,level String,message String ) ENGINE = MergeTree(day, (day, level), 8192);
  • 創建物化視圖,該視圖會在后臺轉換引擎中的數據并將其放入之前創建的表中。
  • CREATE MATERIALIZED VIEW consumer TO dailyAS SELECT q_date AS day, level, messageFROM queue;

    其中AS后面的語句是自己根據實際需求進行調整的。
    為了提高性能,接受的消息被分組為max_insert_block_size大小的塊。如果未在stream_flush_interval_ms毫秒內形成塊,則不關心塊的完整性,都會將數據刷新到表中。

  • 停止或者修改轉換邏輯
    detach物化視圖
  • DETACH TABLE consumer; ATTACH TABLE consumer;
  • 在創建表前,需要在kafka創建一個topic用于測試。
  • ./bin/kafka-topic.sh --create --topic my_topic --partitions 3 --replication-factor 3

    此處的topic需要與之前的kafka_topic_list對應,既然該參數為list,則可以配置多個topic

  • 生產者往topic中生產數據
  • ./bin/kafka-console-producer.sh --topic my_topic --broker-list k1:9092

    按照CSV格式,生產數據輸入

    2020-06-28 level1 message 2020-06-28 level2 message 2020-06-28 level3 message

    此時查看表daily,數據已同步

    select * from daily |--- day --- level --- message ---| |--- 2020-06-28 --- level1 --- message ---| |--- 2020-06-28 --- level2 --- message ---| |--- 2020-06-28 --- level3 --- message ---|

    同理,向kafka引擎表中插入數據,也可以在my_topic中可以消費到插入數據。

  • 數據量測試
    在該版本,我在kafka生產者客戶端進行for循環2000w數據進行測試,延遲不高,基本在10秒內同步完成,不知道在表字段數量和復雜sql語句時情況如何。還需要進一步進行實際使用
  • 所遇問題

  • 表結構變更
    對于數據同步問題,其中一個就是表同步之間的結構對應問題。
    由于表創建的時候,已經固定,所以ck的kafka引擎在遇到字段改變的時候,依然需要刪表重建,或者修改物化視圖進行不同的sql操作。

  • 延遲問題
    在之前的版本中,社區有人提出該同步延遲太高,特別是數據量大的時候,但是在我實際測試中,大約2000w簡單單表同步延遲可以接受。具體性能需要進一步測試。

  • format格式
    對于自己規定的格式,一定要正確,比如csv就是csv,json就是json格式,不然會報錯。

  • 消費問題
    前面說過,kafka引擎其實是一個或者多個消費者進行topic的消費,那必然就涉及到消費問題,如何重新消費,如何在需要修改業務的時候重新連接。重置偏移量。
    場景:丟失數據,重新消費

  • 首先,我們可以將daily中的消息干掉,手動導致消息丟失(自己整一個消息丟失)TRUNCATE TABLE daily此時,daily數據被刪除了,同步的數據丟失。 接下來,我們停止kafka引擎表,在clickhouse中執行DETACH TABLE queue最后在kafka中,執行重置偏移量的命令./bin/kafka-consumer-groups --bootstrap-server k1:9092 --topic my_topic --group kafka_group_test --reset-offsets --to-earliest --excute然后,重連queue表ATTACH TABLE queue

    這樣,就開始重新消費啦

  • 錯誤日志
    clickhouse日志查看為ck目錄下的log文件夾的clickhouse-server.log中。
  • 與50位技術專家面對面20年技術見證,附贈技術全景圖

    總結

    以上是生活随笔為你收集整理的Clickhouse Engine kafka 将kafka数据同步clickhouse的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。