日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka Connect简介

發(fā)布時間:2024/1/23 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka Connect简介 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一. Kafka Connect簡介

  Kafka是一個使用越來越廣的消息系統(tǒng),尤其是在大數(shù)據(jù)開發(fā)中(實時數(shù)據(jù)處理和分析)。為何集成其他系統(tǒng)和解耦應用,經常使用Producer來發(fā)送消息到Broker,并使用Consumer來消費Broker中的消息。Kafka Connect是到0.9版本才提供的并極大的簡化了其他系統(tǒng)與Kafka的集成。Kafka Connect運用用戶快速定義并實現(xiàn)各種Connector(File,Jdbc,Hdfs等),這些功能讓大批量數(shù)據(jù)導入/導出Kafka很方便。

? ? ? ? ? ? ?

如圖中所示,左側的Sources負責從其他異構系統(tǒng)中讀取數(shù)據(jù)并導入到Kafka中;右側的Sinks是把Kafka中的數(shù)據(jù)寫入到其他的系統(tǒng)中。

二. 各種Kafka Connector

  Kafka Connector很多,包括開源和商業(yè)版本的。如下列表中是常用的開源Connector

ConnectorsReferences
JdbcSource,?Sink
Elastic SearchSink1,?Sink2,?Sink3
CassandraSource1,?Source 2,?Sink1,?Sink2?
MongoDBSource
HBaseSink
SyslogSource
MQTT (Source)Source
Twitter (Source)Source,?Sink
S3Sink1,?Sink2
?

  商業(yè)版的可以通過Confluent.io獲得

三. 示例

3.1 FileConnector Demo

 本例演示如何使用Kafka Connect把Source(test.txt)轉為流數(shù)據(jù)再寫入到Destination(test.sink.txt)中。如下圖所示:

? ? ? ? ??

? ? ? 本例使用到了兩個Connector:

  • FileStreamSource:從test.txt中讀取并發(fā)布到Broker中
  • FileStreamSink:從Broker中讀取數(shù)據(jù)并寫入到test.sink.txt文件中

  其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test

  其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test

  Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000

?

3.2 運行Demo

  需要熟悉Kafka的一些命令行,參考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)

?3.2.1 啟動Kafka Broker

