基于 Kafka 技术栈构建和部署实时搜索引擎的实践
在 Koverhoop,我們正在保險、醫療、房地產和離線分析領域建立一些大型項目。在我們其中一個多租戶團體保險經紀平臺 klient.ca,我們計劃構建一個強大的搜索功能,希望能在用戶輸入內容的同時同步呈現搜索結果。下面是我們能夠實現的效果,我將在這篇文章討論這一功能的核心基礎設施,包括如何完全自動化部署及如何快速完成構建工作。
?
來自作者的動圖: 搜索能力
?
這個系列文章分為兩部分,我將分別討論以下內容:
-
第 1 部分:了解用于支持此搜索能力的技術棧,并使用 Docker 和 Docker-compose 進行部署(本文)
-
第 2 部分:使用 Kubernetes 對這些服務進行可伸縮的生產部署(待發布)
問題定義和決策
為了構建一個快速、實時的搜索引擎,我們必須做出某些設計決策。我們使用 Postgres 作為主數據庫,因此有以下選項可以使用:
?
直接在 Postgres 數據庫中查詢我們在搜索欄中鍵入的每個字符。😐
使用一個高效的搜索數據庫,如 Elasticsearch。🤔
?
考慮到我們已經是一個多租戶應用程序,同時被搜索的實體可能需要大量的關聯操作(如果我們使用 Postgres)且預計規模也相當大,因此我們決定不使用以前直接查詢數據庫的方案。
?
因此,我們必須決定一種可靠、高效的方式,將數據從 Postgres 實時遷移到 Elasticsearch。接下來需要作出以下決定:
使用 Logstash 定期查詢 Postgres 數據庫并將數據發送到 Elasticsearch。😶
在我們的應用程序中使用 Elasticsearch 客戶端,在 Postgres 和 Elasticsearch 中同時對數據進行 CRUD 操作。🧐?
使用基于事件的流引擎,從 Postgres 的預寫日志中提取事件,將它們導入到流處理服務器,并將其接收到 Elasticsearch。🤯
?
選項 1 因為不是實時的,所以很快就被排除了,而且即使我們以較短的間隔進行查詢,也會給 Postgres 服務器帶來明顯的壓力。對于其他兩種選擇,不同的公司做出的決定可能不一樣。在我們的場景里如果選擇選項 2,我們可以預見到一些問題:如果 Elasticsearch 在確認更新時速度很慢,這可能會減慢我們應用程序的速度,或者在不一致的情況下,我們要如何對單個或一組事件的插入進行重試?
?
因此,我們決定構建一個基于事件隊列的基礎設施。還因為我們已經計劃了一些適合基于事件的未來場景和服務,比如通知服務、數據倉庫、微服務架構等。事不宜遲,讓我們直接開始解決方案及所使用服務的基本介紹吧。
服務簡介
為了實現基于事件的流基礎設施,我們決定使用 Confluent Kafka 技術棧。
?
以下是我們整合的服務:
來源:Confluent?公司
?
Apache Kafka:Kafka 是 Confluent 平臺的核心。它是一個基于開源的分布式事件流平臺。它將是數據庫事件(插入、更新和刪除)的主存儲區域。
?
Kafka Connect:我們使用 Kafka-Connect 從 Debezium 的 Postgres 連接器獲取 Kafka 的數據,該連接器從 Postgres WAL 文件中獲取事件。
?
在接收端,我們使用 ElasticSearch 連接器處理數據并將其加載到 ElasticSearch 中。Connect 既可以作為一個獨立軟件運行,也可以作為一個生產環境容錯且可伸縮的服務運行。
?
ksqlDB:ksqlDB 允許在 Kafka 之上構建一個流處理應用程序。它在內部使用 Kafka-streams 并在事件進來時進行轉換,我們使用它來豐富特定流的事件,其中包括已經在 Kafka 持久存在的其他表的事件,這些事件可能與搜索功能相關,例如 root 表中的tenant_id。
自作者的圖片:基于 Apache Kafka 的 ksqlDB
?
使用 ksqlDB,只需編寫 SQL 查詢來過濾、聚合、關聯和填充數據即可。例如,假設我們正在接收一個關于兩個主題的事件流,其中包括與brands和brand_products相關的信息??紤]到這是一個多租戶數據源,我們需要使用 tenant_id 來填充 brand_product,而 tenant_id目前只與brands相關聯。然后,我們可以使用這些填充后的記錄,并將它們以非標準化的形式保存在 Elasticsearch 中(以便進行搜索)。
?
我們可以使用一個主題來設置 KStream:
CREATE STREAM "brands"WITH ( kafka_topic = 'store.public.brands', value_format = 'avro');復制代碼
?
為了只使用其中幾列并按 id 對數據流分區,我們可以創建一個名為 enriched_brands 的新數據流:
CREATE STREAM "enriched_brands"WITH ( kafka_topic = 'enriched_brands') AS SELECT CAST(brand.id AS VARCHAR) as "id", brand.tenant_id as "tenant_id", brand.name as "name" FROM "brands" brand PARTITION BY CAST(brand.id AS VARCHAR) EMIT CHANGES;復制代碼
?
然后可以通過 KTable 中的最新偏移量來實現事件集合。我們使用這個功能是為了將brand事件的當前狀態與其他流關聯起來。
CREATE TABLE "brands_table"AS SELECT id as "id", latest_by_offset(tenant_id) as "tenant_id" FROM "brands" group by id EMIT CHANGES;復制代碼
現在我們添加了一個含有brand_id 字段的 brand_products 的新流,但沒有tenant_id 字段。
CREATE STREAM "brand_products" WITH ( kafka_topic = 'store.public.brand_products', value_format = 'avro' );復制代碼
我們可以使用以下關聯查詢向 brand_products填充 tenant_id。
CREATE STREAM "enriched_brand_products" WITH ( kafka_topic = 'enriched_brand_products’ ) AS SELECT "brand"."id" as "brand_id", "brand"."tenant_id" as "tenant_id", CAST(brand_product.id AS VARCHAR) as "id", brand_product.name AS "name" FROM "brand_products" AS brand_product INNER JOIN "brands_table" "brand" ON brand_product.brand_id = "brand"."id" PARTITION BY CAST(brand_product.id AS VARCHAR) EMIT CHANGES;復制代碼
?
Schema 注冊表:它在 Kafka 的上層,用于存儲你在 Kafka 中提取的事件的元數據。它基于 AVRO 模式,并提供 REST 接口來存儲和查詢它們。它有助于確保一些 Schema 兼容性檢查及其隨時間發生的演變。
配置技術棧
我們使用 Docker 和 Docker-compose 來配置和部署服務。下面是準備用于構建服務所寫的 docker-compose 文件,將運行 Postgres,Elasticsearch,和 Kafka 相關的服務。下面我還將解釋提到的每一種服務。
Postgres 和 Elasticsearch
postgres: build: services/postgres container_name: oeso_postgres volumes: - database:/var/lib/postgresql/data env_file: - .env ports: - 5432:5432 networks: - project_network復制代碼
用于 Postgres 的 Docker-compose 服務
?
elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.10.0 container_name: elasticsearch volumes: - ./services/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:ro - elasticsearch-database:/usr/share/elasticsearch/data env_file: - .env ports: - "9200:9200" - "9300:9300" networks: - project_network復制代碼
用于 Elasticsearch 的 Docker-compose 服務
?
為了從源數據庫中流式的導出事件,我們需要啟用邏輯解碼以便從其日志中進行復制。在 Postgres 的例子中,這些日志被稱為 Write-Ahead Logs (WAL) ,它們被寫入一個文件中。我們需要一個邏輯解碼插件,在我們的例子中,wal2json 用來提取關于持久數據庫更改的易于閱讀的信息,以便它可以被作為事件發送到 Kafka。
?
為了配置所需的擴展,你可以參考這個 Postgres Dockerfile文件。
?
對于 Elasticsearch 和 Postgres,我們需要在環境文件中指定一些必要的變量來設置它們,如用戶名、密碼等。
Zookeeper
zookeeper: image: confluentinc/cp-zookeeper:6.0.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 networks: - project_network復制代碼
總的來說,Zookeeper 扮演 Kafka 這樣的分布式平臺的中心服務,它存儲所有元數據,如 Kafka 節點狀態,并持續跟蹤主題或分區。
?
即便已經有了在無 zookeeper 的情況下運行 Kafka的替代計劃,但是目前它還是管理集群所必須的。
Kafka Broker
broker: image: confluentinc/cp-enterprise-kafka:6.0.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 networks: - project_network復制代碼
為了簡單起見,我們將配置一個單節點 Kafka 集群。我將在本系列的第 2 部分中討論關于多階段集群的更多內容。
?
了解我們為 Kafka Broker 所做的一些配置尤其重要。
?
監聽器(Listeners)
?
因為 Kafka 被設計成一個分布式平臺,我們需要提供一些明確的方式來允許 Kafka Broker 彼此在內部通信,并基于您的網絡結構與其他客戶端進行外部通信。因此我們使用監聽器來完成這個任務,監聽器是主機、端口和協議的組合。
?
-
KAFKA_LISTENERS
這是一個可以由 KAFKA 綁定的網絡端口列表,由主機、端口和協議組合成。默認情況下,它被設置為?0.0.0.0,即監聽所有端口。
?
-
KAFKA_ADVERTISED_LISTENERS
這個值同樣是主機和端口的組合,客戶端將使用它來連接 KAFKA Broker。因此,如果客戶端在 docker 中,它可以使用 broker:9092連接到 broker,如果在 docker 外,則返回 localhost:9092來建立和 broker 的連接。我們還需要提到監聽器名稱,其才能被映射到恰當的協議以建立連接。
?
-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
這里我們將用戶定義的監聽器名稱映射到希望用于通信的協議;它可以是PLAINTEXT(未加密)或 SSL (加密的)。這些名字在 KAFKA_LISTENERS 和 KAFKA_ADVERTISED_LISTENERS 中被進一步與 host/ip 一起使用,以便使用恰當的協議。
由于我們只配置了單節點的 Kafka 集群,因此返回的或者說發送給任何客戶端的推薦地址都將是自身這同一 broker。
Schema 注冊(Schema-Registry)
schema-registry: image: confluentinc/cp-schema-registry:6.0.0 hostname: schema-registry container_name: schema-registry depends_on: - zookeeper - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181" networks: - project_network復制代碼
對于單節點 schema 注冊,我們指定用來連接 zookeeper 的字符串,Kafka 用它存儲與 schema 相關的數據。
Kafka-Connect
connect: image: confluentinc/cp-kafka-connect:6.0.0 hostname: connect container_name: connect volumes: - "./producers/debezium-debezium-connector-postgresql/:/usr/share/confluent-hub-components/debezium-debezium-connector-postgresql/" - "./consumers/confluentinc-kafka-connect-elasticsearch/:/usr/share/confluent-hub-components/confluentinc-kafka-connect-elasticsearch/" depends_on: - zookeeper - broker - schema-registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: "broker:9092" KAFKA_HEAP_OPTS: "-Xms256M -Xmx512M" CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181" CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.1.jar CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR networks: - project_network復制代碼
我們看到一些新的參數,比如:
-
CONNECT_BOOTSTRAP_SERVERS:一組主機和端口組合,用于建立到 Kafka 集群的初始連接
-
CONNECT_KEY_CONVERTER:用于將鍵(key)從connect格式序列化為與 Kafka 兼容的格式。類似地,對于 CONNECT_VALUE_CONVERTER,我們使用 AvroConverter 進行序列化。
?
映射大量 source 和 sink 連接器插件并在 CONNECT_PLUGIN_PATH 中指定它們是非常的重要。
ksqlDB
ksqldb-server: image: confluentinc/cp-ksqldb-server:6.0.0 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - schema-registry ports: - "8088:8088" volumes: - "./producers/debezium-debezium-connector-postgresql/:/usr/share/kafka/plugins/debezium-debezium-connector-postgresql/" - "./consumers/confluentinc-kafka-connect-elasticsearch/:/usr/share/kafka/plugins/confluentinc-kafka-connect-elasticsearch/" environment: KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_BOOTSTRAP_SERVERS: "broker:9092" KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" KSQL_KSQL_STREAMS_MAX_TASK_IDLE_MS: 2000 KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster" KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:9092" KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter" KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter" KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs" KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets" KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses" KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins" networks: - project_network復制代碼
如果不打算使用 Kafka-Connect,并且不需要獨立于 ksql擴展 Kafka-Connect,那么可以為 ksql設置 embedded-connect配置,這將暴露來自 ksqldb-server的連接點。
?
除此之外,還有一個環境變量需要考慮:
-
KSQL_KSQL_STREAMS_MAX_TASK_IDLE_MS:在當前版本的 ksqlDB,對于流式表關聯,關聯的結果可能變成不確定的,即如果在流事件之前還沒有創建或更新被關聯的表中的實時事件,那您可能無法關聯成功。當流中的某個事件在某個特定時間戳到達時,配置這個環境變量可以做一些等待讓這個事件加載到表中。這提高了關聯的可預測性,但可能會導致某些性能下降。在這里我們正在努力改善這一點。
?
實際上,如果你不能清楚地理解上面的內容,我建議你現在就使用這個配置,因為它很有效;它實際上需要另一篇文章來詳細討論時間同步,或者如果你仍然好奇,你可以觀看這個由來自 Confluent 的 Matthias j. Sax 制作的視頻。
ksqldb-cli: image: confluentinc/cp-ksqldb-cli:6.0.0 container_name: ksqldb-cli depends_on: - broker - ksqldb-server entrypoint: /bin/sh tty: true networks: - project_network復制代碼
在測試或開發環境中,使用 ksqldb-cli服務來嘗試和測試流非常方便。即使在生產環境中,如果您想探索事件流或 Ktables,或者手動創建或過濾流,也可以這樣做。盡管如此,還是建議您使用 ksql 或 kafka 客戶端或其 REST 端點自動創建流、表或主題,這些我們將在下面進行討論。
圖片由作者提供:目前為止對我們的架構進行的更詳細觀察
初始化數據
流
streams-init: build: jobs/streams-init container_name: streams-init depends_on: - zookeeper - broker - schema-registry - ksqldb-server - ksqldb-cli - postgres - elasticsearch - connect env_file: - .env environment: ZOOKEEPER_HOSTS: "zookeeper:2181" KAFKA_TOPICS: "brands, brand_products" networks: - project_network復制代碼
這個服務的目的是進行流初始化和 Kafka 內部配置,以及我們正在使用的其他服務。在部署時,我們不希望在服務器上手動創建主題、流、連接等。因此,我們使用為每個服務提供的 REST 服務,并編寫 shell 腳本來自動化這個過程。
?
我們的配置腳本如下所示:
#!/bin/bash# Setup ENV variables in connectors json filessed -i "s/POSTGRES_USER/${POSTGRES_USER}/g" connectors/postgres.jsonsed -i "s/POSTGRES_PASSWORD/${POSTGRES_PASSWORD}/g" connectors/postgres.jsonsed -i "s/POSTGRES_DB/${POSTGRES_DB}/g" connectors/postgres.jsonsed -i "s/ELASTIC_PASSWORD/${ELASTIC_PASSWORD}/g" connectors/elasticsearch.json# Simply wait until original kafka container and zookeeper are started.export WAIT_HOSTS=zookeeper:2181,broker:9092,schema-registry:8081,ksqldb-server:8088,elasticsearch:9200,connect:8083export WAIT_HOSTS_TIMEOUT=300/wait# Parse string of kafka topics into an array# https://stackoverflow.com/a/10586169/4587961kafkatopicsArrayString="$KAFKA_TOPICS"IFS=', ' read -r -a kafkaTopicsArray <<< "$kafkatopicsArrayString"# A separate variable for zookeeper hosts.zookeeperHostsValue=$ZOOKEEPER_HOSTS# Terminate all queriescurl -s -X "POST" "http://ksqldb-server:8088/ksql" \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -d '{"ksql": "SHOW QUERIES;"}' | \ jq '.[].queries[].id' | \ xargs -Ifoo curl -X "POST" "http://ksqldb-server:8088/ksql" \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -d '{"ksql": "TERMINATE 'foo';"}' # Drop All Tablescurl -s -X "POST" "http://ksqldb-server:8088/ksql" \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -d '{"ksql": "SHOW TABLES;"}' | \ jq '.[].tables[].name' | \ xargs -Ifoo curl -X "POST" "http://ksqldb-server:8088/ksql" \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -d '{"ksql": "DROP TABLE \"foo\";"}'# Drop All Streamscurl -s -X "POST" "http://ksqldb-server:8088/ksql" \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -d '{"ksql": "SHOW STREAMS;"}' | \ jq '.[].streams[].name' | \ xargs -Ifoo curl -X "POST" "http://ksqldb-server:8088/ksql" \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -d '{"ksql": "DROP STREAM \"foo\";"}' # Create kafka topic for each topic item from split array of topics.for newTopic in "${kafkaTopicsArray[@]}"; do # https://kafka.apache.org/quickstart curl -X DELETE http://elasticsearch:9200/enriched_$newTopic --user elastic:${ELASTIC_PASSWORD} curl -X DELETE http://schema-registry:8081/subjects/store.public.$newTopic-value kafka-topics --create --topic "store.public.$newTopic" --partitions 1 --replication-factor 1 --if-not-exists --zookeeper "$zookeeperHostsValue" curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @schemas/$newTopic.json http://schema-registry:8081/subjects/store.public.$newTopic-value/versionsdonecurl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d { "ksql": "CREATE STREAM \\"brands\\" WITH (kafka_topic = \'store.public.brands\', value_format = \'avro\');", "streamsProperties": {} }'curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d { "ksql": "CREATE STREAM \\"enriched_brands\\" WITH ( kafka_topic = \'enriched_brands\' ) AS SELECT CAST(brand.id AS VARCHAR) as \\"id\\", brand.tenant_id as \\"tenant_id\\", brand.name as \\"name\\" from \\"brands\\" brand partition by CAST(brand.id AS VARCHAR) EMIT CHANGES;", "streamsProperties": {} }'curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d { "ksql": "CREATE STREAM \\"brand_products\\" WITH ( kafka_topic = \'store.public.brand_products\', value_format = \'avro\' );", "streamsProperties": {} }'curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d { "ksql": "CREATE TABLE \\"brands_table\\" AS SELECT id as \\"id\\", latest_by_offset(tenant_id) as \\"tenant_id\\" FROM \\"brands\\" group by id EMIT CHANGES;", "streamsProperties": {} }'curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d { "ksql": "CREATE STREAM \\"enriched_brand_products\\" WITH ( kafka_topic = \'enriched_brand_products\' ) AS SELECT \\"brand\\".\\"id\\" as \\"brand_id\\", \\"brand\\".\\"tenant_id\\" as \\"tenant_id\\", CAST(brand_product.id AS VARCHAR) as \\"id\\", brand_product.name AS \\"name\\" FROM \\"brand_products\\" AS brand_product INNER JOIN \\"brands_table\\" \\"brand\\" ON brand_product.brand_id = \\"brand\\".\\"id\\" partition by CAST(brand_product.id AS VARCHAR) EMIT CHANGES;", "streamsProperties": {} }'curl -X DELETE http://connect:8083/connectors/enriched_writercurl -X "POST" -H "Content-Type: application/json" --data @connectors/elasticsearch.json http://connect:8083/connectorscurl -X DELETE http://connect:8083/connectors/event_readercurl -X "POST" -H "Content-Type: application/json" --data @connectors/postgres.json http://connect:80復制代碼
這就是我們目前的工作方式:
-
在運行任何任務之前,我們確保所有的服務都準備好了;
-
我們需要確保主題在 Kafka 上已存在,或者我們創建新的主題;
-
即使有 schema 更新,我們的數據流也應該是可用的;
-
當底層數據 srouce 或 sink 的密碼或版本更改,需要再次創建連接。
?
共享這個配置腳本的目的只是為了演示一種自動化這些 pipeline 的方法。完全相同的配置可能并不適合您,但是自動化工作流和避免在任何環境中的進行手工部署的想法始終是一樣的。
?
為了讓這個數據基礎設施能夠真正快速地運行起來,請參考 Github 倉庫:
behindthescenes-group/oesophagus
?
在你的終端中克隆代碼庫并執行以下操作:
cp default.env .envdocker-compose up -d復制代碼
?
在 Postgres 數據庫 store中創建 brands 和 brand_products 表:
CREATE TABLE brands ( id serial PRIMARY KEY, name VARCHAR (50), tenant_id INTEGER);CREATE TABLE brand_products ( id serial PRIMARY KEY, brand_id INTEGER, name VARCHAR(50));復制代碼
在brands表中插入一些記錄:
INSERT INTO brands VALUES(1, 'Brand Name 1', 1);INSERT INTO brands VALUES(2, 'Brand Name 2', 1);INSERT INTO brands VALUES(3, 'Brand Name 3', 2);INSERT INTO brands VALUES(4, 'Brand Name 4', 2);復制代碼
然后brand_products表中的一些記錄:
INSERT INTO brand_products VALUES(1, 1, 'Product Name 1');INSERT INTO brand_products VALUES(2, 2, 'Product Name 2');INSERT INTO brand_products VALUES(3, 3, 'Product Name 3');INSERT INTO brand_products VALUES(4, 4, 'Product Name 4');INSERT INTO brand_products VALUES(5, 1, 'Product Name 5');復制代碼
在 Elasticsearch 的中查看填充了tenant_id 的brand_products :
curl localhost:9200/enriched_brand_products/_search --user elastic:your_password復制代碼
我將持續為上述代碼庫做出貢獻:添加在 Kubernetes 部署多節點 Kafka 基礎設施的配置,編寫更多連接器,使用期望的服務實現即插即用架構的框架。請在這里自由的提交貢獻,或讓我知道在你在當前配置中所遇到的任何數據工程問題。
下一步
我希望這篇文章能給你一個關于部署和運行完整 Kafka 技術棧的清晰思路,這是一個構建實時流處理應用程序的基礎且有效的示例。
?
根據產品或公司的自身特點,部署過程根據需要可能會有所不同。我還計劃在本系列的下一部分中就這樣一個系統在可伸縮性方面進行探討,那將是關于在相同使用場景下如何在 Kubernetes 上部署這樣的基礎設施的討論。
?
英文原文鏈接:https://towardsdatascience.com/enabling-a-powerful-search-capability-building-and-deploying-a-real-time-stream-processing-etl-a27ecb0ab0ae
總結
以上是生活随笔為你收集整理的基于 Kafka 技术栈构建和部署实时搜索引擎的实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用Spring特性优雅书写业务代码
- 下一篇: HTTP访问一个网站的过程详解