插曲:Kafka的生產者案例和消費者原理解析

前言

······java

1、Kafka的Producer小案例

假設咱們如今有一個電商系統,凡是能登陸系統的用戶都是會員,會員的價值體如今,消費了多少錢,就會累計相應的積分。積分能夠兌換禮品包,優惠券···等等。數據庫

又到了咱們的畫圖時間👌。首先咱們得先來一個訂單系統,那這個訂單系統中確定就會有數據日誌產生,它如今就是把這些日誌寫到Kafka裏面,日誌咱們使用json的方式記錄。圖中的statement表示訂單狀態,此時是已支付。apache

此時擔任咱們消費者的確定就是會員系統了,它要對這個id爲1的會員進行積分累計。固然必需要考慮到的狀況是,這個會員有可能也會進行退款操做,那相應的積分也會減小。statement此時爲cancel取消json

咱們上一講中的設置參數中,提到咱們能夠給每個消息設置一個key,也能夠不指定,這個key跟咱們要把這個消息發送到哪一個主題的哪一個分區是有關係的。好比咱們如今有一個主題叫 tellYourDream,主題下面有兩個分區,兩個分區分別存在兩個副本(此時咱們不關注follower,由於它的數據是同步leader的)bootstrap

Topic:tellYourDream
    p0:leader partition <- follower partition
    p1:leader partition <- follower partition
複製代碼

若是是不指定key的時候,發送的一條消息會以輪詢的方式發送到分區裏面。也就是好比說,我第一條消息是one,那這個one就發送到了p0裏面,第二條是two,就發送到了p1裏面,以後的three就是p0,four就是p1···依次類推。數組

若是指定key,好比個人key爲message1,Kafka就會取得這個key的hash值,取到的數字再對咱們的分區數取模,而後根據取模的值來決定哪一個分區(例如咱們如今是p0,p1兩個分區,取模的值就只會是0,1),取模爲0,就發送到p0,取模爲1,就發送到p1,這樣的作法能夠保證key相同的消息必定會被髮送到同一個分區(也可使用這個特性來規定某些消息必定會發送到指定的分區)。這個作法和MapReduce的shuffle是否是又相似了,因此這些大數據框架,真的互通點不少。網絡

對於咱們剛剛提到的會員系統,若是此時用戶下單時的消息發送到了p0,而退款的消息發送到了p1,不免有時會發生消費者先消費到p1中的消息的狀況,此時用戶的積分尚未增長,就已經扣除1000了,顯示就會出現問題。因此爲了保證同一個用戶的消息發送到同一個分區中,咱們須要將其指定key。session

代碼部分

由於在 Kafka的生產者原理及重要參數說明 中咱們已經把下面的prop.put的全部配置都已經解釋過了,因此此次就直接ctrl+c,ctrl+v上來。其實就是把那時候的建立生產者的代碼抽取出來成爲一個createProducer()方法而已。併發

public class OrderProducer {
	public static KafkaProducer<String, String> createProducer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");  
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("buffer.memory", 33554432);
		props.put("compression.type", "lz4");
		props.put("batch.size", 32768);
		props.put("linger.ms", 100);
		props.put("retries", 10);//5 10
		props.put("retry.backoff.ms", 300);
		props.put("request.required.acks", "1");
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
		return producer;
	}
複製代碼

這裏就是一段生產JSON格式的消息代碼而已,也抽取成一個方法。負載均衡

public static JSONObject createRecord() {
		JSONObject order=new JSONObject();
		order.put("userId", 12344);
		order.put("amount", 100.0);
		order.put("statement", "pay");
		return order;
	}
複製代碼

這裏就是直接建立生產者和消息,此時key使用userId或者訂單id都行,問題不大。

public static void main(String[] args) throws Exception {
        KafkaProducer<String, String> producer = createProducer();
        JSONObject order=createRecord();
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "tellYourDream",order.getString("userId") ,order.toString());
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception == null) {
                    System.out.println("消息發送成功");  
                } else {
                    //進行處理
                }
            }
        });
        Thread.sleep(10000); 
        producer.close();
        }
    }
複製代碼

此時若是進行太重試機制後,消息還存在異常的話,公司比較嚴謹的項目都會有備用鏈路,好比把數據存到MySQL,Redis···等來保證消息不會丟失。

補充:自定義分區(可自行了解)

