在安裝php-kafka 擴展後,就能夠開始編寫 php 消費消息的腳本了,php-rdkafka 擴展提供了幾種消息處理的方式php
這種方式沒有消費組的概念服務器
<?php $rk = new RdKafka\Consumer(); $rk->setLogLevel(LOG_DEBUG); // 指定 broker 地址,多個地址用"," 分割 $rk->addBrokers("192.168.33.1:9092"); $topic = $rk->newTopic("test"); $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); while (true) { // 第一個參數是分區號 // 第二個參數是超時時間 $msg = $topic->consume(0, 1000); if ($msg->err) { echo $msg->errstr(), "\n"; break; } else { echo $msg->payload, "\n"; } }
這種方式能夠指定消費組,一個消費組內,一個consumer 進程只能讀取一個分區,函數
<?php $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional) // 當有新的消費進程加入或者退出消費組時,kafka 會自動從新分配分區給消費者進程,這裏註冊了一個回調函數,當分區被從新分配時觸發 $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // 配置groud.id 具備相同 group.id 的consumer 將會處理不一樣分區的消息,因此同一個組內的消費者數量若是訂閱了一個topic, 那麼消費者進程的數量多於 多於這個topic 分區的數量是沒有意義的。 $conf->set('group.id', 'myConsumerGroup1'); //添加 kafka集羣服務器地址 $conf->set('metadata.broker.list', '192.168.33.1:9092'); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning //當沒有初始偏移量時,從哪裏開始讀取 $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // 讓消費者訂閱log 主題 $consumer->subscribe(['log']); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } ?>