基於Logstash跑通Kafka仍是須要注意不少東西,最重要的就是理解Kafka的原理。bootstrap
因爲Kafka採用解耦的設計思想,並不是原始的發佈訂閱,生產者負責產生消息,直接推送給消費者。而是在中間加入持久化層——broker,生產者把數據存放在broker中,消費者從broker中取數據。這樣就帶來了幾個好處:ruby
另外,因爲broker採用了主題topic-->分區的思想,使得某個分區內部的順序能夠保證有序性,可是分區間的數據不保證有序性。這樣,消費者能夠以分區爲單位,自定義讀取的位置——offset。架構
Kafka採用zookeeper做爲管理,記錄了producer到broker的信息,以及consumer與broker中partition的對應關係。所以,生產者能夠直接把數據傳遞給broker,broker經過zookeeper進行leader-->followers的選舉管理;消費者經過zookeeper保存讀取的位置offset以及讀取的topic的partition分區信息。測試
因爲上面的架構設計,使得生產者與broker相連;消費者與zookeeper相連。有了這樣的對應關係,就容易部署logstash-->kafka-->logstash的方案了。fetch
啓動zookeeper:架構設計
$zookeeper/bin/zkServer.sh start
啓動kafka:debug
$kafka/bin/kafka-server-start.sh $kafka/config/server.properties &
建立主題:設計
$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1
查看主題:code
$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe
執行生產者腳本:server
$kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello
執行消費者腳本,查看是否寫入:
$kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello
input{ stdin{} } output{ kafka{ topic_id => "hello" bootstrap_servers => "192.168.0.4:9092" # kafka的地址 batch_size => 5 } stdout{ codec => rubydebug } }
logstash配置文件:
input{ kafka { codec => "plain" group_id => "logstash1" auto_offset_reset => "smallest" reset_beginning => true topic_id => "hello" #white_list => ["hello"] #black_list => nil zk_connect => "192.168.0.5:2181" # zookeeper的地址 } } output{ stdout{ codec => rubydebug } }