zookeeper與Kafka集羣搭建及python代碼測試

Kafka初識

一、Kafka使用背景javascript

在咱們大量使用分佈式數據庫、分佈式計算集羣的時候,是否會遇到這樣的一些問題:
  1. 咱們想分析下用戶行爲(pageviews),以便咱們設計出更好的廣告位
  2. 我想對用戶的搜索關鍵詞進行統計,分析出當前的流行趨勢
  3. 有些數據,存儲數據庫浪費,直接存儲硬盤效率又低 
這些場景都有一個共同點
數據是由上游模塊產生,上游模塊,使用上游模塊的數據計算、統計、分析,這個時候就可使用消息系統,尤爲是分佈式消息系統!
二、Kafka的定義
What is Kafka:它是一個分佈式消息系統,由linkedin使用scala編寫,用做LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。具備高水平擴展和高吞吐量。
三、Kafka和其餘主流分佈式消息系統的對比 
定義解釋:
一、Java 和 scala都是運行在JVM上的語言。
二、erlang和最近比較火的和go語言同樣是從代碼級別就支持高併發的一種語言,因此RabbitMQ天生就有很高的併發性能,可是 有RabbitMQ嚴格按照AMQP進行實現,受到了不少限制。kafka的設計目標是高吞吐量,因此kafka本身設計了一套高性能可是不通用的協議,他也是仿照AMQP( Advanced Message Queuing Protocol   高級消息隊列協議)設計的。 
三、事物的概念:在數據庫中,多個操做一塊兒提交,要麼操做所有成功,要麼所有失敗。舉個例子, 在轉帳的時候付款和收款,就是一個事物的例子,你給一我的轉帳,你轉成功,而且對方正常行收到款項後,這個操做纔算成功,有一方失敗,那麼這個操做就是失敗的。 
對應消在息隊列中,就是多條消息一塊兒發送,要麼所有成功,要麼所有失敗。3箇中只有ActiveMQ支持,這個是由於,RabbitMQ和Kafka爲了更高的性能,而放棄了對事物的支持 。
四、集羣:多臺服務器組成的總體叫作集羣,這個總體對生產者和消費者來講,是透明的。其實對消費系統組成的集羣添加一臺服務器減小一臺服務器對生產者和消費者都是無感之的。
五、負載均衡,對消息系統來講負載均衡是大量的生產者和消費者向消息系統發出請求消息,系統必須均衡這些請求使得每一臺服務器的請求達到平衡,而不是大量的請求,落到某一臺或幾臺,使得這幾臺服務器高負荷或超負荷工做,嚴重狀況下會中止服務或宕機。
六、動態擴容是不少公司要求的技術之一,不支持動態擴容就意味着中止服務,這對不少公司來講是不能夠接受的。 
注:
阿里巴巴的Metal,RocketMQ都有Kafka的影子,他們要麼改造了Kafka或者借鑑了Kafka,最後Kafka的動態擴容是經過Zookeeper來實現的。 
 
Zookeeper是一種在分佈式系統中被普遍用來做爲:分佈式狀態管理、分佈式協調管理、分佈式配置管理、和分佈式鎖服務的集羣。kafka增長和減小服務器都會在Zookeeper節點上觸發相應的事件kafka系統會捕獲這些事件,進行新一輪的負載均衡,客戶端也會捕獲這些事件來進行新一輪的處理。

Kafka相關概念

一、 AMQP協議html

Advanced Message Queuing Protocol (高級消息隊列協議)
The Advanced Message Queuing Protocol (AMQP): 是一個標準開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了經過網絡發送的字節流的數據格式。所以兼容性很是好,任何實現AMQP協議的程序均可以和與AMQP協議兼容的其餘程序交互,能夠很容易作到跨語言,跨平臺。
 
