kafka工做流程| 命令行操做

1.  概述

數據層:結構化數據+非結構化數據+日誌信息(大部分爲結構化)算法

傳輸層:flume(採集日誌--->存儲性框架(如HDFS、kafka、Hive、Hbase))+sqoop(關係型數據性數據庫裏數據--->hadoop)+kafka(將實時日誌在線--->sparkstream在數據進行實時處理分析)sql

存儲層:HDFS+Hbase(非關係型數據庫)+kafka(節點上默認存儲1G數據數據庫

資源調度層:Yarnapache

計算層:MapReduce+ Hive(計算+存儲型框架:sql-->mapreduce)+  spark +Fink + stombootstrap

圖表展示層:vim

 消息隊列原理

同步通訊|
異步通訊:
  消費者只有一個即點對點(消費者主動拉取數據,消息收到後消息清除)緩存

點對點模型一般是一個基於拉取或者輪詢(輪詢調度算法:每一次把來自用戶的請求輪流分配給內部中的服務器,從1開始,直到N而後從新開始循環。輪(循環)着詢問(訪問)數據!的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特色是發送到隊列的消息被一個且只有一個接收者接收處理,即便有多個消息監聽者也是如此。服務器

  消費者有多個即發佈/訂閱模式(一對多)網絡

發佈訂閱模型則是另外一個消息傳送模型。發佈訂閱模型能夠有多種不一樣的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的全部消息,即便當前訂閱者不可用,處於離線狀態。架構

kafka--->>分佈式消息隊列,把兩種模式結合起來(點對點+ 發佈)

消費者組中只有一個消費者--一對一;
多個消費者組中有多個消費者去消費同一個主題的數據,即發佈/訂閱模式;

拉模式:保證數據不至於丟失


消息隊列好處:
發接消息解耦了;冗餘(備份);擴展性;靈活性、峯值處理;可恢復性;有順序的;緩衝

什麼是Kafka 

在流式計算中,Kafka通常用來緩存數據,Spark經過消費Kafka的數據進行計算。

Apache Kafka是一個開源消息系統,由Scala寫成。爲處理實時數據提供一個統1、高通量、低等待的平臺;

Kafka是一個分佈式消息隊列。Kafka對消息保存是根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)稱爲broker。

不管是kafka集羣,仍是consumer都依賴於zookeeper集羣保存一些meta信息,來保證系統可用性。

2. 架構

客戶端:producer、cluster、consumer

 

producer:-->TopicA在某個節點上,節點存儲空間有限,主題中消息不能無限存儲
可指定分區:消息的分佈式存儲,一個主題分紅多個分區;一個分區即一個消息隊列;分區中的消息要備份ReplicationA/0;一個分區中可能備份有多個,選出一個leader; 數據的一致性,其餘節點去同步消息時速度可能不同 Leader和Follower都是針對分區中的多個副本,分區下面有多個副本,在副本中選一個leader leader接收發送數據,讀寫數據;follower只負責數據的備份 zk中的leader和follower是針對節點的 分區中的消息都是有序的,每個消息要進行編號,即偏移量(從0開始編),如消費者讀取到1號message,把1保存zk;下次讀取時從1開始,防止數據被重複被消費; 
有些消費者消費能力有限
----引入--->消費者組(多個消費者)多個消費者去消費某一個主題 每個消費者是消費主題下面的分區,而不能消費同一個分區的數據,至關於同時消費了; 分區數=消費者數,速度是最快的,才能保證資源的最大化利用; 分區:①實現對消息的分佈式存儲;②實現消費者消費消息時的併發且互不干擾,提升消費者消費的效率; 消費者只能去找leader(讀寫)去消費,follower只是做爲存儲備份數據; zk-->①主題、節點分區信息都會存儲在zk;②消費者消費消息的offset也會存在zk,但0.9版本以後偏移量offset存在本地; Topic主題是對消息的分類;Topic主題中的容量>節點broker1容量時,會進行分區;
  誤區:並非0分區滿了就去存儲到1分區,1分區滿了就去存儲到2分區;往分區裏邊生成數據是有規則的,見下; 數據的備份數
<=節點數 建立主題是要建立分區,每一個分區的leader要分到不一樣節點實現負載均衡;
消費者去進行消費時是一個一個分區來的;有前後順序;而在消費者組中的消費者是併發去消費各個分區中的數據

 3. 安裝kafka集羣

tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/ [kris@hadoop101 module]$ mv kafka_2.11-0.11.0.0/ kafka 在/opt/module/kafka目錄下建立logs文件夾 [kris@hadoop101 kafka]$ mkdir logs [kris@hadoop101 kafka]$ cd config/ [kris@hadoop101 config]$ vim server.properties #修改配置文件
#broker的全局惟一編號,不能重複 broker.id=0 #刪除topic功能使能 delete.topic.enable=true #處理網絡請求的線程數量 num.network.threads=3 #用來處理磁盤IO的現成數量 num.io.threads=8 #發送套接字的緩衝區大小 socket.send.buffer.bytes=102400 #接收套接字的緩衝區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩衝區大小 socket.request.max.bytes=104857600 #kafka運行日誌存放的路徑 log.dirs=/opt/module/kafka/logs #topic在當前broker上的分區個數; num.partitions=1 #用來恢復和清理data下數據的線程數量 num.recovery.threads.per.data.dir=1 #segment文件保留的最長時間,超時將被刪除 log.retention.hours=168 #配置鏈接Zookeeper集羣地址 zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181

在建立主題時,要指定分區數,而在配置文件中已經配置了;若是主題不存在,它會自動建立主題,這時用的分區數即配置文件裏邊的分區數;

分發安裝包

[kris@hadoop101 module]$ xsync kafka/

分別在hadoop102和hadoop103上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=一、broker.id=2

       注:broker.id不得重複

啓動集羣/ 要先啓動zookeeper

依次在hadoop10一、hadoop10二、hadoop103節點上啓動kafka [kris@hadoop101 kafka]$ bin/kafka-server-start.sh config/server.properties & ##後臺啓動加& [kris@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties & [kris@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties & 關閉集羣 [kris@hadoop101 kafka]$ bin/kafka-server-stop.sh stop [kris@hadoop102 kafka]$ bin/kafka-server-stop.sh stop [kris@hadoop103 kafka]$ bin/kafka-server-stop.sh stop jps -l會顯示進程的詳細信息 [kris@hadoop101 ~]$ jps -l 3444 org.apache.zookeeper.server.quorum.QuorumPeerMain 3524 kafka.Kafka 3961 sun.tools.jps.Jps

4. Kafka命令行操做

zookeeper有三個端口:
  2181:對cline端提供服務
  3888:選舉leader使用
  2888:集羣內機器通信使用(Leader監聽此端口)

zookeeper中的節點:

cluster(集羣的版本,id)

brokers(ids節點信息如在哪一個機器上,建立的topics主題/其中__consumer_offsets存儲本地的offsets,每一個主題下面有/partitions/各個分區的信息如它的leader、version、isr等信息

consumers(消費者組id/ids--subscription:訂閱的主題:在哪一個節點、version、pattern、timestamp/ owners--哪一個主題,哪一個分區/ offsets--哪一個主題哪一個分區--記錄偏移量)

 

① 建立Topic主題

###建立Topics,指定名字,分區數,副本數 [kris@hadoop101 bin]$ ./kafka-topics.sh --zookeeper hadoop101:2181, hadoop102:2181, hadoop103:2181 --create --topic first --partitions 3 --replication-factor 3 Created topic "first".  #建立主題 [kris@hadoop101 bin]$ ./kafka-topics.sh --zookeeper hadoop101:2181 --list ##查看有多少個主題 first [zk: localhost:2181(CONNECTED) 1] ls /cluster ##cluster是有關集羣版本version、id [id] [zk: localhost:2181(CONNECTED) 2] ls /cluster/id [] [zk: localhost:2181(CONNECTED) 3] get /cluster/id {"version":"1","id":"ujFrs7F7SVuO2JwXw62vow"} cZxid = 0x700000014 ctime = Wed Feb 27 11:51:23 CST 2019 mZxid = 0x700000014 mtime = Wed Feb 27 11:51:23 CST 2019 pZxid = 0x700000014 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 45 numChildren = 0 
[zk: localhost:2181(CONNECTED) 4] ls /brokers #集羣中節點信息存儲在brokers [ids, topics, seqid] ##集羣中節點的ids;[0, 1, 2] 如節點1的機器名字、timestamp、port、version [zk: localhost:2181(CONNECTED) 5] ls /brokers/topics [first]

 [zk: localhost:2181(CONNECTED) 15] ls /brokers/topics    ##集羣主題topics
 [first, __consumer_offsets, second]

[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/first [partitions] [zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/first/partitions [0, 1, 2] [zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/first/partitions/0 [state] [zk: localhost:2181(CONNECTED) 9] ls /prokers/topics/first/partitions/0/state ## [ ]

 [zk: localhost:2181(CONNECTED) 22] get /brokers/topics/first/partitions/0/state
  {"controller_epoch":10,"leader":0,"version":1,"leader_epoch":8,"isr":[0,2,1]}

 

② 生產者生成| 消費者消費

控制檯生產者執行腳本; --broker-list指定節點地址 [kris@hadoop101 bin]$ ./kafka-console-producer.sh --broker-list hadoop101:9092 --topic first >Hello kafka!
>kris >smile ① 控制檯消費者 :默認從最大的offset消費開始消費,下次就是後邊的數據,前邊接收不到;再生產數據才能收到 [kris@hadoop101 bin]$ ./kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic first Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. alex ② 控制檯消費者 --from-beginning 從頭消費 [kris@hadoop101 bin]$ ./kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic first --from-beginning  Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. smile kris Hello kafka! alex a消費者是對3個分區一個個分區消費的,因此總的順序不同

 

第一個消費者:從最大的offset開始消費 [zk: localhost:2181(CONNECTED) 12] ls /consumers 要保證消費者是在線的,由於偏移量是臨時的,消費者一退出就看不到偏移量了; [console-consumer-27938, console-consumer-90053] 控制檯消費者id;ids、owners屬於哪一個主題哪一個分區、 [zk: localhost:2181(CONNECTED) 13] ls /consumers/console-consumer-27938 [ids, owners, offsets] [zk: localhost:2181(CONNECTED) 14] ls /consumers/console-consumer-27938/offsets [first] [zk: localhost:2181(CONNECTED) 15] ls /consumers/console-consumer-27938/offsets/first [0, 1, 2] [zk: localhost:2181(CONNECTED) 16] ls /consumers/console-consumer-27938/offsets/first/0 [] [zk: localhost:2181(CONNECTED) 17] get /consumers/console-consumer-27938/offsets/first/0 1 cZxid = 0x80000003f ctime = Wed Feb 27 18:26:11 CST 2019 mZxid = 0x80000003f mtime = Wed Feb 27 18:26:11 CST 2019 pZxid = 0x80000003f cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 1 numChildren = 0 [zk: localhost:2181(CONNECTED) 18] get /consumers/console-consumer-27938/offsets/first/1 1 [zk: localhost:2181(CONNECTED) 19] get /consumers/console-consumer-27938/offsets/first/2 2 第二個消費者的狀況:從頭開始消費 [zk: localhost:2181(CONNECTED) 26] get /consumers/console-consumer-90053/offsets/first/0 1 [zk: localhost:2181(CONNECTED) 27] get /consumers/console-consumer-90053/offsets/first/1 1 [zk: localhost:2181(CONNECTED) 28] get /consumers/console-consumer-90053/offsets/first/2 2  

 

若是是把offset存儲在本地,加--bootstarp-server,就不會在zookeeper上的 /consumers上建立消費者組了;

③ 控制檯消費者 偏移量存儲在本地的消費者,不存儲在zookeeper上就加--bootstrap-server存儲在__consumer_offsets主題下面 [kris@hadoop101 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic first --from-beginning Hello kafka! alex kris smile --bootstrap-server是再也不把偏移量存儲在zookeeper上,而是存儲在本地;數據仍是存儲在分區的first-0/first-1/first-2 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-0 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-12 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-15 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-18 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-21 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-24 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-27 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-3 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-30 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-33 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-36 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-39 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-42 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-45 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-48 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-6 drwxrwxr-x. 2 kris kris 4096 2月 27 19:02 __consumer_offsets-9 [kris@hadoop101 __consumer_offsets-0]$ ll 總用量 0 -rw-rw-r--. 1 kris kris 10485760 2月 27 19:02 00000000000000000000.index -rw-rw-r--. 1 kris kris 0 2月 27 19:02 00000000000000000000.log -rw-rw-r--. 1 kris kris 10485756 2月 27 19:02 00000000000000000000.timeindex -rw-rw-r--. 1 kris kris 0 2月 27 19:02 leader-epoch-checkpoint [kris@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list __consumer_offsets first [zk: localhost:2181(CONNECTED) 41] ls /brokers/topics [first, __consumer_offsets] [zk: localhost:2181(CONNECTED) 47] ls /brokers/topics/__consumer_offsets/partitions [44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43] [zk: localhost:2181(CONNECTED) 48] ls /brokers/topics/__consumer_offsets/partitions/0 [state] [zk: localhost:2181(CONNECTED) 49] ls /brokers/topics/__consumer_offsets/partitions/0/state [] [zk: localhost:2181(CONNECTED) 50] get /brokers/topics/__consumer_offsets/partitions/0/state {"controller_epoch":2,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} cZxid = 0x80000008c ctime = Wed Feb 27 19:02:06 CST 2019 mZxid = 0x80000008c mtime = Wed Feb 27 19:02:06 CST 2019 pZxid = 0x80000008c cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 72 numChildren = 0

 生產者生產的數據是存儲在logs裏邊的,命名是:主題名-分區號 ,無論是把offset存儲在zookeeper仍是存儲在本地,它的數據都是存儲在logs裏邊的主題名-分區號裏邊;

會有一個index,便於下次的查找;

主題topic是邏輯上的概念,分區是物理上的概念; [kris@hadoop101 kafka]$ cd logs/ [kris@hadoop101 logs]$ ll drwxrwxr-x. 2 kris kris 4096 2月 27 18:21 first-0 drwxrwxr-x. 2 kris kris 4096 2月 27 18:21 first-1 drwxrwxr-x. 2 kris kris 4096 2月 27 18:21 first-2 [kris@hadoop101 logs]$ cd first-0 [kris@hadoop101 first-0]$ ll 總用量 8 -rw-rw-r--. 1 kris kris 10485760 2月 27 18:07 00000000000000000000.index -rw-rw-r--. 1 kris kris 73 2月 27 18:21 00000000000000000000.log -rw-rw-r--. 1 kris kris 10485756 2月 27 18:07 00000000000000000000.timeindex -rw-rw-r--. 1 kris kris 8 2月 27 18:21 leader-epoch-checkpoint 數據序列化到磁盤而不是內存first-0,索引 [kris@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic first Topic:first PartitionCount:3 ReplicationFactor:3 Configs: Topic: first Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 ##Isr是同步副本隊列 Topic: first Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: first Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 測試Kafka集羣一共3個節點, Topic爲first, 編號爲0的Partition, Leader在broker.id=0這個節點上,副本在broker.id爲0 1 2這3個節點,而且全部副本都存活,並跟broker.id=0這個節點同步 leader是在給出的全部partitons中負責讀寫的節點,每一個節點都有可能成爲leader replicas 顯示給定partiton全部副本所存儲節點的節點列表,無論該節點是不是leader或者是否存活。 isr 副本都已同步的的節點集合,這個集合中的全部節點都是存活狀態,而且跟leader同步;若是沒有同步數據,則會從這個Isr中移除; 寫入的順序: >Hello kafka! >kris >smile >alex 節點0-broker.id=0, smile在分區first-0 1 節點1-broker.id=1, kris在分區first-1 1 節點2-broker.id=2, Hello kafka!和alex在分區first-2 2(在zk中get /consumers/console-consumer-90053/offsets/first/2) 同一個partition可能會有多個replication,而這時須要在這些replication之間選出一個leader, producer和consumer只與這個leader交互,其它replication做爲follower從leader 中複製數據。

 5. 工做流程

見上架構流程

1)數據生產

  寫入方式

producer採用推(push)模式將消息發佈到broker,每條消息都被追加(append)到分區(patition)中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障kafka吞吐率)。

flume和kafka(它的消費者)都是進行數據;

不一樣消費者組中消費者可消費同一個分區數據;
消費者組只有一個消費者它消費數據是一個個分區依次進行消費的;而若是一個消費者組中有多個消費者,它們是並行的;

2)分區(Partition)

消息發送時都被髮送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區日誌)組成 

咱們能夠看到,每一個Partition中的消息(以k,v的形式)都是有序的,生產的消息被不斷追加到Partition log上,其中的每個消息都被賦予了一個惟一的offset值。

  1)分區的緣由

(1)方便在集羣中擴展,每一個Partition能夠經過調整以適應它所在的機器,而一個topic又能夠有多個Partition組成,所以整個集羣就能夠適應任意大小的數據了;

(2)能夠提升併發,由於能夠以Partition爲單位讀寫了。

  2)分區的原則

(1)指定了patition,則直接使用;

(2)未指定patition但指定key,經過對key的value進行hash出一個patition; 對key的hashcode % 分區數,取餘獲得對應的分區; 

(3)patition和key都未指定,使用輪詢選出一個patition。

3) 副本(Replication)

同一個partition可能會有多個replication(對應server.properties 配置中的 default.replication.factor=N)。沒有replication的狀況下,一旦broker宕機,其上全部patition 的數據都不可被消費,同時producer也不能再將數據存於其上的patition。引入replication以後,同一個partition可能會有多個replication,而這時須要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication做爲follower從leader 中複製數據。

4) producer寫入消息流程

  1)producer先從broker-list節點中找到該partition的leader;

  2)producer將消息發送給該leader;

  3)leader將消息寫入本地log;

  4)followers從leader pull消息,寫入本地log後向leader發送ACK;

  5)leader收到全部的replication的ACK後,向producer發送ACK

