springboot整合kafka

爲了方便起見安裝zk集羣和kafka集羣我就不詳細說明了,如下摘自https://www.cnblogs.com/lentoo/p/7785004.html,文章部份內容我作了修改,由於版本或者命令改變

kafka集羣搭建(windows環境下)

1、簡介

Kafka 是一個實現了分佈式的、具備分區、以及複製的日誌的一個服務。它經過一套獨特的設計提供了消息系統中間件的功能。它是一種發佈訂閱功能的消息系統。html

一、名詞介紹

Messagejava

消息,就是要發送的內容,通常包裝成一個消息對象。spring

 

Topic數據庫

通俗來說的話,就是放置「消息」的地方,也就是說消息投遞的一個容器。假如把消息看做是信封的話,那麼 Topic 就是一個郵箱 apache

 

Partition && Logbootstrap

Partition 分區,能夠理解爲一個邏輯上的分區,像是咱們電腦的磁盤 C:, D:, E: 盤同樣,windows

Kafka 爲每一個分區維護着一份日誌Log文件。服務器

 

Producers(生產者)app

和其餘消息隊列同樣,生產者一般都是消息的產生方。負載均衡

在 Kafka 中它決定消息發送到指定Topic的哪一個分區上。

 

Consumers(消費者)

消費者就是消息的使用者,在消費者端也有幾個名詞須要區分一下。

通常消息隊列有兩種模式的消費方式,分別是 隊列模式 和 訂閱模式

隊列模式:一對一,就是一個消息只能被一個消費者消費,不能重複消費。通常狀況隊列支持存在多個消費者,可是對於一個消息,只會有一個消費者能夠消費它。

訂閱模式:一對多,一個消息可能被屢次消費,消息生產者將消息發佈到Topic中,只要是訂閱改Topic的消費者均可以消費。

 

2、安裝zookeeper

一、簡介

Kafka使用zookeeper做爲其分佈式協調框架,很好的將消息生產、消息存儲、消息消費的過程結合在一塊兒。同時藉助zookeeper,kafka可以生產者、消費者和broker在內的因此組件在無狀態的狀況下,創建起生產者和消費者的訂閱關係,並實現生產者與消費者的負載均衡。

二、下載zookeeper

能夠到zookeeper官網下載

http://zookeeper.apache.org/releases.html

三、配置zookeeper

(1)下載解壓完成後,來到conf文件夾下,有一個 zoo_sample.cfg 官方默認的配置文件。複製一份,重命名爲 zoo.cfg

(2)配置,打開zoo.cfg 修改配置信息

複製代碼

#存儲內存中數據庫快照的位置,若是不設置參數,更新事務日誌將被存儲到默認位置。

dataDir=../zkData

#日誌文件的位置

dataLogDir=../zkLog

#監聽端口

clientPort=2181

複製代碼

 

(3)集羣配置

server.1=127.0.0.1:12888:1388

server.2=127.0.0.1:12889:1389

server.3=127.0.0.1:12887:1387

格式: server.A = B:C:D

A:是一個數字,表示第幾號服務器

B:服務器IP地址

C:是一個端口號,用來集羣成員的信息交換,表示這個服務器與集羣中的leader服務器交換信息的端口

D:是在leader掛掉時專門用來進行選舉leader所用的端口

 完整的配置文件以下 

 

複製兩份zookeeper解壓好配置後的文件夾,命名爲

在對應的文件下下面修改zoo.cfg的監聽端口地址

好比:

 

第一個zookeeper-3.4.6程序 修改zoo.cfg 配置文件

clientPort=2181

第二個zookeeper-3.4.6-2程序 修改zoo.cfg 配置文件

clientPort=2182

第三個zookeeper-3.4.6-2程序 修改zoo.cfg 配置文件

clientPort=2183

建立ServerID

在配置的dataDir目錄下面新建一個 myid 文件,文件內容就是對應的id號,

好比: 