上面說的3種比較流行的消息隊列協議,要麼支持AMQP協議,要麼借鑑了AMQP協議的思想進行了開發、實現、設計。
二、 一些基本的概念
一、消費者:(Consumer):從消息隊列中請求消息的客戶端應用程序
二、生產者:(Producer)  :向broker發佈消息的應用程序
三、AMQP服務端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列,便於fafka將生產者發送的消息,動態的添加到磁盤並給每一條消息一個偏移量,因此對於kafka一個broker就是一個應用程序的實例
kafka支持的客戶端語言:Kafka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可使用以上任何一種語言和kafka服務器進行通訊(即辨析本身的consumer從kafka集羣訂閱消息也能夠本身寫producer程序) 
三、Kafka架構
生產者生產消息、kafka集羣、消費者獲取消息這樣一種架構,以下圖:
kafka集羣中的消息,是經過Topic(主題)來進行組織的,以下圖:
一些基本的概念:
一、主題(Topic):一個主題相似新聞中的體育、娛樂、教育等分類概念,在實際工程中一般一個業務一個主題。
二、分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區能夠看做是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。
kafka分區是提升kafka性能的關鍵所在,當你發現你的集羣性能不高時,經常使用手段就是增長Topic的分區,分區裏面的消息是按照重新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。
備份(Replication):爲了保證分佈式可靠性,kafka0.8開始對每一個分區的數據進行備份(不一樣的Broker上),防止其中一個Broker宕機形成分區上的數據不可用。
kafka0.7是一個很大的改變:一、增長了備份二、增長了控制借點概念,增長了集羣領導者選舉 。

Zookeeper集羣搭建

Kafka集羣是把狀態保存在Zookeeper中的,首先要搭建Zookeeper集羣。
一、軟件環境
(3臺服務器-個人測試)
192.168.7.100 server1
192.168.7.101 server2
192.168.7.107 server3
一、Linux服務器一臺、三臺、五臺、( 2*n+1),Zookeeper集羣的工做是超過半數才能對外提供服務,3臺中超過兩臺超過半數,容許1臺掛掉 ,是否能夠用偶數,其實不必。
若是有四臺那麼掛掉一臺還剩下三臺服務器,若是在掛掉一個就不行了,這裏記住是超過半數。
二、Java jdk1.7 zookeeper是用java寫的因此他的須要JAVA環境,java是運行在java虛擬機上的
三、Zookeeper的穩定版本Zookeeper 3.4.6版本 
二、配置&安裝Zookeeper
下面的操做是:3臺服務器統一操做
一、安裝Java
yum list java*
yum -y install java-1.7.0-openjdk*

二、下載Zookeeperjava

首先要注意在生產環境中目錄結構要定義好,防止在項目過多的時候找不到所需的項目linux

#個人目錄統一放在/opt下面
#首先建立Zookeeper項目目錄
mkdir zookeeper #項目目錄
cd zookeeper 
mkidr -p {data,logs}#存放快照,事物日誌

  

下載Zookeeper數據庫

#下載軟件
cd /opt/zookeeper/

wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

#解壓軟件
tar -zxf zookeeper-3.4.6.tar.gz

 

三、修改配置文件apache

進入到解壓好的目錄裏面的conf目錄中,查看
複製代碼
#進入conf目錄
/opt/zookeeper/zookeeper-3.4.6/conf
#查看
[root@192.168.7.107]$ ll
-rw-rw-r--. 1 1000 1000  535 Feb 20  2014 configuration.xsl
-rw-rw-r--. 1 1000 1000 2161 Feb 20  2014 log4j.properties
-rw-rw-r--. 1 1000 1000  922 Feb 20  2014 zoo_sample.cfg
複製代碼

#zoo_sample.cfg  這個文件是官方給咱們的zookeeper的樣板文件,給他複製一份命名爲zoo.cfg,zoo.cfg是官方指定的文件命名規則json

3臺服務器的配置文件bootstrap

 

tickTime=2000
initLimit=5
syncLimit=2
dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/logs
clientPort=2181
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
server.1=10.0.0.201:2888:3888
server.2=10.0.0.202:2888:3888
server.3=10.0.0.203:2888:3888
#server.1 這個1是服務器的標識也能夠是其餘的數字, 表示這個是第幾號服務器,用來標識服務器,這個標識要寫到快照目錄(data)下面myid文件裏
#第一個端口是master和slave之間的通訊端口,默認是2888,第二個端口是leader選舉的端口,集羣剛啓動的時候選舉或者leader掛掉以後進行新的選舉的端口默認是3888

 

 

 

配置文件解釋:bash

複製代碼
#tickTime:
這個時間是做爲 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每一個 tickTime 時間就會發送一個心跳。
#initLimit:
這個配置項是用來配置 Zookeeper 接受客戶端(這裏所說的客戶端不是用戶鏈接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集羣中鏈接到 Leader 的 Follower 服務器)初始化鏈接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度後 Zookeeper 服務器尚未收到客戶端的返回信息,那麼代表這個客戶端鏈接失敗。總的時間長度就是 5*2000=10#syncLimit:
這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒
#dataDir:
快照日誌的存儲路徑
#dataLogDir:
事物日誌的存儲路徑,若是不配置這個那麼事物日誌會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事物日誌、快照日誌太多
#clientPort:
這個端口就是客戶端鏈接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。修改他的端口改大點
複製代碼

