Confluent Platform 是一個流數據平臺,可以組織管理來自不一樣數據源的數據,擁有穩定高效的系統。
Confluent Platform 不只提供數據傳輸的系統, 還提供全部的工具:鏈接數據源的工具,應用, 以及數據接收。
Confluent是基於Kafka構造的軟件,他有一個企業版 試用30天,一個是開源版複製代碼
這裏咱們使用開源版的 confluent-kafka, 開源版包括如下組件:html
完整工具以避免費試用組件請見官網: www.confluent.io/downloadjava
組件功能介紹: www.cnblogs.com/dadadecheng…
linux
yum install curl which -y
rpm --import https://packages.confluent.io/rpm/5.3/archive.key複製代碼
配置yum源git
cat > /etc/yum.repos.d/confluent.repo <<EOF
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.3/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.3
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.3/archive.key
enabled=1
EOF複製代碼
Confluent Platform using only Confluent Community components:
github
yum clean all && yum install confluent-community-2.12 -y
複製代碼
去oracle官網下載jdk, 上傳到服務器,執行下面命令安裝:golang
rpm -ivh jdk-8u231-linux-x64.rpm複製代碼
oracle jdk1.8下載頁面: www.oracle.com/technetwork…bootstrap
vim /etc/kafka/zookeeper.propertiesvim
tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24複製代碼
注* zoo1 zoo2 zoo3 爲主機名,請提早配置好hostsbash
echo "1" > /var/lib/zookeeper/myid #請在zoo1 機器上執行
echo "2" > /var/lib/zookeeper/myid #請在zoo2 機器上執行
echo "3" > /var/lib/zookeeper/myid #請在zoo3 機器上執行啓複製代碼
設置開機啓動並啓動zookeeper服務器
systemctl enable confluent-zookeeper && systemctl start confluent-zookeeper複製代碼
查看狀態
systemctl status confluent-zookeeper複製代碼
查看日誌
tail -f /var/log/messages複製代碼
vim /etc/kafka/server.properties #修改如下2個選項 zoo
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181broker.id.generation.enable=true複製代碼
設置開機啓動並啓動kafka
systemctl enable confluent-kafka && systemctl start confluent-kafka複製代碼
查看狀態
systemctl status confluent-kafka複製代碼
查看日誌
tail -f /var/log/messages複製代碼
kafka-topics --create --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --replication-factor 1 --partitions 5 --topic kafka-test複製代碼
kafka-console-producer --broker-list zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest
複製代碼
kafka-console-consumer --bootstrap-server zoo1:9092,zoo2:9092,zoo3:9092 --topic kafkatest --from-beginning複製代碼
package main
import (
"fmt"
"log"
"time"
"github.com/Shopify/sarama"
)
func main() {
var address = []string{"ip1:9092","ip2:9092","ip3:9092"}
producer(address)
}
func producer(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(address, config)
if err != nil {
log.Println(err)
}
defer p.Close()
strKey := "key: "
srcValue := "testKafka: test message, index=%d"
log.Println("start")
for i := 0; i < 10000; i++ {
value := fmt.Sprintf(srcValue, i)
msg := &sarama.ProducerMessage{
Key: sarama.StringEncoder(strKey),
Topic: gotest,
Value: sarama.ByteEncoder(value),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
log.Println(err, value, part, offset)
}
}
log.Println("end")
}
複製代碼
1萬條:
10萬條:
速度仍是很快的!