PHP處理kafka消息隊列

安裝php-kafka 擴展後,就能夠開始編寫 php 消費消息的腳本了,php-rdkafka 擴展提供了幾種消息處理的方式php

低級方式(Low level)

這種方式沒有消費組的概念服務器

<?php

$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
// 指定 broker 地址,多個地址用"," 分割
$rk->addBrokers("192.168.33.1:9092");


$topic = $rk->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);


while (true) {
    // 第一個參數是分區號
    // 第二個參數是超時時間
    $msg = $topic->consume(0, 1000);
    if ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

 

高級方式 (High level)

這種方式能夠指定消費組,一個消費組內,一個consumer 進程只能讀取一個分區,函數

<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
// 當有新的消費進程加入或者退出消費組時,kafka 會自動從新分配分區給消費者進程,這裏註冊了一個回調函數,當分區被從新分配時觸發
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err);
    }
});

// 配置groud.id 具備相同 group.id 的consumer 將會處理不一樣分區的消息,因此同一個組內的消費者數量若是訂閱了一個topic, 那麼消費者進程的數量多於 多於這個topic 分區的數量是沒有意義的。
$conf->set('group.id', 'myConsumerGroup1');

//添加 kafka集羣服務器地址
$conf->set('metadata.broker.list', '192.168.33.1:9092');

$topicConf = new RdKafka\TopicConf();


// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//當沒有初始偏移量時,從哪裏開始讀取
$topicConf->set('auto.offset.reset', 'smallest');


// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// 讓消費者訂閱log 主題
$consumer->subscribe(['log']);


while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>
相關文章
相關標籤/搜索