kafka最初是 LinkedIn 的一個內部基礎設施系統。最初開發的原由是,LinkedIn雖然有了數據庫和其餘系統能夠用來存儲數據,可是缺少一個能夠幫助處理持續數據流的組件。因此在設計理念上,開發者不想只是開發一個可以存儲數據的系統,如關係數據庫、Nosql 數據庫、搜索引擎等等,更但願把數據當作一個持續變化和不斷增加的流,並基於這樣的想法構建出一個數據系統,一個數據架構。前端
Kafka外在表現很像消息系統,容許發佈和訂閱消息流,可是它和傳統的消息系統有很大的差別,java
首先,Kafka 是個現代分佈式系統,以集羣的方式運行,能夠自由伸縮。sql
其次,Kafka 能夠按照要求存儲數據,保存多久均可以,數據庫
第三,流式處理將數據處理的層次提示到了新高度,消息系統只會傳遞數據,Kafka 的流式處理能力可讓咱們用不多的代碼就能動態地處理派生流和數據集。因此 Kafka 不只僅是個消息中間件。apache
Kafka不只僅是一個消息中間件,同時它是一個流平臺,這個平臺上能夠發佈和訂閱數據流(Kafka 的流,有一個單獨的包 Stream 的處理),並把他們保存起來,進行處理,這個是 Kafka做者的設計理念。bootstrap
大數據領域,Kafka 還能夠當作實時版的 Hadoop,可是仍是有些區別,Hadoop 能夠存儲和按期處理大量的數據文件,每每以 TB 計數,而 Kafka能夠存儲和持續處理大型的數據流。Hadoop 主要用在數據分析上,而 Kafka由於低延遲,更適合於核心的業務應用上。因此國內的大公司通常會結合使用,好比京東在實時數據計算架構中就使用了到了 Kafka,具體見《張開濤-海量數據下的應用系統架構實踐》windows
常見的大數據處理框架:storm、spark、Flink、(Blink 阿里)數組
Kafka名字的由來:卡夫卡與法國做家馬塞爾·普魯斯特,愛爾蘭做家詹姆斯·喬伊斯並稱爲西方現代主義文學的先驅和大師。《變形記》是卡夫卡的短篇表明做,是卡夫卡的藝術成就中的一座高峯,被認爲是 20 世紀最偉大的小說做品之一(達到管理層的高度應該多看下人文相關的書籍,增加管理知識和人格魅力)。緩存
本文以 kafka_2.11-2.3.0 版本爲主,其他版本不予考慮,而且 Kafka 是 scala 語言寫的,小衆語言,沒有必要研究其源碼,投入和產出比低,除非你的技術級別很是高或者須要去開發單獨的消息中間件。服務器
消息,Kafka 裏的數據單元,也就是咱們通常消息中間件裏的消息的概念(能夠比做數據庫中一條記錄)。消息由字節數組組成。消息還能夠包含鍵(可選元數據,也是字節數組),主要用於對消息選取分區。
做爲一個高效的消息系統,爲了提升效率,消息能夠被分批寫入 Kafka。批次就是一組消息,這些消息屬於同一個主題和分區。若是隻傳遞單個消息,會致使大量的網絡開銷,把消息分紅批次傳輸能夠減小這開銷。可是,這個須要權衡(時間延遲和吞吐量之間),批次裏包含的消息越多,單位時間內處理的消息就越多,單個消息的傳輸時間就越長(吞吐量高延時也高)。若是進行壓縮,能夠提高數據的傳輸和存儲能力,但須要更多的計算處理。
對於 Kafka來講,消息是晦澀難懂的字節數組,通常咱們使用序列化和反序列化技術,格式經常使用的有 JSON 和 XML,還有 Avro(Hadoop 開發的一款序列化框架),具體怎麼使用依據自身的業務來定。
Kafka裏的消息用主題進行分類(主題比如數據庫中的表),主題下有能夠被分爲若干個分區(分表技術)。分區本質上是個提交日誌文件,有新消息,這個消息就會以追加的方式寫入分區(寫文件的形式),而後用先入先出的順序讀取。
可是由於主題會有多個分區,因此在整個主題的範圍內,是沒法保證消息的順序的,單個分區則能夠保證。
Kafka經過分區來實現數據冗餘和伸縮性,由於分區能夠分佈在不一樣的服務器上,那就是說一個主題能夠跨越多個服務器(這是 Kafka 高性能的一個緣由,多臺服務器的磁盤讀寫性能比單臺更高)。
前面咱們說 Kafka 能夠當作一個流平臺,不少時候,咱們會把一個主題的數據當作一個流,無論有多少個分區。
就是通常消息中間件裏生產者和消費者的概念。一些其餘的高級客戶端 API,像數據管道 API 和流式處理的 Kafka Stream,都是使用了最基本的生產者和消費者做爲內部組件,而後提供了高級功能。
生產者默認狀況下把消息均衡分佈到主題的全部分區上,若是須要指定分區,則須要使用消息裏的消息鍵和分區器。
消費者訂閱主題,一個或者多個,而且按照消息的生成順序讀取。消費者經過檢查所謂的偏移量來區分消息是否讀取過。偏移量是一種元數據,一個不斷遞增的整數值,建立消息的時候,Kafka 會把他加入消息。在一個主題中一個分區裏,每一個消息的偏移量是惟一的。每一個分區最後讀取的消息偏移量會保存到 Zookeeper 或者 Kafka 上,這樣分區的消費者關閉或者重啓,讀取狀態都不會丟失。
多個消費者能夠構成一個消費者羣組。怎麼構成?共同讀取一個主題的消費者們,就造成了一個羣組。羣組能夠保證每一個分區只被一個消費者使用。
消費者和分區之間的這種映射關係叫作消費者對分區的全部權關係,很明顯,一個分區只有一個消費者,而一個消費者能夠有多個分區。
(吃飯的故事:一桌一個分區,多桌多個分區,生產者不斷生產消息(消費),消費者就是買單的人,消費者羣組就是一羣買單的人),一個分區只能被消費者羣組中的一個消費者消費(不能重複消費),若是有一個消費者掛掉了<James 跑路了>,另外的消費者接上)
一個獨立的 Kafka 服務器叫 Broker。broker 的主要工做是,接收生產者的消息,設置偏移量,提交消息到磁盤保存;爲消費者提供服務,響應請求,返回消息。在合適的硬件上,單個 broker 能夠處理上千個分區和每秒百萬級的消息量。(要達到這個目的須要作操做系統調優和 JVM 調優)
多個 broker 能夠組成一個集羣。每一個集羣中 broker 會選舉出一個集羣控制器。控制器會進行管理,包括將分區分配給 broker 和監控 broker。
集羣裏,一個分區從屬於一個 broker,這個 broker 被稱爲首領。可是分區能夠被分配給多個 broker,這個時候會發生分區複製。
集羣中 Kafka 內部通常使用管道技術進行高效的複製。
分區複製帶來的好處是,提供了消息冗餘。一旦首領 broker 失效,其餘 broker 能夠接管領導權。固然相關的消費者和生產者都要從新鏈接到新的首領上。
在必定期限內保留消息是 Kafka 的一個重要特性,Kafka broker 默認的保留策略是:要麼保留一段時間(7 天),要麼保留必定大小(好比 1 個 G)。
到了限制,舊消息過時並刪除。可是每一個主題能夠根據業務需求配置本身的保留策略(開發時要注意,Kafka 不像 Mysql 之類的永久存儲)。
多生產者和多消費者
基於磁盤的數據存儲,換句話說,Kafka 的數據天生就是持久化的。
高伸縮性,Kafka 一開始就被設計成一個具備靈活伸縮性的系統,對在線集羣的伸縮絲絕不影響總體系統的可用性。
高性能,結合橫向擴展生產者、消費者和 broker,Kafka 能夠輕鬆處理巨大的信息流(LinkedIn 公司天天處理萬億級數據),同時保證亞秒級的消息延遲。
跟蹤網站用戶和前端應用發生的交互,好比頁面訪問次數和點擊,將這些信息做爲消息發佈到一個或者多個主題上,這樣就能夠根據這些數據爲機器學習提供數據,更新搜素結果等等(頭條、淘寶等總會推送你感興趣的內容,其實在數據分析以前就已經作了活動跟蹤)。
標準消息中間件的功能
收集應用程序和系統的度量監控指標,或者收集應用日誌信息,經過 Kafka路由到專門的日誌搜索系統,好比 ES。(國內用得較多)
收集其餘系統的變更日誌,好比數據庫。能夠把數據庫的更新發布到 Kafka上,應用經過監控事件流來接收數據庫的實時更新,或者經過事件流將數據庫的更新複製到遠程系統。
還能夠當其餘系統發生了崩潰,經過重放日誌來恢復系統的狀態。(異地災備)
操做實時數據流,進行統計、轉換、複雜計算等等。隨着大數據技術的不斷髮展和成熟,不管是傳統企業仍是互聯網公司都已經再也不知足於離線批處理,實時流處理的需求和重要性日益增加 。
近年來業界一直在探索實時流計算引擎和 API,好比這幾年火爆的 Spark Streaming、Kafka Streaming、Beam 和 Flink,其中阿里雙 11 會場展現的實時銷售金額,就用的是流計算,是基於 Flink,而後阿里在其上定製化的 Blink。
Kafka是 Java 生態圈下的一員,用 Scala 編寫,運行在 Java 虛擬機上,因此安裝運行和普通的 Java 程序並無什麼區別。
安裝 Kafka官方說法,Java 環境推薦 Java8。
Kafka須要 Zookeeper 保存集羣的元數據信息和消費者信息。Kafka通常會自帶 Zookeeper,可是從穩定性考慮,應該使用單獨的 Zookeeper,並且構建Zookeeper 集羣。
在 http://kafka.apache.org/downloads 上尋找合適的版本下載,這裏選用的是 kafka_2.11-2.3.0,下載完成後解壓到本地目錄。
啓動 Zookeeper
進入 Kafka目錄下的 bin\windows
執行 kafka-server-start.bat ../../config/server.properties,出現如下畫面表示成功
Linux下與此相似,進入 bin 後,執行對應的 sh 文件便可
##列出全部主題
kafka-topics.bat --zookeeper localhost:2181 --list
##列出全部主題的詳細信息
kafka-topics.bat --zookeeper localhost:2181 --describe
##建立主題 主題名 my-topic,1 副本,8 分區
kafka-topics.bat --zookeeper localhost:2181 --create --topic my-topic --replication-factor 1 --partitions 8
##增長分區,注意:分區沒法被刪除
kafka-topics.bat --zookeeper localhost:2181 --alter --topic my-topic --partitions 16
##刪除主題
kafka-topics.bat --zookeeper localhost:2181 --delete --topic my-topic
##建立生產者(控制檯)
kafka-console-producer.bat --broker-list localhost:9092 --topic my-topic
##建立消費者(控制檯)
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning
##列出消費者羣組(僅 Linux)
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --list
##列出消費者羣組詳細信息(僅 Linux)
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --describe --group 羣組名
配置文件放在 Kafka目錄下的 config 目錄中,主要是 server.properties 文件
在單機時無需修改,但在集羣下部署時每每須要修改。它是個每個 broker 在集羣中的惟一表示,要求是正數。當該服務器的 IP 地址發生改變時,broker.id 沒有變化,則不會影響 consumers 的消息狀況
監聽列表(以逗號分隔 不一樣的協議(如 plaintext,trace,ssl、不一樣的 IP 和端口)),hostname 若是設置爲 0.0.0.0 則綁定全部的網卡地址;若是 hostname 爲空則綁定默認的網卡。
若是沒有配置則默認爲 java.net.InetAddress.getCanonicalHostName()。
如:PLAINTEXT://myhost:9092,TRACE://:9091 或 PLAINTEXT://0.0.0.0:9092,
zookeeper 集羣的地址,能夠是多個,多個之間用逗號分割。(一組 hostname:port/path 列表,hostname 是 zk 的機器名或 IP、port 是 zk 的端口、/path是可選 zk 的路徑,若是不指定,默認使用根路徑)
Kafka把全部的消息都保存在磁盤上,存放這些數據的目錄經過 log.dirs 指定。可使用多路徑,使用逗號分隔。若是是多路徑,Kafka會根據「最少使用」原則,把同一個分區的日誌片斷保存到同一路徑下。會往擁有最少數據分區的路徑新增分區。
每數據目錄用於日誌恢復啓動和關閉時的線程數量。由於這些線程只是服務器啓動(正常啓動和崩潰後重啓)和關閉時會用到。因此徹底能夠設置大量的線程來達到並行操做的目的。注意,這個參數指的是每一個日誌目錄的線程數,好比本參數設置爲 8,而 log.dirs 設置爲了三個路徑,則總共會啓動24 個線程。
是否容許自動建立主題。若是設爲 true,那麼 produce(生產者往主題寫消息),consume(消費者從主題讀消息)或者 fetch metadata(任意客戶端向主題發送元數據請求時)一個不存在的主題時,就會自動建立。缺省爲 true。
新建主題的默認參數
每一個新建主題的分區個數(分區個數只能增長,不能減小 )。這個參數通常要評估,好比,每秒鐘要寫入和讀取 1000M 數據,若是如今每一個消費者每秒鐘能夠處理 50MB 的數據,那麼須要 20 個分區,這樣就可讓 20 個消費者同時讀取這些分區,從而達到設計目標。(通常經驗,把分區大小限制在25G 以內比較理想)
日誌保存時間,默認爲 7 天(168 小時)。超過這個時間會清理數據。bytes 和 minutes 不管哪一個先達到都會觸發。與此相似還有 log.retention.minutes和log.retention.ms,都設置的話,優先使用具備最小值的那個。(提示:時間保留數據是經過檢查磁盤上日誌片斷文件的最後修改時間來實現的。也就是最後修改時間是指日誌片斷的關閉時間,也就是文件裏最後一個消息的時間戳)
topic 每一個分區的最大文件大小,一個 topic 的大小限制 = 分區數*log.retention.bytes。-1 沒有大小限制。log.retention.bytes 和 log.retention.minutes任意一個達到要求,都會執行刪除。(注意若是是 log.retention.bytes 先達到了,則是刪除多出來的部分數據),通常不推薦使用最大文件刪除策略,而是推薦使用文件過時刪除策略。
分區的日誌存放在某個目錄下諸多文件中,這些文件將分區的日誌切分紅一段一段的,咱們稱爲日誌片斷。這個屬性就是每一個文件的最大尺寸;當尺寸達到這個數值時,就會關閉當前文件,並建立新文件。被關閉的文件就開始等待過時。默認爲 1G。
若是一個主題天天只接受 100MB 的消息,那麼根據默認設置,須要 10 天才能填滿一個文件。並且由於日誌片斷在關閉以前,消息是不會過時的,因此若是 log.retention.hours 保持默認值的話,那麼這個日誌片斷須要 17 天才過時。由於關閉日誌片斷須要 10 天,等待過時又須要 7 天。
做用和 log.segment.bytes 相似,只不過判斷依據是時間。一樣的,兩個參數,以先到的爲準。這個參數默認是不開啓的。
表示一個服務器可以接收處理的消息的最大字節數,注意這個值 producer 和 consumer 必須設置一致,且不要大於 fetch.message.max.bytes 屬性的值(消費者能讀取的最大消息,這個值應該大於或等於 message.max.bytes)。該值默認是 1000000 字節,大概 900KB~1MB。若是啓動壓縮,判斷壓縮後的值。
這個值的大小對性能影響很大,值越大,網絡和 IO 的時間越長,還會增長磁盤寫入的大小。
Kafka 設計的初衷是迅速處理短小的消息,通常 10K 大小的消息吞吐性能最好(LinkedIn 的 kafka性能測試)
爲 Kafka 選擇合適的硬件更像是一門藝術,就跟它的名字同樣,咱們分別從磁盤、內存、網絡和 CPU 上來分析,肯定了這些關注點,就能夠在預算範圍以內選擇最優的硬件配置。
磁盤吞吐量(IOPS 每秒的讀寫次數)會影響生產者的性能。由於生產者的消息必須被提交到服務器保存,大多數的客戶端都會一直等待,直到至少有一個服務器確認消息已經成功提交爲止。也就是說,磁盤寫入速度越快,生成消息的延遲就越低。(SSD固態貴單個速度快,HDD 機械偏移能夠多買幾個,設置多個目錄加快速度,具體狀況具體分析)
磁盤容量的大小,則主要看須要保存的消息數量。若是天天收到 1TB 的數據,並保留 7 天,那麼磁盤就須要 7TB 的數據。
Kafka自己並不須要太大內存,內存則主要是影響消費者性能。在大多數業務狀況下,消費者消費的數據通常會從內存(頁面緩存,從系統內存中分)
中獲取,這比在磁盤上讀取確定要快的多。通常來講運行 Kafka 的 JVM 不須要太多的內存,剩餘的系統內存能夠做爲頁面緩存,或者用來緩存正在使用的日誌片斷,因此咱們通常 Kafka不會同其餘的重要應用系統部署在一臺服務器上,由於他們須要共享頁面緩存,這個會下降 Kafka 消費者的性能。
網絡吞吐量決定了 Kafka可以處理的最大數據流量。它和磁盤是制約 Kafka 拓展規模的主要因素。對於生產者、消費者寫入數據和讀取數據都要瓜分網絡流量。同時作集羣複製也很是消耗網絡。
Kafka對 cpu的要求不高,主要是用在對消息解壓和壓縮上。因此 cpu 的性能不是在使用 Kafka的首要考慮因素。
咱們要爲 Kafka選擇合適的硬件時,優先考慮存儲,包括存儲的大小,而後考慮生產者的性能(也就是磁盤的吞吐量),選好存儲之後,再來選擇CPU 和內存就容易得多。網絡的選擇要根據業務上的狀況來定,也是很是重要的一環。
本地開發,一臺 Kafka足夠使用。在實際生產中,集羣能夠跨服務器進行負載均衡,再則可使用複製功能來避免單獨故障形成的數據丟失。同時集羣能夠提供高可用性。
要估量如下幾個因素:
須要多少磁盤空間保留數據,和每一個 broker 上有多少空間能夠用。好比,若是一個集羣有 10TB 的數據須要保留,而每一個 broker 能夠存儲 2TB,那麼至少須要 5 個 broker。若是啓用了數據複製,則還須要一倍的空間,那麼這個集羣須要 10 個 broker。
集羣處理請求的能力。若是由於磁盤吞吐量和內存不足形成性能問題,能夠經過擴展 broker 來解決。
很是簡單,只須要兩個參數。第一,配置 zookeeper.connect,第二,爲新增的 broker 設置一個集羣內的惟一性 id。
Kafka中的集羣是能夠動態擴容的。