日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink sql client讀取kafka數據的timestamp(DDL方式)

發布時間:2023/12/31 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink sql client讀取kafka數據的timestamp(DDL方式) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

實驗目的

Kafka的數據能讓Flink SQL Client讀取到

本文是對[1]的詳細記載

具體操作步驟

①啓動hadoop集羣,離開安全模式

②各個節點都關閉防火墻:

service firewalld status(查看防火墻狀態)

service firewalld stop(關閉防火墻)

各個節點分別啟動zookeeper

③啟動kafka集群

startkafka

startkafka2

startkafka3

flink-connector-kafka_2.12-1.12.0.jar

flink-json-1.12.0.jar

flink-jdbc_2.12-1.10.2.jar

放入$FLINK_HOME/lib中

啟動flink集群

?

?

操作命令備注
查看topic$KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

如果想刪除topic,可以是:

?


?

往 order_sql 這個 topic發送 json 消息$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic order_sql

這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致

[2]中的報錯還可能是某個節點的kafka掛掉導致的.

?

可能碰到[3]

注意關閉防火墻

?

?

使用kafka自帶消費端測試下消費$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic order_sql

如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了,

此時如果使用Flink SQL Client來消費也必然會出現問題

清除topic中所有數據[6](因為,萬一你輸錯了呢?對吧)$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic order_sql

需要$KAFKA/config/server.properties設置

delete.topic.enable=true

?


下面的需要手動輸入(發送json消息,注意下面的信息千萬不要一次性復制全部內容,必須一條一條手動拷貝)

{"order_id": "1","shop_id": "AF18","member_id": "3410211","trade_amt": "100.00","pay_time": "1556420980000"}
{"order_id": "2","shop_id": "AF20","member_id": "3410213","trade_amt": "130.00","pay_time": "1556421040000"}
{"order_id": "3","shop_id": "AF18","member_id": "3410212","trade_amt": "120.00","pay_time": "1556421100000"}
{"order_id": "4","shop_id": "AF19","member_id": "3410212","trade_amt": "100.00","pay_time": "1556421120000"}
{"order_id": "5","shop_id": "AF18","member_id": "3410211","trade_amt": "150.00","pay_time": "1556421480000"}
{"order_id": "6","shop_id": "AF18","member_id": "3410211","trade_amt": "110.00","pay_time": "1556421510000"}
{"order_id": "7","shop_id": "AF19","member_id": "3410213","trade_amt": "110.00","pay_time": "1556421570000"}
{"order_id": "8","shop_id": "AF20","member_id": "3410211","trade_amt": "100.00","pay_time": "1556421630000"}
{"order_id": "9","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "1556421655000"}
?

$FLINK_HOME/bin/sql-client.sh embedded -d $FLINK_HOME/conf/sql.my.yaml -l sql-libs/

注意這個實驗涉及到兩個.yaml文件,修改如下:

?

?

$FLINK_HOME/bin/sql-client.sh embedded -d $FLINK_HOME/conf/sql.my.yaml -l /home/appleyuchi/bigdata/flink-1.12/lib

⑦FLINK SQL執行

具體操作具體FLINK SQL
顯示orders內容select * from orders;
1分鐘固定窗口計算SELECT
? shop_id
? , TUMBLE_START(payment_time, INTERVAL '1' MINUTE) AS tumble_start
? , TUMBLE_END(payment_time, INTERVAL '1' MINUTE) ? AS tumble_end
? , sum(trade_amt) ? ? ? ? ? ? ? ? ? ? ? ? ? ? AS amt
FROM orders
GROUP BY shop_id, TUMBLE(payment_time, INTERVAL '1' MINUTE);

?

?

--------------------------------------------------------------------------------------------------------實驗效果截圖--------------------------------------------------------------------------------------------------------------------------------------------------------------------

解決方案:

$FLINK_HOME/conf/flink-conf.yaml中修改為:

classloader.resolve-order: parent-first

盡量集群中的每個節點都要修改

繼續往下,碰到問題:

問了花名雪盡

根據提示:

放棄采用.yaml的方式,改用DDL方式(下面的直接拷貝到Flink SQL Client中然后按下回車即可):

CREATE TABLE orders (order_id BIGINT, member_id BIGINT, trade_amt DOUBLE, pay_time BIGINT,ts AS TO_TIMESTAMP(FROM_UNIXTIME(pay_time/ 1000, 'yyyy-MM-dd HH:mm:ss')), -- 定義事件時間WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 在ts上定義5 秒延遲的 watermark ) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'order_sql','connector.startup-mode' = 'earliest-offset','connector.properties.group.id' = 'testGroup','connector.properties.zookeeper.connect' = 'Desktop:2181,Laptop:2181,Laptop:2183','connector.properties.bootstrap.servers' = 'Desktop:9091','format.type' = 'json' );

操作如下:

實驗效果如下:

----------------------------------------------------------------------------附錄(其他可能用到的常規操作)-----------------------------------------------------------------------------------------------------------------------------------------

?

刪除topic:

$KAFKA/bin/kafka-topics.sh --delete --zookeeper Desktop:2181 --topic order_sql

或者:

deleteall /brokers/topics/order_sql
deleteall /config/topics/order_sql
deleteall /admin/delete_topics/order_sql

?


?

?

根據官方文檔[4]用的是0.11的kafka,可能是版本存在不兼容的問題

Reference:

[1]Flink SQL-Client 的使用

[2]Kafka連接服務器出現:Connection to node 1 (localhost/127.0.0.1:9092) could not be established.

[3]kafka出現Unable to read additional data from server sessionid 0x0, likely server has closed socket

[4]SQL Client

[5]Flink 1.9 實戰:使用 SQL 讀取 Kafka 并寫入 MySQL

[6]Is there a way to delete all the data from a topic or delete the topic before every run?

?

?

?

?

?

總結

以上是生活随笔為你收集整理的flink sql client讀取kafka數據的timestamp(DDL方式)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。