Kafka 入門 and kafka+logstash 實戰應用

1、基礎理論nginx


這塊是整個kafka的核心不管你是先操做在來看仍是先看在操做都須要多看幾遍。

算法

首先來了解一下Kafka所使用的基本術語apache

Topic
Kafka將消息種子(Feed)分門別類 每一類的消息稱之爲話題(Topic).
Producer
發佈消息的對象稱之爲話題生產者(Kafka topic producer)
Consumer
訂閱消息並處理髮布的消息的種子的對象稱之爲話題消費者(consumers)
Broker
已發佈的消息保存在一組服務器中稱之爲Kafka集羣。集羣中的每個服務器都是一個代理(Broker). 消費者能夠訂閱一個或多個話題並從Broker拉數據從而消費這些已發佈的消息。

讓咱們站的高一點從高的角度來看Kafka集羣的業務處理就像這樣子

wKiom1fhP-vRkftcAAAfOoQwTtk506.png編程

Client和Server之間的通信是經過一條簡單、高性能而且和開發語言無關的TCP協議。除了Java Client外還有很是多的其它編程語言的Client。
bootstrap


話題和日誌  (Topic和Log)

讓咱們更深刻的瞭解Kafka中的Topic。bash

Topic是發佈的消息的類別或者種子Feed名。對於每個TopicKafka集羣維護這一個分區的log就像下圖中的示例服務器

wKioL1fhQHSQG62WAAA-QB83hq8979.png

每個分區都是一個順序的、不可變的消息隊列 而且能夠持續的添加。分區中的消息都被分配了一個序列號稱之爲偏移量(offset)在每一個分區中此偏移量都是惟一的。 Kafka集羣保持全部的消息直到它們過時 不管消息是否被消費了。 實際上消費者所持有的僅有的元數據就是這個偏移量也就是消費者在這個log中的位置。 這個偏移量由消費者控制正常狀況當消費者消費消息的時候偏移量也線性的的增長。可是實際偏移量由消費者控制消費者能夠將偏移量重置爲更老的一個偏移量從新讀取消息。 能夠看到這種設計對消費者來講操做自如 一個消費者的操做不會影響其它消費者對此log的處理。 再說說分區。Kafka中採用分區的設計有幾個目的。一是能夠處理更多的消息不受單臺服務器的限制。Topic擁有多個分區意味着它能夠不受限的處理更多的數據。第二分區能夠做爲並行處理的單元。併發

分佈式(Distribution)

Log的分區被分佈到集羣中的多個服務器上。每一個服務器處理它分到的分區。 根據配置每一個分區還能夠複製到其它服務器做爲備份容錯。 每一個分區有一個leader零或多個follower。Leader處理此分區的全部的讀寫請求而follower被動的複製數據。若是leader宕機其它的一個follower會被推舉爲新的leader。 一臺服務器可能同時是一個分區的leader另外一個分區的follower。 這樣能夠平衡負載避免全部的請求都只讓一臺或者某幾臺服務器處理。異步

生產者(Producers)

生產者往某個Topic上發佈消息。生產者也負責選擇發佈到Topic上的哪個分區。最簡單的方式從分區列表中輪流選擇。也能夠根據某種算法依照權重選擇分區。開發者負責如何選擇分區的算法。
elasticsearch

消費者(Consumers)

一般來說消息模型能夠分爲兩種 隊列和發佈-訂閱式。 隊列的處理方式是 一組消費者從服務器讀取消息一條消息只有其中的一個消費者來處理。在發佈-訂閱模型中消息被廣播給全部的消費者接收到消息的消費者均可以處理此消息。Kafka爲這兩種模型提供了單一的消費者抽象模型 消費者組 consumer group。 消費者用一個消費者組名標記本身。 一個發佈在Topic上消息被分發給此消費者組中的一個消費者。 假如全部的消費者都在一個組中那麼這就變成了queue模型。 假如全部的消費者都在不一樣的組中那麼就徹底變成了發佈-訂閱模型。 更通用的 咱們能夠建立一些消費者組做爲邏輯上的訂閱者。每一個組包含數目不等的消費者 一個組內多個消費者能夠用來擴展性能和容錯。正以下圖所示

