官網:http://kafka.apache.org/java
Producer:消息的發送者node
Consumer:消息的接收者git
kafka cluster:kafka的集羣。github
Topic:就是消息類別名,一個topic中一般放置一類消息。每一個topic都有一個或者多個訂閱者(消費者)。shell
消息的生產者將消息推送到kafka集羣,消息的消費者從kafka集羣中拉取消息。apache
說明:bootstrap
準備三臺虛擬機,分別是node01,node02,node03,而且修改hosts文件以下:vim
vim /etc/hosts #注意: 前面的ip地址改爲本身的ip地址 192.168.40.133 node01 192.168.40.134 node02 192.168.40.135 node03 #3臺服務器的時間要一致 #時間更新: yum install -y rdate rdate -s time-b.nist.gov
因爲Kafka 是用Scala 語言開發的,運行在JVM上,所以在安裝Kafka 以前須要先安裝JDK 。api
安裝過程略過,我這裏使用的是jdk1.8。瀏覽器
Kafka 依賴ZooKeeper ,經過ZooKeeper 來對服務節點、消費者上下線管理、集羣、分區元數據管理等,所以ZooKeeper 也是Kafka 得以運行的基礎環境之一。
#上傳zookeeper-3.4.9.tar.gz到/export/software cd /export/software mkdir -p /export/servers/ tar -xvf zookeeper-3.4.9.tar.gz -C /export/servers/ #建立ZooKeeper的data目錄 mkdir /export/data/zookeeper -p cd /export/servers/zookeeper-3.4.9/conf/ #修改配置文件 mv zoo_sample.cfg zoo.cfg vim zoo.cfg #設置data目錄 dataDir=/export/data/zookeeper #啓動ZooKeeper ./zkServer.sh start #檢查是否啓動成功 jps
#在/export/data/zookeeper目錄中建立myid文件 vim /export/data/zookeeper/myid #寫入對應的節點的id,如:1,2等,保存退出 #在conf下,修改zoo.cfg文件 vim zoo.cfg #添加以下內容 server.1=node01:2888:3888 server.2=node02:2888:3888 server.3=node03:2888:3888
vim /etc/profile export ZK_HOME=/export/servers/zookeeper-3.4.9 export PATH=${ZK_HOME}/bin:$PATH #當即生效 source /etc/profile
scp /etc/profile node02:/etc/ scp /etc/profile node03:/etc/ cd /export/servers scp -r zookeeper-3.4.9 node02:/export/servers/ scp -r zookeeper-3.4.9 node03:/export/servers/
mkdir -p /export/servers/onekey/zk vim slave #輸入以下內容 node01 node02 node03 #保存退出 vim startzk.sh #輸入以下內容 cat /export/servers/onekey/zk/slave | while read line do { echo "開始啓動 --> "$line ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh start >/dev/null 2>&1 &" }& wait done echo "★★★啓動完成★★★" #保存退出 vim stopzk.sh #輸入以下內容 cat /export/servers/onekey/zk/slave | while read line do { echo "開始中止 --> "$line ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh stop >/dev/null 2>&1 &" }& wait done echo "★★★中止完成★★★" #保存退出 #設置可執行權限 chmod +x startzk.sh stopzk.sh #添加到環境變量中 export ZK_ONEKEY=/export/servers/onekey export PATH=${ZK_ONEKEY}/zk:$PATH
發現三臺機器都有「QuorumPeerMain」進程,說明機器已經啓動成功了。
檢查集羣是否正常:
zkServer.sh status
發現,集羣運行一切正常。
第一步:上傳Kafka安裝包而且解壓
rz 上傳kafka_2.11-1.1.0.tgz到 /export/software/ cd /export/software/ tar -xvf kafka_2.11-1.1.0.tgz -C /export/servers/ cd /export/servers mv kafka_2.11-1.1.0/ kafka
第二步:配置環境變量
vim /etc/profile #輸入以下內容 export KAFKA_HOME=/export/servers/kafka export PATH=${KAFKA_HOME}/bin:$PATH #保存退出 source /etc/profile
第三步:修改配置文件
cd /export/servers/kafka cd config vim server.properties # The id of the broker. This must be set to a unique integer for each broker. # 必需要只要一個brokerid,而且它必須是惟一的。 broker.id=0 # A comma separated list of directories under which to store log files # 日誌數據文件存儲的路徑 (如不存在,須要手動建立該目錄, mkdir -p /export/data/kafka/) log.dirs=/export/data/kafka # ZooKeeper的配置,本地模式下指向到本地的ZooKeeper服務便可 zookeeper.connect=node01:2181 # 保存退出
第四步:啓動kafka服務
# 以守護進程的方式啓動kafka kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties
第五步:檢測kafka是否啓動
若是進程中有名爲kafka的進程,就說明kafka已經啓動了。
因爲kafka是將元數據保存在ZooKeeper中的,因此,能夠經過查看ZooKeeper中的信息進行驗證kafka是否安裝成功。
Kafka Manager 由 yahoo 公司開發,該工具能夠方便查看集羣 主題分佈狀況,同時支持對 多個集羣的管理、分區平衡以及建立主題等操做。
源碼託管於github:https://github.com/yahoo/kafka-manager
第一步:上傳Kafka-manager安裝包而且解壓
rz上傳kafka-manager-1.3.3.17.tar.gz到 /export/software/ cd /export/software tar -xvf kafka-manager-1.3.3.17.tar.gz -C /export/servers/ cd /export/servers/kafka-manager-1.3.3.17/conf
第二步:修改配置文件
#修改配置文件 vim application.conf #新增項,http訪問服務的端口 http.port=19000 #修改爲本身的zk機器地址和端口 kafka-manager.zkhosts="node01:2181" #保存退出
第三步:啓動服務
cd /export/servers/kafka-manager-1.3.3.17/bin #啓動服務 ./kafka-manager -Dconfig.file=../conf/application.conf #製做啓動腳本 vim /etc/profile export KAFKA_MANAGE_HOME=/export/servers/kafka-manager-1.3.3.17 export PATH=${KAFKA_MANAGE_HOME}/bin:$PATH source /etc/profile cd /export/servers/onekey/ mkdir kafka-manager cd kafka-manager vim start-kafka-manager.sh nohup kafka-manager -Dconfig.file=${KAFKA_MANAGE_HOME}/conf/application.conf >/dev/null 2>&1 & chmod +x start-kafka-manager.sh vim /etc/profile export PATH=${ZK_ONEKEY}/kafka-manager:$PATH source /etc/profile
第四步:檢查是否啓動成功
打開瀏覽器,輸入地址:http://node01:19000/,便可看到kafka-manage管理界面。
進入管理界面,是沒有顯示Cluster信息的,須要添加後才能操做。
輸入Cluster Name、ZooKeeper信息、以及Kafka的版本信息(這裏最高只能選擇1.0.0)。
點擊Save按鈕保存。
添加成功。
kafka集羣的搭建是很是簡單的,只須要將上面的單機版的kafka分發的其餘機器,而且將ZooKeeper信息修改爲集羣的配置以及設置不一樣的broker值便可。
第一步:將kafka分發到node0二、node03
cd /export/servers/ scp -r kafka node02:/export/servers/ scp -r kafka node03:/export/servers/ scp /etc/profile node02:/etc/ scp /etc/profile node03:/etc/ # 分別到node0二、node03機器上執行 source /etc/profile
第二步:修改node0一、node0二、node03上的kafka配置文件
node01:
cd /export/servers/kafka/config vim server.properties zookeeper.connect=node01:2181,node02:2181,node03:2181
node02:
cd /export/servers/kafka/config vim server.properties broker.id=1 zookeeper.connect=node01:2181,node02:2181,node03:2181
node03:
cd /export/servers/kafka/config vim server.properties broker.id=2 zookeeper.connect=node01:2181,node02:2181,node03:2181
第三步:編寫一鍵啓動、中止腳本。注意:該腳本依賴於環境變量中的KAFKA_HOME。
mkdir -p /export/servers/onekey/kafka vim slave #輸入以下內容 node01 node02 node03 #保存退出 vim start-kafka.sh #輸入以下內容 cat /export/servers/onekey/kafka/slave | while read line do { echo "開始啓動 --> "$line ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties >/dev/null 2>&1 &" }& wait done echo "★★★啓動完成★★★" #保存退出 chmod +x start-kafka.sh vim stop-kafka.sh #輸入以下內容 cat /export/servers/onekey/kafka/slave | while read line do { echo "開始中止 --> "$line ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &" }& wait done echo "★★★中止完成★★★" #保存退出 chmod +x stop-kafka.sh #加入到環境變量中 export PATH=${ZK_ONEKEY}/kafka:$PATH source /etc/profile
第四步:經過kafka-manager管理工具查看集羣信息。
因而可知,kafka集羣已經啓動完成。
對kafka的操做有2種方式,一種是經過命令行方式,一種是經過API方式。
Kafka在bin目錄下提供了shell腳本文件,能夠對Kafka進行操做,分別是: 經過命令行的方式,咱們將體驗下kafka,以便咱們對kafka有進一步的認知。
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic #執行結果: Created topic "my-kafka-topic".
參數說明:
kafka-topics.sh --list --zookeeper node01:2181 __consumer_offsets my-kafka-topic
能夠查看列表。
若是須要查看topic的詳細信息,須要使用describe命令。
kafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic #若不指定topic,則查看全部topic的信息 kafka-topics.sh --describe --zookeeper node01:2181
經過kafka-topics.sh執行刪除動做,須要在server.properties文件中配置 delete.topic.enable=true,該配置默認爲 false。
不然執行該腳本並未真正刪除主題 ,將該topic標記爲刪除狀態 。
kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic # 執行以下 [root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic Topic my-kafka-topic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. # 若是將delete.topic.enable=true [root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2 Topic my-kafka-topic2 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. # 說明:雖然設置後,刪除時依然提示沒有設置爲true,實際上已經刪除了。
kafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic
能夠看到,已經向topic發送了消息。
kafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic # 經過以上命令,能夠看到消費者能夠接收生產者發送的消息 # 若是須要從頭開始接收數據,須要添加--from-beginning參數 kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic
除了經過命令行的方式操做kafka外,還能夠經過Java api的方式操做,這種方式將更加的經常使用。
導入依賴:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>itcast-bigdata</artifactId> <groupId>cn.itcast.bigdata</groupId> <version>1.0.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>itcast-bigdata-kafka</artifactId> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <build> <plugins> <!-- java編譯插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
因爲主題的元數據信息是註冊在 ZooKeeper 相 應節點之中,因此對主題的操做實質是對 ZooKeeper 中記錄主題元數據信息相關路徑的操做。 Kafka將對 ZooKeeper 的相關操做封裝成一 個 ZkUtils 類 , 井封裝了一個AdrninUtils 類調用 ZkClient 類的相關方法以實現對 Kafka 元數據 的操做,包括對主題、代理、消費者等相關元數據的操做。對主題操做的相關 API調用較簡單, 相應操做都是經過調用 AdminUtils類的相應方法來完成的。
package cn.itcast.kafka; import kafka.admin.AdminUtils; import kafka.utils.ZkUtils; import org.apache.kafka.common.security.JaasUtils; import org.junit.Test; import java.util.Properties; public class TestKafkaTopic { [@Test](https://my.oschina.net/azibug) public void testCreateTopic() { ZkUtils zkUtils = null; try { //參數:zookeeper的地址,session超時時間,鏈接超時時間,是否啓用zookeeper安全機制 zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled()); String topicName = "my-kafka-topic-test1"; if (!AdminUtils.topicExists(zkUtils, topicName)) { //參數:zkUtils,topic名稱,partition數量,副本數量,參數,機架感知模式 AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6()); System.out.println(topicName + " 建立成功!"); } else { System.out.println(topicName + " 已存在!"); } } finally { if (null != zkUtils) { zkUtils.close(); } } } }
測試結果:
[@Test](https://my.oschina.net/azibug) public void testDeleteTopic() { ZkUtils zkUtils = null; try { //參數:zookeeper的地址,session超時時間,鏈接超時時間,是否啓用zookeeper安全機制 zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled()); String topicName = "my-kafka-topic-test1"; if (AdminUtils.topicExists(zkUtils, topicName)) { //參數:zkUtils,topic名稱 AdminUtils.deleteTopic(zkUtils, topicName); System.out.println(topicName + " 刪除成功!"); } else { System.out.println(topicName + " 不已存在!"); } } finally { if (null != zkUtils) { zkUtils.close(); } } }
測試結果:
package cn.itcast.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import java.util.Properties; public class TestProducer { [@Test](https://my.oschina.net/azibug) public void testProducer() throws InterruptedException { Properties config = new Properties(); // 設置kafka服務列表,多個用逗號分隔 config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092"); // 設置序列化消息 Key 的類 config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 設置序列化消息 value 的類 config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 初始化 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config); for (int i = 0; i < 100 ; i++) { ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i); // 發送消息 kafkaProducer.send(record); System.out.println("發送消息 --> " + i); Thread.sleep(100); } kafkaProducer.close(); } }
package cn.itcast.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import javax.sound.midi.Soundbank; import java.util.Arrays; import java.util.Properties; public class TestConsumer { @Test public void testConsumer() { Properties config = new Properties(); // 設置kafka服務列表,多個用逗號分隔 config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092"); // 設置消費者分組id config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); // 設置序反列化消息 Key 的類 config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 設置序反列化消息 value 的類 config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config); // 訂閱topic kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic")); while (true) { // 使用死循環不斷的拉取數據 ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) { String value = record.value(); long offset = record.offset(); System.out.println("value = " + value + ", offset = " + offset); } } } }
什麼是Kafka? Kafka監控工具彙總 Kafka快速入門 Kafka核心之Consumer Kafka核心之Producer
替代Flume——Kafka Connect簡介 最簡單流處理引擎——Kafka Streams簡介
更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算