Kafka 單機和分佈式環境搭建與案例使用

實驗環境: 一、Ubuntu Server 16.04 二、kafka_2.11-0.11.0.0java

1、單機環境搭建

官方參考文章:數據庫

kafka.apache.org/quickstartapache

一、下載和解壓安裝包bootstrap

這裏下載了zookeeper和kafaka兩個安裝包,下載地址:後端

zookeeper:www.apache.org/dyn/closer.…api

kafka:kafka.apache.org/downloadsbash

這裏寫圖片描述

二、啓動Zookeeper服務服務器

這裏的kafka默認是由內置的zookeeper的,若是使用內置的zookeeper的話,啓動的方式以下:微信

這裏寫圖片描述

zookeeper的配置文件是在:/kafka_2.12-0.11.0.0/config 目錄下網絡

這裏寫圖片描述

啓動Zookeeper:

>bin/zookeeper-server-start.sh config/zookeeper.properties
複製代碼

當看到以下信息的時候,就表示成功了!

這裏寫圖片描述

三、啓動Kafka

kafka的配置文件是在/kafka_2.12-0.11.0.0/config 目錄下,默認狀況下不須要修改。

>bin/kafka-server-start.sh config/server.properties
複製代碼

四、建立一個Topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
複製代碼

--replication-factor 複製因子爲1; --partitions 分區爲1;

查看已建立的Topic:

這裏寫圖片描述

五、發送測試消息

kafka支持從Console發送信息,消費者從Console接受信息。

>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
複製代碼

--broker-list 表示代理服務器的列表,這裏只有一個;

建立一個消費者:

>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
複製代碼

--from-beginning 表示從消息開始處讀取;

而後在生產者的Console輸入數據,消費者的Console就能夠看到信息:

這裏寫圖片描述

這裏寫圖片描述

2、僞集羣環境搭建

官方提供了一種方式在一臺機器上啓動多個Broker機器構成multi-broker cluster,這是一種僞集羣的方式,下邊就配置一下。

一、修改配置文件

思路是配置多個config/server.properties文件,修改其中的broker.id=1 和端口號,日誌文件位置。

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
複製代碼

編輯配置文件,修改以下對應的位置:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2
複製代碼

二、分別啓動另外兩個Kafka

>bin/kafka-server-start.sh config/server-1.properties &
>>bin/kafka-server-start.sh config/server-2.properties &
複製代碼

&表示在後臺運行

三、查看運行結果:

這裏寫圖片描述

QuorumPeerMain表示Zookeeper進行; 另外有3個Kafka進程;

四、建立Topic

新建一個複製因子爲3的Topic

>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

複製代碼

這裏寫圖片描述

查看Topic的描述信息:

>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
複製代碼

這裏寫圖片描述

五、發送消息

啓動生產者,這裏有3個Kafka實例,可是--broker-list 還是啓動的Zookeeper服務。

>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
複製代碼

啓動消費者:

>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
複製代碼

和單機的狀況是同樣的。

3、分佈式集羣環境搭建

搭建的分佈式集羣和僞集羣的方式大體相同,這裏假設使用3臺服務器模擬實驗,部署3個Zookeeper實例和3個Kafka實例,固然也能夠直接部署一個Zookeeper實例,這裏只是演示分佈式Zookeeper和kafka的搭建。

這裏寫圖片描述

工具使用的是SecureCRT。

一、分佈式Zookeeper的搭建

(1)將Zookeeper安裝包分別上傳到3臺服務器,個人是放在:/home/xuliugen/server 目錄下。

(2)配置第一臺Zookeeper

複製zookeeper-3.4.10/conf/zoo_sample.cfgzookeeper-3.4.10/conf/zoo.cfg,修改zoo.cfg文件以下,只更改data的目錄:

這裏寫圖片描述

由於,修改了dataDir目錄的位置,那麼就須要建立一個/zookeeper-3.4.6/data目錄。

(3)按一樣的方式修改第二臺Zookeeper和第三臺Zookeeper服務器配置。

(4)而後,在每一臺Zookeeper的配置文件中的最下邊添加Zookeeper的集羣配置:

這裏寫圖片描述

(5)最後建立每個Zookeeper的 myid 文件,在/data/myid文件

xuliugen@xuliugen-pc:~/server/zookeeper-3.4.6/data$ echo 1 > myid
複製代碼

則,另外兩臺分別爲:

xuliugen@xuliugen-pc:~/server/zookeeper-3.4.6/data$ echo 2 > myid
複製代碼
xuliugen@xuliugen-pc:~/server/zookeeper-3.4.6/data$ echo 3 > myid
複製代碼

注意:

一、myid和IP地址的對應

server.1=
server.2=
server.3=
複製代碼

這裏的一、二、3是和咱們剛纔配置的myid的數值是相對應的,即1的IP地址爲192.168.1.120,那麼server.1=192.168.1.120:2888:3888

