confluent-kafka集羣搭建

1.   什麼是confluent-kafka?

Confluent Platform 是一個流數據平臺,可以組織管理來自不一樣數據源的數據,擁有穩定高效的系統。
Confluent Platform 不只提供數據傳輸的系統, 還提供全部的工具:鏈接數據源的工具,應用, 以及數據接收。 
Confluent是基於Kafka構造的軟件,他有一個企業版 試用30天,一個是開源版複製代碼

這裏咱們使用開源版的 confluent-kafka, 開源版包括如下組件:html


完整工具以避免費試用組件請見官網:  www.confluent.io/downloadjava

組件功能介紹:  www.cnblogs.com/dadadecheng…
linux

2.  開始搭建 confluent-kafka 

2.1 安裝最新版本 confluent-kafka 

yum install curl which -y
rpm --import https://packages.confluent.io/rpm/5.3/archive.key複製代碼

配置yum源git

cat > /etc/yum.repos.d/confluent.repo <<EOF
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.3/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.3
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1
EOF複製代碼

Confluent Platform using only Confluent Community components:
github

yum clean all &&  yum install confluent-community-2.12 -y
複製代碼

2.2  安裝jdk1.8

去oracle官網下載jdk, 上傳到服務器,執行下面命令安裝:golang

rpm -ivh jdk-8u231-linux-x64.rpm複製代碼

oracle jdk1.8下載頁面:  www.oracle.com/technetwork…bootstrap

2.3  配置zookeeper

vim /etc/kafka/zookeeper.propertiesvim

tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24複製代碼

注*  zoo1 zoo2 zoo3 爲主機名,請提早配置好hostsbash

echo "1" > /var/lib/zookeeper/myid   #請在zoo1 機器上執行
echo "2" > /var/lib/zookeeper/myid   #請在zoo2 機器上執行
echo "3" > /var/lib/zookeeper/myid   #請在zoo3 機器上執行啓複製代碼

設置開機啓動並啓動zookeeper服務器

systemctl enable confluent-zookeeper && systemctl start confluent-zookeeper複製代碼

查看狀態

systemctl status confluent-zookeeper複製代碼

查看日誌

tail -f /var/log/messages複製代碼

2.3  配置kafka

vim /etc/kafka/server.properties  #修改如下2個選項 zoo

zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181broker.id.generation.enable=true複製代碼

設置開機啓動並啓動kafka

systemctl enable confluent-kafka && systemctl start confluent-kafka複製代碼

查看狀態

systemctl status confluent-kafka複製代碼

查看日誌

tail -f /var/log/messages複製代碼


3. 測試kafka生產消息

3.1 建立topic 

kafka-topics --create --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --replication-factor 1 --partitions 5 --topic kafka-test複製代碼

3.2 生產消息

kafka-console-producer --broker-list zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest
複製代碼

3.3 消費消息

kafka-console-consumer --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest --from-beginning複製代碼


4. 使用golang demo測試生產消息消耗的時間

package main

import (
	"fmt"	
	"log"
	"time"
        "github.com/Shopify/sarama"
)

func main()  {
    var address = []string{"ip1:9092","ip2:9092","ip3:9092"}
    producer(address)
}

func producer(address []string) {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Timeout = 5 * time.Second
	p, err := sarama.NewSyncProducer(address, config)
	if err != nil {
		log.Println(err)
	}
	defer p.Close()
	strKey := "key: "
	srcValue := "testKafka: test message, index=%d"
	log.Println("start")
	for i := 0; i < 10000; i++ {
		value := fmt.Sprintf(srcValue, i)
		msg := &sarama.ProducerMessage{
			Key:   sarama.StringEncoder(strKey),
			Topic: gotest,
			Value: sarama.ByteEncoder(value),
		}
		part, offset, err := p.SendMessage(msg)
		if err != nil {
			log.Println(err, value, part, offset)
		}
	}
	log.Println("end")
}
複製代碼

4.1 10000和100000條所消耗的時間

1萬條:


10萬條:


速度仍是很快的!

相關文章
相關標籤/搜索