日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

14_clickhouse,kafka引擎,kafka消息到ClickHouse的MergeTree引擎

發布時間:2024/9/27 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 14_clickhouse,kafka引擎,kafka消息到ClickHouse的MergeTree引擎 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

19.Kafka引擎

19.1.Kafka引擎

Kafka引擎結合Kafka使用,可實現訂閱或發布數據流。

指定表引擎:

ENGINE = Kafka() SETTINGSkafka_broker_list = 'host:port', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,][kafka_row_delimiter = 'delimiter_symbol',][kafka_schema = '',][kafka_num_consumers = N,][kafka_skip_broken_messages = N]

必選參數:
kafka_broker_list :以逗號分隔的brokers列表。
kafka_topic_list :以逗號分隔的kafka的topic列表。
kafka_group_name :Kafka消費組。
kafka_format :消息的格式,例如JSONEachRow。

可選參數:
kafka_row_delimiter :行之間的分隔符。
kafka_schema :按需定義schema,例如Cap’n Proto格式需指定。
kafka_num_consumers :消費者數量,默認1,最多不超過Topic的分區數。
kafka_skip_broken_messages :每個block中,Kafka的消息解析器容忍schema不兼容消息的數量。默認值:0。

創建Kafka引擎表示例
示例1:

CREATE TABLE queue (timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

示例2:

CREATE TABLE queue2 (timestamp UInt64, level String, message String ) ENGINE = Kafka('localhost:9092', 'topic', 'group1') SETTINGS kafka_format ='JSONEachRow', kafka_num_consumers = 4;

示例3:

CREATE TABLE queue2 (timestamp UInt64,level String,message String ) ENGINE = Kafka SETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_num_consumers = 4;

SELECT 查詢對于讀取消息并不是很有用(除了調試),因為每個消息只能讀取一次。

通常,將該引擎結合物化視圖一起使用,使用方法:
(1)、使用Kafka引擎創建一個Kafka的消費者,并將其視為一個數據流。
(2)、創建所需結構的表。
(3)、創建一個物化視圖,該視圖轉換來自引擎的數據并將其放入上一步創建的表中。

當物化視圖添加至該引擎,它將會在后臺收集數據。這就允許你從Kafka持續接收消息并使用SELECT將數據轉換為所需的格式。它們不直接從Kafka中讀取數據,而是接收新記錄,以block為單位,這樣就可以寫入具有不同詳細信息級別的多個表(分組聚合或無聚合)中。

為了提高性能,接收到的消息將被分組為大小為max_insert_block_size的block(塊)。如果block沒有在stream_flush_interval_ms時間內形成,則不管block的完整性如何,數據將刷新到表中。

要停止接收topic數據或更改轉換邏輯,需detach物化視圖。
DETACH TABLE consumer;
ATTACH MATERIALIZED VIEW consumer;

如果要使用ALTER更改目標表,建議禁用物化視圖,以避免目標表和該視圖中的數據之間出現差異。

Kafka的擴展配置
Kafka引擎支持使用ClickHouse配置文件擴展配置。
用戶可以使用兩個配置key,全局的kafka和topic級別的kafka_*。首先應用全局配置,然后應用topic級別的配置。

<!-- Global configuration options for all tables of Kafka engine type --> <kafka> <debug>cgrp</debug> <auto_offset_reset>smallest</auto_offset_reset> </kafka><!-- Configuration specific for topic “logs” kafka下劃線后面是topic的名稱 --> <kafka_logs> <retry_backoff_ms>250</retry_backoff_ms> <fetch_min_bytes>100000</fetch_min_bytes> </kafka_logs>

有關可能的配置選項的列表,參見librdkafka配置,鏈接:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。

ClickHouse配置中使用下劃線(_)代替點,例如,check.crcs=true將配置為

<check_crcs>true</check_crcs>

最終在自己的機器上的配置如下:

[root@middleware config.d]# vim kafka.xml [root@middleware config.d]# pwd /etc/clickhouse-server/config.d [root@middleware config.d]# ls kafka.xml [root@middleware config.d]# ls kafka.xml [root@middleware config.d]# cat kafka.xml <yandex><kafka><debug>cgrp</debug><auto_offset_reset>smallest</auto_offset_reset></kafka><!-- Configuration specific for topic "topic_ch" --><kafka_topic_ch><auto_offset_reset>latest</auto_offset_reset><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_topic_ch><kafka_my_topic><auto_offset_reset>latest</auto_offset_reset><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_my_topic></yandex> [root@middleware config.d]#

19.2.示例1

示例1:通過兩張表分別保存Kafka的清單數據和分組聚合數據。
創建Kafka的topic:
參考地址:https://kafka.apachecn.org/quickstart.html

[root@hadoop4 kafka-broker]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_ch WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "topic_ch". [root@hadoop4 kafka-broker]#

可以運行list(列表)命令來查看這個topic:

[root@middleware kafka_2.12-2.6.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181

(1)、創建topic的數據流

drop table if exists topic_ch_kafka; CREATE TABLE topic_ch_kafka (timestamp UInt64,level String,message String ) ENGINE = Kafka('localhost:9092', 'topic_ch', 'group_ch', 'JSONEachRow');

效果圖:

middleware :) CREATE TABLE topic_ch_kafka ( :-] timestamp UInt64, :-] level String, :-] message String :-] ) ENGINE = Kafka('localhost:9092', 'topic_ch', 'group_ch', 'JSONEachRow');CREATE TABLE topic_ch_kafka (`timestamp` UInt64,`level` String,`message` String ) ENGINE = Kafka('localhost:9092', 'topic_ch', 'group_ch', 'JSONEachRow')Ok.0 rows in set. Elapsed: 0.007 sec. middleware :)

