Linux環境Kafka安裝配置

Linux環境Kafka安裝配置

1. 認識Kafa

(1) Kafa介紹

  • 開源消息系統
  • 官網:kafka.apache.org/
  • 用途:在流式計算中,Kafka通常用來緩存數據,Storm經過消費Kafka的數據進行計算。
  • Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟件基金會開發的一個開源消息系統項目。Kafka最初是由LinkedIn公司開發,並於2011年初開源。2012年10月從Apache Incubator畢業。該項目的目標是爲處理實時數據提供一個統1、高通量、低等待的平臺。Kafka是一個分佈式消息隊列。Kafka對消息保存時根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)成爲broker。不管是kafka集羣,仍是producer和consumer都依賴於zookeeper集羣保存一些meta信息,來保證系統可用性。

(2) 消息隊列內部實現原理

  • <1>. 點對點模式(相似接受文件,一對一,消費者主動拉取數據,消息收到後消息清除) -點對點模型一般是一個基於拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特色是發送到隊列的消息被一個且只有一個接收者接收處理,即便有多個消息監聽者也是如此。
  • <2>. 發佈/訂閱模式(相似公衆號,一對多,數據生產後,推送給全部訂閱者)
    • 發佈訂閱模型則是一個基於推送的消息傳送模型。發佈訂閱模型能夠有多種不一樣的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的全部消息,即便當前訂閱者不可用,處於離線狀態。

(3) 爲何須要消息隊列

  • <1>. 解耦:
    • 容許獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。
  • <2>. 冗餘:
    • 消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
  • <3>. 擴展性:
    • 由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的,只要另外增長處理過程便可。
  • <4>. 靈活性 & 峯值處理能力:
    • 在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。
  • <5>. 可恢復性:
    • 系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。
  • <6>. 順序保證:
    • 在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)
  • <7>. 緩衝:
    • 有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。
  • <8>. 異步通訊:
    • 不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。

(4) Kafka架構

  • **<1>. Producer:**消息生產者,就是向kafka broker發消息的客戶端。
  • **<2>. Consumer:**消息消費者,向kafka broker取 消息的客戶端
  • **<3>. Topic:**能夠理解爲一個隊列。
  • **<4>. Consumer Group(CG):**消費者組,kafka提供的可擴展且具備容錯性的消費者機制。既然是一個組,那麼組內必然能夠有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內的全部消費者協調在一塊兒來消費訂閱主題(subscribed topics)的全部分區(partition)。固然,每一個分區只能由同一個消費組內的一個consumer來消費。
  • **<5>. Broker:**一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic。
  • **<6>. Partition:**爲了實現擴展性,一個很是大的topic能夠分佈到多個broker(即服務器)上,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的總體(多個partition間)的順序。
  • **<7>. Offset:**kafka的存儲文件都是按照offset.kafka來命名,用offset作名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件便可。固然the first offset就是00000000000.kafka