二、防火牆端口的配置

另外,2888:3888端口要設置防火牆權限

二、啓動Zookeeper服務器

依此使用命令./bin/zkServer.sh start 啓動Zookeeper服務。

使用jps 查看是否已經啓動

這裏寫圖片描述

查看zookeeper日誌的話,是在/zookeeper-3.4.6/bin 目錄下的zookeeper.out 文件:

這裏寫圖片描述

使用tailf zookeeper.out 能夠進行查看。

三、分佈式Kafka的搭建

(1)將Kafka安裝包分別上傳到3臺服務器,個人是放在:/home/xuliugen/server 目錄下。

(2)配置第一臺Kafka

Kafka的配置文件是在/conf/server.properties ,修改日誌的目錄:

這裏寫圖片描述

配置主機IP或者hostname:

這裏寫圖片描述

而後修改kafka中使用的Zookeeper集羣地址:

這裏寫圖片描述

多個Zookeeper之間以英文逗號分開。

注意:

這裏須要注意的是,若是按照上述的方式配置:

listeners=PLAINTEXT://192.168.1.120:9092
複製代碼

這樣配置的話,是在內網環境下容許的,若是使用外網進行訪問的話,能夠配置爲以下:

這裏寫圖片描述

具體請參考: blog.csdn.net/fengcai19/a…

(3)按一樣的方式配置第二臺kafka和第三臺kafka服務器。

要注意的是不一樣的kafka的broker.id 必定要不同,我這裏分別配置的是0、一、2。

這裏寫圖片描述

四、分別啓動Kafka服務

>bin/kafka-server-start.sh config/server.properties
複製代碼

4、代碼測試

一、項目結構

這裏寫圖片描述

二、pom文件內容

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.12</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.12</version>
        </dependency>
    </dependencies>
複製代碼

三、日誌配置log4j.properties

log4j.rootLogger=DEBUG,rolling,errlog,stdout
#stdout log
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} [%-5p] %c{1}.%M:%L-[%X{traceId}]-%m%n
#common log
log4j.appender.rolling=org.apache.log4j.DailyRollingFileAppender
log4j.appender.rolling.File=${catalina.base}/logs/kafka-demo.log
log4j.appender.rolling.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.ConversionPattern=%d{HH:mm:ss.SSS} [%-5p] %-20c{1} [%t]%x [%X{traceId}]-%m%n
#error log
log4j.appender.errlog=org.apache.log4j.DailyRollingFileAppender
log4j.appender.errlog.Threshold=ERROR
log4j.appender.errlog.File=${catalina.base}/logs/error.log
log4j.appender.errlog.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.errlog.layout=org.apache.log4j.PatternLayout
log4j.appender.errlog.layout.ConversionPattern=%d{MM-dd HH:mm:ss.SSS} [%-5p] %-20c{1} [%.11t] [%X{traceId}]%x-%m%n

複製代碼

三、生產者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerDemo {

    // Topic
    private static final String topic = "kafkaTopic";

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.120:9092,192.168.1.135:9093,192.168.1.227:9094");
        props.put("acks", "0");
        props.put("group.id", "1111");
        props.put("retries", "0");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //生產者實例
        KafkaProducer producer = new KafkaProducer(props);

        int i = 1;

        // 發送業務消息
        // 讀取文件 讀取內存數據庫 讀socket端口
        while (true) {
            Thread.sleep(1000);
            producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));
            System.out.println("key:" + i + " " + "value:" + i);
            i++;
        }
    }
}
複製代碼

四、消費者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {

    private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
    private static final String topic = "kafkaTopic";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.120:9092,192.168.1.135:9093,192.168.1.227:9094");
        props.put("group.id", "1111");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}
複製代碼

五、測試結果

生產者:

這裏寫圖片描述

消費者:

這裏寫圖片描述

代碼下載地址:download.csdn.net/download/u0…

也能夠到官網下載Kafka的源代碼包,包裏邊有example代碼能夠參考

www.apache.org/dyn/closer.…

這裏寫圖片描述

搜索或掃描下述二維碼關注微信公衆號:Java後端技術(ID: JavaITWork),和20萬人一塊兒學Java!

Java後端技術專一Java相關技術:SSM、Spring全家桶、微服務、MySQL、MyCat、集羣、分佈式、中間件、Linux、網絡、多線程,偶爾講點運維Jenkins、Nexus、Docker、ELK,偶爾分享些技術乾貨,致力於Java全棧開發!

在這裏插入圖片描述

【視頻福利】2T免費學習視頻,搜索或掃描上述二維碼關注微信公衆號:Java後端技術(ID: JavaITWork)回覆:1024,便可免費獲取!內含SSM、Spring全家桶、微服務、MySQL、MyCat、集羣、分佈式、中間件、Linux、網絡、多線程,Jenkins、Nexus、Docker、ELK等等免費學習視頻,持續更新!

相關文章
相關標籤/搜索