Kafka 教程(一) 第一個實例

kafka介紹:Kafka是最初由Linkedin公司工程師用Java和Scala語言開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,能夠理解爲一個消息隊列或者實時消息流系統。java

本篇將介紹在Linux服務器運行一個簡單的單機實例,讓讀者對kafka有個基本的認識。apache

ssh到Linux開始操做bootstrap

1.下載kafka到服務器並解壓(默認服務器已安裝jdk)

官網下載地址vim

若是下載很慢,能夠點擊此處查看Apache鏡像服務器地址,找到國內鏡像地址進行下載。bash

#從清華鏡像下載kafka包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
#解壓
tar -zxvf kafka_2.12-2.0.0.tgz
#進入kafka目錄
cd kafka_2.12-2.0.0

2.啓動服務

kafka依賴zookeeper註冊中心作協調,須要安裝zookeeper,不過kafka已經自帶了一個zookeeper服務,爲了演示方便直接使用kafka自帶的zookeeper服務。服務器

啓動zookeeper服務ssh

bin/zookeeper-server-start.sh config/zookeeper.properties

啓動新的終端,在新的終端啓動kafka服務分佈式

bin/kafka-server-start.sh config/server.properties

能夠使用jps / jps -m 命令查看啓動的kafka進程spa

3.建立一個topic

因爲是單機實例,因此咱們使用一個分區一個副本建立一個名稱爲「test」的topic。.net

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

使用列出topic命令查看當前的topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

test

4.在終端啓動一個生產者向指定topic發送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

hello world
This is a message

5.在新的終端啓動一個消費者消費消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

hello world
This is a message

注意:上述命令若是在0.9一下的kafka版本應該改成:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

在生成者終端發送消息,消費者終端將實時打印。

至此第一個kafka實例已經運行起來了。

 

服務方式啓動:

vim /usr/lib/systemd/system/kafka.service
[Unit]
Description=kafka
After=network.target

[Service]
Environment=JAVA_HOME=/opt/soft/jdk1.8.0_141
Type=simple
ExecStart=/data/server/kafka/bin/kafka-server-start.sh /data/server/kafka/config/server.properties
Restart=on-failure

[Install]
WantedBy=multi-user.target

 

kafka2.0.0 配置文件:

#服務監聽 本地外網
listeners=PLAINTEXT://0.0.0.0:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().

#對外訪問
advertised.listeners=PLAINTEXT://00.00.00.00:9092

 

本地日誌發往kafka flume服務

vim /usr/lib/systemd/system/flume-kafka-system-log.service
[Unit]
Description=flume-kafka-system-log
After=network.target

[Service]
Environment=JAVA_HOME=/opt/soft/jdk1.8.0_141
Type=simple
ExecStart=/data/server/flume/bin/flume-ng agent --conf /data/server/flume/conf --conf-file /data/server/flume/conf/exec-memory-kafka.conf --name a1 -Dflume.root.logger=INFO,console
Restart=on-failure

[Install]
WantedBy=multi-user.target
相關文章
相關標籤/搜索