(5) 分佈式模型

  • Kafka每一個主題的多個分區日誌分佈式地存儲在Kafka集羣上,同時爲了故障容錯,每一個分區都會以副本的方式複製到多個消息代理節點上。其中一個節點會做爲主副本(Leader),其餘節點做爲備份副本(Follower,也叫做從副本)。主副本會負責全部的客戶端讀寫操做,備份副本僅僅從主副本同步數據。當主副本出現故障時,備份副本中的一個副本會被選擇爲新的主副本。由於每一個分區的副本中只有主副本接受讀寫,因此每一個服務器端都會做爲某些分區的主副本,以及另一些分區的備份副本,這樣Kafka集羣的全部服務端總體上對客戶端是負載均衡的。
  • Kafka的生產者和消費者相對於服務器端而言都是客戶端。Kafka生產者客戶端發佈消息到服務端的指定主題,會指定消息所屬的分區。生產者發佈消息時根據消息是否有鍵,採用不一樣的分區策略。消息沒有鍵時,經過輪詢方式進行客戶端負載均衡;消息有鍵時,根據分區語義(例如hash)確保相同鍵的消息老是發送到同一分區。
  • Kafka的消費者經過訂閱主題來消費消息,而且每一個消費者都會設置一個消費組名稱。由於生產者發佈到主題的每一條消息都只會發送給消費者組的一個消費者。因此,若是要實現傳統消息系統的**「隊列」模型**,可讓每一個消費者都擁有相同的消費組名稱,這樣消息就會負責均衡到全部的消費者;若是要實現**「發佈-訂閱」模型**,則每一個消費者的消費者組名稱都不相同,這樣每條消息就會廣播給全部的消費者。
  • 分區是消費者現場模型的最小並行單位。以下圖(圖1)所示,生產者發佈消息到一臺服務器的3個分區時,只有一個消費者消費全部的3個分區。在下圖(圖2)中,3個分區分佈在3臺服務器上,同時有3個消費者分別消費不一樣的分區。假設每一個服務器的吞吐量時300MB,在下圖(圖1)中分攤到每一個分區只有100MB,而在下圖(圖2)中,集羣總體的吞吐量有900MB。能夠看到,增長服務器節點會提高集羣的性能,增長消費者數量會提高處理性能。
  • 同一個消費組下多個消費者互相協調消費工做,Kafka會將全部的分區平均地分配給全部的消費者實例,這樣每一個消費者均可以分配到數量均等的分區。Kafka的消費組管理協議會動態地維護消費組的成員列表,當一個新消費者加入消費者組,或者有消費者離開消費組,都會觸發再平衡操做。
  • Kafka的消費者消費消息時,只保證在一個分區內的消息的徹底有序性,並不保證同一個主題匯中多個分區的消息順序。並且,消費者讀取一個分區消息的順序和生產者寫入到這個分區的順序是一致的。好比,生產者寫入「hello」和「Kafka」兩條消息到分區P1,則消費者讀取到的順序也必定是「hello」和「Kafka」。若是業務上須要保證全部消息徹底一致,只能經過設置一個分區完成,但這種作法的缺點是最多隻能有一個消費者進行消費。
  • 通常來講,只須要保證每一個分區的有序性,再對消息鍵(message Key能夠是user id等)來保證相同鍵的全部消息落入同一分區,就能夠知足絕大多數的應用。
  • kafka讀寫流程圖:

3. Kafka-2.11安裝流程

(1) 準備工做

  • ZooKeeper集羣環境

(2) 解壓kafka_2.11-2.1.1.tgz安裝包到目標目錄下:

  • tar -zxvf .tgz -C 目標目錄

(3) 爲後續方便,重命名Kafka文件夾:

  • mv kafka_2.11-2.1.1/ kafka_2.11

(4) 在/opt/module/kafka目錄下建立logs文件夾

  • mkdir logs

(5) 修改配置文件

  • cd config/
  • vi server.properties
    • #broker的全局惟一編號,不能重複
      broker.id=0
      #是否容許刪除topic
      delete.topic.enable=true
      #處理網絡請求的線程數量
      num.network.threads=3
      #用來處理磁盤IO的線程數量
      num.io.threads=8
      #發送套接字的緩衝區大小
      socket.send.buffer.bytes=102400
      #接收套接字的緩衝區大小
      socket.receive.buffer.bytes=102400
      #請求套接字的最大緩衝區大小
      socket.request.max.bytes=104857600
      #kafka運行日誌存放的路徑
      log.dirs=/opt/module/kafka/logs
      #topic在當前broker上的分區個數
      num.partitions=1
      #用來恢復和清理data下數據的線程數量
      num.recovery.threads.per.data.dir=1
      #segment文件保留的最長時間,超時將被刪除
      log.retention.hours=168
      #配置鏈接Zookeeper集羣地址 
      zookeeper.connect=XXXX:2181,XXXX:2181,XXXX:2181
      複製代碼
  • <1>. Broker配置信息
  • <2>. Producer配置信息
  • <3>.Consumer配置信息

