這個系列主要是講解關於分佈式消息中間件的一些心得html
關於分佈式系統、中間件是什麼、消息中間件能作什麼、分佈式消息中間件長什麼樣諸如此類基礎概念在上一篇文章——
分佈式消息中間件(1):Rabbitmq入門到高可用實戰!都已經講過,這裏就不贅述了,感興趣的朋友能夠本身去看一下。java
這是本系列的第二篇,準備寫kafka,web
Kakfa 普遍應用於國內外大廠,例如 BAT、字節跳動、美團、Netflix、Airbnb、Twitter 等等。其重要性不言而喻。面試
kafka與其餘三個主流中間件相比,優點有兩個:spring
今天咱們經過這篇文章深刻了解一下 Kafka的工做原理。因爲篇幅所限,確定不會徹底寫到,只能挑比較重要的幾個點來跟你們分析一下,面試題的話也不會在這篇文章裏解析了,單獨整理了一份kafka學習筆記PDF以及定經典高頻面試題解析,須要的朋友能夠自行領取apache
好了,話很少說,坐穩扶好,發車嘍!編程
kafka官網圖bootstrap
有中文官網,能夠詳細看看。瀏覽器
地址:http://kafka.apachecn.org/intro.html緩存
地址:http://kafka.apache.org/downloads
(1)由於kafka要依賴於zookeeper作調度,kafka中實際自帶的有kafka,可是通常建議使用獨立的zookeeper,方便後續升級及公用。
(2)下載地址:
文件都不大,zk是9m多,kafka是50多兆
說明:北遊在本地弄了三臺虛擬機,ip分別爲:
192.168.85.158 192.168.85.168 192.168.85.178
(1)上傳jar包,就再也不新建用戶了,直接在root帳戶下執行,將kafka和zookeeper的tar包上傳到/root/tools目錄下。
(2)解壓
[root@ruanjianlaowang158 tools]# tar -zxvf kafka_2.12-2.4.1.tgz [root@ruanjianlaowang158 tools]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
(3)配置zookeeper及啓動
[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# cd /root/tools/apache-zookeeper-3.5.7-bin #北遊,首先建立個空文件夾,在接下來的配置文件中配置 [root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# mkdir data [root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf [root@ruanjianlaowang158 conf]# cp zoo_sample.cfg zoo.cfg [root@ruanjianlaowang158 conf]# vi zoo.cfg #單機只改一個值,保存退出。 #dataDir=/tmp/zookeeper dataDir=/root/tools/apache-zookeeper-3.5.7-bin/data #啓動zookeeper [root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin [root@ruanjianlaowang158 bin]# ./zkServer.sh start
(4)配置kafka及啓動
[root@ruanjianlaowang158 kafka_2.12-2.4.1]# cd /root/tools/kafka_2.12-2.4.1 #北遊,新建個空文件夾 [root@ruanjianlaowang158 kafka_2.12-2.4.1]# mkdir data #北遊,更改配置文件 [root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config [root@ruanjianlaowang158 config]# vi server.properties #須要改3個值 #log.dirs=/tmp/kafka-logs log.dirs=/root/tools/kafka_2.12-2.4.1/data #listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://192.168.85.158:9092 #zookeeper.connect=localhost:2181 zookeeper.connect=192.168.85.158:2181 #啓動kafka [root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin [root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &
啓動完畢,單機驗證就不驗證了,直接在集羣中進行驗證。
(1)集羣方式,首先把上面的單機模式,再在192.168.85.168和192.168.85.178服務器上先解壓配置一遍。
(2)zookeeper是仍是更改zoo.cfg
158,168,178三臺服務器同樣:
[root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf [root@ruanjianlaowang158 conf]# vi zoo.cfg #其餘不變,最後面新加,三行,三臺服務器配置同樣,北遊 server.1=192.168.85.158:2888:3888 server.2=192.168.85.168:2888:3888 server.3=192.168.85.178:2888:3888 158服務器執行: echo "1" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid 168服務器執行: echo "2" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid 178服務器執行: echo "3" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid
(3)kafka集羣配置
[root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config [root@ruanjianlaowang158 config]# vi server.properties #broker.id 三臺服務器不同,158服務器設置爲1,168服務器設置爲2,178服務器設置爲3 broker.id=1 #三個服務器配置同樣 zookeeper.connect=192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181
Kafka經常使用Broker配置說明:
配置項 | 默認值/示例值 | 說明 |
---|---|---|
broker.id | 0 | Broker惟一標識 |
listeners | PLAINTEXT://192.168.85.158:9092 | 監聽信息,PLAINTEXT表示明文傳輸 |
log.dirs | /root/tools/apache-zookeeper-3.5.7-bin/data | kafka數據存放地址,能夠填寫多個。用","間隔 |
message.max.bytes | message.max.bytes | 單個消息長度限制,單位是字節 |
num.partitions | 1 | 默認分區數 |
log.flush.interval.messages | Long.MaxValue | 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量 |
log.flush.interval.ms | Long.MaxValue | 在數據被寫入到硬盤前的最大時間 |
log.flush.scheduler.interval.ms | Long.MaxValue | 檢查數據是否要寫入到硬盤的時間間隔。 |
log.retention.hours | 24 | 控制一個log保留時間,單位:小時 |
zookeeper.connect | 192.168.85.158:2181, |
192.168.85.168:2181,
192.168.85.178:2181 | ZooKeeper服務器地址,多臺用","間隔 |
(4)集羣啓動
啓動方式跟單機同樣:
#啓動zookeeper [root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin [root@ruanjianlaowang158 bin]# ./zkServer.sh start #啓動kafka [root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin [root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh ../config/server.properties &
(5)注意點
集羣啓動的時候,單機那臺服務器(158)可能會報:Kafka:Configured broker.id 2 doesn't match stored broker.id 0 in meta.properties. 方案:在158服務器data中有個文件:meta.properties,文件中的broker.id也須要修改爲與server.properties中的broker.id同樣,因此形成了這個問題。
(6)建立個topic,後面springboot項目測試使用。
[root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin [root@ruanjianlaowang158 bin]# ./kafka-topics.sh --create --zookeeper 192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181 --replication-factor 3 --partitions 5 --topic aaaa
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.itany</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
說明:
主要就兩個gav,一個是spring-boot-starter-web,啓動web服務使用;一個是spring-kafka,這個是springboot集成額kafka核心包。
spring: kafka: # 北遊,kafka集羣服務器地址 bootstrap-servers: 192.168.85.158:9092,192.168.85.168:9092,192.168.85.178:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@RestController public class KafkaProducer { @Autowired private KafkaTemplate template; //北遊,topic使用上測試建立的aaaa @RequestMapping("/sendMsg") public String sendMsg(String topic, String message){ template.send(topic,message); return "success"; } }
@Component public class KafkaConsumer { //北遊,這裏是監控aaaa這個topic,直接打印到idea中,北遊 @KafkaListener(topics = {"aaaa"}) public void listen(ConsumerRecord record){ System.out.println(record.topic()+":"+record.value()); } }
(1)瀏覽器上輸入
http://localhost:8080/sendMsg?topic=aaaa&message=bbbb
(2)北遊的idea控制檯打印信息
一般是指分佈式系統在多臺網絡互聯的機器上保存有相同的數據拷貝
系統部分組件失效,系統依然可以繼續運轉,於是增長了總體可用性以及數據持久性
支持橫向擴展,可以經過增長機器的方式來提高讀性能,進而提升讀操做吞吐量
容許將數據放入與用戶地理位置相近的地方,從而下降系統延時。
(1)、 本質就是一個只能追加寫消息的日誌文件
(2)、同一個分區下的全部副本保存有相同的消息序列
(3)、副本分散保存在不一樣的 Broker 上,從而可以對抗部分 Broker 宕機帶來的數據不可用(Kafka 是有若干主題概,每一個主題可進一步劃分紅若干個分區。每一個分區配置有若干個副本)
以下:有 3 臺 Broker 的 Kafka 集羣上的副本分佈狀況
基於領導者(Leader-based)的副本機制
工做原理如圖:
(1)、Kafka 中分紅兩類副本:領導者副本(Leader Replica)和追隨者副本(Follower Replica)。每一個分區在建立時都要選舉一個副本,稱爲領導者副本,其他的副本自動稱爲追隨者副本。
(2)、Kafka 中,追隨者副本是不對外提供服務的。追隨者副本不處理客戶端請求,它惟一的任務就是從領導者副本,全部的讀寫請求都必須發往領導者副本所在的 Broker,由該 Broker 負責處理。(所以目前kafka只能享受到副本機制帶來的第 1 個好處,也就是提供數據冗餘實現高可用性和高持久性)
(3)、領導者副本所在的 Broker 宕機時,Kafka 依託於 ZooKeeper 提供的監控功能可以實時感知到,並當即開啓新一輪的領導者選舉,從追隨者副本中選一個做爲新的領導者。老 Leader 副本重啓回來後,只能做爲追隨者副本加入到集羣中。
Kafka 引入了 In-sync Replicas,也就是所謂的 ISR 副本集合。ISR 中的副本都是與 Leader 同步的副本,相反,不在 ISR 中的追隨者副本就被認爲是與 Leader 不一樣步的
(1)、ISR不僅是追隨者副本集合,它必然包括 Leader 副本。甚至在某些狀況下,ISR 只有 Leader 這一個副本
(2)、經過Broker 端replica.lag.time.max.ms 參數(Follower 副本可以落後 Leader 副本的最長時間間隔)值來控制哪一個追隨者副本與 Leader 同步?只要一個 Follower 副本落後 Leader 副本的時間不連續超過 10 秒,那麼 Kafka 就認爲該 Follower 副本與 Leader 是同步的,即便此時 Follower 副本中保存的消息明顯少於 Leader 副本中的消息。
(3)、ISR 是一個動態調整的集合,而非靜態不變的。
某個追隨者副本從領導者副本中拉取數據的過程持續慢於 Leader 副本的消息寫入速度,那麼在 replica.lag.time.max.ms
時間後,此 Follower 副本就會被認爲是與 Leader 副本不一樣步的,所以不能再放入 ISR 中。此時,Kafka 會自動收縮 ISR 集合,將該副本「踢出」ISR。
假若該副本後面慢慢地追上了 Leader 的進度,那麼它是可以從新被加回 ISR 的。
(4)、ISR集合爲空則leader副本也掛了,這個分區就不可用了,producer也沒法向這個分區發送任何消息了。(反之leader副本掛了能夠從ISR集合中選舉leader副本)
(1)、ISR不爲空,從ISR中選舉
(2)、ISR爲空,Kafka也能夠從不在 ISR 中的存活副本中選舉,這個過程稱爲Unclean 領導者選舉,經過Broker 端參數unclean.leader.election.enable
控制是否容許 Unclean 領導者選舉。
開啓 Unclean 領導者選舉可能會形成數據丟失,但好處是,它使得分區 Leader 副本一直存在,不至於中止對外提供服務,所以提高了高可用性。反之,禁止 Unclean 領導者選舉的好處在於維護了數據的一致性,避免了消息丟失,但犧牲了高可用性。
一個分佈式系統一般只能同時知足一致性(Consistency)、可用性(Availability)、分區容錯性(Partition tolerance)中的兩個。顯然,在這個問題上,Kafka 賦予你選擇 C 或 A 的權利。
強烈建議不要開啓unclean leader election,畢竟咱們還能夠經過其餘的方式來提高高可用性。若是爲了這點兒高可用性的改善,犧牲了數據一致性,那就很是不值當了。
ps1:leader副本的選舉也能夠理解爲分區leader的選舉
ps2:broker的leader選舉與分區leader的選舉不一樣,
Kafka的Leader選舉是經過在zookeeper上建立/controller臨時節點來實現leader選舉,並在該節點中寫入當前broker的信息
{「version」:1,」brokerid」:1,」timestamp」:」1512018424988」}
利用Zookeeper的強一致性特性,一個節點只能被一個客戶端建立成功,建立成功的broker即爲leader,即先到先得原則,leader也就是集羣中的controller,負責集羣中全部大小事務。
當leader和zookeeper失去鏈接時,臨時節點會刪除,而其餘broker會監聽該節點的變化,當節點刪除時,其餘broker會收到事件通知,從新發起leader選舉
再給大家留個小問題:若是容許 Follower 副本對外提供讀服務,你以爲應該如何避免或緩解因 Follower 副本與 Leader 副本不一樣步而致使的數據不一致的情形?
在整合這套方案的時候,項目組也是通過一番討論,在討論中,觀點不少,有人認爲直接使用Storm進行實時處理,去掉Kafka環節;也有認爲直接使用Kafka的API去消費,去掉Storm的消費環節等等,可是最終組內仍是一致決定使用這套方案,緣由有以下幾點:
咱們認爲,Kafka在整個環節中充當的職責應該單一,這項目的整個環節她就是一箇中間件,下面用一個圖來講明這個緣由,以下圖所示:
整個項目流程如上圖所示,這樣劃分使得各個業務模塊化,功能更加的清晰明瞭。
負責從各個節點上實時收集用戶上報的日誌數據,咱們選用的是Apache的Flume NG來實現。
因爲收集的數據的速度和數據處理的速度不必定是一致的,所以,這裏添加了一箇中間件來作處理,所使用的是Apache的Kafka,關於Kafka集羣部署。另外,有一部分數據是流向HDFS分佈式文件系統了的,方便於爲離線統計業務提供數據源。
在收集到數據後,咱們須要對這些數據作實時處理,所選用的是Apache的Storm。關於Storm的集羣搭建部署博客後面補上,較爲簡單。
在使用Storm對數據作處理後,咱們須要將處理後的結果作持久化,因爲對響應速度要求較高,這裏採用Redis+MySQL來作持久化。整個項目的流程架構圖,以下圖所示:
Flume是一個分佈式的、高可用的海量日誌收集、聚合和傳輸日誌收集系統,支持在日誌系統中定製各種數據發送方(如:Kafka,HDFS等),便於收集數據。Flume提供了豐富的日誌源收集類型,有:Console、RPC、Text、Tail、Syslog、Exec等數據源的收集,在咱們的日誌系統中目前咱們所使用的是spooldir方式進行日誌文件採集,配置內容信息以下所示:
producer.sources.s.type = spooldir producer.sources.s.spoolDir = /home/hadoop/dir/logdfs
固然,Flume的數據發送方類型也是多種類型的,有:Console、Text、HDFS、RPC等,這裏咱們系統所使用的是Kafka中間件來接收,配置內容以下所示:
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=test
Kafka是一種提供高吞吐量的分佈式發佈訂閱消息系統,她的特性以下所示:
Kafka的目的是提供一個發佈訂閱解決方案,他能夠處理Consumer網站中的全部流動數據,在網頁瀏覽,搜索以及用戶的一些行爲,這些動做是較爲關鍵的因素。這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。對於Hadoop這樣的日誌數據和離線計算系統,這樣的方案是一個解決實時處理較好的一種方案。
關於Kafka集羣的搭建部署和使用,上面已經寫了,不會的朋友翻上去再看一下,這裏就不贅述了。
Twitter將Storm開源了,這是一個分佈式的、容錯的實時計算系統,已被貢獻到Apache基金會,下載地址以下所示:
http://storm.apache.org/downloads.html
Storm的主要特色以下:
Storm集羣由一個主節點和多個工做節點組成。主節點運行了一個名爲「Nimbus」的守護進程,用於分配代碼、佈置任務及故障檢測。每一個工做節 點都運行了一個名爲「Supervisor」的守護進程,用於監聽工做,開始並終止工做進程。
Nimbus和Supervisor都能快速失敗,並且是無 狀態的,這樣一來它們就變得十分健壯,二者的協調工做是由Apache的ZooKeeper來完成的。
Storm的術語包括Stream
、Spout
、Bolt
、Task
、Worker
、Stream Grouping
和Topology
。
關於Storm集羣的搭建部署,博客在下一篇中更新,到時候會將更新地址附在這裏,這裏就先不對Storm集羣的搭建部署作過多的贅述了。
Kafka 日誌消息保存時間總結
Kafka 做爲一個高吞吐的消息中間件和傳統的消息中間件一個很大的不一樣點就在於它的日誌其實是以日誌的方式默認保存在/kafka-logs文件夾中的。雖然默認有7天清楚的機制,可是在數據量大,而磁盤容量不足的狀況下,常常出現沒法寫入的狀況。如何調整Kafka的一些默認參數就顯得比較關鍵了。這裏筆者整理了一些常見的配置參數供你們參考:
分段策略屬性 屬性名 |
含義 | 默認值 |
---|---|---|
log.roll.{hours,ms} | 日誌滾動的週期時間,到達指定週期時間時,強制生成一個新的segment | 168(7day) |
log.segment.bytes | 每一個segment的最大容量。到達指定容量時,將強制生成一個新的segment | 1G(-1爲不限制) |
log.retention.check.interval.ms | 日誌片斷文件檢查的週期時間 | 60000 |
日誌刷新策略
Kafka的日誌其實是開始是在緩存中的,而後根據策略按期一批一批寫入到日誌文件中去,以提升吞吐率。
屬性名 | 含義 | 默認值 |
---|---|---|
log.flush.interval.messages | 消息達到多少條時將數據寫入到日誌文件 | 10000 |
log.flush.interval.ms | 當達到該時間時,強制執行一次flush | null |
log.flush.scheduler.interval.ms | 週期性檢查,是否須要將信息flush | 很大的值 |
日誌保存清理策略
屬性名 | 含義 | 默認值 |
---|---|---|
log.cleanup.polict | 日誌清理保存的策略只有delete和compact兩種 | delete |
log.retention.hours | 日誌保存的時間,能夠選擇hours,minutes和ms | 168(7day) |
log.retention.bytes | 刪除前日誌文件容許保存的最大值 | -1 |
log.segment.delete.delay.ms | 日誌文件被真正刪除前的保留時間 | 60000 |
log.cleanup.interval.mins | 每隔一段時間多久調用一次清理的步驟 | 10 |
log.retention.check.interval.ms | 週期性檢查是否有日誌符合刪除的條件(新版本使用 | ) 300000 |
這裏特別說明一下,日誌的真正清楚時間。當刪除的條件知足之後,日誌將被「刪除」,可是這裏的刪除其實只是將該日誌進行了「delete」標註,文件只是沒法被索引到了而已。
可是文件自己,仍然是存在的,只有當過了log.segment.delete.delay.ms 這個時間之後,文件纔會被真正的從文件系統中刪除。
文章寫到這裏差很少了,比我預計要寫得短一些,由於還有一些東西要寫出來不免長篇大論,篇幅不容許,想更透徹的掌握kafka的同窗能夠領取我整理的完整版kafka學習筆記,最近要準備面試的同窗能夠看看我這份kafka高頻面試題整理。
後面我會把另外兩個中間件也分別寫文章分析,能夠給我點個關注第一時間接到通知
而後,能夠點個贊嗎兄弟們!
end