建立myid文件服務器

 

#server1
echo "1" > /opt/zookeeper/data/myid
#server2
echo "2" > /opt/zookeeper/data/myid
#server3
echo "3" > /opt/zookeeper/data/myid

 

 

 

 四、重要配置說明

一、myid文件和server.myid  在快照目錄下存放的標識本臺服務器的文件,他是整個zk集羣用來發現彼此的一個重要標識。

二、zoo.cfg 文件是zookeeper配置文件 在conf目錄裏。

三、log4j.properties文件是zk的日誌輸出文件 在conf目錄裏用java寫的程序基本上有個共同點日誌都用log4j,來進行管理。

# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, CONSOLE  #日誌級別
zookeeper.console.threshold=INFO  #使用下面的console來打印日誌
zookeeper.log.dir=.    #日誌打印到那裏,是我們啓動zookeeper的目錄 (建議設置統一的日誌目錄路徑)
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=.
zookeeper.tracelog.file=zookeeper_trace.log

#
# ZooKeeper Logging Configuration
#

# Format is "<default threshold> (, <appender>)+

# DEFAULT: console appender only
log4j.rootLogger=${zookeeper.root.logger}

# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE

# Example with rolling log file and tracing
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE

#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n


# Add ROLLINGFILE to rootLogger to get log file output
#    Log DEBUG level and above messages to a log file
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}

# Max log file size of 10MB
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
# uncomment the next line to limit number of backup files
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10

log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n


#
# Add TRACEFILE to rootLogger to get log file output
#    Log DEBUG level and above messages to a log file
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}

log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
### Notice we are including log4j's NDC here (%x)
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n
configuration for log4j

四、zkEnv.sh和zkServer.sh文件

zkServer.sh 主的管理程序文件
zkEnv.sh 是主要配置,zookeeper集羣啓動時配置環境變量的文件
五、還有一個須要注意
ZooKeeper server  will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator
zookeeper不會主動的清除舊的快照和日誌文件,這個是操做者的責任。

可是能夠經過命令去按期的清理。

複製代碼
#!/bin/bash 
 
#snapshot file dir 
dataDir=/opt/zookeeper/zkdata/version-2
#tran log dir 
dataLogDir=/opt/zookeeper/zkdatalog/version-2

#Leave 66 files 
count=66 
count=$[$count+1] 
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f 
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f 

#以上這個腳本定義了刪除對應兩個目錄中的文件,保留最新的66個文件,能夠將他寫到crontab中,設置爲天天凌晨2點執行一次就能夠了。


#zk log dir   del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f
複製代碼

其餘方法:

第二種:使用ZK的工具類PurgeTxnLog,它的實現了一種簡單的歷史文件清理策略,能夠在這裏看一下他的使用方法 http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html 

第三種:對於上面這個執行,ZK本身已經寫好了腳本,在bin/zkCleanup.sh中,因此直接使用這個腳本也是能夠執行清理工做的。

第四種:從3.4.0開始,zookeeper提供了自動清理snapshot和事務日誌的功能,經過配置 autopurge.snapRetainCount 和 autopurge.purgeInterval 這兩個參數可以實現定時清理了。這兩個參數都是在zoo.cfg中配置的:

autopurge.purgeInterval  這個參數指定了清理頻率,單位是小時,須要填寫一個1或更大的整數,默認是0,表示不開啓本身清理功能。
autopurge.snapRetainCount 這個參數和上面的參數搭配使用,這個參數指定了須要保留的文件數目。默認是保留3個。
 
推薦使用第一種方法,對於運維人員來講,將日誌清理工做獨立出來,便於統一管理也更可控。畢竟zk自帶的一些工具並不怎麼給力。
五、啓動服務並查看
一、啓動服務
#進入到Zookeeper的bin目錄下#啓動服務(3臺都須要操做)
/opt/zookeeper/bin/zkServer.sh  start

 

二、檢查服務狀態

# 檢查zookeeper狀態
/opt/zookeeper/bin/zkServer.sh  status

經過status就能看到狀態:

/opt/zookeeper/bin/zkServer.sh status
JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg  #配置文件
Mode: follower  #這臺服務器角色

