kafka集羣安裝搭建(springBoo集成kafka)

kafka介紹

根據官網的介紹,ApacheKafka®是一個分佈式流媒體平臺,它主要有3種功能:html

  • 發佈和訂閱消息流,這個功能相似於消息隊列,這也是kafka歸類爲消息隊列框架的緣由
  • 以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流
  • 能夠再消息發佈的時候進行處理

使用場景

  • 在系統或應用程序之間構建可靠的用於傳輸實時數據的管道,消息隊列功能
  • 建實時的流數據處理程序來變換或處理數據流,數據處理功能

Kafka目前主要做爲一個分佈式的發佈訂閱式的消息系統使用 下圖爲消息傳輸流程java

輸入圖片說明

  • Producer即生產者,向Kafka集羣發送消息,在發送消息以前,會對消息進行分類,即Topic,上圖展現了兩個producer發送了分類爲topic1的消息,另一個發送了topic2的消息。
  • Topic即主題,經過對消息指定主題能夠將消息分類,消費者能夠只關注本身須要的Topic中的消息
  • Consumer即消費者,消費者經過與kafka集羣創建長鏈接的方式,不斷地從集羣中拉取消息,而後能夠對這些消息進行處理。

下載

在kafka官網 http://kafka.apache.org/downloads下載到最新的kafka安裝包,選擇下載二進制版本的tgz文件spring

安裝

  • 首先確保你的機器上安裝了jdk,kafka須要java運行環境,之前的kafka還須要zookeeper,新版的kafka已經內置了一個zookeeper環境,因此咱們能夠直接使用。
  • 若是隻須要進行最簡單的嘗試的話咱們只須要解壓到任意目錄便可,這裏咱們將kafka壓縮包解壓到/home目錄 輸入圖片說明

kafka解壓目錄下下有一個config的文件夾,裏面放置的是咱們的配置文件apache

consumer.properites 消費者配置json

producer.properties 生產者配置bootstrap

server.properties kafka服務器的配置,此配置文件用來配置kafka服務器 目前僅介紹幾個最基礎的配置服務器

  • broker.id 申明當前kafka服務器在集羣中的惟一ID,需配置爲integer,而且集羣中的每個kafka服務器的id都應是惟一的,咱們這裏採用默認配置便可
  • listeners 申明此kafka服務器須要監聽的端口號,若是是在本機上跑虛擬機運行能夠不用配置本項,默認會使用localhost的地址,若是是在遠程服務器上運行則必須配置,例如: listeners=PLAINTEXT:// 192.168.180.128:9092。並確保服務器的9092端口可以訪問
  • zookeeper.connect 申明kafka所鏈接的zookeeper的地址 ,需配置爲zookeeper的地址,因爲本次使用的是kafka高版本中自帶zookeeper,使用默認配置便可 zookeeper.connect=localhost:2181

運行

啓動zookeeperapp

#前臺啓動
[root@CentOS124 home]# cd kafka2.11/
[root@CentOS124 kafka2.11]# bin/zookeeper-server-start.sh config/zookeeper.properties

#後臺啓動
[root@CentOS124 kafka2.11]# bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null 2>&1 &
[1] 18466

#查看是否啓動成功
[root@CentOS124 ~]#  ps -ef|grep kafka

啓動kafka框架

[root@CentOS124 kafka2.11]# bin/kafka-server-start.sh config/server.properties

#後臺啓動
[root@CentOS124 kafka2.11]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

#建立 topic
[root@CentOS124 kafka2.11]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


#查看Kafka 中的 topic 列表
bin/kafka-topics.sh --list --zookeeper localhost:2181

配置kafka集羣

  • 條件有限,在同一個機器上啓動三個broker來模擬kafka集羣,三個broker使用另外安裝的同一個zookeeper服務(實際集羣中,每一個broker一般在不一樣的機器上,也會使用不一樣host的zookeeper)
#複製server.properties配置文件爲三份,分別起名爲server.properties,server-2.properties,server-3.properties

三份配置中都要修改如下
#三個配置中分別修改成0,2,3
broker.id=0

#三個配置中分別修改成9092,9093,9094
port=9092

#kafka-logs,kafka-logs-2,kafka-logs-3
log.dirs=/tmp/kafka-logs

#都設置爲3,即每一個topic默認三個partition
num.partitions=3

#zookeeper集羣地址,外部能夠配置,這裏環境有限  使用默認既可
zookeeper.connect=localhost:2181

#分別進入kafka目錄下 執行以下命令啓動服務控制檯輸出日子完成了
bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh config/server-2.properties
bin/kafka-server-start.sh config/server-3.properties

springBoot中如何使用kafka

首先建立一個springBoot項目 引入spring-kafka 輸入圖片說明分佈式

application.properties 配置

server.port=8080

#kafka地址 brokers集羣地址用,隔開
spring.kafka.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

#生產者的配置,大部分咱們可使用默認的,這裏列出幾個比較重要的屬性
#每批次發送消息的數量
spring.kafka.producer.batch-size=16
#發送失敗重試次數
spring.kafka.producer.retries=0
#即32MB的批處理緩衝區
spring.kafka.producer.buffer-memory=33554432
#key序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#消費者的配置
##Kafka中沒有初始偏移或若是當前偏移在服務器上再也不存在時,默認區最新 ,有三個選項 【latest, earliest, none】
spring.kafka.consumer.auto-offset-reset=latest
#是否開啓自動提交
spring.kafka.consumer.enable-auto-commit=true
#自動提交的時間間隔
spring.kafka.consumer.auto-commit-interval=100
#key的解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#value的解碼方式
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#在kafka/config文件的consumer.properties中有配置
spring.kafka.consumer.group-id=test-consumer-group

建立Producer生產者

package com.example.modules;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
 * 〈生產者〉
 * @author qinxuewu
 * @create 18/8/4下午11:56
 * @since 1.0.0
 */
@Component
public class Producer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    //發送消息方法
    public void send() {
        JSONObject obj=new JSONObject();
        obj.put("id",System.currentTimeMillis());
        obj.put("name","生產者發送消息");
        obj.put("date",new Date());
        //這個 topic 在 Java 程序中是不須要提早在 Kafka 中設置的,由於它會在發送的時候自動建立你設置的 topic
        kafkaTemplate.send("qxw",obj.toString());
    }
}

建立消費者

@Component
public class Consumer {
    private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
    /**
     *  同時監聽兩個 topic 的消息了,可同時監聽多個topic
     * @param record
     * @throws Exception
     */
    @KafkaListener(topics = {"test","qxw"})
    public void listen (ConsumerRecord<?, ?> record) throws Exception {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            logger.info("消費者開始消費message:" + message);
        }
    }
}

運行後就能夠看到控制檯輸出了

@RunWith(SpringRunner.class)                  
@SpringBootTest                               
public class KafkaDemoApplicationTests {      
    @Autowired                                
    private Producer producer;                
    @Test                                     
    public void contextLoads() {              
        for (int i = 0; i <3 ; i++) {         
            producer.send();                  
            try {                             
                Thread.sleep(1000);           
            } catch (InterruptedException e) {
                e.printStackTrace();          
            }                                 
        }                                     
    }                                         
}

kafka 配置文件參數詳解

https://www.cnblogs.com/alan319/p/8651434.html kafka的配置分爲 broker、producter、consumer三個不一樣的配置

相關文章
相關標籤/搜索