wKiom1fhQUrTOmDcAABf0AAq7-s668.png

  2個kafka集羣託管4個分區P0-P32個消費者組消費組A有2個消費者實例消費組B有4個。


正像傳統的消息系統同樣Kafka保證消息的順序不變。 再詳細扯幾句。傳統的隊列模型保持消息而且保證它們的前後順序不變。可是 儘管服務器保證了消息的順序消息仍是異步的發送給各個消費者消費者收到消息的前後順序不能保證了。這也意味着並行消費將不能保證消息的前後順序。用過傳統的消息系統的同窗確定清楚消息的順序處理很讓人頭痛。若是隻讓一個消費者處理消息又違背了並行處理的初衷。 在這一點上Kafka作的更好儘管並無徹底解決上述問題。 Kafka採用了一種分而治之的策略分區。 由於Topic分區中消息只能由消費者組中的惟一一個消費者處理因此消息確定是按照前後順序進行處理的。可是它也僅僅是保證Topic的一個分區順序處理不能保證跨分區的消息前後處理順序。 因此若是你想要順序的處理Topic的全部消息那就只提供一個分區。

Kafka的保證(Guarantees)

生產者發送到一個特定的Topic的分區上的消息將會按照它們發送的順序依次加入


消費者收到的消息也是此順序


若是一個Topic配置了複製因子( replication facto)爲N 那麼能夠容許N-1服務器宕機而不丟失任何已經增長的消息



Kafka官網

http://kafka.apache.org/


做者半獸人
連接http://orchome.com/5
來源OrcHome
著做權歸做者全部。商業轉載請聯繫做者得到受權非商業轉載請註明出處。


2、安裝和啓動


一、下載二進制安裝包直接解壓

tar xf kafka_2.11-0.10.0.1.tgz
cd kafka_2.11-0.10.0.1


二、啓動服務

Kafka須要用到ZooKeepr因此須要先啓動一個ZooKeepr服務端若是沒有單獨的ZooKeeper服務端可使用Kafka自帶的腳本快速啓動一個單節點ZooKeepr實例

bin/zookeeper-server-start.sh config/zookeeper.properties  # 啓動zookeeper服務端實例

bin/kafka-server-start.sh config/server.properties  # 啓動kafka服務端實例


3、基本操做指令


一、新建一個主題topic

建立一個名爲「test」的Topic只有一個分區和一個備份

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


二、建立好以後能夠經過運行如下命令查看已建立的topic信息

bin/kafka-topics.sh --list  --zookeeper localhost:2181


三、發送消息

Kafka提供了一個命令行的工具能夠從輸入文件或者命令行中讀取消息併發送給Kafka集羣。每一行是一條消息。

運行producer生產者,而後在控制檯輸入幾條消息到服務器。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message


四、消費消息

Kafka也提供了一個消費消息的命令行工具,將存儲的信息輸出出來。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message


五、查看topic詳細狀況

bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic peiyinlog

wKiom1fh5DiCDyFHAABNw6sr0ok754.png

Topic: 主題名稱

Partition: 分片編號

Leader: 該分區的leader節點

Replicas: 該副本存在於哪一個broker節點

Isr: 活躍狀態的broker


六、給Topic添加分區

bin/kafka-topics.sh --zookeeper 192.168.90.201:2181 --alter --topic test2 --partitions 20


七、刪除Topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name


主題(Topic)刪除選項默認是關閉的,須要服務器配置開啓它。

delete.topic.enable=true


注:若是須要在其餘節點做爲客戶端使用指令鏈接kafka broker,則須要注意如下兩點(二選一便可)

另 : ( 使用logstash input 鏈接kafka也須要注意 )


一、設置kafka broker 配置文件中 host.name 參數爲監聽的IP地址


