3、Kafka工做流程分析

三 Kafka 工做流程分析
3.1 Kafka 生產過程(Producer)分析
                            
3.1.1 寫入方式
  producer 採用推(push)模式將消息發佈到 broker,每條消息都被追加(append)到分區(patition)中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障 kafka 吞吐率)。
3.1.2 分區(Partition)
  消息發送時都被髮送到一個 topic,其本質就是一個目錄,而 topic 是由一些 Partition
  Logs(分區日誌)組成,其組織結構以下圖所示:
    
      咱們能夠看到,每一個 Partition 中的消息都是 有序的,生產的消息被不斷追加到 Partition
        log 上,其中的每個消息都被賦予了一個惟一的 offset 值
   1)分區的緣由
      (1)方便在集羣中擴展,每一個 Partition 能夠經過調整以適應它所在的機器,而一個 topic又能夠有多個 Partition 組成,所以整個集羣就能夠適應任意大小的數據了;
      (2)能夠提升併發,由於能夠以 Partition 爲單位讀寫了。
   2)分區的原則(數據寫入到哪一個分區)
      (1)指定了 patition,則直接使用;
      (2)未指定 patition 但指定 key,經過對 key 的 value 進行 hash 出一個 patition
      (3)patition 和 key 都未指定,使用輪詢選出一個 patition。———————————————————————————
DefaultPartitioner 類 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {       List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);       int numPartitions = partitions.size();       if (keyBytes == null) {           int nextValue = nextValue(topic);           List<PartitionInfo> availablePartitions =cluster.availablePartitionsForTopic(topic);           if (availablePartitions.size() > 0) {                 int part = Utils.toPositive(nextValue) % availablePartitions.size();                 return availablePartitions.get(part).partition();           } else {           // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;           }       } else {           // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;       } }
3.1.3 副本(Replication)
    同 一 個 partition 可 能 會 有 多 個 replication ( 對 應 server.properties 配 置 中 的default.replication.factor=N)。
    沒有 replication 的狀況下,一旦 broker 宕機,其上全部 patition的數據都不可被消費,
    同時 producer 也不能再將數據存於其上的 patition。引入 replication以後,同一個 partition 可能會有多個 replication,
    而這時須要在這些 replication 之間選出一個 leader,producer 和 consumer 只與這個 leader 交互,其它 replication 做爲 follower 從 leader中複製數據。
3.1.4 寫入流程
    producer 寫入消息流程以下:
    
      1)producer 先從 zookeeper 的 "/brokers/.../state"節點找到該 partition 的 leader
      2)producer 將消息發送給該 leader
      3)leader 將消息寫入本地 log
      4)followers 從 leader pull 消息,寫入本地 log 後向 leader 發送 ACK
      5)leader 收到全部 ISR 中的 replication 的 ACK 後,增長 HW(high watermark,最後 commit的 offset)並向 producer 發送 ACK
3.2 broker 保存消息
   3.2.1 存儲方式
      物理上把 topic 分紅一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每一個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的全部消息和索引文
      件),以下:
             [hadoop@node01 logs]$ ll
              drwxrwxr-x. 2 hadoop hadoop 4096 8 月 6 14:37 first-0
              drwxrwxr-x. 2 hadoop hadoop 4096 8 月 6 14:35 first-1
              drwxrwxr-x. 2 hadoop hadoop 4096 8 月 6 14:37 first-2
             [hadoop@node01 logs]$ cd first-0
             [hadoop@node01 first-0]$ ll
              -rw-rw-r--. 1 hadoop hadoop 10485760 8 月 6 14:33 00000000000000000000.index
              -rw-rw-r--. 1 hadoop hadoop 219 8 月 6 15:07 00000000000000000000.log
              -rw-rw-r--. 1 hadoop hadoop 10485756 8 月 6 14:33 00000000000000000000.timeindex
              -rw-rw-r--. 1 hadoop hadoop 8 8 月 6 14:37 leader-epoch-checkpoint
3.2.2 存儲策略
      不管消息是否被消費,kafka 都會保留全部消息。有兩種策略能夠刪除舊數據:
          1)基於時間:log.retention.hours=168
          2)基於大小:log.retention.bytes=1073741824
        須要注意的是,由於 Kafka 讀取特定消息的時間複雜度爲 O(1),即與文件大小無關,因此這裏刪除過時文件與提升 Kafka 性能無關。
