最近項目上有一個須要用到消息隊列的功能,從網上找了一些php相關的kafka使用的教程和博客,大抵都是安裝php的拓展librdkafka(這裏就不講這個拓展的安裝方法了,搜一下仍是有不少教程的),而後直接用這個拓展進行開發,可是我直接用這個拓展開發的時候,不知道爲啥運行不起來,一直報錯(應該是我太菜了,哈哈哈哈哈哈)......我從github上找了一些相關的包想直接用一下,可是發現不少包都是幾年前的了,基本上都是kafka 0.x 版本的。php
我看了一下最多star的一個包,連接是:github.com/weiboad/kaf…git
後面我用了enqueue/rdkafka 這個包,連接:github.com/php-enqueue…github
下面展現一下我測試的代碼,只是能運行起來,可是還沒達到我想要的預期app
生產者composer
$connFactory = new RdKafkaConnectionFactory([
'global' => [
'metadata.broker.list' => '127.0.0.1:9092',
'socket.timeout.ms' => '50'
]
]);
$context = $connFactory->createContext();
$message = $context->createMessage('hello world!');
$topic = $context->createTopic('app');
$context->createProducer()->send($topic, $message);
複製代碼
消費者socket
$config = [
'global' => [
'group.id' => uniqid('', true),
'metadata.broker.list' => '127.0.0.1:9092',
'enable.auto.commit' => 'false',
],
'topic' => [
// 設置從最後一個offset開始讀取消息,不會讀取到以前的消息
'auto.offset.reset' => 'latest',
],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$topic = $context->createTopic('app');
$consumer = $context->createConsumer($topic);
while (true) {
$message = $consumer->receive(30 * 1000);
if (!$message instanceof RdKafkaMessage && !$message instanceof Message) {
var_dump($message);
continue;
}
$consumer->acknowledge($message);
var_dump($message->getBody());
}
複製代碼
這樣是能運行起來的,能正常發送和接受數據,我實際狀況是想作一個隊列,生產者有多個,往一個topic進行數據生產,而後有多個消費者在消費,可是這樣寫有個問題,我在啓動了多個消費者的時候,每一個消費者都會接受到生產者發送過來的消息,更像是羣體發佈羣體訂閱的形式,不是我想要的結果,我去網上找了其餘的教程,有人說只要設置group_id不一樣就能夠了,但個人group_id所有都是隨機的,不太可能同樣,按理來講是能實現的,可是就是不行,也試過用Queue去操做,可是仍是會全部消費者都收到消息。測試
目前就只瞭解到了這個地方,還須要花點時間再看看,若是有大神看到的話,求指點一下!!spa