(2)、創建保存清單的表以及以及相應的物化視圖:

DROP TABLE topic_ch_list; CREATE TABLE topic_ch_list (timestamp UInt64,level String,message String ) ENGINE = MergeTree() order by (timestamp);DROP TABLE topic_ch_list_view; CREATE MATERIALIZED VIEW topic_ch_list_view TO topic_ch_listAS SELECT timestamp, level, messageFROM topic_ch_kafka;

效果圖:

middleware :) DROP TABLE topic_ch_list;DROP TABLE topic_ch_listReceived exception from server (version 20.9.3): Code: 60. DB::Exception: Received from localhost:9000. DB::Exception: Table default.topic_ch_list doesn't exist.. 0 rows in set. Elapsed: 0.015 sec. middleware :) CREATE TABLE topic_ch_list ( :-] timestamp UInt64, :-] level String, :-] message String :-] ) ENGINE = MergeTree() :-] order by (timestamp);CREATE TABLE topic_ch_list (`timestamp` UInt64,`level` String,`message` String ) ENGINE = MergeTree() ORDER BY timestampOk.0 rows in set. Elapsed: 0.006 sec. middleware :) DROP TABLE topic_ch_list_view;DROP TABLE topic_ch_list_viewReceived exception from server (version 20.9.3): Code: 60. DB::Exception: Received from localhost:9000. DB::Exception: Table default.topic_ch_list_view doesn't exist.. 0 rows in set. Elapsed: 0.002 sec. middleware :) CREATE MATERIALIZED VIEW topic_ch_list_view TO topic_ch_list :-] AS SELECT timestamp, level, message :-] FROM topic_ch_kafka;CREATE MATERIALIZED VIEW topic_ch_list_view TO topic_ch_list AS SELECT timestamp,level,message FROM topic_ch_kafkaOk.0 rows in set. Elapsed: 0.005 sec. middleware :)

(3)、創建統計聚合的表以及相應的物化視圖:

DROP TABLE topic_ch_daily;CREATE TABLE topic_ch_daily (day Date,level String,total UInt64) ENGINE = SummingMergeTree(day)ORDER BY (day, level);DROP TABLE topic_ch_daily_view; CREATE MATERIALIZED VIEW topic_ch_daily_view TO topic_ch_dailyAS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as totalFROM topic_ch_kafka GROUP BY day, level;

2、生產數據

[root@middleware ~]# source /etc/profile [root@middleware ~]# $ZOOKEEPER_HOME/bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /root/apache-zookeeper-3.6.2-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@middleware ~]# # 啟動kafka [root@middleware ~]# cd $KAFKA_HOME [root@middleware kafka_2.12-2.6.0]# bin/kafka-server-start.sh -daemon config/server.properties [root@middleware kafka_2.12-2.6.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets my_topic test topic topic_ch topic_ch2 [root@middleware kafka_2.12-2.6.0]# bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_ch >{"timestamp":1542426134, "level":"high", "message":"hehe"} >{"timestamp":1542427132, "level":"high", "message":"hehe"} >{"timestamp":1542428133, "level":"mid", "message":"hehe"} >{"timestamp":1542429134, "level":"low", "message":"hehe"} >{"timestamp":1542430134, "level":"high", "message":"hehe"} >{"timestamp":1542423134, "level":"low", "message":"hehe"} >{"timestamp":1542434434, "level":"low", "message":"hehe"} >{"timestamp":1542444134, "level":"low", "message":"hehe"} >{"timestamp":1542454136, "level":"high", "message":"hehe"} >{"timestamp":1542464134, "level":"high", "message":"hehe"} >{"timestamp":1542474134, "level":"high", "message":"hehe"} >{"timestamp":1542484134, "level":"low", "message":"hehe"} >{"timestamp":1542494134, "level":"high", "message":"hehe"} >{"timestamp":1542424194, "level":"mid", "message":"hehe"} >

