日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

confluent connect写出到ES及ClickHouse

發布時間:2024/7/5 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 confluent connect写出到ES及ClickHouse 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1 連接Elasticsearch測試

1.1 啟動confluent

/home/kafka/.local/confluent/bin/confluent start

This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.htmlUsing CONFLUENT_CURRENT: /tmp/confluent.swpIapNw Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Starting ksql-server ksql-server is [UP] Starting control-center control-center is [UP]

1.2 增加配置

vim /home/kafka/.local/confluent/etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties

name=iot-elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=road_traffic key.ignore=true connection.url=http://10.0.165.8:9200 type.name=iot-kafka-connect batch.size=1 flush.timeout.ms=200000 topic.schema.ignore=road_traffic schema.ignore=true retry.backoff.ms=3000

1.3 增加connect

bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties

$ bin/confluent load iot-elasticsearch-sink -d etc/kafka-connect-elasticsearch/iot-elasticsearch-sink.properties This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.htmlWarning: Install 'jq' to add support for parsing JSON {"name":"iot-elasticsearch-sink","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"road_traffic","key.ignore":"true","connection.url":"http://10.0.165.8:9200","type.name":"iot-kafka-connect","batch.size":"1","flush.timeout.ms":"200000","topic.schema.ignore":"road_traffic","schema.ignore":"true","retry.backoff.ms":"3000","name":"iot-elasticsearch-sink"},"tasks":[],"type":"sink"}

查看狀態

$ bin/confluent status connectors This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html ["iot-elasticsearch-sink"]$ bin/confluent status iot-elasticsearch-sink This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html {"name":"iot-elasticsearch-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.9:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.0.165.8:8083"}],"type":"sink"}

1.4 創建kafkatopic

/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --create --replication-factor 2 --partitions 2 --topic road_traffic

查看是否創建成功

/home/kafka/.local/confluent/bin/kafka-topics --zookeeper fbi-local-08:2181,fbi-local-09:2181 --list

1.5 生產數據

(1)添加如下的依賴

<groupId>org.example</groupId><artifactId>Manufacturing_data</artifactId><version>1.0-SNAPSHOT</version><properties><scala.binary.version>2.11</scala.binary.version><kafka.version>1.0.0</kafka.version><avro.version>1.8.0</avro.version></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.binary.version}</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.66</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>${avro.version}</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro-tools</artifactId><version>${avro.version}</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

(2)confluent的相關包在maven上是找不到的。需要自己手動添加,否則會報錯找不到io.confluent.kafka.serializers.KafkaAvroSerializer。

confluent-4.0.0 解壓后,其 share/java/目錄下有 confluent 各個組件的 jar 包:我們需要 confluent-common 目錄下的common-config-4.1.1.jar、common-utils-4.1.1.jar和全部以jackson開頭的 jar 包以及 kafka-serde-tools 目錄下的kafka-schema-registry-client-4.1.1.jar和kafka-avro-serializer-4.1.1.jar
復制出來在模塊下新建一個lib包放入,然后右鍵Add as Libary…

common-config-4.0.0.jar common-utils-4.0.0.jar jackson-annotations-2.9.0.jar jackson-core-2.9.1.jar jackson-core-asl-1.9.13.jar jackson-databind-2.9.1.jar jackson-jaxrs-1.9.13.jar jackson-mapper-asl-1.9.13.jar jackson-xc-1.9.13.jar kafka-avro-serializer-4.0.0.jar kafka-schema-registry-client-4.0.0.jar

生產者代碼如下:

