消息隊列之activeMQhtml
消息隊列之RabbitMQjava
kafka是由scala語言開發的一個多分區,多副本的而且居於zookeeper協調的分佈式的發佈-訂閱消息系統。具備高吞吐、可持久化、可水平擴展、支持流處理等特性;可以支撐海量數據的數據傳遞;而且將消息持久化到磁盤中,並對消息建立了備份保證了數據的安全。kafka在保證了較高的處理速度的同時,又能保證數據處理的低延遲和數據的零丟失。node
kafka的特性:shell
kafka的主要應用場景:數據庫
基本流程:apache
kafka的關鍵角色:bootstrap
安裝kafka的前提是安裝zookeeper以及jdk環境。我這裏安裝的版本是jdk1.8.0_20,kafka_2.11-1.0.0,zookeeper-3.4.14。kafka與jdk的版本必定要對應。我以前用的kafka_2.12_2.3.0,就不行緩存
1.將kafka的文件上傳到home目錄下並解壓縮到/usr/local目錄下安全
root@localhost home]# tar -xvzf kafka_2.11-1.0.0.tgz -C /usr/local
2.進入kafka的config服務器
[root@localhost local]# cd /usr/local/kafka_2.11-1.0.0/config
3.編輯server.properties文件
# 若是是集羣環境中,則每一個broker.id要設置爲不一樣 broker.id=0 # 將下面這一行打開,這至關於kafka對外提供服務的入口 listeners=PLAINTEXT://192.168.189.150:9092 # 日誌存儲位置:log.dirs=/tmp/kafka_logs 改成 log.dirs=/usr/local/kafka_2.11-1.0.0/logs # 修改zookeeper的地址 zookeeper.connect=192.168.189.150:2181 # 修改zookeeper的鏈接超時時長,默認爲6000(可能會超時) zookeeper.connection.timeout.ms=10000
3.啓動zookeeper
由於我是配置的zookeeper集羣,因此須要將三臺zookeeper都啓動。只啓動單臺服務器zookeeper在選舉的時候將不可進行(當整個集羣超過半數機器宕機,zookeeper會認爲集羣處於不可用狀態)
[root@localhost ~]# zkServer.sh start # 查看狀態 [root@localhost ~]# zkServer.sh status
4.啓動kafka
[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh config/server.properties # 也可使用後臺啓動的方式,若是不使用後臺啓動,則在啓動後操做須要新開一個窗口才能操做 [root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh -daemon config/server.properties
5.建立一個主題
# --zookeeper: 指定了kafka所鏈接的zookeeper的服務地址 # --partitions: 指定了分區的個數 # --replication-factor: 指定了副本因子 [root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic charon --partitions 2 --replication-factor 1 Created topic "charon".
6.展現全部的主題(驗證建立的主題是否有問題)
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list charon
7.查看某個主題的詳情
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic charon Topic:charon PartitionCount:2 ReplicationFactor:1 Configs: Topic: charon Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: charon Partition: 1 Leader: 0 Replicas: 0 Isr: 0
8.新開一個窗口啓動消費者接收消息.
--bootstrap-server:指定鏈接kafka集羣的地址,9092是kafka服務的端口。由於個人配置文件中配置的是具體地址,因此須要寫明具體地址。不然會報 [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available.的錯
[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.189.150:9092 --topic charon
9.新開一個窗口啓動生產者產生消息
--bootstrap-server:指定鏈接kafka集羣的地址,9092是kafka服務的端口。由於個人配置文件中配置的是地址。
[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list 192.168.189.150:9092 --topic charon
10.產生消息並消費消息
# 生產者生產消息 >hello charon good evening # 消費者這邊接收到的消息 hello charon good evening
固然上面這種方式,只有在同一個網段才能實現。
kafka生產流程:
1)producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader
2)producer將消息發送給該leader
3)leader將消息寫入本地log
4)followers從leader pull消息,寫入本地log後向leader發送ACK
5)leader收到全部ISR中的replication的ACK後,增長HW(high watermark,最後commit 的offset)並向producer發送ACK
消費組:
kafka消費者是消費組的一部分,當多個消費者造成一個消費組來消費主題的時候,每一個消費者都會收到來自不一樣分區的消息。假如消費者都在同一個消費者組裏面,則是工做-隊列模型。假如消費者在不一樣的消費組裏面,則是發佈-訂閱模型。
當單個消費者沒法跟上數據的生成速度時,就能夠增長更多的消費者來分擔負載,每一個消費者只處理部分分區的消息,從而實現單個應用程序的橫向伸縮。可是千萬不要讓消費者的數量少於分區的數量,由於此時會有多餘的消費者空閒。
當有多個應用程序都須要從kafka獲取消息時,讓每一個應用程序對應一個消費者組,從而使每一個應用程序都能獲取一個或多個topic的所有消息。每一個消費者對應一個線程,若是要在同一個消費者組中運行多個消費者,須要讓每一個消費者運行在本身的線程中。
1.添加依賴:
<!--添加kafka的依賴--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency>
生產者代碼:
package kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * @className: Producer * @description: kafka的生產者 * @author: charon * @create: 2021-01-18 08:52 */ public class Producer { /**topic*/ private static final String topic = "charon"; public static void main(String[] args) { // 配置kafka的屬性 Properties properties = new Properties(); // 設置地址 properties.put("bootstrap.servers","192.168.189.150:9092"); // 設置應答類型,默認值爲0。(0:生產者不會等待kafka的響應;1:kafka的leader會把這條消息寫到本地日誌文件中,但不會等待集羣中其餘機器的成功響應; // -1(all):leader會等待全部的follower同步完成,確保消息不會丟失,除非kafka集羣中的全部機器掛掉,保證可用性) properties.put("acks","all"); // 設置重試次數,大於0,客戶端會在消息發送失敗是從新發送 properties.put("reties",0); // 設置批量大小,當多條消息須要發送到同一個分區時,生產者會嘗試合併網絡請求,提交效率 properties.put("batch.size",10000); // 生產者設置序列化方式,默認爲:org.apache.kafka.common.serialization.StringSerializer properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 建立生產者 KafkaProducer producer = new KafkaProducer(properties); for (int i = 0; i < 5; i++) { String message = "hello,charon message "+ i ; producer.send(new ProducerRecord(topic,message)); System.out.println("生產者發送消息:" + message); } producer.close(); } }
消費者代碼:
package kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * @className: Consumer * @description: kafka的消費者 * @author: charon * @create: 2021-01-18 08:53 */ public class Consumer implements Runnable{ /**topic*/ private static final String topic = "charon"; /**kafka消費者*/ private static KafkaConsumer kafkaConsumer; /**消費消息*/ private static ConsumerRecords<String,String> msgList; public static void main(String[] args) { // 配置kafka的屬性 Properties properties = new Properties(); // 設置地址 properties.put("bootstrap.servers","192.168.189.150:9092"); // 消費者設置反序列化方式 properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer", StringDeserializer.class.getName()); // 設置消費組 properties.put("group.id","test01"); // 設置容許自動提交 properties.put("enable.auto.commit","true"); // 設置自動提交的時間間隔 properties.put("auto.commit.interval.ms","1000"); // 設置鏈接的超時市場 properties.put("session.timeout.ms","30000"); // 建立消費者 kafkaConsumer = new KafkaConsumer(properties); // 指定分區 kafkaConsumer.subscribe(Arrays.asList(topic)); Consumer consumer = new Consumer(); new Thread(consumer).start(); // kafkaConsumer.close(); } @Override public void run() { for (;;){ // 獲取數據的超時1000ms msgList = kafkaConsumer.poll(1000); if(null != msgList && msgList.count() > 0){ for (ConsumerRecord<String,String> consumerRecord: msgList ) { System.out.println("消費者接受到消息,開始消費:" + consumerRecord); System.out.println("topic= "+consumerRecord.topic()+" ,partition= "+consumerRecord.partition()+" ,offset= "+consumerRecord.offset()+" ,value="+consumerRecord.value()+"\n"); } }else{ // 若是沒有接受到數據,則阻塞一段時間 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
kafka不會像activemq那樣須要獲得消費者確認,因此消費者須要追蹤kafka的消息消費到分區中的哪一個位置了,這個位置就叫偏移量。把更新分區當前位置的操做叫作提交。若是消費者發生崩潰或者有新的消費者加入羣組,就會觸發再均衡,完成再均衡以後,每一個消費者可能分配到新的分區上,而不是以前處理的那個,爲了可以繼續以前的工做,消費者須要讀取每一個分區最後一次提交的偏移量,而後從偏移量指定的地方繼續處理。
這樣的話就可能會有如下兩種狀況:
1.提交的偏移量小於客戶端處理的偏移量
若是提交的偏移量小於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息就會被從新處理。
2.提交的偏移量大於客戶端處理的偏移量
若是提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息就會丟失。
kafka的提交方式:
自動提交模式:消費者拉取數據以後自動提交偏移量,不關心後續對消息的處理是否正確。優勢是:消費快,適用於數據一致性弱的業務場景,缺點爲:消息容易丟失或者重複消費。
將enable.auto.commit被設爲 true
手動提交模式:消費者拉取數據以後作業務處理,並且須要業務處理完成纔算真正消費成功。缺點:在broker對提交請求作出迴應以前,應用程序會一直阻塞,會限制應用程序的吞吐量。
將enable.auto.commit被設爲 false;
在消息處理完成後手動調用consumer.commitSync();
異步提交:只須要發送提交請求,無需等待broker的響應
在消息處理完成後手動調用consumer.commitAsync();這個方法也支持回調,在broker做出響應時會執行回調,回調常常被用於記錄提交失敗將錯誤信息和偏移量記錄下來,若是從新提交,則須要注意提交的順序。
在爲消費者分配新的分區或者移除舊的分區時,能夠經過消費者API執行一些應用程序代碼,在調用subscribe(Pattern pattern, ConsumerRebalanceListener listener)時,能夠傳入一個再均衡監聽器。
須要實現的兩個方法:
public void onPartitionRevoked(Collection
在再均衡開始以前和消費者中止讀取消息以後被調用,若是在這裏提交偏移量,下一個接管分區的消費者就知道從哪裏開始讀取了,要注意提交的是最近處理過的偏移量,而不是批次中還在處理的最後一個偏移量。
public void onPartitionAssigned(Collection
在從新分配分區以後和消費者開始夫區消息以前被調用
首先來看看kafka的應答類型:
若是是單機環境中,三者沒有區別。
kafka的消息重複和丟失可能發生在三個階段:
1.生產者階段的緣由爲:生產者發送的消息沒有收到正確的broker的響應,致使生產者重試。
生產者發送一條消息,broker羅盤之後由於網絡等種種緣由,發送端獲得一個發送失敗的響應或者網絡中斷,而後prodcuer收到一個可恢復的exception重試消息致使消息重試。
重試過程:
解決方式:
1.啓動kafka的冪等性。要啓動kafka的冪等性,須要修改配置文件中的:enable.idempotenmce=true,同時要求ack=all且retries>1。若是要提升數據的可靠性,還須要min.insync.replicas這個參數配合,若是ISR的副本數少於min.insync.replicas則會產生異常,緣由:消息被拒絕,同步副本數量少於所需的數量
冪等性的原理:
每一個生產者都有一個PID,服務端回經過PID關聯記錄每一個生產者的狀態,每一個生產者的每一個消息會帶上一個遞增的序列(sequence),服務端會記錄每一個生產者對應的當前的最大的序列(PID+seq),若是新的消息帶上的序列不大於當前的最大的seq就拒絕這條消息,若是消息落盤會同時更新最大的seq,這個時候重發的消息會唄服務器拒掉從而避免了消息重複。
2.設置ack=0,即不須要確認,不重試。但可能會丟失數據,因此適用於吞吐量指標重要性高於數據丟失,例如:日誌收集。
2.生產者和broker階段的緣由:
ack=0,不重試。生產者發送消息後,無論結果如何,若是發送失敗數據也就丟失了。
ack=1,leader宕機(crash)了,生產者發送消息完,只等待leader寫入成功就返回了,leader宕機了,這是follower還沒來得及同步,那麼消息就丟失了。
unclean.leader.election.enable 配置爲true。容許選舉ISR之外的副本做爲leader,會致使數據丟失,默認爲fase(非ISR中的副本不能參與選舉)。
生產者發送完異步消息,只等待leader寫入成功就返回了,leader宕機了,這時ISR中沒有follower,leader從OSR中選舉,由於OSR中原本落後於leader而形成數據丟失。
解決方式:
1.配置:ack=-1,retries>1,unclean.leader.election.enable=false
生產者發送完消息,等待follower同步完在返回,若是異常則重試,這時副本的數量可能影響吞吐量,最大不超過5個,通常三個就夠了。
2.配置:min.insync.replicas > 1
當生產者將ack設置爲all或-1時,min.insync副本指定必須確認寫操做成功的最小副本數量,若是不能知足這個最小值,則生產者將引起一個異常。當一塊兒使用時,min.insync.replicas和ack容許執行更大的持久性保證。
3.失敗的offset單獨記錄
生產者發送消息,回自動重試,遇到不可恢復的異常會拋出,這時能夠捕獲異常記錄到數據庫或緩存中,進行單獨處理。
3.消費階段的緣由:數據消費完沒有及時提交offset到broker。消息消費端在消費過程當中掛掉沒有及時提交offset到broker,另外一個消費者啓動拿到以前記錄的offset開始消費,因爲offset的滯後性可能會致使新啓動的客戶端有少許重複消費。
解決方式:
1.取消自動提交,每次消費完或者程序退出時手動提交,這也沒有辦法保證不會有重複。
2.作冪等性,儘可能讓下游作冪等或者儘可能每消費一條消息都記錄offset。對於少書嚴格的場景可能須要吧offset或惟一ID和下游狀態更新放在同一個數據庫裏作事務來保證精確的一次更新或者在下游數據庫表裏同時記錄消費的offset。而後更新數據的時候用消費位點作樂觀鎖拒絕掉舊的位點的數據更新。
參考文章:
https://www.cnblogs.com/qingyunzong/p/9004509.html
https://www.cnblogs.com/frankdeng/p/9310704.html