kafka的ack機制(request.requred.acks):

  0:producer不等待broker的ack,broker一接收到尚未寫入磁盤就已經返回,當broker故障時有可能丟失數據;producer不等待節點返回的ack,它只管發

   1:producer等待broker的ack,partition的leader落盤成功後返回ack,若是在follower同步成功以前leader故障,那麼將會丟失數據;ack=1,leader落盤就返回ack給producer;

  -1:producer等待broker的ack,partition的leader和follower所有落盤成功後才返回ack,數據通常不會丟失,延遲時間長可是可靠性高。leader收到全部的follow(都同步完數據)的ack後才向producer發送ack


數據持久化到磁盤而不是內存;kafka從本地讀取磁盤數據比從內存還快,kafka作了優化
消息的保存以k,v對的形式
讀取本地保存的offset 1)修改配置文件consumer.properties exclude.internal.topics=false 2)讀取offset bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop101:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning 

  第二次執行時把--from-beging去掉;

偏移量以k,v對形式:
k(控制檯消費者組id
| 主題名| 分區),k(偏移量,提交時間,過時時間) [console-consumer-81371,first,1]::[OffsetMetadata[1,NO_METADATA],CommitTime 1551272323753,ExpirationTime 1551358723753] [console-consumer-81371,first,0]::[OffsetMetadata[1,NO_METADATA],CommitTime 1551272323753,ExpirationTime 1551358723753] [console-consumer-81371,first,2]::[OffsetMetadata[2,NO_METADATA],CommitTime 1551272328754,ExpirationTime 1551358728754]

 