由於其實Kafka自身提供的機制已經基本知足生產環境中的使用了,因此這塊就不展開詳細的說明了。此外還有自定義序列化,自定義攔截器,這些在工做當中使用得頻率不高,若是用到大概能夠進行百度自行學習。

例如,通話記錄中,給客服打電話的記錄要存到一個分區中,其他的記錄均分的分佈到剩餘的分區中。咱們就這個案例來進行演示,要自定義的狀況就要實現Partition接口,而後實現3個方法,說是實現3個,其實主要也就實現partition()這個方法而已。

package com.bonc.rdpe.kafka110.partitioner;
import java.util.List;import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

/**
* @Title PhonenumPartitioner.java 
* @Description 自定義分區器
* @Date 2018-06-25 14:58:14
*/
public class PhonenumPartitioner implements Partitioner{
    @Override
    public void configure(Map<String, ?> configs) {
        // TODO nothing
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 獲得 topic 的 partitions 信息
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // 模擬某客服
        if(key.toString().equals("10000") || key.toString().equals("11111")) {
            // 放到最後一個分區中
            return numPartitions - 1;
        }
        String phoneNum = key.toString();
        return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1);
    }

    @Override
    public void close() {
        // TODO nothing
    }

}
複製代碼

使用自定義分區器

package com.bonc.rdpe.kafka110.producer;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @Title PartitionerProducer.java 
 * @Description 測試自定義分區器
 * @Date 2018-06-25 15:10:04
 */public class PartitionerProducer {
        private static final String[] PHONE_NUMS = new String[]{
            "10000", "10000", "11111", "13700000003", "13700000004",
            "10000", "15500000006", "11111", "15500000008", 
            "17600000009", "10000", "17600000011" 
        };

        public static void main(String[] args) throws Exception {
            
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
            // 設置分區器
            props.put("partitioner.class", "com.bonc.rdpe.kafka110.partitioner.PhonenumPartitioner");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            int count = 0;
            int length = PHONE_NUMS.length;
            
            while(count < 10) {
                Random rand = new Random();
                String phoneNum = PHONE_NUMS[rand.nextInt(length)];
                ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", phoneNum, phoneNum);
                RecordMetadata metadata = producer.send(record).get();
                String result = "phonenum [" + record.value() + "] has been sent to partition " + metadata.partition();
                System.out.println(result);
                Thread.sleep(500);
                count++;
            }
            producer.close();
        }
}
複製代碼

自定義分區結果:

2、Kafka消費者原理解析

1.offset 偏移量

此時再次請出咱們的kafka集羣,有多個消費者同時去消費集羣中的信息

若是程序一直在穩定執行,那咱們的整個流程是不會出現啥問題的,但是如今若是程序中止執行了呢?有多是程序出現了bug,也有多是由於咱們進行修改手動中止了程序。那下一次恢復的時候,消費者又該從哪一個地方開始消費?

Topic:tellYourDream   ConsumerA
    tellYourDream:p0(10000)
    tellYourDream:p1(10001)
複製代碼

offset就相似於數組下標的那種理解相似,好比數組的下標爲何要從0開始,基於數組的內存模型。就是所處數組位置離首地址的距離而定。array[0]就是偏移爲0的位置,也就是首地址。array[k]也就是偏移爲k的位置。kafka中的offset也是一樣的理解,這個偏移量其實就是記錄一個位置而使用的。用來標識消費者此次消費到了這個位置。

在kafka裏面,kafka是不幫忙維護這個offset偏移量的,這個offset須要consumer自行維護。kafka提供了兩個關於offset的參數,一個是enable_auto_commit,當這個參數設置爲true的時候,每次重啓kafka都會把全部的數據從新消費一遍。再一個是auto_commit_interval_ms,這個是每次提交offset的一個時間間隔。

這個offset的存儲位置在0.8版本(再次劃重點,0.8以前的kafka儘可能不要使用)以前,是存放在zookeeper裏面的。這個設計明顯是存在問題的,整個kafka集羣有衆多的topic,而系統中又有成千上萬的消費者去消費它們,若是offset存放在zookeeper上,消費者每次都要提交給zookeeper這個值,這樣zookeeper能頂得住嗎?若是這時候以爲沒啥問題的同窗,那你就是沒認真去讀 插曲:Kafka的集羣部署實踐及運維相關 中的 3.4---消費信息 啦,趕快去複習一下🤣。

