Kafka 是一個實現了分佈式的、具備分區、以及複製的日誌的一個服務。它經過一套獨特的設計提供了消息系統中間件的功能。它是一種發佈訂閱功能的消息系統。html
Messagejava
消息,就是要發送的內容,通常包裝成一個消息對象。spring
Topic數據庫
通俗來說的話,就是放置「消息」的地方,也就是說消息投遞的一個容器。假如把消息看做是信封的話,那麼 Topic 就是一個郵箱 apache
Partition && Logbootstrap
Partition 分區,能夠理解爲一個邏輯上的分區,像是咱們電腦的磁盤 C:, D:, E: 盤同樣,windows
Kafka 爲每一個分區維護着一份日誌Log文件。服務器
Producers(生產者)app
和其餘消息隊列同樣,生產者一般都是消息的產生方。負載均衡
在 Kafka 中它決定消息發送到指定Topic的哪一個分區上。
Consumers(消費者)
消費者就是消息的使用者,在消費者端也有幾個名詞須要區分一下。
通常消息隊列有兩種模式的消費方式,分別是 隊列模式 和 訂閱模式。
隊列模式:一對一,就是一個消息只能被一個消費者消費,不能重複消費。通常狀況隊列支持存在多個消費者,可是對於一個消息,只會有一個消費者能夠消費它。
訂閱模式:一對多,一個消息可能被屢次消費,消息生產者將消息發佈到Topic中,只要是訂閱改Topic的消費者均可以消費。
Kafka使用zookeeper做爲其分佈式協調框架,很好的將消息生產、消息存儲、消息消費的過程結合在一塊兒。同時藉助zookeeper,kafka可以生產者、消費者和broker在內的因此組件在無狀態的狀況下,創建起生產者和消費者的訂閱關係,並實現生產者與消費者的負載均衡。
能夠到zookeeper官網下載
http://zookeeper.apache.org/releases.html
(1)下載解壓完成後,來到conf文件夾下,有一個 zoo_sample.cfg 官方默認的配置文件。複製一份,重命名爲 zoo.cfg
(2)配置,打開zoo.cfg 修改配置信息
#存儲內存中數據庫快照的位置,若是不設置參數,更新事務日誌將被存儲到默認位置。 dataDir=../zkData #日誌文件的位置 dataLogDir=../zkLog #監聽端口 clientPort=2181
(3)集羣配置
server.1=127.0.0.1:12888:1388 server.2=127.0.0.1:12889:1389 server.3=127.0.0.1:12887:1387
格式: server.A = B:C:D
A:是一個數字,表示第幾號服務器
B:服務器IP地址
C:是一個端口號,用來集羣成員的信息交換,表示這個服務器與集羣中的leader服務器交換信息的端口
D:是在leader掛掉時專門用來進行選舉leader所用的端口
複製兩份zookeeper解壓好配置後的文件夾,命名爲
在對應的文件下下面修改zoo.cfg的監聽端口地址
好比:
第一個zookeeper-3.4.6程序 修改zoo.cfg 配置文件
clientPort=2181
第二個zookeeper-3.4.6-2程序 修改zoo.cfg 配置文件
clientPort=2182
第三個zookeeper-3.4.6-2程序 修改zoo.cfg 配置文件
clientPort=2183
在配置的dataDir目錄下面新建一個 myid 文件,文件內容就是對應的id號,
好比:
zookeeper-3.4.6程序 myid 文件的內容 爲 1
zookeeper-3.4.6-2程序 myid 文件的內容 爲 2
zookeeper-3.4.6-3程序 myid 文件的內容 爲 3
我這邊配置的目錄是
在對應的bin目錄下啓動
zkServer.cmd
去官網 http://kafka.apache.org/下載便可 這邊下載的是
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.9.2-0.8.2.2.tgz
改成:
http://archive.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz
解壓後到config文件夾下 打開server.properties配置文件進行配置
修改或新增如下配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
複製兩份解壓後的文件,命名以下
修改部分配置信息
在對應的server.properties中修改
#惟一標識
broker.id=0
broker.id=1
broker.id=2
#監聽端口
port=9092
port=9093
port=9094
進入到bin/windows目錄下 啓動kafka並指定配置文件
kafka-server-start.bat ../../config/server.properties
啓動過程當中若是遇到Kafka中錯誤:
Unrecognized VM option ‘UseCompressedOops’ Error: Clould not create the Java Vritual Machine. Error: A fatal exception has occurres . Program will exit.
解決方案:
找到bin/windows/kafka-run-class.bat 文件,
找到112行左右
IF ["%KAFKA_JVM_PERFORMANCE_OPTS%"] EQU [""] ( set KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true )
刪除掉 -XX:+UseCompressedOops 便可
(1)建立一個 topic
kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
(2)查看是否建立成功
kafka-topics.bat --list --zookeeper localhost:2181
(3)發送消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test This is a message
(4)接收消息
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning
改成:
kafka-console-consumer.bat -- bootstrap-server 127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094 --topic test --f rom-beginning
不一樣客戶端能接收到消息,說明配置成功
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
增長配置
spring: kafka: bootstrapServers: 127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094 producer: batchSize: 10 compressionType: snappy acks: all
測試類,ps:上述文章中已經建立TOPIC。。。。test
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @RunWith(SpringRunner.class) @SpringBootTest public class KafkaApplicationTests { @Autowired private KafkaTemplate kafkaTemplate; @Test public void contextLoads() { try { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test","luoye", "hello kafka"); future.addCallback(new ListenableFutureCallback<SendResult< String, String >>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println(result.toString()); System.out.println("推送消息成功"); } @Override public void onFailure(Throwable throwable) { System.out.println("推送消息失敗"); } }); } catch (Exception e) { e.printStackTrace(); } } }
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
spring: kafka: consumer: group-id: defaultGroup bootstrap-servers: 127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class Listener { @KafkaListener(topics = {"test"}) public void listen(ConsumerRecord<?, ?> record) { System.out.println(record.key()); System.out.println(record.value()); } }