(6) 配置環境變量:

  • 修改配置文件:
    • vi /etc/profile
  • 增長如下內容:
    • export KAFKA_HOME=kafka安裝路徑
    • export PATH=$PATH:$KAFKA_HOME/bin
  • 聲明環境變量:
    • source /etc/profile

(7) 集羣配置:

  • 拷貝配置好的kafka到其餘機器上
    • scp -r kafka_2.11/ bigdata02:$PWD
      • scp -r kafka_2.11/ bigdata03:$PWD
  • 修改配置信息broker.id (注:broker.id不得重複)
  • 配置相應環境變量

(8) 啓動集羣

  • 依次在bigdata十一、bigdata十二、bigdata13節點上啓動kafka(加上&是在後臺啓動)
    • kafka-server-start.sh config/server.properties &

(9) 關閉集羣

  • kafka-server-stop.sh stop

4. Kafka命令行操做

(1) 查看當前服務器中的全部topic

  • kafka-topics.sh --zookeeper XXXX:2181 --list

(2) 建立topic

  • kafka-topics.sh --zookeeper XXXX:2181 --create --replication-factor 3 --partitions 1 --topic 名稱
  • 選項說明:
    • --topic 定義topic名
    • --replication-factor 定義副本數
    • --partitions 定義分區數

(3) 刪除topic

  • kafka-topics.sh --zookeeper XXXX:2181 --delete --topic first
  • 須要server.properties中設置delete.topic.enable=true不然只是標記刪除或者直接重啓。

(4) 發送消息

  • kafka-console-producer.sh --broker-list XXXX:9092 --topic first

(5) 消費消息

  • kafka-console-consumer.sh --bootstrap-server node3:9092 --from-beginning --topic first
  • --from-beginning:會把first主題中以往全部的數據都讀取出來。根據業務場景選擇是否增長該配置。

(6) 查看某個Topic的詳情

  • kafka-topics.sh --zookeeper XXXX:2181 --describe --topic first

5. Kafka工做流程分析

