億級流量該怎麼抗,kafka集羣解耦高吞吐量|Java 開發實戰

這是我參與更文挑戰的第7天,活動詳情查看: 更文挑戰java

本文正在參加「Java主題月 - Java 開發實戰」,詳情查看 活動連接shell

Kafka是一種高吞吐量的分佈式發佈訂閱的消息隊列系統apache

Kafka是一種高吞吐量的分佈式發佈訂閱的消息隊列系統,Kafka對消息進行保存時是經過tipic進行分組的。今天咱們僅實現Kafka集羣的配置。理論的抽空在聊
複製代碼

前言

  • 最近研究kafka,發現網上不少關於kafka的介紹都是基於Linux操做系統的。雖然這些服務最後都是配置Linux上的。可是咱們平時使用的大多都是Windows系統。因此研究非常吃力。通過借鑑不一樣的網絡文章終於在Windows上實現了kafka的配置。在這裏如今的配置是基於Zookeeper 3.4.6 版本的。若是讀者在操做但願能和個人版本保持一致。

zookeeper

  • 瞭解kafka的都知道。kafka是經過Zookeeper實現分佈操做的。不論是broker,consumer,仍是provide信息都是存儲在Zookeeper中的。當broker掛掉都是Zookeeper來進行從新分配選擇的。因此實現kafka集羣前咱們得先實現Zookeeper的集羣配置。windows

  • 首先咱們從官網上下載Zookeeper到本地。我這裏下載的是Zookeeper-3.4.6.tar.gz版本的。讀者能夠根據本身狀況下載。下載好以後進行文件解壓。而後找到conf文件中的zoo_sample.cfg文件。該文件是Zookeeper官網給咱們提供的一套樣板。咱們賦值該文件到同級下並更名爲zoo.cfg.以下圖api

這裏寫圖片描述

修改配置文件

而後咱們來看看這個配置文件裏面都有些啥服務器

tickTime=2000
複製代碼
  • 服務器之間或客戶端與服務端之間維持心跳的時間。就是每隔tickTime就會發送一次心跳。單位毫秒
initLimit=10
複製代碼
  • 這個是配置Zookeeper接收客戶端初始化鏈接最長能忍受initLimit心跳時間間隔。
syncLimit=5
複製代碼
  • Leader與follow之間發送消息和應答的時間 總時間=syncLimit*tickTime
dataDir
複製代碼
  • zookeeper數據保持路徑 默認將log日誌也保存在dataDir
dataLogDir
複製代碼
  • zookeeper log日誌保存地址 不設置默認是dataDir
clientPort=2181
複製代碼
  • 客戶端鏈接的端口
server.1=192.168.1.130:28881:38881
server.2=192.168.1.130:28882:38882
server.3=192.168.1.130:28883:38883
複製代碼
  • 由於個人集羣都是在同一臺電腦上配置的,因此這裏端口不能同樣

知道配置文件裏的意思應該就知道如何修改了吧markdown

  • 對於新手咱們只須要該如下地方呢。
dataDir+dataLogDir+clientPort
複製代碼
  • 可是下面的server是Zookeeper配置裏須要重點講解的部分

上面的格式咱們能夠簡單的總結爲 server.num=B:C:D。 num:是正整數表明的服務的惟一標識。這個要和後面說道的myid文件保持一致。網絡

B: 標識Zookeeper集羣中某一個服務的ip或者域名 192.168.1.130app

C:表示server.num這個服務於集羣中leader進行信息交流的端口。在kafka中咱們leader和follower須要進行數據備份。具體服務就是經過這個地方制定的端口進行通訊的。socket

D:表示萬一leader宕機了,咱們就經過這個端口來進行再follower中選舉新的leader。

大坑預防

  • 網上的不少教程也就介紹到這裏。稍微好點就提了一下建立myid文件的事,我當時就糾結在這裏。由於我根本不知道穿件的myid的類型。我就隨便建立txt文件。結果是錯的。這裏咱們建立myid我有兩種方式。還有myid裏面的內容就是咱們對應的配置文件中server.num中的num。

  • 第一種就是咱們經過cmd窗口到咱們要建立myid的文件夾下

執行以下命令

echo 1 > myid
複製代碼

這裏寫圖片描述

  • 第二種是咱們先建立TXT文件將對應的內容寫入。而後txt後綴刪掉就能夠了。

  • 順便提一下myid應該放在咱們conf/zoo.cfg文件中指定的dataDir 的對應的文件路徑下。

服務開啓

  • 所謂的集羣就是講上面的Zookeeper複製成多個,將上面提到的幾個重要的屬性更改掉就好了。

  • 若是你到這一步說明你離成功已經不遠了。下面咱們只須要開啓服務就好了。開啓服務在咱們解壓的bin目錄下。