zookeeper-3.4.6程序 myid 文件的內容 爲 1

zookeeper-3.4.6-2程序 myid 文件的內容 爲 2

zookeeper-3.4.6-3程序 myid 文件的內容 爲 3

我這邊配置的目錄是

啓動zookeeper

在對應的bin目錄下啓動

zkServer.cmd

以上zk的集羣搭建沒有問題,下面會有部份內容作修改

3、安裝kafka

(1)下載

去官網 http://kafka.apache.org/下載便可 這邊下載的是

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.9.2-0.8.2.2.tgz

改成:

http://archive.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz

(2)配置

解壓後到config文件夾下 打開server.properties配置文件進行配置

(3)配置內容

修改或新增如下配置信息 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

#惟一標識

broker.id=0

#監聽端口

port=9092

host.name=127.0.0.1

#消息最大大小

message.max.bytes=50485760

#配置副本數量

default.replication.factor=2

#獲取的最大大小

replica.fetch.max.bytes=50485760

#隊列中消息持久化存放的位置,能夠多個目錄,用逗號分開

log.dirs=/tmp/kafka-logs

#默認的分區數

num.partitions=2

#對應着剛剛配置的zookeeper的三個ip與端口地址

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

 

(4)集羣配置

複製兩份解壓後的文件,命名以下

 

修改部分配置信息

對應的server.properties中修改

#惟一標識

broker.id=0

broker.id=1

broker.id=2

#監聽端口 

port=9092

port=9093

port=9094

啓動對應的kafka

進入到bin/windows目錄下 啓動kafka並指定配置文件

kafka-server-start.bat ../../config/server.properties

 

啓動過程當中若是遇到Kafka中錯誤:

Unrecognized VM option ‘UseCompressedOops’ Error: Clould not create the Java Vritual Machine. Error: A fatal exception has occurres . Program will exit.

解決方案:

找到bin/windows/kafka-run-class.bat 文件,

找到112行左右

IF ["%KAFKA_JVM_PERFORMANCE_OPTS%"] EQU [""] (

  set KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true

)

 

刪除掉 -XX:+UseCompressedOops 便可

測試集羣

(1)建立一個 topic

kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

 

(2)查看是否建立成功

kafka-topics.bat --list --zookeeper localhost:2181

 

(3)發送消息

kafka-console-producer.bat --broker-list localhost:9092 --topic test

This is a message

 

(4)接收消息

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning

 

改成:

kafka-console-consumer.bat --
bootstrap-server 127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094 --topic test --f
rom-beginning

 

不一樣客戶端能接收到消息,說明配置成功

接下來爲正式整合:

 生產者:

  1. 引入jar包
  2. <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    增長配置

    spring:
      kafka:
          bootstrapServers: 127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094
          producer:
            batchSize: 10
            compressionType: snappy
            acks: all
  3. 測試類,ps:上述文章中已經建立TOPIC。。。。test

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class KafkaApplicationTests {
    	@Autowired
    	private KafkaTemplate kafkaTemplate;
    	@Test
    	public void contextLoads() {
    		try {
    			ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test","luoye", "hello kafka");
    			future.addCallback(new ListenableFutureCallback<SendResult< String, String >>() {
    				@Override
    				public void onSuccess(SendResult<String, String> result) {
    					System.out.println(result.toString());
    					System.out.println("推送消息成功");
    				}
    
    				@Override
    				public void onFailure(Throwable throwable) {
    					System.out.println("推送消息失敗");
    				}
    			});
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    }

 

消費者:

  1.     引入jar包
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
  2. 增長配置類
    spring:
        kafka:
          consumer:
            group-id: defaultGroup
          bootstrap-servers: 127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094
  3. 建立監聽器
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Listener {
      @KafkaListener(topics = {"test"})
      public void listen(ConsumerRecord<?, ?> record) {
         System.out.println(record.key());
         System.out.println(record.value());
      }
    }
相關文章
相關標籤/搜索