JMS (Java Message Service) 對消息的發送和接收定義了兩種模式:java
點對點模式:消息的生產和消費者均只有一個,消息由生產者將消息發送到消息隊列(queue)中,而後消息消費者從隊列中取出消息進行消費,消息被取出後,queue中再也不保存該消息。正則表達式
發佈訂閱模式:消息的生產和消費者可能有多個,使用主題(Topic)來對消息進行分類,生產者將消息發送到主題,多個消費者都可以對這個主題進行消費。相似於對多個消費者作廣播。apache
常見的消息中間件Active MQ, Rabbit MQ , Kafka中,只有Active 徹底實現了上述JMS的規範,Kafka則經過消費組和主題分區的方式讓發佈訂閱模型同時也具備了點對點模式的消息收發能力。事實上沒有徹底按上述JMS規範設計Rabbit MQ,和Kafka反而更優秀,其中Kafka在徹底按照分佈式的思想來設計的,在大數據和高可用上有着自然優點。bootstrap
使用Kafka做爲消息中間件,咱們須要涉及到包括 Kafka集羣, 分佈式協調中心(Zookeeper), 生產者, 消費者 在內的四個部分對象。它們協同工做,讓消息高吞吐高可靠的存儲和流通。以下圖ubuntu
左圖簡單來說就是,消息生產者在Kafka集羣上訂閱主題後,能夠併發的向集羣發送消息,Kafka集羣接受到消息會按機制將消息存在不一樣的分區,存哪一個分區能夠由生產者指定,若是生產者未指定則按key來hash或者採用round robin的方式保存保存。服務器
中間的圖是一個左右兩圖整體歸納。網絡
右圖來自kafka官網,旨在說明kafka的消費都是以消費組的方式來消費,即便不指定也會默認建立一個消費組,不一樣的消費組對同一個主題的消費相互獨立,同一消費組內不一樣消費者不能重複消費某一分區,兩種極端的狀況就是:架構
若消費組內消費者數量和分區數量相同,則每一個消費者各自消費一個分區,一個分區一個消費者併發
若消費組內只有一個消費者,則該消費者須要消費全部分區,由於主題的完整消息時各分區消息的總和app
假如主題分區數爲 N,消費組內消費者數量爲 M,且M > N ,能夠確定是組內有 M - N 個消費者沒法消費主題。
消息傳輸:即用做消息中間件
行爲日誌跟蹤:
Kafka 最先就是用於重建用戶行爲數據追蹤系統的。不少網站上的用戶操做都會以消息的形式發送到Kafka 的某個對應的topic 上。這些點擊流蘊含了巨大的商業價值, 事實上,目前就有不少創業公司使用機器學習或其餘實時處理框架來幫助收集並分析用戶的點擊流數據。鑑於這種點擊流數據量是很大的, Kafka 超強的吞吐量特性此時就有了用武之地
審計數據收集:
不少企業和組織都須要對關鍵的操做和運維進行監控和審計。這就須要從各個運維應用程序處實時彙總操做步驟信息進行集中式管理。在這種使用場景下,你會發現Kafka 是很是適合的解決方案,它能夠便捷地對多路消息進行實時收集,同時因爲其持久化的特性,使得後續離線審計成爲可能。
日誌收集:
這多是Kafka 最多見的使用方式了一一日誌收集彙總解決方案。每一個企業都會產生大量的服務日誌,這些日誌分散在不一樣的機器上。咱們可使用Kafka 對它們進行全量收集,井集中送往下游的分佈式存儲中(好比HDF S 等) 。比起其餘主流的日誌抽取框架Kafka 有更好的性能,並且提供了完備的可靠性解決方案,同時還保持了低延時的特色。
流處理:
不少用戶接觸到Kafka 都是由於它的消息隊列功能。自0.10.0.0 版本開始, Kafka 社區推出了一個全新的流式處理組件Kafka Streams 。這標誌着Kafka 正式進入流式處理框架俱樂部。相比老牌流式處理框架Apache Storm 、Apache Samza,或是最近風頭正勁的Spark Strearming,抑或是Apache Flink, Kafka Streams 的競爭力如何?讓咱們拭目以待。
broker: Kafka把服務器的物理機稱爲 broker
topic: 發佈訂閱的消息模式中對消息的分類, 對應某個業務需求的消息。
partition: kakfa在保存主題消息數據時對主題的劃分,每一個partition分別保存主題的一部分數據,全部分區的數據的總和就是主題的完整消息。
leader & follower: 至關於 master 和 slaver的關係,分別表明分佈式系統中的主節點和從節點。當主題的分區有多個副本(replication)時,有且僅有一個replication當選leader,其它的均爲follower, follower的數據的直接來源是leader而不是生產者。
replication:分區的備份,當leader節點掛了後, 從replica中選舉出新的leader。Kafka中消息的讀寫都是分區的leader完成的,replica 只經過向leader fench數據保存備份並在leader宕機後重新當選leader,來保證高可用性。
offset:生產者和消費者在寫和讀數據的時候,對消息寫讀進度的記錄。Kafka服務器將消息數據保存在磁盤log文件上,採用對磁盤的append順序寫讀的方式,offset至關於順序寫讀的偏移量
消費組:消費者使用一個消費者組名(即group.id )來標記本身, topic 的每條消息都只會被髮送到每一個訂閱它的消費者組的一個消費者實例上。kafka默認全部消費都使用消費組來消費。
ISR: ISR 的全稱是in-sync replica,翻譯過來就是與leader replica 保持同步的replica 集合,只有這個集合中的replica 才能被選舉爲leader,也只有該集合中全部replica 都接收到了同一條消息, Kafka 纔會將該消息置於「己提交」狀態。
生產者在鏈接kafka服務器的時候通常都會指定以下參數, 經過以下參數的設定來建立KafkaProducer對象,而後使用該producer對象來發送消息。
1 props.put("bootstrap.servers", "10.118.65.203:9092"); 2 props.put("acks", "all"); 3 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 4 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
其中當 bootstrap.servers 參數用來指定鏈接服務器的 地址與端口號, 一般kafka服務器會有多個broker, 該參數只須要指定其中的一個或者幾個便可,鏈接上kafka的任意broker以後能夠在zookeeper中的 /brokers/ids/ 下找到全部的id 以及 id對應的主機地址及端口號。
生產者鏈接上broker以後, 可以獲得全部的broker 的id 及地址端口, 但一個生產者默認狀況下只能寫該 topic 下的一個partition,這時若是生產者在發送的 ProduceRecord 中指定了消息的 key, kafka會更具該key 來自行計算該寫入的partition編號。若生產者在創建鏈接後發送消息時未指定消息的key 值,能夠經過自定義實現Partitioner接口的自定義類來制定寫partion編號的規則。而後只須要在鏈接broker-list 時指定一個"partitioner.class"參數,該參數傳自定義類的全路徑名,類中覆蓋接口的partition方法便可.一種分區策略以下:
1 @Override 2 public int partition(String topic, Object keyObj, byte[] keyBytes, 3 Object value, byte[] valueBytes, Cluster cluster) { 4 String key = (String) keyObj; 5 List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic); 6 7 int partitionCount = partitionInfos.size(); 8 int myPartition = (1 == partitionCount) ? partitionCount : partitionCount - 1; 9 boolean condition = (key == null || key.isEmpty() || !key.contains("my")); 10 return condition ? random.nextInt(partitionCount - 1): myPartition; 11 }
若生產者在創建鏈接時並未指定 partitioner.class 發消息時候也沒有指定key, 這時默認狀況下kafka會以round robin的機制選擇該topic下的分區。
消費者客戶端在鏈接服務器建立consumer對象時,一般須要設置如下四個參數:
1 props.put("bootstrap.servers", "10.118.65.203:9092"); 2 props.put("group.id", "test"); 3 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 4 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
以上參數是沒有默認值的,須要用戶自行指定。其中key,value 的解序列化類要求與生產者指定的序列化類對應。若是消費者不指定groupId,Kafka會自動的爲該消費者實例生成一個groupId。
消費者客戶端在消費消息時會維護一個offset, 該offset就是當前消費者消費到分組-主題-分區下的什麼位置的記錄。例如當消費者A在消費完第N條消息後,自動或者手動的,消費者A會向kafka服務器提交一次位移,(注意這裏是N,由於offset從0開始計數,屬於第N+1條消息了),該offset會提交到log.dirs指定的路徑下中的某一個__consumer_offsets中(以下圖),這裏的__consumer_offsets 其實也是kafka本身建立的一個主題,__consumer_offsets-n 路徑裏面保存的也是index. log 文件。默認狀況下 kafka爲該__consumer_offsets建立了50個分區。用來保存多個主題,多個分區,以及多個組的場景下的消費者位移。以下圖
kafka服務器在__consumer_offsets 主題下,實際保存的是消費者提交過來的offset的鍵值對,其中key是 group.id + topic + 分區號, value 爲offset的實際取值。每當更新一個key的最新的offest時,該topic就會寫入一條含有最新offset的消息,同時kafka也會按期的對topic作清理,即爲每一個消息key只保存含有最新offset。這樣每次消費者在讀取消息以前會先讀取本身的offset,而後再根據offset的值來讀取訂閱主題的topic消息,即便在消費者服務器啓動時沒有指定offset的值也能自動的從上一次消費的地方開始消費。
Kafka採用將每一個分區的消息數據寫入磁盤文件的方式來存儲, 在config/server.properties 文件中log.dir 指定的路徑下,咱們能夠找到 [topic名-分區ID]格式的路徑,選擇任意一個路徑進入能夠看到以下文件列表:
由圖能夠看到四個文件,其中一個log文件, 兩個index文件 和 一個epoch 文件;其中的log文件就是用來記錄消息數據的,兩個index文件用來對log文件的中的數據創建索引,方便消費者快速讀取到須要消費的數據。
Kafka 消息集格式
此外有消息集的格式能夠看出,消息集的實際長度 = 61 + 消息長度 。所以咱們能夠簡單的驗證一下消息的數據存儲是否符合上述描述。
# 建立secondTopic 主題, 設定2個分區
bin/kafka-topics.sh --create --topic secondTopic --zookeeper localhost:2181 --partitions 2 --replication-factor 1
生產者向secondTopic 發送消息前的狀態:
開啓控制檯生產者,發送字符串 "1234567" , 以後再發送 "hello" ,獲得兩個分區的log文件大小截圖以下:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic secondTopic
從發送時間前後來看,顯然第一次發的"1234567" 保存在了分區0,第二次發的保存在分區1,第二次比第一次少了2個字節。根據上面分析的消息集大小計算方式可得"1234567" 保存在剛建立的消息集中的大小爲 = 消息體大小 + 61。
消息體大小 = 1(屬性) + 1(時間戳增量) + 1(位移增量) + 1(key 長度) + 1(value 長度) + 7(value內容) + 1(header個數) +1(消息總字節數,須要計算才能肯定字節數) = 14 , 所以計算的理論消息集的大小就是 14 + 61 = 75. 能夠看到與實際存入log文件字節數一致。
事實上採用消息集在消息併發量較大時能夠有效節省消息存儲空間,而且爲消息的查詢帶來便利。
kafka環境的搭建十分簡單,只須要簡單的配置便可讓服務運行起來;能夠分兩步
1. zookeeper 環境搭建:
① zookeeper下載:https://www.apache.org/dyn/closer.cgi/zookeeper/(鏡像地址)
② zk下載後分別保存到 /opt/bigdata/zookeeper 路徑下,解壓後修改zookeeper 配置文件 zoo_sample.cfg 重命名爲 zoo.cfg
③ 編輯zoo.cfg 文件, 加入如下配置
dataDir=/tmp/data/zookeeper server.1=ubuntu:2888:3888 server.2=ubuntu2:2888:3888 server.3=ubuntu3:2888:3888
在三臺服務上的上述 dataDir 路徑分別保存一個myid文件,文件中分別保存上述配置中主機名對應前面的server的ID即(1,2,3); 而後分別在三臺服務器上啓動zookeeper。
2. Kafka 環境搭建
① 下載地址:http://kafka.apache.org/downloads 選擇 kafka_2.12-1.0.2 版本 (下劃線後面的2.12爲 scala 語言的版本, 1.0.2 爲kafka版本)
② 修改kafka 解壓路徑下 config/server.properties 文件
zookeeper.connect=ubuntu:2181,ubuntu2:2181,ubuntu3:2181
③ 至此就能夠啓動kafka服務器了
./kafka-server-start.sh -daemon ../config/server.properties
④ 指令指定了執行守護進程,所以啓動成功後看不到任何結果, 能夠查看9092端口號是否在監聽,或者適應jps指令查看是否有kafka服務;接下來就是建立kafka主題了(默認副本數不能大於broker數)
bin/kafka-topics.sh --create --topic secondTopic --zookeeper ubuntu:2181 --partitions 2 --replication-factor 1
⑤ 啓動生產者向該主題寫消息, (控制檯生產者只能發送消息的value ,沒法發送key)
bin/kafka-console-producer.sh --broker-list ubuntu:9092 --topic secondTopic
⑥ 啓動消費者消費消息
bin/kafka-console-consumer.sh --topic secondTopic --bootstrap-server ubuntu:9092 --from-beginning
其中控制檯消費組啓動的指令中的參數 --bootstrap-server 在老版本中使用的是 --zookeeper host:post, 可是自從消費組位移offset信息再也不保存到zookeeper以後,消費者不用再鏈接zookeeper,而改成直接鏈接kafka集羣。
下面介紹java 工程鏈接Kafka服務器實現生產與消費的簡單實現。
客戶端鏈接Kafka以及Zookeeper 實現生產者的發送以及消費者的拉取消費,須要引入以下Maven依賴:
1 <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client --> 2 <dependency> 3 <groupId>org.apache.curator</groupId> 4 <artifactId>curator-client</artifactId> 5 <version>4.0.1</version> 6 </dependency> 7 <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --> 8 <dependency> 9 <groupId>org.apache.curator</groupId> 10 <artifactId>curator-framework</artifactId> 11 <version>4.0.1</version> 12 </dependency> 13 <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> 14 <dependency> 15 <groupId>org.apache.curator</groupId> 16 <artifactId>curator-recipes</artifactId> 17 <version>4.0.1</version> 18 </dependency> 19 20 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> 21 <dependency> 22 <groupId>org.apache.kafka</groupId> 23 <artifactId>kafka_2.12</artifactId> 24 <version>${kafka.version}</version> 25 </dependency> 26 27 <!-- https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-mapper-asl --> 28 <dependency> 29 <groupId>org.codehaus.jackson</groupId> 30 <artifactId>jackson-mapper-asl</artifactId> 31 <version>1.9.13</version> 32 </dependency>
生產者端高級API 實現:
1 static private final String TOPIC = "firstTopic"; 2 static private final String BROKER_LIST = "192.168.0.102:9092"; 3 .... 4 5 Properties props = new Properties(); 6 props.put("bootstrap.servers",BROKER_LIST); 7 props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 8 props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 9 10 // acks 指定了 partition 中leader broker 在接收到producer 的消息後必須寫入的 副本數; acks 一般可能的取值有 0,1,all(-1) 11 // acks = 0 則表示producer 徹底不理睬 leader broker 的處理結果, 在發送完一條消息後不等待leader broker 的返回結果就開始下一次發送 12 // 因爲不等待發送結果得 一般這種方式能夠有效提升producer的吞吐率;同時若是發送失敗了 producer是不知道的 13 // acks = 1 表示設置 leader broker 在接收到producer 的消息並將消息寫入本地日誌,就能夠發送響應結果給producer 14 // 而無需等待其它ISR中的副本,這樣只要leader broker 一直存活,kafka 就可以保證這一條消息不丟失 15 // acks = -1(all) 表示 leader broker 在接收到producer 的消息以後 不經須要將記錄寫入本地日誌,同時還要將記錄寫入ISR中全部的其它成員 16 // 纔會向 producer發送響應結果; 這樣只要ISR中存在一個存活的副本,消息記錄就不會丟失; 當副本數較多的 producer的吞吐量將變得較低 17 props.put("acks","1"); 18 // 因爲網絡抖動或者leader選舉等緣由, producer 發送的消息可能會失敗,能夠在properties 參數中設置producer的重發次數 19 // retries = 0 表示不作重發; producer 認爲的發送失敗 有可能並非真正的發送失敗,而是在broker提交後發送響應給producer producer因爲某種緣由 20 // 沒有成功接收到, 這將致使producer 向broker 發送重複的消息,所以retries > 0 時須要consumer在消費時對消息採起去重處理 21 props.put("retries","0"); 22 // producer 將發往同一分區的多條消息封裝進一個batch 中,當batch 滿了的時候,producer 會發送batch中的全部消息 23 // 能夠經過 配置batch.size 來設置 batch 容量的大小; batch 過大佔用過多內存,batch 太小 24 props.put("batch.size","323840"); 25 // producer 在向broker發送消息時若是是等到 batch已經滿了再發送 有可能由於 producer的吞吐量比較小,batch須要等較長時間才能滿 26 // 這個時候若是等待就會話較長時間, linger.ms 參數就是用來設置這種消息發送延時的行爲的,linger 設置的較大會讓生產者發送消息的延時變大 27 // linger 設置的較小會讓生產者發送消息的吞吐量變小, 吞吐量和延時之間存在矛盾 須要權衡設置 28 props.put("linger.ms",2); 29 // buffer.memory 指定producer 用戶緩衝消息的內存大小, 30 props.put("buffer.memory",33554432); 31 // 設置單條消息最大大小 32 props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1024*1024); 33 // 設置請求超時時間,producer 向 broker發送消息後 等待時長,若是超過這個時長 producer就會認爲響應超時了 34 props.put("max.block.ms",3000); 35 36 // 指定使用 topic 下的哪個分區 37 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.partitioner.MyPartitioner"); 38 Producer<String,String> producer = new KafkaProducer<>(props); 39 40 // 使用 producer 發送後的回調函數 作後續處理 41 42 // 測試 對topic設定partition 43 ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC,"my non-test","partition setting"); 44 producer.send(record); 45 46 producer.close();
消費者高級API實現:
1 private static final String topicName = "firstTopic"; 2 private static final String groupId = "group1"; 3 .... 4 5 Properties props = new Properties(); 6 // server, group.id, key.deserializer, value.deserializer四個參數無默認值,必須配置 7 // 注意這裏 服務器地址配置的 主機名:端口號, 須要在研發環境修改hosts 文件 8 props.put("bootstrap.servers","ubuntu1:9092"); 9 props.put("group.id",groupId); 10 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 11 props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 12 // 是否容許consumer 位移自動提交 13 props.put("enable.auto.commit","true"); 14 // consumer 位移自動提交時間間隔 15 props.put("auto.commit.interval.ms","1000"); 16 // auto.offset.reset 設置爲 earliest 指定從最先的位移開始消費,可是若是以前有位移提交,則啓動時從位移提交處開始消費 17 // auto.offset.reset 一般還能夠設置爲 latest, 設置爲latest 指的從最新處位移開始消費 18 props.put("auto.offset.reset","earliest"); 19 20 KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props); 21 consumer.subscribe(Arrays.asList(topicName)); 22 23 try { 24 while(true){ 25 ConsumerRecords<String,String> records = consumer.poll(2000); 26 for(ConsumerRecord<String,String> record : records){ 27 System.out.printf("訂閱消息 offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value()); 28 } 29 } 30 } catch (Exception e) { 31 e.printStackTrace(); 32 } finally { 33 consumer.close(); 34 }
以上生產者與消費者端的實現雖然簡單,可是在不少業務場景下是不知足需求的,須要咱們使用更多定製化的開發,譬如生產者如何設定分區規則,消費何時提交位移,這些後續文章再作進一步研究。