import java.io.File import java.util.Propertiesimport com.alibaba.fastjson.JSON import org.apache.avro.Schema import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}case class RoadTraffic(status:String,avgMeasuredTime:Int,avgSpeed: Int, extID:String,medianMeasuredTime: Int, timestamp: Long,vehicleCount:Int,id:Long,perort_id:String,process_time:Long)object KafkaToTraffic {def main(args: Array[String]): Unit = {// kafka配置參數val props = new Properties()props.put("bootstrap.servers","10.0.165.8:9092,10.0.165.9:9092")props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")props.put("schema.registry.url", "http://10.0.165.8:8081");// Avro Schema解析val schema:Schema = new Schema.Parser().parse(new File("E:\\working\\ideaWorking\\iot_road_traffic\\src\\main\\resources\\RoadTraffic.avsc"));//val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)val avroRecord:GenericData.Record = new GenericData.Record(schema)//創建一個kafka生產者val producer: KafkaProducer[String,GenericRecord] = new KafkaProducer(props)val str= "{\"status\":\"OK\",\"avgMeasuredTime\":\"53\",\"avgSpeed\":\"58\",\"extID\":\"724\",\"medianMeasuredTime\":\"53\",\"TIMESTAMP\":\"2014-04-25T19:35:00\",\"vehicleCount\":\"1\",\"id\":\"8961146\",\"perort_id\":\"179444\",\"process_time\":\"1593386110\"}"val roadTraffic = JSON.parseObject(str, classOf[RoadTraffic])System.out.println(roadTraffic)avroRecord.put("status", roadTraffic.status);avroRecord.put("avgMeasuredTime", roadTraffic.avgMeasuredTime);avroRecord.put("avgSpeed", roadTraffic.avgSpeed);avroRecord.put("extID", roadTraffic.extID);avroRecord.put("medianMeasuredTime", roadTraffic.medianMeasuredTime);avroRecord.put("timestamp", roadTraffic.timestamp);avroRecord.put("vehicleCount", roadTraffic.vehicleCount);avroRecord.put("id", roadTraffic.id);avroRecord.put("perort_id", roadTraffic.perort_id);avroRecord.put("process_time", roadTraffic.process_time);try {val record = new ProducerRecord[String, GenericRecord]("road_traffic", avroRecord)System.out.println(record.toString)producer.send(record).get()} catch {case e: Exception => e.printStackTrace()}producer.close();} }

RoadTraffic.avsc

{"type": "record","name": "traffic","fields": [{"name": "status", "type": "string"},{"name": "avgMeasuredTime", "type": "int"},{"name": "avgSpeed", "type": "int"},{"name": "extID", "type": "string"},{"name": "medianMeasuredTime", "type": "int"},{"name": "timestamp", "type": "long"},{"name": "vehicleCount", "type": "int"},{"name": "id", "type": "long"},{"name": "perort_id", "type": "string"},{"name": "process_time", "type": "long"}] }

1.6 查看結果

加載iot-elasticsearch-sink后啟動生產者,會自動在ES上建立與topic一樣的索引,查看es

curl -GET http://10.0.165.8:9200/road_traffic/_search

$ curl -GET http://10.0.165.8:9200/road_traffic/_search {"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":1,"max_score":1.0,"hits":[{"_index":"road_traffic","_type":"iot-kafka-connect","_id":"road_traffic+0+0","_score":1.0,"_source":{"status":"OK","avgMeasuredTime":53,"avgSpeed":58,"extID":"724","medianMeasuredTime":53,"timestamp":1398425700000,"vehicleCount":1,"id":8961146,"perort_id":"179444","process_time":1593386110}}]}}

2 連接ClickHouse測試

連接ClickHouse是通過jdbc的方式

2.1 增加配置

vim /home/kafka/.local/confluent/etc/kafka-connect-jdbc/iot-clickhouse-sink.properties

name=iot-clickhouse-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=road_traffic connection.url=jdbc:clickhouse://10.0.50.1:8123/iot connection.user=default auto.create=false insert.mode=INSERT table.name.format=traffic_all errors.log.enable=true db.timezone=Asia/Shanghai

2.2 增加jar包

通過jdbc連接ClickHouse是需要在/home/kafka/.local/confluent/share/java/kafka-connect-jdbc目錄下增加ClickHouse的jdbc連接的jar包:clickhouse-jdbc-0.2.4.jar

$ ll total 10952 -rw-r--r-- 1 kafka kafka 20437 Mar 27 08:37 audience-annotations-0.5.0.jar -rw-r--r-- 1 root root 211574 Jun 29 12:30 clickhouse-jdbc-0.2.4.jar -rw-r--r-- 1 kafka kafka 20903 Mar 27 08:37 common-utils-5.2.4.jar -rw-r--r-- 1 kafka kafka 87325 Mar 27 08:37 jline-0.9.94.jar -rw-r--r-- 1 kafka kafka 317816 Mar 27 08:37 jtds-1.3.1.jar -rw-r--r-- 1 kafka kafka 223878 Mar 27 08:37 kafka-connect-jdbc-5.2.4.jar -rw-r--r-- 1 kafka kafka 1292696 Mar 27 08:37 netty-3.10.6.Final.jar -rw-r--r-- 1 kafka kafka 927447 Mar 27 08:37 postgresql-42.2.10.jar -rw-r--r-- 1 kafka kafka 41203 Mar 27 08:37 slf4j-api-1.7.25.jar -rw-r--r-- 1 kafka kafka 7064881 Mar 27 08:37 sqlite-jdbc-3.25.2.jar -rw-r--r-- 1 kafka kafka 74798 Mar 27 08:37 zkclient-0.10.jar -rw-r--r-- 1 kafka kafka 906708 Mar 27 08:37 zookeeper-3.4.13.jar

注意:需要重啟confluent,否則會報錯: java.sql.SQLException: No suitable driver found for jdbc:clickhouse://10.0.50.1:8123/iot

2.3 在clickhouse建庫建表

2.4 增加connect

因為前面已經在進行es測試的時候往road_traffic的主題上寫入了一條數據直接進行測試

bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties

[kafka@fbi-local-08 confluent]$ bin/confluent load iot-clickhouse-sink -d etc/kafka-connect-jdbc/iot-clickhouse-sink.properties This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.htmlWarning: Install 'jq' to add support for parsing JSON {"name":"iot-clickhouse-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"road_traffic","connection.url":"jdbc:clickhouse://10.0.50.1:8123/iot","connection.user":"default","auto.create":"false","insert.mode":"INSERT","table.name.format":"traffic_all","errors.log.enable":"true","db.timezone":"Asia/Shanghai","name":"iot-clickhouse-sink"},"tasks":[],"type":"sink"} $ bin/confluent status iot-clickhouse-sink This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html{"name":"iot-clickhouse-sink","connector":{"state":"RUNNING","worker_id":"10.0.165.8:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"10.0.165.8:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Table \"traffic_all\" is missing and auto-creation is disabled\n\tat io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:88)\n\tat io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)\n\tat io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:85)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)\n\t... 10 more\n"}],"type":"sink"}

當修改了iot-clickhouse-sink.properties中的表為本地表traffic時不報錯。

解決如下:
修改kafka-connect-jdbc-5.2.4源碼,增加clickhouse的連接然后將修改編譯后的jar包上傳到/home/kafka/.local/confluent/share/java/kafka-connect-jdbc,并刪除原來的kafka-connect-jdbc-5.2.4.jar

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的confluent connect写出到ES及ClickHouse的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。