接上一篇文章,上次沒有解決的一個問題就是在作一個隊列的時候,存在多消費者消費到同一個消息的狀況,今天終於解決了這個問題,問題的本質是由於運維給我建立的topic是有問題的,他建立的分區數量是0,我今天上容器看了一下,終於發現了,而後刪了本身從新建了一個,具體容器操做kafka的topic教程能夠看我另外一個文檔基於kafka容器操做topic。php
在這裏,咱們從頭開始介紹一下topic(主題),partition(分區),group(分組),consumer(消費者),producer(生產者)的關係app
下面上一下下代碼,修改以後topic的partition是3個,依舊是基於 enqueue/rdkafka 這個包負載均衡
生產者,不指定partition,kafka會自動分配運維
$connFactory = new RdKafkaConnectionFactory([
'global' => [
'metadata.broker.list' => '127.0.0.1:9092',
'socket.timeout.ms' => '50'
]
]);
$context = $connFactory->createContext();
$topic = $context->createQueue('app');
for ($i = 0; $i <= 5; $i++) {
$message = $context->createMessage('hello world!' . $i);
$context->createProducer()->send($topic, $message);
}
複製代碼
消費者socket
$config = [
'global' => [
'group.id' => date('Ymd'), // 指定一個分區,分區名自定義,作隊列分區名必須同樣
'metadata.broker.list' => '127.0.0.1:9092',
'enable.auto.commit' => 'false',
],
'topic' => [
'auto.offset.reset' => 'latest',
],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$queue = $context->createQueue('app');
$consumer = $context->createConsumer($queue);
while (true) {
$message = $consumer->receive(30 * 1000);
if (!is_object($message)) {
continue;
}
var_dump($message->getBody());
$consumer->acknowledge($message);
$consumer->reject($message);
}
複製代碼
啓動三個消費者,運行一次生產者,能夠獲得如下結果spa
消費者1.net
string(13) "hello world!1"
string(13) "hello world!3"
string(13) "hello world!4"
string(13) "hello world!5"
複製代碼
消費者2code
string(13) "hello world!2"
複製代碼
消費者3blog
string(13) "hello world!0"
複製代碼
到此就實現了kafka作隊列的需求了,本文內容就到這裏,相關kafka知識能夠看這篇文章。教程