查看結果(數據有歷史消息):

middleware :) select * from topic_ch_list;SELECT * FROM topic_ch_list┌──timestamp─┬─level─┬─message─┐ │ 1542474134 │ high │ hehe │ └────────────┴───────┴─────────┘ ┌──timestamp─┬─level─┬─message─┐ │ 1542423134 │ low │ hehe │ │ 1542424132 │ high │ hehe │ │ 1542424132 │ high │ hehe │ │ 1542424133 │ mid │ hehe │ │ 1542424133 │ mid │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ high │ hehe │ │ 1542424134 │ low │ hehe │ │ 1542424136 │ high │ hehe │ │ 1542424434 │ low │ hehe │ │ 1542426134 │ high │ hehe │ │ 1542427132 │ high │ hehe │ │ 1542428133 │ mid │ hehe │ │ 1542429134 │ low │ hehe │ │ 1542430134 │ high │ hehe │ │ 1542434134 │ high │ hehe │ │ 1542434434 │ low │ hehe │ │ 1542444134 │ low │ hehe │ │ 1542454136 │ high │ hehe │ │ 1542464134 │ high │ hehe │ └────────────┴───────┴─────────┘ ┌──timestamp─┬─level─┬─message─┐ │ 1542424194 │ mid │ hehe │ └────────────┴───────┴─────────┘ ┌──timestamp─┬─level─┬─message─┐ │ 1542484134 │ low │ hehe │ └────────────┴───────┴─────────┘ ┌──timestamp─┬─level─┬─message─┐ │ 1542494134 │ high │ hehe │ └────────────┴───────┴─────────┘33 rows in set. Elapsed: 0.058 sec. middleware :)

聚合統計表:

SELECT level, sum(total) FROM topic_ch_daily GROUP BY level;

結果類似:

┌─level─┬─sum(total)─┐ │ mid │ 3 │ │ low │ 5 │ │ high │ 8 │ └───────┴────────────┘

如果要停止接收主題或更改轉換邏輯,可以使用下面的命令分離物化視圖(這個是在clickhouse-client -m中執行的):

DETACH TABLE consumer; ATTACH TABLE consumer;

19.3.示例2:Kafka的配置

通過使用ClickHouse配置文件,Kafka引擎支持擴展配置。有兩個配置key,你可以使用:全局(kafka)和topic-level(kafka_*)。全局的配置在最前面,接著是topic-level的配置。
在目錄/etc/clickhouse-server/config.d/新建配置文件,配資文件名稱任意指定,這里命名為kafka.xml。如下:

[root@middleware config.d]# pwd /etc/clickhouse-server/config.d [root@middleware config.d]# ls kafka.xml [root@middleware config.d]#

Kafka.xml的具體內容如下:

<yandex><!-- 下面是通用的配置,支持所有的kafka的topic中的消息 --><kafka><debug>cgrp</debug><auto_offset_reset>smallest</auto_offset_reset></kafka><!-- Configuration specific for topic "topic_ch" 只是針對topic_ch這個topic的 --><kafka_topic_ch><auto_offset_reset>latest</auto_offset_reset><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_topic_ch><!-- 這個是針對my_topic這個topic的 --><kafka_my_topic><auto_offset_reset>latest</auto_offset_reset><retry_backoff_ms>250</retry_backoff_ms><fetch_min_bytes>100000</fetch_min_bytes></kafka_my_topic></yandex>

如果想了解更多的關于這些的可選配置,參見librdkafka configuration reference(https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)。使用下劃線(_)代替ClickHouse配置文件中的點。例如:check.crcs=true將會寫成<check_crcs>true</check_crcs>。

總結

以上是生活随笔為你收集整理的14_clickhouse,kafka引擎,kafka消息到ClickHouse的MergeTree引擎的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。