在0.8版本以後,kafka就把這個offset存在了內部的一個主題裏面,這個主題的名字叫作 consumer_offset。這個內部主題默認有50個分區,咱們知道,消費者組是有它們的一個group.id的。提交過去的時候,key是group.id+topic+分區號(這是爲了保證Kakfa集羣中同分區的數據偏移量都提交到consumer_offset的同一個分區下)。這句話有點繞口,不過請務必讀懂。

value就是當前offset的值,每隔一段時間,kafka內部會對這個topic進行compact。也就是每一個group.id+topic+分區號就保留最新的那條數據便可。並且由於這個 consumer_offsets可能會接收高併發的請求,因此默認分區50個,這樣若是你的kafka部署了一個大的集羣,好比有50臺機器,就能夠用50臺機器來抗offset提交的請求壓力,就好不少。

2.Coordinator

每一個consumer group都會選擇一個broker做爲本身的coordinator,負責監控這個消費組裏的各個消費者的心跳,以及判斷是否宕機,而後開啓rebalance, 根據內部的一個選擇機制,會挑選一個對應的Broker,Kafka會把各個消費組均勻分配給各個Broker做爲coordinator來進行管理,consumer group中的每一個consumer剛剛啓動就會跟選舉出來的這個consumer group對應的coordinator所在的broker進行通訊,而後由coordinator分配分區給這個consumer來進行消費。coordinator會盡量均勻的分配分區給各個consumer來消費。

2.1 如何選擇哪臺是coordinator?

首先對消費組的groupId進行hash,接着對consumer_offsets的分區數量取模,默認是50,能夠經過offsets.topic.num.partitions來設置,找到你的這個consumer group的offset要提交到consumer_offsets的哪一個分區。好比說:groupId,「membership-consumer-group」 -> hash值(數字)-> 對50取模(結果只能是0~49,又是以往的那個套路) -> 就知道這個consumer group下的全部的消費者提交offset的時候是往哪一個分區去提交offset,找到consumer_offsets的一個分區(這裏consumer_offset的分區的副本數量默認來講1,只有一個leader),而後對這個分區找到對應的leader所在的broker,這個broker就是這個consumer group的coordinator了,consumer接着就會維護一個Socket鏈接跟這個Broker進行通訊

其實簡單點解釋,就是找到consumer_offsets中編號和它對應的一個分區而已。取模後是2,那就找consumer_offsets那50個分區中的第二個分區,也就是p1。取模後是10,那就找consumer_offsets那50個分區中的第十個分區,也就是p9.

2.2 coordinator完成了什麼工做

而後這個coordinator會選出一個leader consumer(誰先註冊上來,誰就是leader),這時候coordinator也會把整個Topic的狀況彙報給leader consumer,,由leader consumer來制定消費方案。以後會發送一個SyncGroup請求把消費方案返回給coordinator。

用一小段話再總結一遍吧:

首先有一個消費者組,這個消費者組會有一個它們的group.id號,根據這個能夠計算出哪個broker做爲它們的coodinator,肯定了coordinator以後,全部的consumer都會發送一個join group請求註冊。以後coordinator就會默認把第一個註冊上來的consumer選擇成爲leader consumer,把整個Topic的狀況彙報給leader consumer。以後leader consumer就會根據負載均衡的思路制定消費方案,返回給coordinator,coordinator拿到方案以後再下發給全部的consumer,完成流程。

consumer都會向coordinator發送心跳,能夠認爲consumer是從節點,coordinator是主節點。當有consumer長時間再也不和coordinator保持聯繫,就會從新把分配給這個consumer的任務從新執行一遍。若是斷掉的是leader consumer,就會從新選舉新的leader,再執行剛剛提到的步驟。

2.3 分區方案的負載均衡

若是臨時有consumer加入或退出,leader consumer就須要從新制定消費方案。

好比咱們消費的一個主題有12個分區: p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11

假設咱們的消費者組裏面有三個消費者

2.3.1 range策略

range策略就是按照partiton的序號範圍

p0~3             consumer1
p4~7             consumer2
p8~11            consumer3
複製代碼
2.3.2.round-robin策略
consumer1:0,3,6,9
consumer2:1,4,7,10
consumer3:2,5,8,11
複製代碼