這裏寫圖片描述

  • 這裏咱們得有些常識,已sh結尾的是Linux 系統的shell文件。在windows上沒有裝插件是沒法使用的。咱們windows認識的就是bat文件。就是上面的cmd結尾纔是咱們能夠用的功能。可是咱們還須要進行一下修改。其實這裏已經能夠了。咱們到cmd窗口中經過該命令去執行咱們zoo.cfg文件。可是爲了方便咱們這裏講zoo.cfg配置進咱們的zkServer.cmd文件中

這裏寫圖片描述

  • 好了。配置完成。咱們只須要每次點擊zkServer.cmd就開啓了Zookeeper中的服務了。

友情提醒

  • 上面咱們配置的Zookeeper在開啓第一個時候回報錯。爲何呢。緣由就是咱們開啓了一個服務,。可是咱們的配置文件配置的是集羣的信息。這個時候就回去尋找其餘服務。可是這個時候其餘的服務尚未開啓呢。因此這個錯誤是正常。等咱們集羣中的全部的服務都開啓了就不會報錯。這裏你們不要被嚇到。

  • 除此以外,還有一點就是Zookeeper的安裝目錄(解壓目錄)是絕對不能包含漢字的。我上面的截圖有漢字那是我計算機上設置的。實際的路徑是沒有漢字的。不要被上面的圖片誘導。

  • 當全部的服務都開啓了,咱們如何查看咱們的服務是否開啓成功呢。這很簡單。咱們從新打開一個新的cmd窗口。直接執行jps就能夠看到咱們的服務了。QuorumPeerMain就是咱們的服務主類

這裏寫圖片描述

Kafka集羣配置

  • 上面咱們就完成了Zookeeper的集羣的配置。實際上Kafka中就自帶有Zookeeper的服務。可是爲了數據的高可用性。咱們最好選擇本身搭建Zookeeper集羣。這也是官網上的建議。

  • 這裏個人Kafka版本選擇的是0.8.1.1。建議單價不要選擇過高的版本。剛出的版本可能有未知的bug。

  • 一樣這裏的集羣就是講Kafka複製多個。這裏我選擇其中一個進行講解。其餘的都是同樣的主要就是講端口改掉就好了。

  • 將官網下載的Kafka解壓更名爲kafka1(其餘的更名數字遞增就行。或者自定義別的名字)。找到config/server.properties文件。

server.properties修改

一樣的先來了解裏面的參數含義吧

broker.id=1
複製代碼
  • 在kafka這個集羣中的惟一標識,且只能是正整數
port=9091
複製代碼
  • 該服務監聽的端口
host.name=192.168.1.130
複製代碼
  • broker 綁定的主機名稱(IP) 若是不設置將綁定全部的接口。
advertised.host.name=192.168.1.130
複製代碼
  • broker服務將通知消費者和生產者 換言之,就是消費者和生產者就是經過這個主機(IP)來進行通訊的。若是沒有設置就默認採用host.name。

num.network.threads=2
複製代碼
  • broker處理消息的最大線程數,通常狀況是CPU的核數
num.io.threads=8
複製代碼
  • broker處理IO的線程數 通常是num.network.threads的兩倍
socket.send.buffer.bytes=1048576
複製代碼
  • socket發送的緩衝區。socket調優參數SO_SNDBUFF
socket.receive.buffer.bytes=1048576
複製代碼
  • socket接收的緩衝區 socket的調優參數SO_RCVBUF
socket.request.max.bytes=104857600
複製代碼
  • socket請求的最大數量,防止serverOOM。
log.dirs=\logs
複製代碼
  • kafka數據的存放地址,多個地址的話用逗號隔開。多個目錄分佈在不一樣的磁盤上能夠提升讀寫性能
num.partitions=2
複製代碼
  • 每一個tipic的默認分區個數,在建立topic時能夠從新制定
log.retention.hours=168
複製代碼
  • 數據文件的保留時間 log.retention.minutes也是一個道理。
log.segment.bytes=536870912
複製代碼
  • topic中的最大文件的大小 -1表示沒有文件大小限制 log.segment.bytes 和log.retention.minutes 任意一個

達到要求 都會刪除該文件 在建立topic時能夠從新制定。若沒有.則選取該默認值

log.retention.check.interval.ms=60000
複製代碼
  • 文件大小檢查的週期時間,是否處罰 log.cleanup.policy中設置的策略
log.cleaner.enable=false
複製代碼
  • 是否開啓日誌清理
zookeeper.connect=192.168.1.130:num1,192.168.1.130:num2,192.168.1.130:num3
複製代碼
  • 上面咱們的Zookeeper集羣
zookeeper.connection.timeout.ms=1000000
複製代碼
  • 進羣連接時間超時

  • 一樣的咱們每次賦值kafka服務咱們只需該配置文件裏的下面兩個屬性就好了。
broker.id  +  port
複製代碼