3.2.3 Zookeeper 存儲結構
      
 
       注意:producer 不在 zk 中註冊,消費者(0.8 以前)在 zk 中註冊。
3.3 Kafka 消費過程分析
      kafka 提供了兩套 consumer API:高級 Consumer API 和低級 API。
    3.3.1 高級 API
      1)高級 API 優勢
          高級 API 寫起來簡單
           不須要去自行去管理 offset,系統經過 zookeeper 自行管理
          不須要管理分區,副本等狀況,系統自動管理
          消費者斷線會自動根據上一次記錄在 zookeeper 中的 offset 去接着獲取數據(默認設置1 分鐘更新一下 zookeeper 中存的的 offset)
          可使用 group 來區分對同一個 topic 的不一樣程序訪問分離開來(不一樣的 group 記錄不一樣的 offset,這樣不一樣程序讀取同一個 topic 纔不會由於 offset 互相影響)
      2)高級 API 缺點
            不能自行控制 offset(對於某些特殊需求來講)
           不能細化控制如分區、副本、zk 等
3.3.2 低級 API
      1)低級 API 優勢
           可以開發者本身控制 offset,想從哪裏讀取就從哪裏讀取。
          自行控制鏈接分區,對分區自定義進行負載均衡
          對 zookeeper 的依賴性下降(如:offset 不必定非要靠 zk 存儲,自行存儲 offset 便可,好比存在文件或者內存中)
      2)低級 API 缺點
           太過複雜,須要自行控制 offset,鏈接哪一個分區,找到分區 leader 等。
3.3.3 消費者組
      消費者是以 consumer group 消費者組的方式工做,由一個或者多個消費者組成一個組,共同消費一個 topic。每一個分區在同一時間只能由 group 中的一個消費者讀取,可是多個 group
    能夠同時消費這個 partition。在圖中,有一個由三個消費者組成的 group,有一個消費者讀取主題中的兩個分區,
    另外兩個分別讀取一個分區。某個消費者讀取某個分區,也能夠叫作某個消費者是某個分區的擁有者。
      在這種狀況下,消費者能夠經過水平擴展的方式同時讀取大量的消息。另外,若是一個消費者失敗了,那麼其餘的 group 成員會自動負載均衡讀取以前失敗的消費者讀取的分區。
3.3.4 消費方式
       consumer 採用 pull(拉)模式從 broker 中讀取數據。
      push(推)模式很難適應消費速率不一樣的消費者,由於消息發送速率是由 broker 決定的。它的目標是儘量以最快速度傳遞消息,
   可是這樣很容易形成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則能夠根據 consumer 的消費能力以適當的速率消費消息。
      對於 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費消息的速率,同時 consumer 能夠本身控制消費方式——便可批量消費也可逐條消費,同時
   還能選擇不一樣的提交方式從而實現不一樣的傳輸語義。
3.3.5 消費者組案例
    1)需求:一個分區只能被一個消費者消費。
    2)方式一:指定配置文件
        (1)在 node0二、node03 上修改/bd/kafka/config/consumer.properties 配置文件中的 group.id 屬性爲任意組名。 
              [hadoop@node02 config]$ vi consumer.properties
             group.id=hadoop
        (2)在 node0二、node03 上分別啓動消費者
             [hadoop@node02 kafka]$ bin/kafka-console-consumer.sh --zookeeper node02:2181
            --topic first --consumer.config config/consumer.properties
            [hadoop@node02 kafka]$ bin/kafka-console-consumer.sh --zookeeper node02:2181
            --topic first --consumer.config config/consumer.properties
        (3)在 node01 上啓動生產者
             [hadoop@node01 kafka]$ bin/kafka-console-producer.sh --broker-list node01:9092
            --topic first
            >hello world
        (4)查看 node02 和 node03 的接收者。
            同一時刻只有一個消費者接收到消息。
    3) 方式二: 參數指定
         (1) 建立 topic
            ./kafka-topics.sh --create --topic mytopic --partitions 2 --replication-factor 1 --zookeeper   node01:2181
         (2) 建立生產者
            node01 上執行
            ./kafka-console-producer.sh --topic mytopic --broker-list node01:9092
         (3) 建立兩個消費者而且劃分到同一組
            node02 上執行:
            ./kafka-console-consumer.sh --topic mytopic --bootstrap-server node01:9092 --group aaa
            node03 上執行:
            ./kafka-console-consumer.sh --topic mytopic --bootstrap-server node01:9092 --group aaa
相關文章
相關標籤/搜索