zk集羣通常只有一個leader,多個follower,主通常是相應客戶端的讀寫請求,而從主同步數據,當主掛掉以後就會從follower裏投票選舉一個leader出來。

能夠用「jps」查看zk的進程,這個是zk的整個工程的main

#執行命令jps
20348 Jps
4233 QuorumPeerMain 

Kafka集羣搭建

一、軟件環境
一、linux一臺或多臺,大於等於2
二、已經搭建好的zookeeper集羣
三、軟件版本 kafka_2.10-0.9.0.0.tgz
二、建立目錄並下載安裝軟件
#下載軟件
wget  -P /usr/local/src http://archive.apache.org/dist/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz

#解壓軟件
tar -zxf kafka_2.11-0.9.0.1.tgz / -C /opt

#建立目錄
mv /opt/kafka_2.11-0.9.0 /opt/kafka#建立項目目錄
mkdir  /opt/kafka/logs #建立kafka消息目錄,主要存放kafka消息

 

三、修改配置文件

進入到config目錄
cd /opt/kafka/config/

主要關注:server.properties 這個文件便可,咱們能夠發如今目錄下:

有不少文件,這裏能夠發現有Zookeeper文件,咱們能夠根據Kafka內帶的zk集羣來啓動,可是建議使用獨立的zk集羣

複製代碼
-rw-r--r--. 1 root root  906 Feb 12 08:37 connect-console-sink.properties
-rw-r--r--. 1 root root  909 Feb 12 08:37 connect-console-source.properties
-rw-r--r--. 1 root root 2110 Feb 12 08:37 connect-distributed.properties
-rw-r--r--. 1 root root  922 Feb 12 08:38 connect-file-sink.properties
-rw-r--r--. 1 root root  920 Feb 12 08:38 connect-file-source.properties
-rw-r--r--. 1 root root 1074 Feb 12 08:37 connect-log4j.properties
-rw-r--r--. 1 root root 2055 Feb 12 08:37 connect-standalone.properties
-rw-r--r--. 1 root root 1199 Feb 12 08:37 consumer.properties
-rw-r--r--. 1 root root 4369 Feb 12 08:37 log4j.properties
-rw-r--r--. 1 root root 2228 Feb 12 08:38 producer.properties
-rw-r--r--. 1 root root 5699 Feb 15 18:10 server.properties
-rw-r--r--. 1 root root 3325 Feb 12 08:37 test-log4j.properties
-rw-r--r--. 1 root root 1032 Feb 12 08:37 tools-log4j.properties
-rw-r--r--. 1 root root 1023 Feb 12 08:37 zookeeper.properties
複製代碼

修改配置文件:

broker.id=0  #當前機器在集羣中的惟一標識,和zookeeper的myid性質同樣
listeners=PLAINTEXT://10.0.0.201:9092

port=9092 #當前kafka對外提供服務的端口默認是9092
host.name=10.0.0.201 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/opt/kafka/logs/ #消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,若是配置多個目錄,新建立的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一會兒就發送的,先回存儲到緩衝區了到達必定的大小後在發送,能提升性能
socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達必定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務
replica.fetch.max.bytes=5242880  #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除
log.cleaner.enable=false #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能
zookeeper.connect=10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181 #設置zookeeper的鏈接端口

 

上面是參數的解釋,實際的修改項爲:

broker.id=201  每臺服務器的broker.id都不能相同

listeners=PLAINTEXT://10.0.0.201:9092

#hostname
host.name=10.0.0.201

# Log Basics 
log.dirs=/opt/kafka/logs

#在log.retention.hours=168 下面新增下面三項
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#設置zookeeper的鏈接端口
zookeeper.connect=10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181

 

五、異常情況

啓動kafka過一會進程自動掛掉問題緣由
這是由於kafka logs目錄下的meta.properties文件中的broker.id與server.properties中的broker.id不一致所致使,只需把二者改成一致啓動kafka後就不會自動掛掉了

kafka linux系統上使用時異常:

錯誤信息:
WARN Error while fetching metadata with correlation id xxx
異常截圖

解決辦法
修改config下的 server.properties 文件
將 listteners=PLAINTEXT://:9092
修改爲listteners=PLAINTEXT://ip:9092

 

一、啓動服務

#從後臺啓動Kafka集羣(3臺都須要啓動)
/opt/kafka/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

二、檢查服務是否啓動

