消息生產者生產消息發送到queue中,而後消息消費者從queue中取出並消費消息算法
注意:數據庫 1.消息被消費之後,queue中再也不有存儲,因此消息消費者不可能消費到已經被消費的消息數組 2.Queue支持存在多個消費者,可是對一個消息而言,只會有一個消費者能夠消費緩存 |
消息生產者(發佈)將消息發佈到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不一樣,發佈到topic的消息會被全部訂閱者消費服務器 |
支持的協議多,很是重量級消息隊列,對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持網絡 |
號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景,擅長高級/複雜的隊列,可是技術也複雜,而且只提供非持久性隊列數據結構 |
Apache下的一個子項,相似ZeroMQ,可以以代理人和點對點的技術實現隊列架構 |
一個key-value的NoSQL數據庫,但也支持MQ功能,數據量較小,性能優於RabbitMQ,數據超過10K就慢的沒法忍受併發 |
Kafka是分佈式發佈-訂閱消息系統。它最初由Linkedln公司開發,使用Scala語言編寫,以後成爲Apache項目的一部分。Kafka是一個分佈式的,可劃分的(分區處理),多訂閱者,冗餘備份的持久性的日誌服務。它主要用於處理活躍的流式數據app |
1.同時爲發佈和訂閱提供高吞吐量。據瞭解,Kafka每秒能夠生產約25萬消息(50M),每秒處理55萬消息(110M)
2.可進行持久化操做。將消息持久化到磁盤,所以可用於批量消費,例如ETL,以及實時應用程序。經過將數據持久化到硬盤以及replication防止數據丟失
3.分佈式系統,易於向外擴展。全部的producer、broker和consumer都會有多個,均爲分佈式。無需停機便可擴展機器
4.消息被處理的狀態是在consumer端維護,而不是由server端維護
5.支持online(上線)和offline(下線)的場景 |
重要說明: 1.在Kafka的體系中不存在單讀的Conmuser,它會存在一個Conmuser Group,Conmuser Group裏面會有多個Conmuser
2.能夠把Consumer Group當作一個虛擬的Consumer,它消費的是一個具體的Topic的數據,但具體執行是由Consumer Group中的Consumer去執行的,Consumer是一個邏輯上的概念,是不存在的,而存在的是Consumer Group當中的Consumer, 一個Consumer Group對應的是Topic,Consumer Group中的Consumer對應的是Topic中的partition
3.一個消費者組裏面的多個消費者對應的是什麼呢? Topic組裏面不一樣Partition的數據,一個Partition裏面的數據交給一個Consumer來處理,另外一個Partition裏面的數據交給另外一個Consumer來處理,固然它們必須是同一個Consumer Group裏面的Consumer,這就達到了並行的消費(每個Consumer對應的是一個Partition裏面的數據)
4.Kafka爲何會有Partition的概念? 帶來的好處就是處理的速度更快,不一樣的Conmuser去消費不一樣Partition的消息,數據的消費就變成了並行的 |
特指消息的生產者 |
特指消息的消費者 |
消費者組,能夠並行消費Topic中Partition的消息 |
緩存代理,Kafka集羣中的一臺或多臺服務器統稱爲broker
1.message在broker中經過log追加的方式進行持久化存儲。並進行分區(patitions)
2.爲了減小磁盤寫入的次數,broker會將消息展現buffer起來,當消息的個數(或尺寸)達到必定閥值時,再flush到磁盤,這樣減小了磁盤IO調用的次數
3.Broker沒有副本機制,一旦broker宕機,該broker的消息將不可用(可是消息是有副本的,能夠把消息的副本同步到其它的broker中)
4.Broker不保存訂閱者的狀態,由訂閱者本身保存
5.無狀態致使消息的刪除成爲難題(可能刪除的消息正在被訂閱)Kafka採用基於時間的SLA(服務水平保證),消息保存必定的時間(一般爲7天)後會被刪除
6.消息訂閱者能夠rewind back到任意位置從新進行消費,當訂閱者故障時,能夠選擇最小的offset(id)進行從新讀取消費消息 |
特指Kafka處理的消息源(feeds of messages)的不一樣分類 |
1.Topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)
2.Kafka的Partitions分區的目的 2.1 kafka基於文件存儲,經過分區,能夠將日誌內容分線到多個server上,來避免文件尺寸達到單擊磁盤的上線,每一個partition都會被當前server(kafka實例)保存
2.2 能夠將一個topic切分任意多個partitions來提升消息保存/消費的效率
2.3 越多的partitions意味着能夠容納更多的consumer,有效提高併發消費的能力 |
1.消息,是通訊的基本單位,每一個producer能夠向一個topic(主題)發佈一些消息
2.Kafka中的Message是以topic爲基本單位組織的,不一樣的topic之間是相互獨立的。每一個topic有能夠分紅幾個不一樣的partition(每一個topic有幾個partition是在建立topic時指定的)每一個partition存儲一部分Message
3.partition中的每條Message包含了一下三個屬性 屬性名稱 數據類型 offset long MessageSize int32 data mssage的具體內容 |
消息和數據生產者,向kafka的一個topic發佈消息的過程叫作producers
1.producer將消息發佈到指定的topic中,同時producer也能決定將此消息歸屬於哪一個partition。好比基於「round-robin」方式或者經過其餘的一些算法等
2.異步發送:批量發送能夠頗有效的提升發送效率。kafka producer的異步發送模式容許進行批量發送,先將消息緩存在內存中,而後一次請求批量發送出去 |
1.消息和數據消費者,訂閱topics並處理其發佈的消息的過程叫作consumers
2.在Kafka中,咱們能夠認爲一個group是一個「訂閱者」,一個Topic中的每一個partition只會被一個「訂閱者」中的一個consumer消費,不過一個consumer能夠消費多個partition中的消息(消費者數據小於Partition的數量時)
3.注意:kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,不然將意味着某些consumer將沒法獲得消息 |
1.發現線性的訪問磁盤,不少時候比隨機的內存訪問快的多
2.傳統的使用內存做爲磁盤的緩存
3.Kafka直接將數據寫入到日誌文件
|
1.寫操做:經過將數據追加到文件中實現
2.讀操做:讀的時候從文件讀就行了 |
1.讀操做不會阻塞寫稻做和其它操做,數據大小不對性能產生影響
2.沒有容量限制(相對於內存來講)的硬盤空間創建消息系統
3.線性訪問磁盤,速度快,能夠保存任意一段時間 |
1.一個Tipic能夠認爲是一個類消息,每一個topic將被分紅多個partition,每一個partition在存儲層面是append log文件。任何發佈到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱爲offset(偏移量),partition是以文件的形式存儲在文件系統中
2.Logs文件根據broker中的配置要求,保留必定時間後刪除來釋放磁盤空間(默認是7天)
說明:Partition是Topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。Partition中每條消息都會被分配一個有序的id(offset) |
爲數據文件創建索引:稀疏存儲,每隔必定字節的數據創建一條索引。下圖是一個partition的索引示意圖
注意: 1. 如今對一、三、六、8 創建了索引,若是要查找7,則會先查找到8而後,再找到8後的一個索引6,而後兩個索引之間作二分法,找到7的位置
2. 日誌文件也會進行segement(分割),分而治之 |
注意: 1.當生產者將消息發送到Kafka後,就會去馬上通知ZooKeeper,zookeeper中會watch到相關的動做,當watch到相關的數據變化後,會通知消費者去消費消息
2.消費者是主動去Pull(拉)kafka中的消息,這樣能夠下降Broker的壓力,由於Broker中的消息是無狀態的,Broker也不知道哪一個消息是能夠消費的
3.當消費者消費了一條消息後,也必需要去通知ZooKeeper。zookeeper會記錄下消費的數據,這樣但系統出現問題後就能夠還原,能夠知道哪些消息已經被消費了 |
說明: 1.Name Server集羣指的是Zookeeper集羣 |
1.Kafka的通信協議主要說的是,consumer去拉數據使用的通信協議
2.Kafka的Producer、Broker和Consumer採用的是一套自行設計基於TCP層的協議,根據業務需求定製,而非實現一套相似於Protocol Buffer的通信協議
3.基本數據類型 3.1定長數據類型:int8,int16,int32和int64,對應到Java中就是byte,short,int和long 3.2變長數據類型:bytes和string。變長的數據類型由兩部分組成,分別是一個有符號整數N(標識內容的長度)和N個字節的內容。其中N爲-1標識內容爲null。Bytes的長度由int32標識,string的長度由int16表示 3.3數組:數組由兩個部分組成,分別是一個有int32類型的數字標識的數組長度N和N個元素 |
1.Kafka通信的基本單位是Request/Response
2.基本結構: RequestOrResponse ---> MessageSize(RequestMessage | ResponseMessage)
3.通信過程: 3.1客戶端打開與服務端的Socket 3.2往Socket寫入一個int32的數字(數字標識此次發送的Request有多少字節) 3.3服務器端先讀出一個int32的整數從而獲取此次Request的大小 3.4而後讀取對應字節數的數據從而獲得Request的具體內容 3.5服務器端處理了請求以後也用一樣的發送發誓來發送響應
4.RequestMessage結構 4.1RequestMessage ---> ApiKey ApiVersion CorrelationId ClientId Request
5.ResponseMessage 5.1ResponseMessage ---> CorrelationId Response
Kafka採用是經典的Reactor(同步IO)模式,也就是1個Acceptor響應客戶端的鏈接請求,N個Processor來讀取數據,這種模式能夠構建出高性能的服務器
6.Message:Producer生產的消息,鍵-值對 6.1Message --- > Crc MagicByte Attributes Key Value
說明: CRC是一種消息檢驗方式,在Consumer拿到數據之後,CRC會獲取MessageSize和MessageData的大小作比較,若是不一致則,那麼這個操做的數據Consumer就不接收了,若是一直則才作處理。防止消息在傳輸過程當中損壞,丟失的一種校驗方式
7.MessageSet:用來組合多條Message,它在每條Message的基礎上加上offset和MessageSize 7.1MessageSet --> [offset MessageSize Message]
8.Request/Response和Message/messageSet的關係 8.1 Request/Response是通信層的結構,和網絡的7層模型對比的話,它相似於TCP 8.2 Message/MessageSet定義的是業務層的結構,相似於網絡7層模型中的HTTP層。Message/MessageSet只是Request/Response的payload中的一種數據結構 備註:Kafka的通信協議中不包含Schema,格式也比較簡單,這樣設計的好處是協議自身的Overhead小,再加上把多條Message放到一期作壓縮,提升壓縮比率,從而在網絡上傳輸的數據量會少一些 |
1.at most once:最多一次,這個和JMS中「非持久化」消息相似,發送一次,不管成敗,將不會重發 消費者fetch(獲得)消息,而後保存offset,而後處理消息; 當client保存offset以後,可是在消息處理過程當中出現了異常,致使部分消息未能繼續處理,那麼伺候「未處理」的消息將不能被fetch到,這就是「at most once」
2.at least once:消息至少發送一次,若是消息未能接收成功,可能會重發,知道接收成功 消費者fetch消息,而後處理消息,而後保存offset,若是消息處理成功以後,可是在保存offset階段zookeeper異常致使保存操做未能執行成功,這就致使接下來再次fetch時可能得到上次已經處理過的消息,這就是「at least once」,緣由offset沒有即便的提交給zookeeper,zookeeper恢復正常仍是以前offset狀態。 注:一般狀況下「at least once」是咱們的首選(相比at most once而言,重複接收數據總比丟失數據要好)
3.exactly once:消息只會發送一次 Kafka中並無嚴格的去實現(基於2階段提交,事務),咱們認爲這種策略在kafka中是沒有必要的 |
1.下載並上傳kafka到服務器
2.解壓縮並移動到/usr/local目錄下
3.啓動服務 3.1啓動zookeeper服務 # ./zookeeper-server-start.sh ../config/zookeeper.properties > /dev/null 2>&1 &
3.2啓動kafka服務 # ./kafka-server-start.sh ../config/server.properties > /dev/null 2>&1 &
3.3建立topic: ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
3.4查看主題 ./kafka-topics.sh --list --zookeeper localhost:2181
3.5查看主題詳情 ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
3.6刪除主題 ./kafka-run-class.sh kafka.admin.TopicCommand --delete --topic test --zookeeper 192.168.31.220:2181 |
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1 |
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning |
生產者參數查看:./kafka-console-producer.sh 消費者參數查看:./kafka-console-consumer.sh |