[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/ [root@localhost kafka_2.11-0.11.0.0]# ls bin config libs LICENSE logs NOTICE site-docs [root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties & [root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &

3.2.2 啟動Source Connector和Sink Connector

[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

3.3.3 打開console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test

3.3.4 寫入到test.txt文件中,并觀察3.3.3中的變化

[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt [root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt 3.3.3中打開的窗口輸出如下 {"schema":{"type":"string","optional":false},"payload":"firest line"} {"schema":{"type":"string","optional":false},"payload":"second line"}

3.3.5 查看test.sink.txt

[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt firest line second line

?本例僅僅演示了Kafka自帶的File Connector,后續(xù)文章會完成JndiConnector,HdfsConnector,并且會使用CDC(Changed Data Capture)集成Kafka來完成一個ETL的例子

?

四. kafka 0.9 connect JDBC測試

kafka 0.9的connect功能,測試過程如下:

1.創(chuàng)建容器(本次采用docker容器構建kafka環(huán)境)

docker run -p 10924:9092 -p 21814:2181 --name?confluent -i -t -d java /bin/bash

2.將confluent安裝程序拷貝進容器;

docker cp ?confluent.zip confluent:/root

3.進入到confluent容器

docker exec -it confluent /bin/bash

4.解壓confluent壓縮包

unzip confluent.zip

5.啟動kafka

/root/confluent/bin/zookeeper-server-start? /root/confluent/etc/kafka/zookeeper.properties ?& > zookeeper.log

/root/confluent/bin/kafka-server-start? /root/confluent/etc/kafka/server.properties?& > server.log

/root/confluent/bin/schema-registry-start ?/root/confluent/etc/schema-registry/schema-registry.properties?& > schema.log

6.測試kafka 是否正常

開兩個docker窗口,一個跑producer,一個跑consumer,

/root/confluent/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

?

/root/confluent/bin/kafka-avro-console-consumer --topic test ?--zookeeper localhost:2181 --from-beginning

在producer端依次輸入以下記錄,確認consumer能正確顯示;

{"f1":?"value1"}

{"f1":?"value2"}

{"f1":?"value3"}

以上為安裝kafka過程,接下來開始測試jdbc接口;

測試之前,需要獲取mysql JDBC的驅動并將獲放在kafka環(huán)境對應的jre/lib文件夾里

測試jdbc connect

1.創(chuàng)建配置文件quickstart-mysql.properties,內容如下:?

name=test-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://192.168.99.100:33061/test1?user=root&password=welcome1
mode=incrementing
incrementing.column.name=id
topic.prefix=test-mysql-jdbc-

注:mysql是我在另一個容器里運行的,jdbc:mysql://192.168.99.100:33061/test1?user=root&password=welcome1是連接容器里的mysql的連接串

2.執(zhí)行./bin/connect-standalone?etc/schema-registry/connect-avro-standalone.properties?etc/kafka-connect-jdbc/quickstart-mysql.properties

3.執(zhí)行./bin/kafka-avro-console-consumer?--new-consumer?--bootstrap-server?192.168.99.100:10924 --topic test-mysql-jdbc-accounts?--from-beginning

然后在數(shù)據(jù)庫里增加一條記錄

然后就會在consumer端顯示新增記錄

?

五. 配置連接器

?

Connector的配置是簡單的key-value映射。對于獨立模式,這些都是在屬性文件中定義,并通過在命令行上的Connect處理。在分布式模式,JSON負載connector的創(chuàng)建(或修改)請求。大多數(shù)配置都是依賴的connector,有幾個常見的選項:

  • name?- 連接器唯一的名稱,不能重復。
  • connector.calss?- 連接器的Java類。
  • tasks.max?- 連接器創(chuàng)建任務的最大數(shù)。
  • connector.class配置支持多種格式:全名或連接器類的別名。比如連接器是org.apache.kafka.connect.file.FileStreamSinkConnector,你可以指定全名,也可以使用FileStreamSink或FileStreamSinkConnector。Sink connector也有一個額外的選項來控制它們的輸入:
  • topics - 作為連接器的輸入的topic列表。

對于其他的選項,你可以查看連接器的文檔。

六、rest api

kafka connect的目的是作為一個服務運行,默認情況下,此服務運行于端口8083。它支持rest管理,用來獲取 Kafka Connect 狀態(tài),管理 Kafka Connect 配置,Kafka Connect 集群內部通信,常用命令如下:

GET /connectors?返回一個活動的connect列表
POST /connectors?創(chuàng)建一個新的connect;請求體是一個JSON對象包含一個名稱字段和連接器配置參數(shù)

GET /connectors/{name}?獲取有關特定連接器的信息
GET /connectors/{name}/config?獲得特定連接器的配置參數(shù)
PUT /connectors/{name}/config?更新特定連接器的配置參數(shù)
GET /connectors/{name}/tasks 獲得正在運行的一個連接器的任務的列表

DELETE /connectors/{name}?刪除一個連接器,停止所有任務,并刪除它的配置

GET /connectors?返回一個活動的connect列表

POST /connectors?創(chuàng)建一個新的connect;請求體是一個JSON對象包含一個名稱字段和連接器配置參數(shù)

GET /connectors/{name}?獲取有關特定連接器的信息
GET /connectors/{name}/config?獲得特定連接器的配置參數(shù)
PUT /connectors/{name}/config?更新特定連接器的配置參數(shù)
GET /connectors/{name}/tasks 獲得正在運行的一個連接器的任務的列表

DELETE /connectors/{name}?刪除一個連接器,停止所有任務,并刪除它的配置

curl -s <Kafka Connect Worker URL>:8083/ | jq???獲取 Connect Worker 信息

curl -s <Kafka Connect Worker URL>:8083/connector-plugins | jq?列出 Connect Worker 上所有 Connector

curl -s <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/tasks | jq?獲取 Connector 上 Task 以及相關配置的信息

curl -s <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/status | jq?獲取 Connector 狀態(tài)信息

curl -s <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/config | jq?獲取 Connector 配置信息

curl -s -X PUT <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/pause?暫停 Connector

curl -s -X PUT <Kafka Connect Worker URL>:8083/connectors/<Connector名字>/resume?重啟 Connector

curl -s -X DELETE <Kafka Connect Worker URL>:8083/connectors/<Connector名字>?刪除 Connector

創(chuàng)建新 Connector (以FileStreamSourceConnector舉例)

  • curl -s -X POST -H "Content-Type: application/json" --data

  • '{"name": "<Connector名字>",

  • "config":

  • {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",

  • "key.converter.schemas.enable":"true",

  • "file":"demo-file.txt",

  • "tasks.max":"1",

  • "value.converter.schemas.enable":"true",

  • "name":"file-stream-demo-distributed",

  • "topic":"demo-distributed",

  • "value.converter":"org.apache.kafka.connect.json.JsonConverter",

  • "key.converter":"org.apache.kafka.connect.json.JsonConverter"}

  • }'

  • http://<Kafka Connect Worker URL>:8083/connectors | jq

  • ?

    更新 Connector配置 (以FileStreamSourceConnector舉例)

  • curl -s -X PUT -H "Content-Type: application/json" --data

  • '{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",

  • "key.converter.schemas.enable":"true",

  • "file":"demo-file.txt",

  • "tasks.max":"2",

  • "value.converter.schemas.enable":"true",

  • "name":"file-stream-demo-distributed",

  • "topic":"demo-2-distributed",

  • "value.converter":"org.apache.kafka.connect.json.JsonConverter",

  • "key.converter":"org.apache.kafka.connect.json.JsonConverter"}'

  • <Kafka Connect Worker URL>:8083/connectors/file-stream-demo-distributed/config | jq

  • 七、kafka connect + debezium,解析binlog至kafka

    在已知kafka connect和debezium作用,會使用kafka的基礎上,學會使用debezium來讀取binlog,并通過kafka connect將讀取的內容放入kafka topic中。?

    基于kafka0.10.0和Debezium0.6,mysql5.6

    kafka connect

    • Kafka Connect是一種用于Kafka和其他數(shù)據(jù)系統(tǒng)之間進行數(shù)據(jù)傳輸?shù)墓ぞ摺?/li>
    • 僅關注數(shù)據(jù)的復制,并且不處理其他任務
    • Kafka connect有兩個概念,一個source,另一個是sink。source是把數(shù)據(jù)從一個系統(tǒng)拷貝到kafka里,sink是從kafka拷貝到另一個系統(tǒng)里。
    • 可使用插件,獲取不同系統(tǒng)的數(shù)據(jù)。例如通過Debezium插件解析mysql的日志,獲取數(shù)據(jù)。
    • 支持集群,可以通過REST API管理Kafka Connect。
    • 對數(shù)據(jù)的傳輸進行管理和監(jiān)控。

    Debezium

    • Debezium是一個分布式平臺,可將現(xiàn)有數(shù)據(jù)庫轉換為事件流,因此應用程序可以立即查看并立即響應數(shù)據(jù)庫中每一行的更改。
    • Debezium建立在Apache Kafka之上,并提供用于監(jiān)視特定數(shù)據(jù)庫管理系統(tǒng)的Kafka Connect兼容連接器。
    • 本教程使用Debezium監(jiān)控binlog。

    準備操作

    • mysql需開啟binlog
  • [mysqld]

  • log-bin=mysql-bin #添加這一行就ok

  • binlog-format=ROW #選擇row模式

  • server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重復

    • mysql需創(chuàng)建一個有mysql slave相關權限的賬號,若mysql不在本機,則需要遠程權限,防火墻放行。
  • //mysql slave相關權限

  • CREATE USER canal IDENTIFIED BY 'debezium';

  • GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';

  • -- GRANT ALL PRIVILEGES ON *.* TO 'debezium'@'%' ;

  • FLUSH PRIVILEGES;

    • 操作概述
    • 安裝并啟動kafka
    • 安裝并啟動mysql
    • 下載Debezium的mysql連接器http://debezium.io/docs/install/并解壓
    • 安裝debezium,即將解壓目錄寫入classpath變量,例如:export classpath=/root/debezium-connector-mysql/*?
      只在當前shell有效
    • 參考http://debezium.io/docs/connectors/mysql/的配置文件示例,寫好配置文件。
    • 以獨立模式啟動kafka connect,此時debezium會對數(shù)據(jù)庫中的每一個表創(chuàng)建一個topic,消費相應的topic,即可獲取binlog解析信息。
  • //啟動kafka connect

  • bin/connect-standalone.sh config/connect-standalone.properties mysql.properties

  • //查看topic列表

  • bin/kafka-topics.sh --list --zookeeper localhost:2181

  • //消費該主題

  • bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    • 配置文件
  • //mysql.properties

  • name=inventory-connector

  • connector.class=io.debezium.connector.mysql.MySqlConnector

  • database.hostname=192.168.99.100

  • database.port=3306

  • database.user=debezium

  • database.password=dbz

  • database.server.id=184054

  • database.server.name=fullfillment

  • database.whitelist=inventory

  • database.history.kafka.bootstrap.servers=192.168.30.30:9092

  • database.history.kafka.topic=dbhistory.fullfillment

  • include.schema.changes=true

    • 索引

    debezium官網(wǎng)?http://debezium.io/?

    kafka文檔?http://kafka.apache.org/0100/documentation.html

    ?

    八、Kafka Connect的優(yōu)點

    1.對開發(fā)者提供了統(tǒng)一的實現(xiàn)接口
    2.開發(fā),部署和管理都非常方便,統(tǒng)一?
    3.使用分布式模式進行水平擴展,毫無壓力
    4.在分布式模式下可以通過Rest Api提交和管理Connectors
    5.對offset自動管理,只需要很簡單的配置,而不像Consumer中需要開發(fā)者處理
    6.流式/批式處理的支持

    九、第三方資源

    這是已經得到支持的組件,不需要做額外的開發(fā):?https://www.confluent.io/product/connectors/
    括號中的Source表示將數(shù)據(jù)從其他系統(tǒng)導入Kafka,Sink表示將數(shù)據(jù)從Kafka導出到其他系統(tǒng)。
    其他的我沒看,但是JDBC的實現(xiàn)比較的坑爹,是通過primary key(如id)和時間戳(如updateTime)字段,

    來判斷數(shù)據(jù)是否更新,這樣的話應用范圍非常受局限。

    ?

    十、Connector Development Guide

    ?

    在kafka與其他系統(tǒng)間復制數(shù)據(jù)需要創(chuàng)建kafka connect,他們將數(shù)復制到kafka或者從kafka復制到其他系統(tǒng)

    連接器有兩種形式:sourceconnectors將另一個系統(tǒng)數(shù)據(jù)導入kafka,sinkconnectors將數(shù)據(jù)導出到另一個系統(tǒng)

    連接器不執(zhí)行任何數(shù)據(jù)復制:它們的描述復制的數(shù)據(jù),并且負責將工作分配給多個task

    task分為sourcetask與sinktask

    每個task從kafka復制數(shù)據(jù),connect會保證record與schema的一致性完成任務分配,通常record與schema的映射是明顯的,每一個文件對應一個流,流中的每一條記錄利用schema解析并且保存對應的offset,另外一種情況是我們需要自己完成這種映射,比如數(shù)據(jù)庫,表的offset不是很明確(沒有自增id),一種可能的選擇是利用時間(timestamp)來完成增量查詢。

    Streams and Records


    每一個stream是包含key value對的記錄的序列,key value可以是原始類型,可以支持復雜結構,除了array,object,嵌套等。數(shù)據(jù)轉換是框架來完成的,record中包含stream id與offset,用于定時offset提交,幫助當處理失敗時恢復避免重復處理。

    Dynamic Connectors

    所有的job不是靜態(tài)的,它需要監(jiān)聽外部系統(tǒng)的變化,比如數(shù)據(jù)庫表的增加刪除,當一個新table創(chuàng)建時,它必須發(fā)現(xiàn)并且更新配置由框架來分配給該表一個task去處理,當通知發(fā)布后框架會更新對應的task.

    Developing a Simple Connector

    例子很簡單
    在standalone模式下實現(xiàn) SourceConnector/SourceTask 讀取文件并且發(fā)布record給SinkConnector/SinkTask 由sink寫入文件

    Connector Example

    我們將實現(xiàn)SourceConnector,SinkConnector實現(xiàn)與它非常類似,它包括兩個私有字段存放配置信息(讀取的文件名與topic名稱)
    public class FileStreamSourceConnector extends SourceConnector {
    ? ? private String filename;
    ? ? private String topic;
    getTaskClass()方法定義實現(xiàn)執(zhí)行處理的task
    @Override
    public Class getTaskClass() {
    ? ? return FileStreamSourceTask.class;
    }
    下面定義FileStreamSourceTask,它包括兩個生命周期方法start,stop
    @Override
    public void start(Map<String, String> props) {
    ? ? // The complete version includes error handling as well.
    ? ? filename = props.get(FILE_CONFIG);
    ? ? topic = props.get(TOPIC_CONFIG);
    }
    @Override
    public void stop() {
    ? ? // Nothing to do since no background monitoring is required.
    }
    最后是真正核心的方法getTaskConfigs()在這里我們僅處理一個文件,所以我們雖然定義了max task(在配置文件里)但是只會返回一個包含一條entry的list
    @Override
    public List<Map<String, String>> getTaskConfigs(int maxTasks) {
    ? ? ArrayList>Map<String, String>> configs = new ArrayList<>();
    ? ? // Only one input stream makes sense.
    ? ? Map<String, String> config = new Map<>();
    ? ? if (filename != null)
    ? ? ? ? config.put(FILE_CONFIG, filename);
    ? ? config.put(TOPIC_CONFIG, topic);
    ? ? configs.add(config);
    ? ? return configs;
    }
    即使有多個任務,這種方法的執(zhí)行通常很簡單。它只是要確定輸入任務的數(shù)量,這可能需要拉取數(shù)據(jù)從遠程服務,然后分攤。請注意,這個簡單的例子不包括動態(tài)輸入。在下一節(jié)中看到討論如何觸發(fā)任務的配置更新。

    Task Example - Source Task

    實現(xiàn)task,我們使用偽代碼描述核心代碼
    public class FileStreamSourceTask extends SourceTask<Object, Object> {
    ? ? String filename;
    ? ? InputStream stream;
    ? ? String topic;
    ? ? public void start(Map<String, String> props) {
    ? ? ? ? filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
    ? ? ? ? stream = openOrThrowError(filename);
    ? ? ? ? topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
    ? ? }
    ? ? @Override
    ? ? public synchronized void stop() {
    ? ? ? ? stream.close()
    ? ? }
    start方法讀取之前的offset,并且處理新的數(shù)據(jù),stop方法停止stream,下面實現(xiàn)poll方法
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
    ? ? try {
    ? ? ? ? ArrayList<SourceRecord> records = new ArrayList<>();
    ? ? ? ? while (streamValid(stream) && records.isEmpty()) {
    ? ? ? ? ? ? LineAndOffset line = readToNextLine(stream);
    ? ? ? ? ? ? if (line != null) {
    ? ? ? ? ? ? ? ? Map sourcePartition = Collections.singletonMap("filename", filename);
    ? ? ? ? ? ? ? ? Map sourceOffset = Collections.singletonMap("position", streamOffset);
    ? ? ? ? ? ? ? ? records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
    ? ? ? ? ? ? } else {
    ? ? ? ? ? ? ? ? Thread.sleep(1);
    ? ? ? ? ? ? }
    ? ? ? ? }
    ? ? ? ? return records;
    ? ? } catch (IOException e) {
    ? ? ? ? // Underlying stream was killed, probably as a result of calling stop. Allow to return
    ? ? ? ? // null, and driving thread will handle any shutdown if necessary.
    ? ? }
    ? ? return null;
    }
    該方法重復執(zhí)行讀取操作,跟蹤file offset,并且利用上述信息創(chuàng)建SourceRecord,它需要四個字段:source partition,source offset,topic name,output value(包括value及value的schema)

    Sink Tasks

    之前描述了sourcetask實現(xiàn),sinktask與它完全不同,因為前者是拉取數(shù)據(jù),后者是推送數(shù)據(jù)
    public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) { ... }
    public abstract void put(Collection<SinkRecord> records);
    public abstract void flush(Map<TopicPartition, Long> offsets);
    put方法是最重要的方法,接收sinkrecords,執(zhí)行任何需要的轉換,并將其存儲在目標系統(tǒng)。此方法不需要確保數(shù)據(jù)已被完全寫入目標系統(tǒng),然后返回。事實上首先放入緩沖,因此,批量數(shù)據(jù)可以被一次發(fā)送,減少對下游存儲的壓力。sourcerecords中保存的信息與sourcesink中的相同。flush提交offset,它接受任務從故障中恢復,沒有數(shù)據(jù)丟失。該方法將數(shù)據(jù)推送至目標系統(tǒng),并且block直到寫入已被確認。的offsets參數(shù)通常可以忽略不計,但在某些情況保存偏移信息到目標系統(tǒng)確保一次交貨。例如,一個HDFS連接器可以確保flush()操作自動提交數(shù)據(jù)和偏移到HDFS中的位置。

    Resuming from Previous Offsets

    kafka connect是為了bulk 數(shù)據(jù)拷貝工作,它拷貝整個db而不是拷貝某個表,這樣會使用connnect的input或者output隨時改變,source connector需要監(jiān)聽source系統(tǒng)的改變,當改變時通知框架(通過ConnectorContext對象)
    舉例
    if (inputsChanged())
    ? ? this.context.requestTaskReconfiguration();
    當接收到通知框架會即時的更新配置,并且在更新前確保優(yōu)雅完成當前任務
    如果一個額外的線程來執(zhí)行此監(jiān)控,該線程必須存在于連接器中。該線程不會影響connector。然而,其他變化也會影響task,最常見的是輸入流失敗在輸入系統(tǒng)中,例如如果一個表被從數(shù)據(jù)庫中刪除。這時連接器需要進行更改,任務將需要處理這種異常。sinkconnectors只能處理流的加入,可以分配新的數(shù)據(jù)到task(例如,一個新的數(shù)據(jù)庫表)??蚣軙幚砣魏蝛afka輸入的改變,例如當組輸入topic的變化因為一個正則表達式的訂閱。sinktasks應該期待新的輸入流,可能需要在下游系統(tǒng)創(chuàng)造新的資源,如數(shù)據(jù)庫中的一個新的表。在這些情況下,可能會出現(xiàn)輸入流之間的沖突(同時創(chuàng)建新資源),其他時候,一般不需要特殊的代碼處理一系列動態(tài)流??

    Dynamic Input/Output Streams

    FileStream連接器是很好的例子,因為他們很簡單的,每一行是一個字符串。實際連接器都需要具有更復雜的數(shù)據(jù)格式。要創(chuàng)建更復雜的數(shù)據(jù),你需要使用kafka connector數(shù)據(jù)接口:Schema,Struct
    Schema schema = SchemaBuilder.struct().name(NAME)
    ? ? ? ? ? ? ? ? ? ? .field("name", Schema.STRING_SCHEMA)
    ? ? ? ? ? ? ? ? ? ? .field("age", Schema.INT_SCHEMA)
    ? ? ? ? ? ? ? ? ? ? .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    ? ? ? ? ? ? ? ? ? ? .build();
    Struct struct = new Struct(schema)
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?.put("name", "Barbara Liskov")
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?.put("age", 75)
    ? ? ? ? ? ? ? ? ? ? ? ? ? ?.build();
    如果上游數(shù)據(jù)與schema數(shù)據(jù)格式不一致應該在sinktask中拋出異常

    總結

    以上是生活随笔為你收集整理的Kafka Connect简介的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。