#執行命令jps
20348 Jps
4233 QuorumPeerMain
18991 Kafka

 

四、啓動Kafka集羣並測試

一、建立Topic來驗證是否建立成功

更多請看官方文檔:http://kafka.apache.org/documentation.html

#建立Topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.201:2181 --replication-factor 2 --partitions 1 --topic stock  
#解釋
--replication-factor 2   #複製兩份
--partitions 1 #建立1個分區
--topic #主題爲stock  

'''在一臺服務器上建立一個發佈者'''
#建立一個broker,發佈者
/opt/kafka/bin/kafka-console-producer.sh --broker-list 10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092 --topic stock

'''在一臺服務器上建立一個訂閱者'''
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper 10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181 --topic stock  --from-beginning

測試(在發佈者那裏發佈消息看看訂閱者那裏是否能正常收到~):

四、其餘命令

大部分命令能夠去官方文檔查看

4.一、查看topic

/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
#就會顯示咱們建立的全部topic

4.二、查看topic狀態

/kafka-topics.sh --describe --zookeeper localhost:2181 --topic stock
#下面是顯示信息
Topic:ssports    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: stock Partition: 0    Leader: 1    Replicas: 0,1    Isr: 1
#分區爲爲1  複製因子爲2     stock的分區爲0 
#Replicas: 0,1   複製的爲0,1

 

kafka集羣搭建完畢

五、其餘說明標註

5.一、日誌說明

默認kafka的日誌是保存在/opt/kafka/kafka_2.10-0.9.0.0/logs目錄下的,這裏說幾個須要注意的日誌

server.log #kafka的運行日誌
state-change.log  #kafka他是用zookeeper來保存狀態,因此他可能會進行切換,切換的日誌就保存在這裏

controller.log #kafka選擇一個節點做爲「controller」,當發現有節點down掉的時候它負責在游泳分區的全部節
點中選擇新的leader,這使得Kafka能夠批量的高效的管理全部分區節點的主從關係。若是controller down掉了,活着的節點中的一個會備切換爲新的controller.

5.二、上面的你們你完成以後能夠登陸zk來查看zk的目錄狀況

複製代碼
#使用客戶端進入zk
./zkCli.sh -server 127.0.0.1:2181  #默認是不用加’-server‘參數的由於咱們修改了他的端口

#查看目錄狀況 執行「ls /」
[zk: 127.0.0.1:12181(CONNECTED) 0] ls /

#顯示結果:[consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]
'''
上面的顯示結果中:只有zookeeper是,zookeeper原生的,其餘都是Kafka建立的
'''

#標註一個重要的
[zk: 127.0.0.1:12181(CONNECTED) 1] get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1456125963355","endpoints":["PLAINTEXT://10.0.0.201:9092"],"host":"192.168.7.100","version":2,"port":9092}
cZxid = 0x1000001c1
ctime = Mon Feb 22 15:26:03 CST 2016
mZxid = 0x1000001c1
mtime = Mon Feb 22 15:26:03 CST 2016
pZxid = 0x1000001c1
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x152e40aead20016
dataLength = 139
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 2] 

#還有一個是查看partion
[zk: 127.0.0.1:2181(CONNECTED) 7] get /brokers/topics/shuaige/partitions/0
null
cZxid = 0x100000029
ctime = Mon Feb 22 10:05:11 CST 2016
mZxid = 0x100000029
mtime = Mon Feb 22 10:05:11 CST 2016
pZxid = 0x10000002a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: 127.0.0.1:2181(CONNECTED) 8] 
複製代碼

 

 Python 集羣測試代碼

生產者

# encoding:utf-8
# Author:Richie
# Date:2019/5/13
import json
from kafka import KafkaProducer

producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),bootstrap_servers=['10.0.0.201:9092','10.0.0.202:9092','10.0.0.203:9092'])

msg_dict = {
    "sleep_time": 10,
    "db_config": {
        "database": "test_1",
        "host": "localhost",
        "user": "root",
        "password": "root"
    },
    "table": "msg",
    "msg": "Hello World"
}

producer.send('stock', msg_dict, partition=0)
producer.close()

消費者

# encoding:utf-8
# Author:Richie
# Date:2019/5/13

from kafka import KafkaConsumer

consumer = KafkaConsumer('stock', bootstrap_servers=['10.0.0.201:9092','10.0.0.202:9092','10.0.0.203:9092'])
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)
相關文章
相關標籤/搜索