服務啓動前的命令準備

  • 一樣的咱們觀察bin目錄中咱們會發現Kafka針對Linux和windows提供了不一樣的組件。windows的組件放在了windows的文件夾下了。可是我在實際操做中沒法使用裏面的命令。報一些錯誤。這裏個人解決辦法是將windows裏的bat所有複製到外面。就是複製到bin目錄下。

這裏寫圖片描述

  • 上圖中指出來的bat本來是在windows文件中。拷貝到bin目錄以後咱們須要修改一下kafka-run-class.bat文件。由於裏面寫的相對路徑和引入的jar會致使出錯。因此咱們將裏面的這段代碼
set ivyPath=%USERPROFILE%\.ivy2\cache

set snappy=%ivyPath%/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.5.jar
     call :concat %snappy%

set library=%ivyPath%/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar
     call :concat %library%

set compiler=%ivyPath%/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar
     call :concat %compiler%

set log4j=%ivyPath%/log4j/log4j/jars/log4j-1.2.15.jar
     call :concat %log4j%

set slf=%ivyPath%/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar
     call :concat %slf%

set zookeeper=%ivyPath%/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar
     call :concat %zookeeper%

set jopt=%ivyPath%/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar
     call :concat %jopt%

for %%i in (%BASE_DIR%\core\target\scala-2.8.0\*.jar) do (
     call :concat %%i
)

for %%i in (%BASE_DIR%\core\lib\*.jar) do (
     call :concat %%i
)

for %%i in (%BASE_DIR%\perf\target\scala-2.8.0/kafka*.jar) do (
     call :concat %%i
) 
複製代碼
  • 替換成
for %%i in (%BASE_DIR%\libs\*.jar) do (
     call :concat %%i
) 
複製代碼
  • 咱們仔細觀察原來的配置大概意思是引入一些jar包啥的。可是會出現有的時候咱們的文件根本沒有那個jar。可是又引入了。會常常報錯。因此咱們改爲引入libs下的全部jar.有啥就引入啥。這樣就不會報錯的。

大坑預防

  • 到這裏我本來天真的認爲就已經完事了。可是誰知我按照網上的教程繼續的時候就出現以下錯誤

這裏寫圖片描述

  • 首先第一行提示 set JMX_PORT to default value 9999 這個錯誤是由於我沒有設置這個值。這卻是小事。可是後面報說找不到或沒法加載主類kafka.Kafka這就讓我費解。在這裏我也是卡了一天了。後來在網上找到了一個方法。我不知道這是否是Kafka的bug。反正用這個方法我是解決了這個錯誤了。

  • 解決辦法就是將kafka-run-class.bat文件中

set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp %CLASSPATH% %*
複製代碼
  • 修改成
set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp "%CLASSPATH%" %*
複製代碼
  • 對比咱們發現就是將classpath加上雙引號。搞了半天就是系統變量路徑沒有找到的緣由。不過這個問題值得引發咱們的注意。咱們的kafka寄去你的搭建實在Java 的jdk基礎是搭建的。因此前提咱們得將jdk等這些配置到環境變量中去。這裏的配置網上搜去吧不少。

服務開啓

  • 到這一步咱們離kafka的成功又不遠了。咱們新開cmd窗口cd到kafka的bin目錄中。

  • 可是在執行開啓以前咱們須要先執行

Set JMX_PORT=19091(每一個服務數字不能同樣)
複製代碼
  • 而後在執行
kafka-server-start.bat ..\config\server.properties
複製代碼

建立Topic批處理

  • 官網上是沒有提供windows版本的topic處理程序的。咱們須要本身新建一個bat文件。這個bat文件的內容填寫以下
kafka-run-class.bat  kafka.admin.TopicCommand  %* 
複製代碼

這裏寫圖片描述

消息處理

  • 有了這個批處理咱們就能夠經過它實現topic的建立。生產者發送消息和消費者的接收消息

建立Topic

  • replication-factor:表示該topic須要在不一樣的broker中保存
  • partitions : 對該top的分區數量
  • topic : 該top的名稱。建議指定。不然採用默認
kafka-topics.bat --create --zookeeper 192.168.1.130:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic 
複製代碼

查看Topic

kafka-topics.bat --describe --zookeeper 192.168.1.130:2181 --topic my-replicated-topic
複製代碼

生產topic消息

kafka-console-producer.bat --broker-list 192.168.1.130:9093 --topic my-replicated-topic
複製代碼

消費topic消息

kafka-console-consumer.bat --zookeeper 192.168.1.130:2181 --from-beginning --topic my-replicated-topic
複製代碼
  • 最後在接收發送消息是咱們須要從新建立新的cmd窗口。下面看看效果圖。最終實現實時接收消息

這裏寫圖片描述


資源因爲大小限制暫時沒法上傳!後續再上傳

相關文章
相關標籤/搜索