flink DDL读取kafka数据-Scala嵌入DDL形式
步驟:
service firewalld stop(關(guān)閉防火墻)
啓動(dòng)hadoop
離開安全模式
啓動(dòng)zookeeper與kafka集羣
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 無 ?
|
| 往kafka_ddl 這個(gè) topic發(fā)送 json 消息 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic kafka_ddl | 這里可能碰到[2]中的報(bào)錯(cuò),注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴(yán)格保持一致 [2]中的報(bào)錯(cuò)還可能是某個(gè)節(jié)點(diǎn)的kafka掛掉導(dǎo)致的. ? 可能碰到[3] 注意關(guān)閉防火墻 ? ? |
| 使用kafka自帶消費(fèi)端測(cè)試下消費(fèi) | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic kafka_ddl | 如果kafka自帶消費(fèi)者測(cè)試有問題,那么就不用繼續(xù)往下面做了, 此時(shí)如果使用Flink SQL Client來消費(fèi)也必然會(huì)出現(xiàn)問題 |
| 清除topic中所有數(shù)據(jù)[6](因?yàn)?萬一你輸錯(cuò)了呢?對(duì)吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic kafka_ddl | 需要$KAFKA/config/server.properties設(shè)置 delete.topic.enable=true |
kafka生產(chǎn)端輸入的數(shù)據(jù):
{"name":"apple1","age":"18","city":"NingBo","address":"100.00","ts":"1556420980000"}
{"name":"apple2","age":"20","city":"JiaXing","address":"130.00","ts":"1556421040000"}
{"name":"apple3","age":"18","city":"JiangXi","address":"120.00","ts":"1556421100000"}
{"name":"apple4","age":"19","city":"JiangXi","address":"100.00","ts":"1556421120000"}
{"name":"apple5","age":"18","city":"NingBo","address":"150.00","ts":"1556421480000"}
{"name":"apple6","age":"18","city":"NingBo","address":"110.00","ts":"1556421510000"}
{"name":"apple7","age":"19","city":"JiaXing","address":"110.00","ts":"1556421570000"}
{"name":"apple8","age":"20","city":"NingBo","address":"100.00","ts":"1556421630000"}
{"name":"apple9","age":"17","city":"JiangXi","address":"110.00","ts":"1556421655000"}
對(duì)應(yīng)的DDL是
CREATE TABLE PERSON (name VARCHAR COMMENT '姓名',age VARCHAR COMMENT '年齡',city VARCHAR COMMENT '所在城市',address VARCHAR COMMENT '家庭住址',ts BIGINT COMMENT '時(shí)間戳',pay_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000, 'yyyy-MM-dd HH:mm:ss')), -- 定義事件時(shí)間WATERMARK FOR pay_time AS pay_time - INTERVAL '0' SECOND)WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal', -- kafka 版本'connector.topic' = 'kafka_ddl', -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 從最早的 offset 開始讀取'connector.properties.0.key' = 'zookeeper.connect', -- 連接信息'connector.properties.0.value' = 'Desktop:2181','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'Desktop:9091','update-mode' = 'append','format.type' = 'json', -- 數(shù)據(jù)源格式為 json'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規(guī)則)完整工程如下:
https://gitee.com/appleyuchi/Flink_Code/blob/master/flink讀kafka/Scala/src/main/scala/FlinkKafkaDDLDemo.scala
本文內(nèi)容已經(jīng)絕大部分涵蓋[7],不必再重復(fù)閱讀[7]
?
Reference:
[1]FlinkSQL使用DDL語句創(chuàng)建kafka源表
[2]Kafka連接服務(wù)器出現(xiàn):Connection to node 1 (localhost/127.0.0.1:9092) could not be established.
[3]Flink SQL深度篇
[6]Is there a way to delete all the data from a topic or delete the topic before every run?
[7]Flink 1.10.0 SQL DDL中如何定義watermark和計(jì)算列
?
?
?
總結(jié)
以上是生活随笔為你收集整理的flink DDL读取kafka数据-Scala嵌入DDL形式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka新增节点时server.pro
- 下一篇: Flink DDL的java代碼中的De