可是前面的這兩個方案有個問題: 假設consuemr1掛了:p0-5分配給consumer2,p6-11分配給consumer3 這樣的話,本來在consumer2上的的p6,p7分區就被分配到了 consumer3上。

2.3.3.sticky策略

最新的一個sticky策略,就是說盡量保證在rebalance的時候,讓本來屬於這個consumer 的分區仍是屬於他們, 而後把多餘的分區再均勻分配過去,這樣儘量維持原來的分區分配的策略

consumer1:0-3
consumer2:  4-7
consumer3:  8-11 
假設consumer3掛了
consumer1:0-3,+8,9
consumer2: 4-7,+10,11
複製代碼
2.3.4 Rebalance分代機制

在rebalance的時候,可能你原本消費了partition3的數據,結果有些數據消費了還沒提交offset,結果此時rebalance,把partition3分配給了另一個consumer了,此時你若是提交partition3的數據的offset,能行嗎?必然不行,因此每次rebalance會觸發一次consumer group generation,分代,每次分代會加1,而後你提交上一個分代的offset是不行的,那個partiton可能已經不屬於你了,你們所有按照新的partiton分配方案從新消費數據。

以上就是比較重要的事情了,以後到了輕鬆愉快的代碼時間。

3、消費者代碼部分

其實和生產者不能說它們如出一轍但是結構徹底就是同樣的,因此會比生產者的時候更加簡短點。由於已經知道有這些東西了,不少東西經過搜索引擎就不難解決了。

public class ConsumerDemo {
	private static ExecutorService threadPool = Executors.newFixedThreadPool(20);
	
	public static void main(String[] args) throws Exception {
		KafkaConsumer<String, String> consumer = createConsumer();
		
		//指定消費的主題
		consumer.subscribe(Arrays.asList("order-topic"));  
		try {
			while(true) {  
			    
			    //這裏設置的是一個超時時間
				ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE); 
				
				//對消費到的數據進行業務處理
				for(ConsumerRecord<String, String> record : records) {
					JSONObject order = JSONObject.parseObject(record.value()); 
					threadPool.submit(new CreditManageTask(order));
				}
			}
		} catch(Exception e) {
			e.printStackTrace();
			consumer.close();
		}
	}
	
	private static KafkaConsumer<String, String> createConsumer() {
	
	    //設置參數的環節
		Properties props = new Properties();
		props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
		props.put("group.id", "test-group");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("heartbeat.interval.ms", 1000); // 這個儘可能時間能夠短一點
		props.put("session.timeout.ms", 10 * 1000); // 若是說kafka broker在10秒內感知不到一個consumer心跳
		props.put("max.poll.interval.ms", 30 * 1000); // 若是30秒纔去執行下一次poll
		// 就會認爲那個consumer掛了,此時會觸發rebalance
		// 若是說某個consumer掛了,kafka broker感知到了,會觸發一個rebalance的操做,就是分配他的分區
		// 給其餘的cosumer來消費,其餘的consumer若是要感知到rebalance從新分配分區,就須要經過心跳來感知
		// 心跳的間隔通常不要太長,1000,500
		props.put("fetch.max.bytes", 10485760);
		props.put("max.poll.records", 500); // 若是說你的消費的吞吐量特別大,此時能夠適當提升一些
		props.put("connection.max.idle.ms", -1); // 不要去回收那個socket鏈接
		// 開啓自動提交,他只會每隔一段時間去提交一次offset
		// 若是你每次要重啓一下consumer的話,他必定會把一些數據從新消費一遍
		props.put("enable.auto.commit", "true");
		// 每次自動提交offset的一個時間間隔
		props.put("auto.commit.ineterval.ms", "1000");
		// 每次重啓都是從最先的offset開始讀取,不是接着上一次
		props.put("auto.offset.reset", "earliest"); 

		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
		return consumer;
	}
	
	static class CreditManageTask implements Runnable {
		private JSONObject order;
		public CreditManageTask(JSONObject order) {
			this.order = order;
		}
		@Override
		public void run() {
			System.out.println("對訂單進行積分的維護......" + order.toJSONString());    
			// 就能夠作一系列的數據庫的增刪改查的事務操做
		}
	}
}
複製代碼

3.1 消費者的核心參數

3.1.1 【heartbeat.interval.ms】

consumer心跳時間,必須得保持心跳才能知道consumer是否故障了,而後若是故障以後,就會經過心跳下發rebalance的指令給其餘的consumer通知他們進行rebalance的操做

