安裝kafkaphp
yum search jdk yum -y install java-1.8.0-openjdk.i686 java-1.8.0-openjdk-devel.i686 wget http://mirror.its.dal.ca/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz tar -xzvf kafka_2.11-1.0.0.tgz cd kafka_2.11-1.0.0/bin #啓動zookeeper ./zookeeper-server-start.sh ../config/zookeeper.properties & #啓動kafka ./kafka-server-start.sh ../config/server.properties & ps -ef | grep kafka 建立一個叫"test1234"的topic,它只有一個分區,一個副本: ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1234 能夠用list查看建立的topic,當前建立了4個topic ./kafka-topics.sh --list --zookeeper localhost:2181 發送消息。運行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端 ./kafka-console-producer.sh --broker-list localhost:9092 --topic test1234 開啓consumer,能夠讀取到剛纔發出的消息並輸出。 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1234 --from-beginning
安裝可視化工具html
wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.zip unzip apache-maven-3.5.4-bin.zip mv apache-maven-3.5.4 apache-maven vim /etc/profile export MAVEN_HOME=/root/dev/apache-maven-3.5.4 export PATH=$PATH:$MAVEN_HOME/bin source /etc/profile wget https://github.com/HomeAdvisor/Kafdrop/archive/kafdrop-2.0.0.zip unzip kafdrop-2.0.0.zip cd kafdrop mvn clean package java -jar ./target/kafdrop-2.0.0.jar --zookeeper.connect=127.0.0.1:2181 --server.port=9999
安裝擴展java
wget https://github.com/edenhill/librdkafka/archive/master.zip unzip master.zip cd librdkafka-master/ ./configure make sudo make install git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure --with-php-config=/usr/local/php/bin/php-config make all -j 5 sudo make install vi /usr/local/php/etc/php.ini extension=rdkafka.so
生產者git
$conf = new \RdKafka\Conf(); $this->conf->set('message.send.max.retries', 3); $conf->set('socket.timeout.ms', 50);//網絡請求的超時時間,默認60s,改爲50ms $conf->set('request.timeout.ms', 20000);//生產者請求超時時間,默認5s改爲10s $conf->set('topic.metadata.refresh.sparse', true);//僅獲取本身用的元數據 減小帶寬 $conf->set('topic.metadata.refresh.interval.ms', 600000);//設置刷新元數據時間間隔爲600s 減小帶寬 $conf->set('log.connection.close', 'false'); if (function_exists('pcntl_sigprocmask')) { pcntl_sigprocmask(SIG_BLOCK, array(SIGIO)); $this->conf->set('internal.termination.signal', SIGIO);//設置kafka客戶端線程在其完成後當即終止 } else { $this->conf->set('queue.buffering.max.ms', 1);//確保消息儘快發送 } $conf->setDrMsgCb(function ($kafka, $message) {//每次都會調用 if ($message->err) { // message permanently failed to be delivered $error_info = rd_kafka_err2str ($message->err); } else { // message successfully delivered print_r($message); } }); $conf->setErrorCb(function ($kafka, $err, $reason) {//發送失敗後調用 printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason); }); $rk = new \RdKafka\Producer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers('IP:PORT'); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('message.timeout.ms', 100);//設置超時時間避免發送不成功長期堵住 $topic = $rk->newTopic('TOPIC_ID',$topicConf); $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'message'); $rk->poll(0);//當即調用DrMsgCb
消費者github
<?php $config = require('config.php'); if(!extension_loaded('Rdkafka')){ //若是擴展沒有被加載 file_put_contents("./extension.txt", "RdKafka extension is not load",FILE_APPEND); } $conf = new \RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { if ($message->err) { $error_info = rd_kafka_err2str ($message->err); echo $error_info; } else { } }); $conf->setErrorCb(function ($kafka, $err, $reason) { printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason); }); $rk = new \RdKafka\Consumer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers($config['kafka']['consumer']['brokers']); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $topicConf->set('auto.commit.enable', true); $topicConf->set('offset.store.path', './kafka_offset'); $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.sync.interval.ms', 0); $topic = $rk->newTopic($config['kafka']['consumer']['topic'], $topicConf); do{ $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); $msg = $topic->consume(0, 1000); if($msg != null && $msg->err == 0){ $data = $msg->payload; } $topic->consumeStop(0); }while(true);
return array( 'kafka' => array( 'produce' => array( 'brokers' => '127.0.0.1:9092', 'topic' => '1' ), 'consumer' => array( 'brokers' => '127.0.0.1:9092', 'topic' => '1' ) ), );
可配置屬性shell
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdapache
文檔bootstrap
https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.htmlvim
可視化工具https://github.com/HomeAdvisor/Kafdropbash
shell命令 http://www.cnblogs.com/xiaodf/p/6093261.html#3