(1) Kafka生產過程分析

  • <1>. 寫入方式node

    • producer採用推(push)模式將消息發佈到broker,每條消息都被追加(append)到分區(patition)中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障kafka吞吐率)。
  • <2>. 分區(Partition)apache

    • Kafka集羣有多個消息代理服務器(broker-server)組成,發佈到Kafka集羣的每條消息都有一個類別,用主題(topic)來表示。一般,不一樣應用產生不一樣類型的數據,能夠設置不一樣的主題。一個主題通常會有多個消息的訂閱者,當生產者發佈消息到某個主題時,訂閱了這個主題的消費者均可以接收到生成者寫入的新消息。bootstrap

    • Kafka集羣爲每一個主題維護了分佈式的分區(partition)日誌文件,物理意義上能夠把主題(topic)看做進行了分區的日誌文件(partition log)。主題的每一個分區都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日誌中。分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,叫作偏移量(offset),這個偏移量可以惟一地定位當前分區中的每一條消息。緩存

    • 消息發送時都被髮送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區日誌)組成,其組織結構以下圖所示:安全

    • 下圖中的topic有3個分區,每一個分區的偏移量都從0開始,不一樣分區之間的偏移量都是獨立的,不會相互影響。 bash

    • 能夠看到,每一個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每個消息都被賦予了一個惟一的offset值服務器

    • 發佈到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務器端的指定分區後,都會分配到一個自增的偏移量。**原始的消息內容和分配的偏移量以及其餘一些元數據信息最後都會存儲到分區日誌文件中。**消息的鍵也能夠不用設置,這種狀況下消息會均衡地分佈到不一樣的分區。網絡

    • 分區的緣由:架構

      • a. 方便在集羣中擴展,每一個Partition能夠經過調整以適應它所在的機器,而一個topic又能夠有多個Partition組成,所以整個集羣就能夠適應任意大小的數據;
      • b. 能夠提升併發,由於能夠以Partition爲單位讀寫。
        • 傳統消息系統在服務端保持消息的順序,若是有多個消費者消費同一個消息隊列,服務端會以消費存儲的順序依次發送給消費者。**但因爲消息是異步發送給消費者的,消息到達消費者的順序多是無序的,這就意味着在並行消費時,傳統消息系統沒法很好地保證消息被順序處理。**雖然咱們能夠設置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,可是這就使得消費處理沒法真正執行。
        • Kafka比傳統消息系統有更強的順序性保證,它使用主題的分區做爲消息處理的並行單元。**Kafka以分區做爲最小的粒度,將每一個分區分配給消費者組中不一樣的並且是惟一的消費者,並確保一個分區只屬於一個消費者,即這個消費者就是這個分區的惟一讀取線程。那麼,只要分區的消息是有序的,消費者處理的消息順序就有保證。**每一個主題有多個分區,不一樣的消費者處理不一樣的分區,因此Kafka不只保證了消息的有序性,也作到了消費者的負載均衡。
    • 分區的原則:併發

      • a. 指定了patition,則直接使用;
      • b. 未指定patition但指定key,經過對key的value進行hash出一個patition
      • c. patition和key都未指定,使用輪詢選出一個patition。
  • <3>. 副本(Replication) 同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的狀況下,一旦broker 宕機,其上全部 patition 的數據都不可被消費,同時producer也不能再將數據存於其上的patition。引入replication以後,同一個partition可能會有多個replication,而這時須要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication做爲follower從leader 中複製數據。

  • <4>. 寫入流程

    • producer寫入消息流程以下:
      • a. producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader
      • b. producer將消息發送給該leader
      • c. leader將消息寫入本地log
      • d. followers從leader pull消息,寫入本地log後向leader發送ACK
      • a. leader收到全部ISR中的replication的ACK後,增長HW(high watermark,最後commit 的offset)並向producer發送ACK

(2) Broker 保存消息

  • a. 存儲方式
    • 物理上把topic分紅一個或多個patition(對應 server.properties 中的num.partitions=3配置),每一個patition物理上對應一個文件夾(該文件夾存儲該patition的全部消息和索引文件)。
  • b. 存儲策略
    • 不管消息是否被消費,kafka都會保留全部消息。有兩種策略能夠刪除舊數據:
      • 基於時間:log.retention.hours=168
      • 基於大小:log.retention.bytes=1073741824
    • 須要注意的是,由於Kafka讀取特定消息的時間複雜度爲O(1),即與文件大小無關,因此這裏刪除過時文件與提升 Kafka 性能無關。

(3) Zookeeper存儲結構

  • 注意:producer不在zk中註冊,消費者在zk中註冊。

6. Kafka消費過程分析

  • kafka提供了兩套consumer API:高級Consumer API和低級API。

