Apache Kafka最先是由LinkedIn開源出來的分佈式消息系統,如今是Apache旗下的一個子項目,而且已經成爲開源領域應用最普遍的消息系統之一。Kafka社區很是活躍,從0.9版本開始,Kafka的標語已經從「一個高吞吐量,分佈式的消息系統」改成"一個分佈式流平臺"。java
Kafka和傳統的消息系統不一樣在於:git
kafka和其餘消息隊列的對比:github
producer面試
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class UserKafkaProducer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; private final Properties props = new Properties(); public UserKafkaProducer(String topic) { props.put("metadata.broker.list", "localhost:9092"); props.put("bootstrap.servers", "master2:6667"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<Integer, String>(props); this.topic = topic; } @Override public void run() { int messageNo = 1; while (true) { String messageStr = new String("Message_" + messageNo); System.out.println("Send:" + messageStr); //返回的是Future<RecordMetadata>,異步發送 producer.send(new ProducerRecord<Integer, String>(topic, messageStr)); messageNo++; try { sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Properties props = new Properties(); /* 定義kakfa 服務的地址,不須要將全部broker指定上 */ props.put("bootstrap.servers", "localhost:9092"); /* 制定consumer group */ props.put("group.id", "test"); /* 是否自動確認offset */ props.put("enable.auto.commit", "true"); /* 自動確認offset的時間間隔 */ props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); /* key的序列化類 */ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* value的序列化類 */ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* 定義consumer */ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 消費者訂閱的topic, 可同時訂閱多個 */ consumer.subscribe(Arrays.asList("foo", "bar")); /* 讀取數據,讀取超時時間爲100ms */ while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }
對於kafka的架構原理咱們先提出幾個問題?apache
1.Kafka的topic和分區內部是如何存儲的,有什麼特色?bootstrap
2.與傳統的消息系統相比,Kafka的消費模型有什麼優勢?服務器
3.Kafka如何實現分佈式的數據存儲與數據讀取?網絡
在一套kafka架構中有多個Producer,多個Broker,多個Consumer,每一個Producer能夠對應多個Topic,每一個Consumer只能對應一個ConsumerGroup。session
整個Kafka架構對應一個ZK集羣,經過ZK管理集羣配置,選舉Leader,以及在consumer group發生變化時進行rebalance。多線程
在Kafka中的每一條消息都有一個topic。通常來講在咱們應用中產生不一樣類型的數據,均可以設置不一樣的主題。一個主題通常會有多個消息的訂閱者,當生產者發佈消息到某個主題時,訂閱了這個主題的消費者均可以接收到生產者寫入的新消息。
kafka爲每一個主題維護了分佈式的分區(partition)日誌文件,每一個partition在kafka存儲層面是append log。任何發佈到此partition的消息都會被追加到log文件的尾部,在分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,也就是咱們的offset,offset是一個long型的數字,咱們經過這個offset能夠肯定一條在該partition下的惟一消息。在partition下面是保證了有序性,可是在topic下面沒有保證有序性。
在上圖中在咱們的生產者會決定發送到哪一個Partition。
1.若是沒有Key值則進行輪詢發送。
2.若是有Key值,對Key值進行Hash,而後對分區數量取餘,保證了同一個Key值的會被路由到同一個分區,若是想隊列的強順序一致性,可讓全部的消息都設置爲同一個Key。
消息由生產者發送到kafka集羣后,會被消費者消費。通常來講咱們的消費模型有兩種:推送模型(psuh)和拉取模型(pull)
基於推送模型的消息系統,由消息代理記錄消費狀態。消息代理將消息推送到消費者後,標記這條消息爲已經被消費,可是這種方式沒法很好地保證消費的處理語義。好比當咱們把已經把消息發送給消費者以後,因爲消費進程掛掉或者因爲網絡緣由沒有收到這條消息,若是咱們在消費代理將其標記爲已消費,這個消息就永久丟失了。若是咱們利用生產者收到消息後回覆這種方法,消息代理須要記錄消費狀態,這種不可取。若是採用push,消息消費的速率就徹底由消費代理控制,一旦消費者發生阻塞,就會出現問題。
Kafka採起拉取模型(poll),由本身控制消費速度,以及消費的進度,消費者能夠按照任意的偏移量進行消費。好比消費者能夠消費已經消費過的消息進行從新處理,或者消費最近的消息等等。
單線程模式適用於併發連接數小,邏輯簡單,數據量小。
在kafka中,consumer和producer都是使用的上面的單線程模式。這種模式不適合kafka的服務端,在服務端中請求處理過程比較複雜,會形成線程阻塞,一旦出現後續請求就會沒法處理,會形成大量請求超時,引發雪崩。而在服務器中應該充分利用多線程來處理執行邏輯。
在kafka服務端採用的是多線程的Selector模型,Acceptor運行在一個單獨的線程中,對於讀取操做的線程池中的線程都會在selector註冊read事件,負責服務端讀取請求的邏輯。成功讀取後,將請求放入message queue共享隊列中。而後在寫線程池中,取出這個請求,對其進行邏輯處理,即便某個請求線程阻塞了,還有後續的縣城從消息隊列中獲取請求並進行處理,在寫線程中處理完邏輯處理,因爲註冊了OP_WIRTE事件,因此還須要對其發送響應。
在Kafka中保證高可靠模型的依靠的是副本機制,有了副本機制以後,就算機器宕機也不會發生數據丟失。
kafka一個topic下面的全部消息都是以partition的方式分佈式的存儲在多個節點上。同時在kafka的機器上,每一個Partition其實都會對應一個日誌目錄,在目錄下面會對應多個日誌分段(LogSegment)。LogSegment文件由兩部分組成,分別爲「.index」文件和「.log」文件,分別表示爲segment索引文件和數據文件。這兩個文件的命令規則爲:partition全局的第一個segment從0開始,後續每一個segment文件名爲上一個segment文件最後一條消息的offset值,數值大小爲64位,20位數字字符長度,沒有數字用0填充,以下,假設有1000條消息,每一個LogSegment大小爲100,下面展示了900-1000的索引和Log:
因爲kafka消息數據太大,若是所有創建索引,即佔了空間又增長了耗時,因此kafka選擇了稀疏索引的方式,這樣的話索引能夠直接進入內存,加快偏查詢速度。
簡單介紹一下如何讀取數據,若是咱們要讀取第911條數據首先第一步,找到他是屬於哪一段的,根據二分法查找到他屬於的文件,找到0000900.index和00000900.log以後,而後去index中去查找 (911-900) =11這個索引或者小於11最近的索引,在這裏經過二分法咱們找到了索引是[10,1367]而後咱們經過這條索引的物理位置1367,開始日後找,直到找到911條數據。
上面講的是若是要找某個offset的流程,可是咱們大多數時候並不須要查找某個offset,只須要按照順序讀便可,而在順序讀中,操做系統會對內存和磁盤之間添加page cahe,也就是咱們日常見到的預讀操做,因此咱們的順序讀操做時速度很快。可是kafka有個問題,若是分區過多,那麼日誌分段也會不少,寫的時候因爲是批量寫,其實就會變成隨機寫了,隨機I/O這個時候對性能影響很大。因此通常來講Kafka不能有太多的partition。針對這一點,RocketMQ把全部的日誌都寫在一個文件裏面,就能變成順序寫,經過必定優化,讀也能接近於順序讀。
能夠思考一下:1.爲何須要分區,也就是說主題只有一個分區,難道不行嗎?2.日誌爲何須要分段
1.分區是爲了水平擴展
2.日誌若是在同一個文件太大會影響性能。若是日誌無限增加,查詢速度會減慢
Kafka的副本機制是多個服務端節點對其餘節點的主題分區的日誌進行復制。當集羣中的某個節點出現故障,訪問故障節點的請求會被轉移到其餘正常節點(這一過程一般叫Reblance),kafka每一個主題的每一個分區都有一個主副本以及0個或者多個副本,副本保持和主副本的數據同步,當主副本出故障時就會被替代。
在Kafka中並非全部的副本都能被拿來替代主副本,因此在kafka的leader節點中維護着一個ISR(In sync Replicas)集合,翻譯過來也叫正在同步中集合,在這個集合中的須要知足兩個條件:
另外還有個AR(Assigned Replicas)用來標識副本的全集,OSR用來表示因爲落後被剔除的副本集合,因此公式以下:ISR = leader + 沒有落後太多的副本; AR = OSR+ ISR;
這裏先要說下兩個名詞:HW(高水位)是consumer可以看到的此partition的位置,LEO是每一個partition的log最後一條Message的位置。HW能保證leader所在的broker失效,該消息仍然能夠重新選舉的leader中獲取,不會形成消息丟失。
當producer向leader發送數據時,能夠經過request.required.acks參數來設置數據可靠性的級別:
在分佈式系統中通常有三種處理語義:
至少一次,有可能會有屢次。若是producer收到來自ack的確認,則表示該消息已經寫入到Kafka了,此時恰好是一次,也就是咱們後面的exactly-once。可是若是producer超時或收到錯誤,而且request.required.acks配置的不是-1,則會重試發送消息,客戶端會認爲該消息未寫入Kafka。若是broker在發送Ack以前失敗,但在消息成功寫入Kafka以後,這一次重試將會致使咱們的消息會被寫入兩次,因此消息就不止一次地傳遞給最終consumer,若是consumer處理邏輯沒有保證冪等的話就會獲得不正確的結果。
在這種語義中會出現亂序,也就是當第一次ack失敗準備重試的時候,可是第二消息已經發送過去了,這個時候會出現單分區中亂序的現象,咱們須要設置Prouducer的參數max.in.flight.requests.per.connection,flight.requests是Producer端用來保存發送請求且沒有響應的隊列,保證Producer端未響應的請求個數爲1。
若是在ack超時或返回錯誤時producer不重試,也就是咱們講request.required.acks=-1,則該消息可能最終沒有寫入kafka,因此consumer不會接收消息。
恰好一次,即便producer重試發送消息,消息也會保證最多一次地傳遞給consumer。該語義是最理想的,也是最難實現的。在0.10以前並不能保證exactly-once,須要使用consumer自帶的冪等性保證。0.11.0使用事務保證了
要實現exactly-once在Kafka 0.11.0中有兩個官方策略:
每一個producer在初始化的時候都會被分配一個惟一的PID,對於每一個惟一的PID,Producer向指定的Topic中某個特定的Partition發送的消息都會攜帶一個從0單調遞增的sequence number。
在咱們的Broker端也會維護一個維度爲<PID,Topic,Partition>,每次提交一次消息的時候都會對齊進行校驗:
上面所說的解決了兩個問題:
1.當Prouducer發送了一條消息以後失敗,broker並無保存,可是第二條消息卻發送成功,形成了數據的亂序。
2.當Producer發送了一條消息以後,broker保存成功,ack回傳失敗,producer再次投遞重複的消息。
上面所說的都是在同一個PID下面,意味着必須保證在單個Producer中的同一個seesion內,若是Producer掛了,被分配了新的PID,這樣就沒法保證了,因此Kafka中又有事務機制去保證。
在kafka中事務的做用是
事務能夠保證就算跨多個<Topic, Partition>,在本次事務中的對消費隊列的操做都當成原子性,要麼所有成功,要麼所有失敗。而且,有狀態的應用也能夠保證重啓後從斷點處繼續處理,也即事務恢復。在kafka的事務中,應用程序必須提供一個惟一的事務ID,即Transaction ID,而且宕機重啓以後,也不會發生改變,Transactin ID與PID可能一一對應。區別在於Transaction ID由用戶提供,而PID是內部的實現對用戶透明。爲了Producer重啓以後,舊的Producer具備相同的Transaction ID失效,每次Producer經過Transaction ID拿到PID的同時,還會獲取一個單調遞增的epoch。因爲舊的Producer的epoch比新Producer的epoch小,Kafka能夠很容易識別出該Producer是老的Producer並拒絕其請求。爲了實現這一點,Kafka 0.11.0.0引入了一個服務器端的模塊,名爲Transaction Coordinator,用於管理Producer發送的消息的事務性。該Transaction Coordinator維護Transaction Log,該log存於一個內部的Topic內。因爲Topic數據具備持久性,所以事務的狀態也具備持久性。Producer並不直接讀寫Transaction Log,它與Transaction Coordinator通訊,而後由Transaction Coordinator將該事務的狀態插入相應的Transaction Log。Transaction Log的設計與Offset Log用於保存Consumer的Offset相似。
關於消息隊列或者Kafka的一些常見的面試題,經過上面的文章能夠提煉出如下幾個比較經典的問題:
大部分問題均可以從上面總結後找到答案,若是還不會的話就關注個人公衆號,讓我爲你解答吧。
最後這篇文章被我收錄於JGrowing-中間件篇,一個全面,優秀,由社區一塊兒共建的Java學習路線,若是您想參與開源項目的維護,能夠一塊兒共建,github地址爲:https://github.com/javagrowin...
麻煩給個小星星喲。
打個廣告,若是你以爲這篇文章對你有文章,能夠關注個人技術公衆號,你的關注和轉發是對我最大的支持,O(∩_∩)O