3.1.2 【session.timeout.ms】

kafka多長時間感知不到一個consumer就認爲他故障了,默認是10秒

3.1.3 【max.poll.interval.ms】

若是在兩次poll操做之間,超過了這個時間,那麼就會認爲這個consume處理能力太弱了,會被踢出消費組,分區分配給別人去消費,一遍來講結合你本身的業務處理的性能來設置就能夠了

3.1.4【fetch.max.bytes】

獲取一條消息最大的字節數,通常建議設置大一些

3.1.5 【max.poll.records】

一次poll返回消息的最大條數,默認是500條

3.1.6 【connection.max.idle.ms】

consumer跟broker的socket鏈接若是空閒超過了必定的時間,此時就會自動回收鏈接,可是下次消費就要從新創建socket鏈接,這個建議設置爲-1,不要去回收

3.1.7 【auto.offset.reset】

earliest:
	當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
	topicA -> partition0:1000   
		  partitino1:2000  			  
latest:
	當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從當前位置開始消費
none:
	topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
複製代碼

注:咱們生產裏面通常設置的是latest

3.1.8 【enable.auto.commit】

這個就是開啓自動提交惟一

3.1.9 【auto.commit.ineterval.ms】

這個指的是多久條件一次偏移量

4、加餐時間:補充第一篇沒提到的內容

日誌二分查找

其實這也能夠被稱做稀鬆索引。也是一個相似跳錶的結構。打開某主題下的分區,咱們能看到這樣的一些文件

00000000000000000000.index(偏移量的索引)
00000000000000000000.log(日誌文件)
00000000000000000000.timeindex(時間的索引)
複製代碼

日誌段文件,.log文件會對應一個.index和.timeindex兩個索引文件。kafka在寫入日誌文件的時候,同時會寫索引文件,就是.index和.timeindex,一個是位移索引,一個是時間戳索引。

默認狀況下,有個參數log.index.interval.bytes限定了在日誌文件寫入多少數據,就要在索引文件寫一條索引,默認是4KB,寫4kb的數據而後在索引裏寫一條索引,因此索引自己是稀疏格式的索引,不是每條數據對應一條索引的。並且索引文件裏的數據是按照位移和時間戳升序排序的,因此kafka在查找索引的時候,會用二分查找,時間複雜度是O(logN),找到索引,就能夠在.log文件裏定位到數據了。

上面的0,2039···這些表明的是物理位置。爲何稀鬆索引會比直接一條條讀取速度快,它不是每一條數據都會記錄,是相隔幾條數據的記錄方式,可是就好比如今要消費偏移量爲7的數據,就直接先看這個稀鬆索引上的記錄,找到一個6時,7比6大,而後直接看後面的數據,找到8,8比7大,再看回來,肯定7就是在6~8之間,而6的物理位置在9807,8的物理位置在12345,直接從它們中間去找。就提高了查找物理位置的速度。就相似於普通狀況下的二分查找。

ISR機制

光是依靠多副本機制能保證Kafka的高可用性,可是能保證數據不丟失嗎?不行,由於若是leader宕機,可是leader的數據還沒同步到follower上去,此時即便選舉了follower做爲新的leader,當時剛纔的數據已經丟失了。

ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的數量,只有處於ISR列表中的follower才能夠在leader宕機以後被選舉爲新的leader,由於在這個ISR列表裏表明他的數據跟leader是同步的。

若是要保證寫入kafka的數據不丟失,首先須要保證ISR中至少有一個follower,其次就是在一條數據寫入了leader partition以後,要求必須複製給ISR中全部的follower partition,才能說表明這條數據已提交,絕對不會丟失,這是Kafka給出的承諾

那什麼狀況下副本會被踢出出ISR呢,若是一個副本超過10s沒有去和leader同步數據的話,那麼它就會被踢出ISR列表。可是這個問題若是解決了(網絡抖動或者full gc···等),follower再次和leader同步了,leader會有一個判斷,若是數據差別小就會讓follower從新加入,那麼怎麼纔算差別大,怎麼纔算小呢,我們留到源碼時說明。

finally

此次的篇幅很是很是長,並且須要理解的地方也很多,後面其實原本在kafka的內核裏還有個HW&LEO原理的,可本身都懶得繼續寫了hhh。下次源碼篇的時候我們再聊吧。

相關文章
相關標籤/搜索