消費者組

[kris@hadoop101 config]$ vi consumer.properties
group.id=kris


分發到其餘機器:
[kris@hadoop101 config]$ xsync consumer.properties 
##啓動一個生產者
[kris@hadoop101 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic first
在hadoop10二、hadoop103上分別啓動消費者
[kris@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic first --consumer.config config/consumer.properties
[kris@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic first --consumer.config config/consumer.properties 

查看hadoop102和hadoop103的接收者。
同一時刻只有一個消費者接收到消息。

 

存儲策略

不管消息是否被消費,kafka都會保留全部消息。有兩種策略能夠刪除舊數據:

1)基於時間:log.retention.hours=168

2)基於大小:log.retention.bytes=1073741824

須要注意的是,由於Kafka讀取特定消息的時間複雜度爲O(1),即與文件大小無關,因此這裏刪除過時文件與提升 Kafka 性能無關。

 

 kafka的balance是怎麼作的?

   總體的負載均衡;leader在3個節點平均分佈

  Kafka的數據是分區存儲的。以集羣形式運行的Kafka,這些分區是分佈在不一樣的Kafka服務器中。當消費者消費的數據分佈在不一樣的分區時,會訪問不一樣的服務器,這樣就完成了負載均衡。因此,Kafka的負載均衡是經過分區機制實現的。

Kafka的偏移量Offset存放在哪兒,爲何?

   offset從zookeeper遷移到本地  , ZKClient的API寫是很低效的  

  Kafka0.9版本之前,offset默認保存在Zookeeper中。從kafka-0.9版本及之後,kafka的消費者組和offset信息就不存zookeeper了,而是存到broker服務器上。

這個變更的緣由在於:以前版本,Kafka其實存在一個比較大的隱患,就是利用 Zookeeper 來存儲記錄每一個消費者/組的消費進度。雖然,在使用過程中,JVM幫助咱們完成了一些優化,可是消費者須要頻繁的去與 Zookeeper 進行交互,而利用ZKClient的API操做Zookeeper頻繁的Write其自己就是一個比較低效的Action,對於後期水平擴展也是一個比較頭疼的問題。若是期間 Zookeeper 集羣發生變化,那 Kafka 集羣的吞吐量也跟着受影響。

爲何kafka能夠實現高吞吐?( 吞:寫,土:讀)單節點kafka的吞吐量也比其餘消息隊列大,爲何?

 分區機制提供它的高吞吐; 分佈式存儲,消費者組併發消費 ;

 分區中的數據是順序讀取;Disk-->Read Buffer-->Socket Buffer-->NIC Buffer;senfile作了優化省去了中間的Application Buffer

 磁盤--內核區 -用戶區--內核區

 日誌分段讀取:存儲在小文件中,還有索引;

 順序讀寫:kafka的消息是不斷追加到文件中的,這個特性使kafka能夠充分利用磁盤的順序讀寫性能,順序讀寫不須要硬盤磁頭的尋道時間,只需不多的扇區旋轉時間,因此速度遠快於隨機讀寫。

零拷貝:在Linux kernel2.2 以後出現了一種叫作"零拷貝(zero-copy)"系統調用機制,就是跳過「用戶緩衝區」的拷貝,創建一個磁盤空間和內存的直接映射,數據再也不復制到「用戶態緩衝區」,系統上下文切換減小爲2次,能夠提高一倍的性能。

文件分段:kafka的隊列topic被分爲了多個區partition,每一個partition又分爲多個段segment,因此一個隊列中的消息其實是保存在N多個片斷文件中,經過分段的方式,每次文件操做都是對一個小文件的操做,很是輕便,同時也增長了並行處理能力

Kafka消費過的數據如何再消費?

 再消費:低級API修改offset

 auto_offset_reset_config

 找不到以前存儲的offset就從earliest(從最先的offset而不是從0,有侷限性)中讀取

 key(主題,partition,組id)

 更改消費者組id--高級API;

 修改offset:Kafka消息隊列中消費過的數據是用offset標記的。經過修改offset到之前消費過的位置,能夠實現數據的重複消費。

經過使用不一樣的group來消費:Kafka中,不一樣的消費者組的offset是獨立的,所以能夠經過不一樣的消費者組實現數據的重複消費。

   須要將ConsumerConfig.AUTO_OFFSET_RESET_CONFIG屬性修改成"earliest"

kafka數據有序性? 分區有序放在一個分區,k v.消息message是k v k通常不寫,k可保證分到哪一個區kafka發送消息採用的是Deququ雙端隊列,兩頭均可以存取,若消息發送失敗就放回頭部,保證了數據的有序性;存放數據時放到一個分區中,消息message是以k, v鍵值對形式,k相同放一個分區

相關文章
相關標籤/搜索