30.kafka数据同步Elasticsearch深入详解(ES与Kafka同步)
1、kafka同步到Elasticsearch方式?
目前已知常用的方式有四種:?
1)logstash_input_kafka插件;?
缺點:不穩(wěn)定(ES中文社區(qū)討論)?
2)spark stream同步;?
缺點:太龐大?
3)kafka connector同步;?
4)自寫程序讀取、解析、寫入?
?
本文主要基于kafka connector實現(xiàn)kafka到Elasticsearch全量、增量同步。
2、從confluenct說起
LinkedIn有個三人小組出來創(chuàng)業(yè)了—正是當(dāng)時開發(fā)出Apache Kafka實時信息列隊技術(shù)的團隊成員,基于這項技術(shù)Jay Kreps帶頭創(chuàng)立了新公司Confluent。Confluent的產(chǎn)品圍繞著Kafka做的。?
Confluent Platform簡化了連接數(shù)據(jù)源到Kafka,用Kafka構(gòu)建應(yīng)用程序,以及安全,監(jiān)控和管理您的Kafka的基礎(chǔ)設(shè)施。?
confluent組成如下所示:?
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
默認(rèn)端口對應(yīng)表:
組件 | 端口
Apache Kafka brokers (plain text):9092
Confluent Control Center:9021
Kafka Connect REST API:8083
REST Proxy:8082
Schema Registry REST API:8081
ZooKeeper:2181
3、kafka connector介紹。
Kafka 0.9+增加了一個新的特性 Kafka Connect,可以更方便的創(chuàng)建和管理數(shù)據(jù)流管道。它為Kafka和其它系統(tǒng)創(chuàng)建規(guī)??蓴U展的、可信賴的流數(shù)據(jù)提供了一個簡單的模型。
通過 connectors可以將大數(shù)據(jù)從其它系統(tǒng)導(dǎo)入到Kafka中,也可以從Kafka中導(dǎo)出到其它系統(tǒng)。
Kafka Connect可以將完整的數(shù)據(jù)庫注入到Kafka的Topic中,或者將服務(wù)器的系統(tǒng)監(jiān)控指標(biāo)注入到Kafka,然后像正常的Kafka流處理機制一樣進(jìn)行數(shù)據(jù)流處理。
而導(dǎo)出工作則是將數(shù)據(jù)從Kafka Topic中導(dǎo)出到其它數(shù)據(jù)存儲系統(tǒng)、查詢系統(tǒng)或者離線分析系統(tǒng)等,比如數(shù)據(jù)庫、 Elastic Search、 Apache Ignite等。
KafkaConnect有兩個核心概念:Source和Sink。 Source負(fù)責(zé)導(dǎo)入數(shù)據(jù)到Kafka,Sink負(fù)責(zé)從Kafka導(dǎo)出數(shù)據(jù),它們都被稱為Connector。
kafkaConnect通過Jest實現(xiàn)Kafka對接Elasticsearch。
4、kafka connector安裝
實操非研究性的目的,不建議源碼安裝。?
直接從官網(wǎng)down confluent安裝即可。地址:https://www.confluent.io/download/
如下,解壓后既可以使用。
[root@kafka_no1 confluent-3.3.0]# pwd /home/confluent/confluent-3.3.0[root@kafka_no1 confluent-3.3.0]# ls -al total 32 drwxrwxr-x. 7 root root 4096 Dec 16 10:08 . drwxr-xr-x. 3 root root 4096 Dec 20 15:34 .. drwxr-xr-x. 3 root root 4096 Jul 28 08:30 bin drwxr-xr-x. 18 root root 4096 Jul 28 08:30 etc drwxr-xr-x. 2 root root 4096 Dec 21 15:34 logs -rw-rw-r--. 1 root root 871 Jul 28 08:45 README drwxr-xr-x. 10 root root 4096 Jul 28 08:30 share drwxrwxr-x. 2 root root 4096 Jul 28 08:45 src- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
5、kafka connector模式
Kafka connect 有兩種工作模式?
1)standalone:在standalone模式中,所有的worker都在一個獨立的進(jìn)程中完成。
2)distributed:distributed模式具有高擴展性,以及提供自動容錯機制。你可以使用一個group.ip來啟動很多worker進(jìn)程,在有效的worker進(jìn)程中它們會自動的去協(xié)調(diào)執(zhí)行connector和task,如果你新加了一個worker或者掛了一個worker,其他的worker會檢測到然后在重新分配connector和task。
6、kafka connector同步步驟
前提:
$ confluent start- 1
如下的服務(wù)都需要啟動:
Starting zookeeper zookeeper is [UP] ——對應(yīng)端口:2181 Starting kafka kafka is [UP]——對應(yīng)端口:9092 Starting schema-registry schema-registry is [UP]——對應(yīng)端口:8081 Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP]- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
可以,netstat -natpl 查看端口是否監(jiān)聽ok。
步驟1:創(chuàng)建topic
./kafka-topics.sh --create --zookeeper 110.118.7.11 :2181 --replication-factor 3 --partitions 1 --topic test-elasticsearch-sink- 1
步驟2:生產(chǎn)者發(fā)布消息
假定avrotest topic已經(jīng)創(chuàng)建。
./kafka-avro-console-producer --broker-list 110.118.7.11:9092 --topic test-elasticsearch-sink \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'{"f1": "value1"} {"f1": "value2"} {"f1": "value3"}- 1
- 2
- 3
- 4
- 5
- 6
步驟3:消費者訂閱消息測試(驗證生產(chǎn)者消息可以接收到)
./kafka-avro-console-consumer --bootstrap-server 110.118.7.11:9092 :9092 --topic test-elasticsearch-sink --from-beginning- 1
步驟4:connector傳輸數(shù)據(jù)操作到ES
./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties \ ../etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties- 1
- 2
注意此處: connect-standalone模式,對應(yīng) connect-avro-standalone.properties要修改;?
如果使用connect-distribute模式,對應(yīng)的connect-avro-distribute.properties要修改。?
這里 quickstart-elasticsearch.properties :啟動到目的Elasticsearch配置。
quickstart-elasticsearch.properties**設(shè)置**:
name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 #kafka主題名稱,也是對應(yīng)Elasticsearch索引名稱 topics= test-elasticsearch-sinkkey.ignore=true #ES url信息 connection.url=http://110.18.6.20:9200 #ES type.name固定 type.name=kafka-connect- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
7、同步效果。
curl -XGET 'http:// 110.18.6.20 :9200/test-elasticsearch-sink/_search?pretty'
8、連接信息查詢REST API
- -
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
9、小結(jié)。
他山之石,可以攻玉。?
kafka上的小學(xué)生,繼續(xù)加油!
參考:
[1]kafka-connect部署及簡介:http://t.cn/RiUCaWx?
[2]connector介紹:http://orchome.com/344?
[3]英文-同步介紹http://t.cn/RYeZm7P?
[4]部署&開發(fā)http://t.cn/RTeyOEl?
[5]confluent生態(tài)鏈http://t.cn/RTebVyL?
[6]快速啟動參考:https://docs.confluent.io/3.3.0/quickstart.html?
[7]ES-connector:http://t.cn/RTecXmc
總結(jié)
以上是生活随笔為你收集整理的30.kafka数据同步Elasticsearch深入详解(ES与Kafka同步)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于Go语言Gin+Xorm+Layui
- 下一篇: Meta拟裁撤Instagram伦敦员工