(1) 消費模型

  • 消息由生產者發佈到Kafka集羣后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。

  • 基於推送模型(push)的消息系統,由消息代理記錄消費者的消費狀態。消息代理在將消息推送到消費者後,標記這條消息爲已消費,但這種方式沒法很好地保證消息被處理。好比,消息代理把消息發送出去後,當消費進程掛掉或者因爲網絡緣由沒有收到這條消息時,就有可能形成消息丟失(由於消息代理已經把這條消息標記爲已消費了,但實際上這條消息並無被實際處理)。若是要保證消息被處理,消息代理髮送完消息後,要設置狀態爲「已發送」,只有收到消費者的確認請求後才更新爲「已消費」,這就須要消息代理中記錄全部的消費狀態,這種作法顯然是不可取的。

  • Kafka採用拉取模型,**由消費者本身記錄消費狀態,每一個消費者互相獨立地順序讀取每一個分區的消息。**以下圖所示,有兩個消費者(不一樣消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限經過最高水位(watermark)控制,生產者最新寫入的消息若是尚未達到備份數量,對消費者是不可見的。這種由消費者控制偏移量的優勢是:**消費者能夠按照任意的順序消費消息。**好比,消費者能夠重置到舊的偏移量,從新處理以前已經消費過的消息;或者直接跳到最近的位置,從當前的時刻開始消費。

  • 在一些消息系統中,消息代理會在消息被消費以後當即刪除消息。若是有不一樣類型的消費者訂閱同一個主題,消息代理可能須要冗餘地存儲同一消息;或者等全部消費者都消費完才刪除,這就須要消息代理跟蹤每一個消費者的消費狀態,這種設計很大程度上限制了消息系統的總體吞吐量和處理延遲。Kafka的作法是生產者發佈的全部消息會一致保存在Kafka集羣中,無論消息有沒有被消費。用戶能夠經過設置保留時間來清理過時的數據,好比,設置保留策略爲兩天。那麼,在消息發佈以後,它能夠被不一樣的消費者消費,在兩天以後,過時的消息就會自動清理掉。

(2) 高級API

  • a. 高級API優勢
    • 高級API寫起來簡單;
    • 不須要自行去管理offset,系統經過zookeeper自行管理;
    • 不須要管理分區,副本等狀況,系統自動管理;
    • 消費者斷線會自動根據上一次記錄在zookeeper中的offset去接着獲取數據(默認設置1分鐘更新一下zookeeper中存的offset)
    • 可使用group來區分對同一個topic 的不一樣程序訪問分離開來(不一樣的group記錄不一樣的offset,這樣不一樣程序讀取同一個topic纔不會由於offset互相影響)
  • b. 高級API缺點
    • 不能自行控制offset(對於某些特殊需求來講)
    • 不能細化控制如分區、副本、zk等

(3) 低級API

  • a. 低級 API 優勢
    • 可以讓開發者本身控制offset,想從哪裏讀取就從哪裏讀取。
    • 自行控制鏈接分區,對分區自定義進行負載均衡
    • 對zookeeper的依賴性下降(如:offset不必定非要靠zk存儲,自行存儲offset便可,好比存在文件或者內存中)
  • b. 低級API缺點
    • 太過複雜,須要自行控制offset,鏈接哪一個分區,找到分區leader 等。

(4) 消費者組

  • 消費者是以consumer group消費者組的方式工做,由一個或者多個消費者組成一個組,共同消費一個topic。每一個分區在同一時間只能由group中的一個消費者讀取,可是多個group能夠同時消費這個partition。在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區,另外兩個分別讀取一個分區。某個消費者讀取某個分區,也能夠叫作某個消費者是某個分區的擁有者。
  • 在這種狀況下,消費者能夠經過水平擴展的方式同時讀取大量的消息。另外,若是一個消費者失敗了,那麼其餘的group成員會自動負載均衡讀取以前失敗的消費者讀取的分區。

(5) 消費方式

  • consumer採用pull(拉)模式從broker中讀取數據。
  • push(推)模式很難適應消費速率不一樣的消費者,由於消息發送速率是由broker決定的。它的目標是儘量以最快速度傳遞消息,可是這樣很容易形成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則能夠根據consumer的消費能力以適當的速率消費消息。
  • 對於Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費消息的速率,同時consumer能夠本身控制消費方式——便可批量消費也可逐條消費,同時還能選擇不一樣的提交方式從而實現不一樣的傳輸語義。
  • pull模式不足之處是,若是kafka沒有數據,消費者可能會陷入循環中,一直等待數據到達。爲了不這種狀況,咱們在咱們的拉請求中有參數,容許消費者請求在等待數據到達的「長輪詢」中進行阻塞(而且可選地等待到給定的字節數,以確保大的傳輸大小)。
相關文章
相關標籤/搜索