官網:http://kafka.apache.org/java
消息的生產者將消息推送到kafka集羣,消息的消費者從kafka集羣中拉取消息。node
說明:git
準備三臺虛擬機,分別是node01,node02,node03,而且修改hosts文件以下:github
~~~shellvim /etc/hostsshell
192.168.40.133 node01192.168.40.134 node02192.168.40.135 node03apache
yum install -y rdaterdate -s time-b.nist.gov~~~bootstrap
因爲Kafka 是用Scala 語言開發的,運行在JVM上,所以在安裝Kafka 以前須要先安裝JDK 。vim
安裝過程略過,我這裏使用的是jdk1.8。api
Kafka 依賴ZooKeeper ,經過ZooKeeper 來對服務節點、消費者上下線管理、集羣、分區元數據管理等,所以ZooKeeper 也是Kafka 得以運行的基礎環境之一。瀏覽器
#上傳zookeeper-3.4.9.tar.gz到/export/software
cd /export/software
mkdir -p /export/servers/
tar -xvf zookeeper-3.4.9.tar.gz -C /export/servers/
#建立ZooKeeper的data目錄
mkdir /export/data/zookeeper -p
cd /export/servers/zookeeper-3.4.9/conf/
#修改配置文件
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
#設置data目錄
dataDir=/export/data/zookeeper
#啓動ZooKeeper
./zkServer.sh start
#檢查是否啓動成功
jps複製代碼
~~~shell
vim /export/data/zookeeper/myid
vim zoo.cfg
server.1=node01:2888:3888server.2=node02:2888:3888server.3=node03:2888:3888~~~
~~~shellvim /etc/profileexport ZK_HOME=/export/servers/zookeeper-3.4.9export PATH=${ZK_HOME}/bin:$PATH
source /etc/profile~~~
~~~shellscp /etc/profile node02:/etc/scp /etc/profile node03:/etc/
cd /export/serversscp -r zookeeper-3.4.9 node02:/export/servers/scp -r zookeeper-3.4.9 node03:/export/servers/~~~
~~~shellmkdir -p /export/servers/onekey/zkvim slave
node01node02node03
vim startzk.sh
cat /export/servers/onekey/zk/slave | while read linedo{echo "開始啓動 --> "$linessh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh start >/dev/null 2>&1 &"}&waitdoneecho "★★★啓動完成★★★"
vim stopzk.sh
cat /export/servers/onekey/zk/slave | while read linedo{echo "開始中止 --> "$linessh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh stop >/dev/null 2>&1 &"}&waitdoneecho "★★★中止完成★★★"
chmod +x startzk.sh stopzk.sh
export ZK_ONEKEY=/export/servers/onekeyexport PATH=${ZK_ONEKEY}/zk:$PATH~~~
發現三臺機器都有「QuorumPeerMain」進程,說明機器已經啓動成功了。
檢查集羣是否正常:
zkServer.sh status
發現,集羣運行一切正常。
第一步:上傳Kafka安裝包而且解壓
~~~shellrz 上傳kafka_2.11-1.1.0.tgz到 /export/software/cd /export/software/tar -xvf kafka_2.11-1.1.0.tgz -C /export/servers/cd /export/serversmv kafka_2.11-1.1.0/ kafka~~~
第二步:配置環境變量
~~~shellvim /etc/profile
export KAFKA_HOME=/export/servers/kafkaexport PATH=${KAFKA_HOME}/bin:$PATH
source /etc/profile~~~
第三步:修改配置文件
~~~shellcd /export/servers/kafkacd configvim server.properties
broker.id=0
log.dirs=/export/data/kafka
zookeeper.connect=node01:2181
~~~
第四步:啓動kafka服務
~~~shell
kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties~~~
第五步:檢測kafka是否啓動
若是進程中有名爲kafka的進程,就說明kafka已經啓動了。
因爲kafka是將元數據保存在ZooKeeper中的,因此,能夠經過查看ZooKeeper中的信息進行驗證kafka是否安裝成功。
Kafka Manager 由 yahoo 公司開發,該工具能夠方便查看集羣 主題分佈狀況,同時支持對 多個集羣的管理、分區平衡以及建立主題等操做。
源碼託管於github:https://github.com/yahoo/kafka-manager
第一步:上傳Kafka-manager安裝包而且解壓
~~~shellrz上傳kafka-manager-1.3.3.17.tar.gz到 /export/software/cd /export/softwaretar -xvf kafka-manager-1.3.3.17.tar.gz -C /export/servers/cd /export/servers/kafka-manager-1.3.3.17/conf~~~
第二步:修改配置文件
~~~shell
vim application.conf
http.port=19000
kafka-manager.zkhosts="node01:2181"
~~~
第三步:啓動服務
~~~shellcd /export/servers/kafka-manager-1.3.3.17/bin
./kafka-manager -Dconfig.file=../conf/application.conf
vim /etc/profileexport KAFKAMANAGEHOME=/export/servers/kafka-manager-1.3.3.17export PATH=${KAFKAMANAGEHOME}/bin:$PATH
source /etc/profile
cd /export/servers/onekey/mkdir kafka-managercd kafka-managervim start-kafka-manager.shnohup kafka-manager -Dconfig.file=${KAFKAMANAGEHOME}/conf/application.conf >/dev/null 2>&1 &chmod +x start-kafka-manager.shvim /etc/profileexport PATH=${ZK_ONEKEY}/kafka-manager:$PATHsource /etc/profile
~~~
第四步:檢查是否啓動成功
打開瀏覽器,輸入地址:http://node01:19000/,便可看到kafka-manage管理界面。
進入管理界面,是沒有顯示Cluster信息的,須要添加後才能操做。
輸入Cluster Name、ZooKeeper信息、以及Kafka的版本信息(這裏最高只能選擇1.0.0)。
點擊Save按鈕保存。
添加成功。
kafka集羣的搭建是很是簡單的,只須要將上面的單機版的kafka分發的其餘機器,而且將ZooKeeper信息修改爲集羣的配置以及設置不一樣的broker值便可。
第一步:將kafka分發到node0二、node03
~~~cd /export/servers/scp -r kafka node02:/export/servers/scp -r kafka node03:/export/servers/scp /etc/profile node02:/etc/scp /etc/profile node03:/etc/
source /etc/profile~~~
第二步:修改node0一、node0二、node03上的kafka配置文件
~~~shellcd /export/servers/kafka/configvim server.propertieszookeeper.connect=node01:2181,node02:2181,node03:2181~~~
~~~shellcd /export/servers/kafka/configvim server.propertiesbroker.id=1zookeeper.connect=node01:2181,node02:2181,node03:2181~~~
~~~shellcd /export/servers/kafka/configvim server.propertiesbroker.id=2zookeeper.connect=node01:2181,node02:2181,node03:2181~~~
第三步:編寫一鍵啓動、中止腳本。注意:該腳本依賴於環境變量中的KAFKA_HOME。
~~~shellmkdir -p /export/servers/onekey/kafkavim slave
node01node02node03
vim start-kafka.sh
cat /export/servers/onekey/kafka/slave | while read linedo{echo "開始啓動 --> "$linessh $line "source /etc/profile;nohup sh ${KAFKAHOME}/bin/kafka-server-start.sh -daemon ${KAFKAHOME}/config/server.properties >/dev/null 2>&1 &"}&waitdoneecho "★★★啓動完成★★★"
chmod +x start-kafka.sh
vim stop-kafka.sh
cat /export/servers/onekey/kafka/slave | while read linedo{echo "開始中止 --> "$linessh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &"}&waitdoneecho "★★★中止完成★★★"
chmod +x stop-kafka.sh
export PATH=${ZK_ONEKEY}/kafka:$PATHsource /etc/profile~~~
第四步:經過kafka-manager管理工具查看集羣信息。
因而可知,kafka集羣已經啓動完成。
對kafka的操做有2種方式,一種是經過命令行方式,一種是經過API方式。
Kafka在bin目錄下提供了shell腳本文件,能夠對Kafka進行操做,分別是:經過命令行的方式,咱們將體驗下kafka,以便咱們對kafka有進一步的認知。
~~~shellkafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
Created topic "my-kafka-topic".~~~
參數說明:
~~~shellkafka-topics.sh --list --zookeeper node01:2181
__consumer_offsetsmy-kafka-topic~~~
能夠查看列表。
若是須要查看topic的詳細信息,須要使用describe命令。
~~~shellkafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic
kafka-topics.sh --describe --zookeeper node01:2181~~~
經過kafka-topics.sh執行刪除動做,須要在server.properties文件中配置 delete.topic.enable=true,該配置默認爲 false。
不然執行該腳本並未真正刪除主題 ,將該topic標記爲刪除狀態 。
~~~shellkafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topicTopic my-kafka-topic is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2Topic my-kafka-topic2 is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.
~~~
~~~shellkafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic~~~
能夠看到,已經向topic發送了消息。
~~~shellkafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic
kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic~~~
除了經過命令行的方式操做kafka外,還能夠經過Java api的方式操做,這種方式將更加的經常使用。
導入依賴:
~~~xml
<artifactId>itcast-bigdata-kafka</artifactId>複製代碼
<dependencies>複製代碼
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>複製代碼
</dependencies>複製代碼
<build>
<plugins>
<!-- java編譯插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
複製代碼
~~~
因爲主題的元數據信息是註冊在 ZooKeeper 相 應節點之中,因此對主題的操做實質是對 ZooKeeper 中記錄主題元數據信息相關路徑的操做。 Kafka將對 ZooKeeper 的相關操做封裝成一 個 ZkUtils 類 , 井封裝了一個AdrninUtils 類調用 ZkClient 類的相關方法以實現對 Kafka 元數據 的操做,包括對主題、代理、消費者等相關元數據的操做。對主題操做的相關 API調用較簡單, 相應操做都是經過調用 AdminUtils類的相應方法來完成的。
~~~javapackage cn.itcast.kafka;
import kafka.admin.AdminUtils;import kafka.utils.ZkUtils;import org.apache.kafka.common.security.JaasUtils;import org.junit.Test;
import java.util.Properties;
public class TestKafkaTopic {
@Test
public void testCreateTopic() {
ZkUtils zkUtils = null;
try {
//參數:zookeeper的地址,session超時時間,鏈接超時時間,是否啓用zookeeper安全機制
zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());複製代碼
String topicName = "my-kafka-topic-test1";
if (!AdminUtils.topicExists(zkUtils, topicName)) {
//參數:zkUtils,topic名稱,partition數量,副本數量,參數,機架感知模式
AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
System.out.println(topicName + " 建立成功!");
} else {
System.out.println(topicName + " 已存在!");
}
} finally {
if (null != zkUtils) {
zkUtils.close();
}
}複製代碼
}
}複製代碼
~~~
測試結果:
~~~java@Testpublic void testDeleteTopic() {ZkUtils zkUtils = null;try {//參數:zookeeper的地址,session超時時間,鏈接超時時間,是否啓用zookeeper安全機制zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());String topicName = "my-kafka-topic-test1";if (AdminUtils.topicExists(zkUtils, topicName)) {//參數:zkUtils,topic名稱AdminUtils.deleteTopic(zkUtils, topicName);System.out.println(topicName + " 刪除成功!");} else {System.out.println(topicName + " 不已存在!");}} finally {if (null != zkUtils) {zkUtils.close();}}
}
~~~複製代碼
測試結果:
~~~javapackage cn.itcast.kafka;
import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import org.junit.Test;
import java.util.Properties;
public class TestProducer {
@Test
public void testProducer() throws InterruptedException {
Properties config = new Properties();複製代碼
// 設置kafka服務列表,多個用逗號分隔
config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
// 設置序列化消息 Key 的類
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 設置序列化消息 value 的類
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());複製代碼
// 初始化
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
for (int i = 0; i < 100 ; i++) {
ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
// 發送消息
kafkaProducer.send(record);
System.out.println("發送消息 --> " + i);複製代碼
Thread.sleep(100);
}複製代碼
kafkaProducer.close();複製代碼
}複製代碼
}
~~~
~~~javapackage cn.itcast.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.junit.Test;
import javax.sound.midi.Soundbank;import java.util.Arrays;import java.util.Properties;
public class TestConsumer {
@Test
public void testConsumer() {
Properties config = new Properties();
// 設置kafka服務列表,多個用逗號分隔
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
// 設置消費者分組id
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
// 設置序反列化消息 Key 的類
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 設置序反列化消息 value 的類
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
複製代碼
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
// 訂閱topic
kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));複製代碼
while (true) { // 使用死循環不斷的拉取數據
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
long offset = record.offset();
System.out.println("value = " + value + ", offset = " + offset);
}
}複製代碼
}
}
~~~
複製代碼
什麼是Kafka?Kafka監控工具彙總Kafka快速入門Kafka核心之ConsumerKafka核心之Producer
替代Flume——Kafka Connect簡介最簡單流處理引擎——Kafka Streams簡介
更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算