ClickHouse表引擎之Integration系列
? Integration系統表引擎主要用于將外部數據導入到ClickHouse中,或者在ClickHouse中直接操作外部數據源。
1 Kafka
1.1 Kafka引擎
? 將Kafka Topic中的數據直接導入到ClickHouse。
? 語法如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] (name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],... ) ENGINE = Kafka() SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_row_delimiter = 'delimiter_symbol',][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0]? 參數說明:
? ①必需的參數
| kafka_broker_list | Kafka broker列表,以逗號分隔 |
| kafka_topic_list | Kafka topic列表 |
| kafka_group_name | Kafka消費者組,如果不希望消息在集群中重復,使用相同的組名 |
| kafka_format | 消息格式。使用與SQL格式函數相同的符號,例如JSONEachRow |
? ②可選參數
| kafka_row_delimiter | 分隔符字符,用于一行的結束標識符號 |
| kafka_schema | 如果kafka_format參數需要schema定義,則通過該參數來支持 |
| kafka_num_consumers | 每張表的消費者個數。默認值:1。如果一個使用者的吞吐量不足,則指定更多使用者。使用者的總數不應該超過主題中的分區數,因為每個分區只能分配一個使用者。 |
| kafka_max_block_size | 輪詢的最大批處理大小 |
| kafka_skip_broken_messages | 忽略無效記錄的條數。默認值:0 |
| kafka_commit_every_batch | 在編寫整個塊之后提交每個使用和處理的批而不是單個提交(默認值:0) |
? 測試:(1)建表
CREATE TABLE test_kafka (\timestamp UInt64,\level String,\message String\) ENGINE = Kafka() SETTINGS kafka_broker_list = 'ambari01:6667,ambari02:6667,ambari03:6667',\kafka_topic_list = 'test',\kafka_group_name = 'group1',\kafka_format = 'JSONEachRow',\kafka_row_delimiter = '\n'? 注意:如果后面在查詢過程中報如下錯誤。是因為有些引擎版本存在的,消息中數據之間的分割符號未指定,導致無法處理。解決辦法: 添加 kafka_row_delimiter = ‘\n’。
Cannot parse input: expected { before: \0: (at row 2)? (2)在kafka建立一個新的topic
sh kafka-topics.sh --create --zookeeper ambari01:2181,ambari02:2181,ambari03:2181 --replication-factor 1 --partitions 3 --topic test? (3)在kafka建立發布者console-producer
sh kafka-console-producer.sh --broker-list ambari01:6667,ambari02:6667,ambari03:6667 --topic test? (4)發送數據
{"timestamp":1515897460,"level":"one","message":"aa"}? 注意:由于一個kafka的partition 只能由一個 group consumer 消費,所以clickhouse 節點數需要大于 topic 的 partition 數。
? (5)第一次查詢
SELECT * FROM test_kafka ┌──timestamp─┬─level─┬─message─┐ │ 1515897460 │ one │ aa │ └────────────┴───────┴─────────┘? (6)第二次查詢
SELECT * FROM test_kafka Ok.? 發現第二次查詢的時候沒有數據了,因為 Kafka引擎 表只是 kafka 流的一個視圖而已,當數據被 select 了一次之后,這個數據就會被認為已經消費了,下次 select 就不會再出現。所以Kafka表單獨使用是沒什么用的,一般是用來和 MaterialView 配合,將Kafka表里面的數據自動導入到 MaterialView 里面。
? (7)與 MaterialView 集成
? 我們現在每一節點建一個 MaterialView 保存 Kafka 里面的數據, 再建一個全局的Distributed表。
CREATE MATERIALIZED VIEW test_kafka_view ENGINE = SummingMergeTree() PARTITION BY day ORDER BY (day, level) AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as count FROM test_kafka GROUP BY day, level;? (6)再次發送數據
{"timestamp":1515897461,"level":"2","message":'bb'} {"timestamp":1515897462,"level":"3","message":'cc'} {"timestamp":1515897462,"level":"3","message":'ee'} {"timestamp":1515897463,"level":"4","message":'dd'}? (7)查詢數據
SELECT * FROM test_kafka Ok.0 rows in set. Elapsed: 2.686 sec. --------------------------------------- SELECT * FROM test_kafka_view Ok.0 rows in set. Elapsed: 0.002 sec.? 發現沒有數據,原因:kafka 引擎默認消費根據條數與時間進行入庫,不然肯定是沒效率的。其中對應的參數有兩個。 max_insert_block_size(默認值為: 1048576),stream_flush_interval_ms(默認值為: 7500)這兩個參數都是全局性的。
? 業務系統需要從kafka讀取數據,按照官方文檔建好表后,也能看到數據,但是延時很高。基本要延時15分鐘左右。kafka的數據大約每秒50條左右。基本規律是累計到65535行以后(最小的塊大小)才會在表中顯示數據。嘗試更改stream_flush_interval_ms 沒有作用,但是有不想改max_block_size,因為修改以后影響到全局所有表,并且影響搜索效率。希望能每N秒保證不管block有沒有寫滿都flush一次。
? 雖然ClickHouse和 Kafka的配合可以說是十分的便利,只有配置好,但是相當的局限性對 kafka 數據格式的支持也有限。下面介紹WaterDrop這個中間件將Kafka的數據接入ClickHouse。
?
1.2 WaterDrop
? WaterDrop: 是一個非常易用,高性能、支持實時流式和離線批處理的海量數據處理產品,架構于Apache Spark和 Apache Flink之上。github地址:https://github.com/InterestingLab/waterdrop
? ①下載并解壓
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.4.3/waterdrop-1.4.3.zip unzip waterdrop-1.4.3.zip? ②修改配置文件waterdrop-env.sh
vim /opt/module/waterdrop-1.4.3/config/waterdrop-env.sh SPARK_HOME=/usr/jdp/3.2.0.0-108/spark2 #配置為spark的路徑? ③增加配置文件test.conf
spark {spark.streaming.batchDuration = 5spark.app.name = "test_waterdrop"spark.ui.port = 14020spark.executor.instances = 3spark.executor.cores = 1spark.executor.memory = "1g" }input {kafkaStream {topics = "test_wd"consumer.bootstrap.servers = "10.0.0.50:6667,10.0.0.52:6667,10.0.0.53:6667"consumer.zookeeper.connect = "10.0.0.50:2181,10.0.0.52:2181,10.0.0.53:2181"consumer.group.id = "group1"consumer.failOnDataLoss = falseconsumer.auto.offset.reset = latestconsumer.rebalance.max.retries = 100} } filter {json{source_field = "raw_message"} }output {clickhouse {host = "10.0.0.50:8123"database = "test"table = "test_wd"fields = ["act","b_t","s_t"]username = "admin"password = "admin"retry_codes = [209, 210 ,1002]retry = 10bulk_size = 1000} }? ④創建Clickhouse表
create table test.test_wd( act String, b_t String, s_t Date) ENGINE = MergeTree() partition by s_t order by s_t;? ⑤啟動寫入程序
cd /data/work/waterdrop-1.4.1 sh /opt/module/waterdrop-1.4.3/bin/start-waterdrop.sh --master yarn --deploy-mode client --config /opt/module/waterdrop-1.4.3/config/test.conf? ⑥插入數據
{"act":"aaaa","b_t":"100","s_t":"2019-12-22"} {"act":"bxc","b_t":"200","s_t":"2020-01-01"} {"act":"dd","b_t":"50","s_t":"2020-02-01"}? ⑦查看表數據
SELECT * FROM test_wd ┌─act─┬─b_t─┬────────s_t─┐ │ dd │ 50 │ 2020-02-01 │ └─────┴─────┴────────────┘ ┌─act──┬─b_t─┬────────s_t─┐ │ aaaa │ 100 │ 2019-12-22 │ └──────┴─────┴────────────┘ ┌─act─┬─b_t─┬────────s_t─┐ │ bxc │ 200 │ 2020-01-01 │ └─────┴─────┴────────────┘2 MySQL
? 將Mysql作為存儲引擎,可以對存儲在遠程 MySQL 服務器上的數據執行 select查詢
? 語法:
MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);? 參數說明
| host:port | MySQL 服務器地址 |
| database | 數據庫的名稱 |
| table | 表名稱 |
| user | 數據庫用戶 |
| password | 用戶密碼 |
| replace_query | 將 INSERT INTO 查詢是否替換為 REPLACE INTO 的標志。如果 replace_query=1,則替換查詢 |
| on_duplicate_clause | 將 ON DUPLICATE KEY UPDATE on_duplicate_clause 表達式添加到 INSERT 查詢語句中。 |
? 測試:
? 在Mysql中建表,并插入數據
CREATE TABLE `user` (`id` int(11) NOT NULL,`username` varchar(50) DEFAULT NULL,`sex` varchar(5) DEFAULT NULL )INSERT INTO user values(11,"zs","0"); INSERT INTO user values(12,"ls","0"); INSERT INTO user values(13,"ww","0"); INSERT INTO user values(14,"ll","1");? 創建ClickHouse表,insert_time字段為默認字段
CREATE TABLE test.from_mysql(\id UInt64,\username String,\sex String,\insert_time Date DEFAULT toDate(now())\ ) ENGINE = MergeTree()\ PARTITION BY insert_time \ ORDER BY (id,username)? 插入數據
INSERT INTO test.from_mysql (id,username,sex) SELECT id, username,sex FROM mysql('10.0.0.42:3306','test', 'user', 'root', 'admin');? 查詢數據
SELECT * FROM from_mysql ┌─id─┬─username─┬─sex─┬─insert_time─┐ │ 11 │ zs │ 0 │ 2020-05-24 │ │ 12 │ ls │ 0 │ 2020-05-24 │ │ 13 │ ww │ 0 │ 2020-05-24 │ │ 14 │ ll │ 1 │ 2020-05-24 │ └────┴──────────┴─────┴─────────────┘4 rows in set. Elapsed: 0.003 sec.3 HDFS
? 用戶通過執行SQL語句,可以在ClickHouse中直接讀取HDFS的文件,也可以將讀取的數據導入到ClickHouse本地表。
? HDFS引擎:ENGINE = HDFS(URI, format)。URI:HDFS的URI,format:存儲格式,格式鏈接https://clickhouse.tech/docs/en/interfaces/formats/#formats
3.1 查詢文件
? 這種使用場景相當于把HDFS做為ClickHouse的外部存儲,當查詢數據時,直接訪問HDFS的文件,而不是把HDFS文件導入到ClickHouse再進行查詢。相對于ClickHouse的本地存儲查詢,速度較慢。
? 在HDFS上新建一個數據文件:user.csv,上傳hadoop fs -cat /user/test/user.csv,內容如下:
1,zs,18 2,ls,19 4,wu,25 3,zl,22? 在ClickHouse上創建一個訪問user.csv文件的表:
CREATE TABLE test_hdfs_csv(\id UInt64,\name String,\age UInt8\ )ENGINE = HDFS('hdfs://ambari01:8020/user/test/user.csv', 'CSV')? 查詢hdfs_books_csv表
SELECT * FROM test_hdfs_csv ┌─id─┬─name─┬─age─┐ │ 1 │ zs │ 18 │ │ 2 │ ls │ 19 │ │ 4 │ wu │ 25 │ │ 3 │ zl │ 22 │ └────┴──────┴─────┘3.2 從HDFS導入數據
? 從HDFS導入數據,數據在ClickHouse本地表,建本地表
CREATE TABLE test_hdfs_local(\id UInt64,\name String,\age UInt8\ )ENGINE = Log? 在數據存儲目錄下可以找到這個表的文件夾
/data/clickhouse/data/test/test_hdfs_local? 從HDFS導入數據
INSERT INTO test_hdfs_local SELECT * FROM test_hdfs_csv? 查詢
SELECT * FROM test_hdfs_local ┌─id─┬─name─┬─age─┐ │ 1 │ zs │ 18 │ │ 2 │ ls │ 19 │ │ 4 │ wu │ 25 │ │ 3 │ zl │ 22 │ └────┴──────┴─────┘總結
以上是生活随笔為你收集整理的ClickHouse表引擎之Integration系列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux 跨服务器备份,用Backup
- 下一篇: 清华镜像源安装 NGboost XGbo