二、給broker設置一個惟一的主機名,而後在本機/etc/hosts文件配置解析到本身的IP(固然若是有內網的DNS服務器也行),同時還須要在zk server 和 客戶端的 /etc/hosts 添加broker主機名的解析。 


緣由詳解:


場景假設

broker_server ip 主機名 zookeeper ip 客戶端 ip
192.168.1.2  默認 localhost 192.168.1.4 192.168.1.5
# 此時客戶端向broker發起一些消費:

bin/kafka-console-consumer.sh --zookeeper 192.168.1.4:2181 --topic test2 --from-beginning


這時客戶端鏈接到zookeeper要求消費數據,zk則返回broker的ip地址和端口給客戶端,可是若是broker沒有設置host.name 和 advertised.host.name  broker默認返回的是本身的主機名,默認就是localhost和端口9092,這時客戶端拿到這個主機名解析到本身,操做失敗。


因此,須要配置broker 的host.name參數爲監聽的IP,這時broker就會返回IP。 客戶端就能正常鏈接了。


或者也能夠設置好broker的主機名,而後分別給雙方配置好解析。


4、broker基本配置

#  server.properties

broker.id=0  # broker節點的惟一標識 ID 不能重複。
host.name=10.10.4.1  # 監聽的地址,若是不設置默認返回主機名給zk_server
log.dirs=/u01/kafka/kafka_2.11-0.10.0.1/data  # 消息數據存放路徑
num.partitions=6  # 默認主題(Topic)分片數
log.retention.hours=24  # 消息數據的最大保留時長
zookeeper.connect=10.160.4.225:2181  # zookeeper server 鏈接地址和端口



5、Logstash + Kafka 實戰應用


Logstash-1.51纔開始內置Kafka插件,也就是說用以前的logstash版本是須要手動編譯Kafka插件的,相信也不多人用了。建議使用2.3以上的logstash版本。


一、使用logstash向kafka寫入一些數據


軟件版本:

logstash 2.3.2 

kafka_2.11-0.10.0.1


logstash output 部分配置

output {
  kafka {
    workers => 2
    bootstrap_servers => "10.160.4.25:9092,10.160.4.26:9092,10.160.4.27:9092"
    topic_id => "xuexilog"

}

}


參數解釋 : 

workers:用於寫入時的工做線程

bootstrap_servers:指定可用的kafka broker實例列表

topic_id:指定topic名稱,能夠在寫入前手動在broker建立定義好分片數和副本數,也能夠不提早建立,那麼在logstash寫入時會自動建立topic,分片數和副本數則默認爲broker配置文件中設置的。



二、使用logstash消費一些數據,並寫入到elasticsearch


軟件版本:

logstash 2.3.2 

elasticsearch-2.3.4


logstash 配置文件

input{
    kafka {
        zk_connect => "112.100.6.1:2181,112.100.6.2:2181,112.100.6.3:2181"
        group_id => "logstash"
        topic_id => "xuexilog"
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true

}

}

# 這裏group_id 須要解釋一下,在Kafka中,相同group的Consumer能夠同時消費一個topic,不一樣group的Consumer工做則互不干擾。
# 補充: 在同一個topic中的同一個partition同時只能由一個Consumer消費,當同一個topic同時須要有多個Consumer消費時,則能夠建立更多的partition。

output {
    if [type] == "nginxacclog" {
        elasticsearch {
            hosts => ["10.10.1.90:9200"]
            index => "logstash-nginxacclog-%{+YYYY.MM.dd}"
            manage_template => true
            flush_size => 50000
            idle_flush_time => 10
            workers => 2
}

}

}


三、經過group_id 查看當前詳細的消費狀況

bin/kafka-consumer-groups.sh --group logstash --describe --zookeeper 127.0.0.1:2181

wKiom1fiTL-xhYo5AABDZmsbids038.png


輸出解釋:

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
消費者組 話題id 分區id 當前已消費的條數 總條數 未消費的條數
相關文章
相關標籤/搜索