php的kafka踩坑(二)

接上一篇文章,上次沒有解決的一個問題就是在作一個隊列的時候,存在多消費者消費到同一個消息的狀況,今天終於解決了這個問題,問題的本質是由於運維給我建立的topic是有問題的,他建立的分區數量是0,我今天上容器看了一下,終於發現了,而後刪了本身從新建了一個,具體容器操做kafka的topic教程能夠看我另外一個文檔基於kafka容器操做topicphp

在這裏,咱們從頭開始介紹一下topic(主題),partition(分區),group(分組),consumer(消費者),producer(生產者)的關係app

  • producer,生產者,生產數據
  • consumer,消費者,消費數據
  • topic,簡單點說就是一個隊列,生產者生產數據和消費者消費數據都必須指定一個Topic,就是生產的數據要放到哪一個隊列去給消費者消費
  • partition和group,一個topic能夠配置多個partition,consumer消費數據時是按照group來消費的,kafka確保每一個partition只能由同一個group中的同一個consumer消費,若是想要重複消費,那麼須要其餘的組來消費,因此同一個group的消費者數量應當小於等於partition數量。Zookeerper中保存這每一個topic下的每一個partition在每一個group中消費的offset。(此段介紹引用這篇文章
    • consumer讀取時,會指定讀取的group,同一個消息在同一個group下只會讀取到一次,若是要重複消費數據,須要新建group
    • 若是group只有一個,而且有多個partition,一個consumer時,全部partition裏的消息都會發往該consumer,若是consumer不止一個時,可能會存在有的consumer裏面數據消費的多,有的消費的少,作多消費者的隊列就是用這個特性,當partition數量=consuerm時,消息能夠達到負載均衡。

下面上一下下代碼,修改以後topic的partition是3個,依舊是基於 enqueue/rdkafka 這個包負載均衡

生產者,不指定partition,kafka會自動分配運維

$connFactory = new RdKafkaConnectionFactory([
    'global' => [
        'metadata.broker.list' => '127.0.0.1:9092',
        'socket.timeout.ms' => '50'
    ]
]);
$context = $connFactory->createContext();
$topic = $context->createQueue('app');
for ($i = 0; $i <= 5; $i++) {
    $message = $context->createMessage('hello world!' . $i);
    $context->createProducer()->send($topic, $message);
}
複製代碼

消費者socket

$config = [
    'global' => [
        'group.id' => date('Ymd'), // 指定一個分區,分區名自定義,作隊列分區名必須同樣
        'metadata.broker.list' => '127.0.0.1:9092',
        'enable.auto.commit' => 'false',
    ],
    'topic' => [
        'auto.offset.reset' => 'latest',
    ],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$queue = $context->createQueue('app');
$consumer = $context->createConsumer($queue);
while (true) {
    $message = $consumer->receive(30 * 1000);
    if (!is_object($message)) {
        continue;
    }
    var_dump($message->getBody());
    $consumer->acknowledge($message);
    $consumer->reject($message);
}
複製代碼

啓動三個消費者,運行一次生產者,能夠獲得如下結果spa

消費者1.net

string(13) "hello world!1"
string(13) "hello world!3"
string(13) "hello world!4"
string(13) "hello world!5"
複製代碼

消費者2code

string(13) "hello world!2"
複製代碼

消費者3blog

string(13) "hello world!0"
複製代碼

到此就實現了kafka作隊列的需求了,本文內容就到這裏,相關kafka知識能夠看這篇文章教程

相關文章
相關標籤/搜索