依賴 libkafka
yum install rdkafka rdkafka-devel pecl install rdkafka php --ri rdkafka
http://pecl.php.net/package/r... 能夠參閱支持的kafka
鏈接集羣,建立 topic
<?php $rk = new Rdkafka\Producer(); $rk->setLogLevel(LOG_DEBUG); // 連接kafka集羣 $rk->addBrokers(","); // 建立topic $topic = $rk->newTopic("topic_1"); while (true) { $message = "hello kafka " . date("Y-m-d H:i:s"); echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL; try { $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); sleep(2); } catch (\Exception $e) { echo $e->getMessage() . PHP_EOL; } }
,comsumer group
<?php $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(null); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'group_1'); // Initial list of Kafka brokers $conf->set('metadata.broker.list', ','); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // Subscribe to topic 'topic_1' $consumer->subscribe(['topic_1']); echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joining the group after leaving it.)\n"; while (true) { $message = $consumer->consume(3e3); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: sleep(2); case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
消費。php consumer_lowlevel.php [partitonNuo]
<?php if (!isset($argv[1])) { fwrite(STDOUT, "請指定消費分區:"); $partition = (int) fread(STDIN, 1024); } else { $partition = (int) $argv[1]; } $topic = "topic_1"; $conf = new RdKafka\Conf(); // Set the group id. This is required when storing offsets on the broker $conf->set('group.id', 'group_2'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers(','); $topicConf = new RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 2000); // Set the offset store method to 'file' // $topicConf->set('offset.store.method', 'file'); // $topicConf->set('offset.store.path', sys_get_temp_dir()); // Alternatively, set the offset store method to 'broker' $topicConf->set('offset.store.method', 'broker'); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic($topic, $topicConf); // Start consuming partition 0 $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume($partition, 3 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new \Exception($message->errstr(), $message->err); break; } }