使用PHP处理Kafka消息
Kafka 是一種高吞吐的分布式消息系統,能夠替代傳統的消息隊列用于解耦合數據處理,緩存未處理消息等,同時具有更高的吞吐率,支持分區、多副本、冗余,因此被廣泛用于大規模消息數據處理應用。
Kafka的特點:
- 以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間復雜度的訪問性能。
- 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸?!緭私?#xff0c;Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)】
- 支持Kafka Server間的消息分區,同時保證每個Partition內的消息順序傳輸。
- 分布式系統,易于向外擴展。所有的producer、broker和consumer都會有多個,均為分布式的。無需停機即可擴展機器。
- 消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。
- 同時支持離線數據處理和實時數據處理。
Kafka的架構:
kafka
Kafka的整體架構非常簡單,producer、broker(kafka)和consumer都可以有多個。Producer,consumer實現Kafka注冊的接口,數據從producer發送到broker,broker承擔一個中間緩存和分發的作用。broker分發注冊到系統中的consumer。broker的作用類似于緩存,即活躍的數據和離線處理系統之間的緩存??蛻舳撕头掌鞫说耐ㄐ?#xff0c;是基于簡單,高性能,且與編程語言無關的TCP協議。
Kafka基本概念:
- Topic:特指Kafka處理的消息源(feeds of messages)的不同分類。
- Partition:Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。
- Message:消息,是通信的基本單位,每個producer可以向一個topic(主題)發布一些消息。
- Producers:消息和數據生產者,向Kafka的一個topic發布消息的過程叫做producers。
- Consumers:消息和數據消費者,訂閱topics并處理其發布的消息的過程叫做consumers。
- Broker:緩存代理,Kafa集群中的一臺或多臺服務器統稱為broker。
Kafka消息發送的流程:
Kafka-Message
?
下面是PHP生產、消費Kafka消息的例子(假設已經配置好Kafka):
1.從zookeeper源碼src/c/src安裝zookeeper c client
cd zookeeper-3.4.8/src/c./configuremake && make install2.編譯php libzookper擴展
git clone https://github.com/Timandes/libzookeeper.git cd libzookeeper phpize ./configure --with-libzookeeper=/usr/local/bin/cli_mt make && make install3.編譯php zookeeper擴展
git clone https://github.com/andreiz/php-zookeeper.git cd php-zookeeper phpize ./configure make && make install4.修改php.ini配置,添加libzookeeper和php-zookeeper擴展
extension=libzookeeper.so extension=zookeeper.soPHP處理Kafka消息:
1.啟動zookeeper和kafka
kafka_2.11-0.10.0.0/bin/zookeeper-server-start.sh --daemon kafka_2.11-0.10.0.0/config/zookeeper.propertieskafka_2.11-0.10.0.0/bin/kafka-server-start.sh kafka_2.11-0.10.0.0/config/server.properties2.創建由2個partition組成的、名為testtopic的topic
kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic testtopic3.composer安裝nmred/kafka-php
composer require "nmred/kafka-php"4.producer.php代碼
<?phprequire_once('./vendor/autoload.php');$produce = \Kafka\Produce::getInstance('localhost:2181', 3000);$produce->setRequireAck(-1); $topicName = 'testtopic'; //獲取到topic下可用的partitions $partitions = $produce->getAvailablePartitions($topicName); $partitionCount = count($partitions); $count = 1; while(true){$message = json_encode(array('uid' => $count, 'age' => $count%100, 'datetime' => date('Y-m-d H:i:s')));//發送消息到不同的partition$partitionId = $count%$partitionCount;$produce->setMessages('testtopic', $partitionId, array($message));$result = $produce->send();var_dump($result);$count++;echo "producer sleeping\n";sleep(1); }5.consumer.php代碼
<?phprequire_once('./vendor/autoload.php');//獲取需要處理的partitionId$partitionId = isset($argv[1]) ? intval($argv[1]) : 0;$consumer = \Kafka\Consumer::getInstance('localhost:2181');$consumer->setGroup('test-consumer-group');$consumer->setPartition('testtopic', $partitionId);$consumer->setFromOffset(true);$consumer->setMaxBytes(102400);while(true){$topic = $consumer->fetch();foreach ($topic as $topicName => $partition) {foreach ($partition as $partId => $messageSet) {foreach ($messageSet as $message) {var_dump($message);}}}echo "consumer sleeping\n";sleep(1);}6.運行php代碼
在3個終端界面分別運行
php producer.phpphp consumer.php 0php consumer.php 17.結果
兩個consumer腳本依次收到producer發送的消息
?
php-kafka-consumer-output
轉自:https://aiddroid.com/kafka-introduction-and-php-kafka-usage/
作者:daos
鏈接:https://www.jianshu.com/p/b9d06f33f060
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。
?
總結
以上是生活随笔為你收集整理的使用PHP处理Kafka消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka删除主题数据和删除主题
- 下一篇: elasticsearch与PHP版本要