http://www.jasongj.com/2015/03/10/KafkaColumn1/java
kafka的簡單介紹:算法
1. kafka是一個流平臺,所謂流平臺:數據庫
2.kafka相關必須明白一下幾個概念: apache
1.producer:
生產者,發佈消息到 kafka 集羣中的服務或程序。bootstrap
Producer負責決定將數據發送到Topic的那個分區上。這能夠經過簡單的循環方式來平衡負載,或則能夠根據某些語義來決定分區(例如基於數據中一些關鍵字)。
2.broker:
kafka 集羣中包含的服務器,一個broker表明一臺服務器。
3.topic:
每條發佈到 kafka 集羣的消息屬於的類別,即 kafka 是面向 topic 的。服務器
對於每一個主題,Kafka會會維護一個以下所示的分區日誌:app
每一個分區是一個有序的,以不可變的記錄順序追加的Commit Log。分區中的每一個記錄都有一個連續的ID,稱爲Offset,惟一標識分區內的記錄。負載均衡
Kafka集羣使用記錄保存時間的配置來保存全部已發佈的記錄(不管他們是否被消費)。例如,配置策略爲兩天,那麼在一條記錄發佈兩天內,這條記錄是能夠被消費的,以後將被丟棄以騰出空間。Kafka的性能和數據量無關,因此存儲長時間的數據並不會成爲問題。異步
4.partition:
partition 是物理上的概念,每一個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。
5.consumer:
從 kafka 集羣中消費消息的程序或服務。分佈式
實際上惟一須要保存的元數據是消費者的消費進度,即消費日誌的偏移量(Offset)。這個Offset是由Consumer控制的:一般消費者會在讀取記錄時以線性方式提高Offset,可是事實上,因爲Offset由Consumer控制,所以它能夠以任何順序消費記錄。例如一個Consumer能夠經過重置Offset來處理過去的數據或者跳過部分數據。
這個特徵意味着Kafka的Consumer能夠消費「過去」和「未來」的數據而不對集羣和其餘Consumer不形成太大的影響。例如,可使用命令行工具tail來獲取Topic尾部的內容而不對已經在消費Consumer形成影響。
6.Consumer group:
總括:high-level consumer API 中,每一個 consumer 都屬於一個 consumer group,每條消息只能被 consumer group 中的一個 Consumer 消費,但能夠被多個 consumer group 消費。
詳解:Consumer使用一個group name來標識本身的身份,每條被髮送到一個Topic的消息都將被分發到屬於同一個group的Consumer的一個實例中(group name相同的Consumer屬於一個組,一個Topic的一條消息會被這個組中的一個Consumer實例消費)。Consumer實例能夠在單獨的進程中或者單獨的機器上。
若是全部的Consumer實例都是屬於一個group的,那麼全部的消息將被均衡的分發給每一個實例。
若是全部的Consumer都屬於不一樣的group,那麼每條消息將被廣播給全部的Consumer。
上圖介紹:一個包含兩個Server的Kafka集羣,擁有四個分區(P0-P3),有兩個Consumer group:Group A和Group B。Group有C一、C2兩個Consumer,GroupB有C三、C四、C五、C6四個Consumer。
更常見的是,Topic有少許的Consumer group,每個都是「一個邏輯上的訂閱者」。每一個group包含多個Consumer實例,爲了可伸縮性和容錯性。這就是一個發佈-訂閱模式,只是訂閱方是一個集羣。
Kafka中消費的實現方式是「公平」的將分區分配給Consumer,每個時刻分區都擁有它惟一的消費者。Consumer成員關係有Kafka程度動態維護。若是新的Consumer加入了分區,那麼它會從這個分區其餘的Consumer中分配走一部分分區;若是部分Consumer實例宕機,它的分區會被其餘Consumer實例接管。
Kafka只保證同一個分區內記錄的順序,而不是同一個Topic的不一樣分區間數據的順序。每一個分區順序結合按Key分配分區的能力,能知足大多數程序的需求。若是須要全局的順序,可使用只有一個分區的Topic,這意味着每一個group只能有一個Consumer實例(由於一個分區同一時刻只能被一份Consumer消費——多加的Consumer只能用於容錯)
7.replica:
partition 的副本,保障 partition 的高可用。
8.leader:
replica 中的一個角色, producer 和 consumer 只跟 leader 交互。
9.follower:
replica 中的一個角色,從 leader 中複製數據。
10.controller:
kafka 集羣中的其中一個服務器,用來進行 leader election 以及 各類 failover。
12.zookeeper:
kafka 經過 zookeeper 來存儲集羣的 meta 信息。
13.Kafka as a Messaging System(消息系統)
消息傳統上有兩種模式:隊列和發佈-訂閱。
在隊列中,一羣Consumer從一個Server讀取數據,每條消息被其中一個Consumer讀取。
在發佈-訂閱中,消息被廣播給全部的Consumer。這兩種模式有各自的優缺點。
模式區別:隊列模式的優勢是你能夠在多個消費者實例上分配數據處理,從而容許你對程序進行「伸縮」。肯定是隊列不是多用戶的,一旦消息被一個Consumer讀取就不會再給其餘Consumer。發佈訂閱模式容許廣播數據到多個Consumer,那麼就沒辦法對單個Consumer進行伸縮。
Kafka的Consumer group包含兩個概念。與隊列同樣,消費組容許經過一些進程來劃分處理(每一個進程處理一部分)。與發佈訂閱同樣,Kafka容許廣播消息到不一樣的Consumer group。
Kafka模式的優點是每一個Topic都擁有隊列和發佈-訂閱兩種模式。
Kafka比傳統的消息系統有更強的順序保證。
傳統的消息系統在服務器上按順序保存消息,若是多個Consumer從隊列中消費消息,服務器按照存儲的順序輸出消息。而後服務器雖然按照順序輸出消息,可是消息將被異步的傳遞給Consumer,因此他們將以不肯定的順序到達Consumer。這意味着在並行消費中將丟失消息順序。傳統消息系統一般採用「惟一消費者」的概念只讓一個Consumer進行消費,但這就丟失了並行處理的能力。
Kafka作的更好一些。經過提供分區的概念,Kafka能提供消費集羣順序和負載的平衡。這是經過將分區分配個一個Consumer group中惟一的一個Consumer而實現的,一個分區只會被一個分組中的一個Consumer進行消費。經過這麼實現,能讓一個Consumer消費一個分區並按照順序處理消息。由於存在多個分區,全部能夠在多個Consumer實例上實現負載均衡。注意,一個分組內的Consumer實例數不能超過度區數。
14. Kafka as a Storage System(存儲系統)
任何將發送消息和消費結構的消息隊列都有效的用做一個消息的存儲系統。不一樣的是Kafka是一個更好的存儲系統。
被寫入到Kafka的數據將被寫入磁盤並複製以保證容錯。Kafka容許Producer等待肯定,以保證Producer能夠確認消息被成功持久化並複製完成。
Kafka使用的存儲結構,使其提供相同的能力,不管是存儲50KB或者50TB持久化數據。
由於容許客戶端控制讀取的位置,能夠將Kafka視爲高性能,低延遲的日誌存儲、複製、傳播的分佈式系統。
15:Kafka for Stream Processing
僅僅是讀寫和存儲流數據是不夠的,Kafka的目標是對流失數據的實時處理。
在Kafka中,Stream Producer從輸入的Topic中讀取數據,執行一些操做,生成輸出流到輸出的Topic中。
例如,零售的應用程序將收到銷售和出貨的輸入流,並輸出根據該數據計算的重排序和價格調整後的數據流。
可使用Producer和Consumer實現簡單的處理。對於更復雜的轉換,Kafka提供的完成的Stream API,容許構建將流中數據聚合或將流鏈接到一塊兒的應用。
這用於解決如下的一些困難:處理無需的數據,執行有狀態的計算等。
Stream API基於Kafka的核心函數古劍:使用Producer和Consumer API用於輸入,使用Kafka做爲有狀態的存儲,使用group機制來實現Stream處理器的容錯。
16:Putting the Pieces Together
消息、存儲和流處理這種組合看是不尋常,可是Kafka做爲流式平臺這是必須的。
相似HDFS的分佈式文件系統存儲靜態的文件用於批處理。這種的系統容許存儲和處理歷史數據。
傳統的企業消息系統容許處理在你訂閱以後的將來的數據。以這種方式構建的應用程序在將來數據到達時進行處理。
Kafka組合這些能力,而且組合這些對Kafka做爲流應用平臺和流數據通道相當重要。
經過組合存儲和低延遲的訂閱,流應用程序能以相同的方式處理過去和將來的數據。一個單一的程序能夠處理過去的歷史數據,而且不會在達到一個位置時中止,而是能繼續處理未來到達的數據。這是一個普遍的流處理的概念,其中包含批處理和消息驅動的應用程序。
一樣,對於數據流通道,組合訂閱機制和實時事件使Kafka成爲很是低延遲的管道;數據的存儲能力使其能和可能會進行停機維護的週期性處理數據的離線系統集成,或用於必須保證數據被確認交付的場景。流處理程序能夠在數據到達後進行處理。
3.kafka的應用場景
KafKa能夠代替傳統的消息隊列軟件(阿里的隊列軟件RocketMQ就是基於KafKa實現的),在隊列軟件的選擇上KafKa已經成了不二之選,使用KafKa來實現隊列有以下優勢
就以上幾點和筆者以前使用的Redis來承載隊列服務要優秀的多,在後續文章的比較中會一一說明
在不少時候咱們須要對一些龐大的數據進行存留,一些業務型公司可能永不上應爲基本能夠依靠數據庫解決日誌的問題,可是服務型公司好比jpush,雲監控此類服務,日誌存儲這塊會遇到巨大的問題,日誌不能丟,日誌存文件很差找,定位一條消息成本高(遍歷當天日誌文件),實時顯示給用戶難,這幾類問題KafKa都能遊刃有餘
kafka特有的offset機制可以保證消息至少被獲取一次,當程序在獲取途中死亡這條消息會被認定爲未被消費,下次會繼續消費這條消息,此特性使得kafka能夠做爲一個保障數據傳輸的通道來使用,可是kafka並無提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特性;因此kafka只能使用做爲"常規"的消息系統
聲明:採用三臺服務器安裝kafka集羣
第一步:安裝前準備,就不詳細的說了
首先保準三臺機器的主機名能夠ping通(配置主機名)
安裝java環境
sudo add-apt-repository ppa:openjdk-r/ppa sudo apt-get update sudo apt-get install openjdk-8-jdk
每臺機器要安裝上supervisor(防止掛掉)
apt-get install supervisor -y
第二步:安裝
下載kafka安裝包,並將安裝包(kafka_2.11-0.10.2.0.tgz)拷貝到三臺機器,解壓縮,移動解壓縮後的文件夾到/usr/local/kafka(能夠隨意指定)目錄
校驗和 051e5e16050c85ebdc40f3bbbc188317 kafka_2.11-0.10.2.0.tgz
tar xvf kafka_2.11-0.10.2.0.tgz mv kafka_2.11-0.10.2.0 /usr/local/kafka
第三步:修改zk配置(配置詳細解釋見下文)
2. 修改/usr/local/kafka/config/zookeeper.properties。其中server.1,server.2,server.3根據部署的集羣數量和 ip地址能夠作調整的。(一下是須要修改的參數)
tickTime=2000 #這個時間是Zookeeper服務集羣之間的相互檢查或客戶端鏈接服務器之間的檢查,也就是每一個 tickTime 時間就會發送一個心跳。 initLimit=10 #這個配置項是用來配置 Zookeeper 接受客戶端(這裏所說的客戶端不是用戶鏈接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集羣中鏈接到 Leader 的 Follower 服務器)初始化鏈接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度後 Zookeeper 服務器尚未收到客戶端的返回信息,那麼代表這個客戶端鏈接失敗。總的時間長度就是 5*2000=10 秒 syncLimit=5 #這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是2*2000=4 秒 # the directory where the snapshot is stored. dataDir=/data/zookeeper # zk的數據目錄 dataLogDir=/data/log/zookeeper #zk的日誌目錄 # the port at which the clients will connect clientPort=2181 #客戶端鏈接的端口,也就是zk監聽的端口 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 #0或者不設置,則每一個ip鏈接zookeeper時的鏈接數沒有限制。若是設置maxClientCnxns的值時須要把kafka server的鏈接數考慮進去,由於啓動kafka server時,kafka server也會鏈接zookeeper的 server.1=ip1:2888:3888 server.2=ip2:2888:3888 server.3=ip3:2888:3888
#註解:
數字:第幾號服務器,
ip:該服務器的ip地址
2888:表示的是這個服務器與集羣中的 Leader 服務器交換信息的端口
3888:表示的是萬一集羣中的 Leader 服務器掛了,須要一個端口來從新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通訊的端口
配置id號:
1,2,3必需要對應zookeeper裏的sever1的ip echo '1' > /var/zookeeper/myid (ip1服務器上) echo '2' > /var/zookeeper/myid (ip2服務器上) echo '3' > /var/zookeeper/myid (ip3服務器上)
第四步: 修改/usr/local/kafka/config/server.properties。其中broker.id配置ip1上面爲0,ip2上面爲1,ip3上面爲2,listeners的ip替換成機器的ip。zookeeper.connect配置爲三個節點的內網ip,端口號都保持默認便可(配置詳細解釋見下文)
broker.id=0 #broker的標識,而且集羣中不得重複 delete.topic.enable=true #打開這個參數是能夠有選擇的刪除topic的,若是不開啓,刪除的動做是不會執行(根據本身的需求是否須要打開) listeners=PLAINTEXT://ip1:9093 # listeners必定要配置成爲IP地址;若是配置爲localhost或服務器的hostname,在使用java發送數據時就會拋出異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。由於在沒有配置advertised.host.name 的狀況下,Kafka並無像官方文檔宣稱的那樣改成廣播咱們配置的host.name,而是廣播了主機配置的hostname。遠端的客戶端並無配置 hosts,因此天然是鏈接不上這個hostname的 log.dirs=/data/kafka/kafka-logs #kafka的數據目錄文件 zookeeper.connect=ip1:2181,ip2:2181,ip3:2181 #鏈接zk的地址,這裏能夠寫多個,也能夠寫一個
第五步:配置supervisor,添加兩個配置
[program:zookeeper] command=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties user=root autostart=true autorestart=true startsecs=3 [program:kafka-server] command=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties user=root autostart=true autorestart=true startsecs=3
把三臺機器都配置完成後,先啓動三臺機器的zk,而後啓動kafka
第六步:建立topic
建立topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic br0
查看建立是否成功:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic br0 輸出以下,注意Replicas有0,1,2,Irs有0,1,2,對應三個節點的broker.id Topic:br0 PartitionCount:1 ReplicationFactor:3 Configs: Topic:br0 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
第一行是對全部分區的一個描述,而後每一個分區都會對應一行,由於咱們只有一個分區就只加了一行
*Leader:負責處理消息的讀和寫,leader是從全部節點中隨機選擇的
*Replicas:列出了所喲副本的節點,無論節點是否在服務中
*Isr:正在服務中的節點
檢查通訊:
在ip2上面啓動「消費者」,localhost換成ip2,回車以後會掛住,有消息來的時候會打印出來
bin/kafka-console-consumer.sh --bootstrap-server ip2:9092 --from-beginning --topic br0
在ip3上面啓動「生產者」,localhost換成ip3,回車以後輸入一些文字,敲回車
bin/kafka-console-producer.sh --broker-list ip3 --topic br0
test1
觀察ip2機器的「消費者」終端將會有消息打印
到此部署結束
http://blog.csdn.net/u013035314/article/details/46741377