浪尖 浪尖聊大數據 java
kafka簡介node
kafka的重要做用:apache
發佈和訂閱bootstrap
像消息傳遞系統同樣讀寫數據流。windows
處理緩存
編寫實時響應事件的可伸縮流處理應用程序安全
存儲系統架構
將數據流安全地存儲在分佈式的,副本的,容錯存儲系統。kafka常見的企業應用。Spark Structured Streaming,kafka Sql併發
企業中常見的kafka架構圖:socket
本文主要內容是講解kafka單節點的安裝,集羣的安裝部署,集羣安裝過程當中的重要配置,錯誤排查監控等內容。但願幫助你們快速入門。
一 下載安裝
因爲kafka依賴於Zookeeper,第一步要先下載安裝Zookeeper
tar -zxvf zookeeper-3.4.5.tar.gz -C /opt/modules/
複製配置文件:
cp conf/zoo_sample.cfg conf/zoo.cfg
配置數據存儲目錄:
dataDir=/opt/modules/zookeeper-3.4.5/data
建立數據存儲目錄:
mkdir /opt/modules/zookpeer-3.4.5/data
啓動
bin/zkServer.sh start
查看狀態
bin/zkServer.sh status
kafka下載版本是0.11.0.1,連接:
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
tar -xzf kafka_2.11-0.11.0.1.tgz
cd kafka_2.11-0.11.0.1
二 啓動server
啓動在前臺,關閉窗口或者ctrl+c就會關閉。
bin/kafka-server-start.sh config/server.properties
啓動在後臺
nohup /opt/modules/kafka_2.11-0.11.0.1/bin/kafka-server-start.sh /opt/modules/kafka_2.11-0.11.0.1/config/server.properties >/dev/null 2>&1 &
三 建立topic並測試
建立一個名字爲test的topic,僅一個分區,一個副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
注意:副本數不能超過kafka Broker數目,不然會報錯。
查看topic信息
bin/kafka-topics.sh --list --zookeeper localhost:2181
另外,上面採用的是顯示的建立topic,也能夠配置Broker在往不存在的Broker發數據的時候自動建立topic。
啓動生產者併發送消息測試:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
啓動消費者,接受消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
注意,這條指令會記錄本身的偏移到Zookeeper或者kafka,因此假如每次都原封不動的執行這條指令會重複消費歷史數據,想要從上次斷開處消費消息,只須要執行的時候去掉from-beginning
四 設置多節點集羣
上面測試的例子是單節點,單節點無需作什麼處理直接啓動便可,可是生產中單節點是知足不了咱們的需求的,因此咱們要學會和了解如何部署多節點集羣。
因爲測試機器能用資源有限,就用單節點去部署三個kafka服務。
要注意的是,咱們的數據存儲目錄要不相同,端口也要不一樣,Broker id也要惟一。這三個要求知足以後就能夠去啓動kafka服務了。
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
修改配置文件以下:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
啓動各個服務實例
nohup bin/kafka-server-start.sh config/server-1.properties >/dev/null 2>&1 &
nohup bin/kafka-server-start.sh config/server-2.properties >/dev/null 2>&1 &
因爲如今咱們有三個Broker實例,雖然在同一臺機器。這時候咱們就能夠建立副本數1<x<4的topic,命令以下:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看咱們topic的描述信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
上面的是對輸出的解釋,第一行是一個關於全部分區的概覽。每個額外的行都會顯示一個分區的信息。建立topic時候,因爲咱們只給定了一個分區,因此僅僅會有一行。
1),leader :leader表明給定分區負責讀寫的Broker節點。每一個Broker節點都會是部分隨機選擇分區的leader。
2),replicas: 真正複製給定分區日誌數據的node列表,包括leader和Follower。
3),isr: 在同步副本的集合。Isr是副本集合的子集,表明着當前是存活的而且能跟上leader的步伐。
在例子裏,Brokerid 爲1的是分區的leader。
對於,在單節點安裝的時候建立的topic,咱們依然能夠查看其信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
因爲當時建立的時候,只有Broker id爲0的節點,因此test分區只會存在該節點。假如咱們想將該分區遷移到其它節點,也能夠實現,該內容就不在這裏講了。
Producer幾條數據給my-replicated-topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
消費出來
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
測試,kafka的容錯,能夠將Broker id爲1的kafka實例停掉:
ps aux | grep server-1.properties
kill -9 pid
查看topic的信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
消費剛剛已經生產進去的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
五 重要配置詳解
1, 部署一個kafka集羣要修改的配置點
統一的Zookeeper集羣,逗號隔開的Zookeeper集羣
zookeeper.connect=localhost:2181
惟一的Broker id,大於等於0的整數
broker.id=0
數據存儲目錄,能夠是多目錄,逗號隔開
log.dirs=/tmp/kafka-logs
2, 問題排查及注意事項
常常有人在技術交流羣裏,@我,說浪尖,我代碼在windows,kafka在虛擬機,明明能夠ping通爲什麼就是不能生產發送消息,並且虛擬機裏面kafka的命令行都是正常的。
實際上就是應該配置Listeners參數。
0.8.2.2版本參數
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
0.9+版本參數
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
3, 配置級別的管理
這個主要是想說的一個參數,就是topic日誌的存儲時間,咱們這個版本默認是
log.retention.hours=168(7天)
log.retention.minutes
log.retention.ms
這樣默認在server.properties配置看起來沒啥問題,實際上,再生成中咱們的數據緩存時間長度要求每每是不同的,好比有些數據量比較大,咱們磁盤優先僅僅想緩存幾個小時,而有些卻想緩存一週或者更久,那麼這個時候就要使用topic級別的配置了。
retention.ms 604800000
修改topic級別配置方式以下
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=128000
六 監控系統講解
Kafka監控,也是咱們生產中常見的,在這裏咱們推薦的監控工具備
Kafka manager
Kafkatools
這些監控工具使用起來很簡單,在這裏就不詳細介紹了。