php+kafka+zookeeper+logstash

本文主要實現的目標是php鏈接kafka而且成功發送消息給kafka。爲了驗證這個鏈接和發送,另外配置了logstash監聽kafka相對應的消息,而後轉發到redis,原來我不知道對kafka比較陌生,不知道怎麼看裏面的消息內容(我知道安裝包裏有個consumer和producer的腳本) ^ _ ^

消息發送路徑:php->kafka->logstash->redisphp

1.安裝kafkagit

下載地址:https://kafka.apache.org/github

下載解壓後進入根目錄,redis

bin/zookeeper-server-start.sh config/zookeeper.properties &  開啓zookeeper
bin/kafka-server-start.sh config/server.properties & 開啓kafka

另開一個終端而後apache

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka

這樣就建立了一個topic爲kafka的消息通道json

若是這個步驟成功的話,能夠經過另開終端發送消息bootstrap

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka

執行以後就能夠輸入消息發送了。ubuntu

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka --from-beginning

來接受上面終端發送的消息composer

2.安裝php zookeeper擴展,而且用php發送消息ui

安裝擴展以前須要安裝zookeeper的c client,擴展有依賴,步驟以下

1.經過apt-get來安裝(樓主用的是ubuntu)

2.經過源碼安裝,地址:https://github.com/apache/zoo...

下載下來

cd zookeeper

./configure

make && sudo make install

按照如上步驟,/usr/local/bin目錄下就會多一個cli_mt

php的zookeeper的源碼能夠去pecl.php.net下載,而後老步驟

phpize

./configure --with-libzookeeper=/usr/local/bin/cli_mt (若是你安裝擴展的php不是默認的php,則須要帶上--with-php-config參數)

make && sudo make install

最後別忘了添加extension=zookeeper.so到php.ini
3.配置logstash

下載地址:https://www.elastic.co/downlo...

修改配置文件,因爲樓主的logstash版本已是5.2的了,因此又是一陣谷歌,才發現不少網上的配置都是1.2版本的,已經不兼容了。

input{

    kafka{
    
        bootstrap_servers=>"localhost:9092"
        
        topics=>["kafka"]
    
    }

}

output{

    redis{
    
        host=>"127.0.0.1"
        
        port=>6379
        
        key=>"kafka"
        
        data_type=>"list"
        
        password=>"123456"
    
    }

}

4.php發送消息

網上找了一圈,終於找到一個能夠用的

https://github.com/nmred/kafk...

也能夠用

composer require "nmred/kafka-php"

php代碼以下:

require "./vendor/autoload.php";

$produce = \Kafka\Produce::getInstance('10.37.129.2:2181', 3000);

$produce->setRequireAck(-1);

$topicName = "kafka";

$partitions = $produce->getAvailablePartitions($topicName);

$partCount = count($partitions);

var_dump('$partCount:'.$partCount);

$count = 0;

$message = json_encode(array('uid' => $count, 'age' => $count%100, 'datetime' => date('Y-m-d H:i:s')));

//發送消息到不一樣的partition

$partitionId = $count%$partCount;

$produce->setMessages($topicName, $partitionId, array($message));

$result = $produce->send();

var_dump($result);

參考文章:http://blog.kazaff.me/2015/06...

最後附一張截圖
5131116-f950050278fff347.jpeg?

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息