基本概念:html
一、什麼是kafka?
Kafka是一個高吞吐量、分佈式的發佈訂閱消息系統。據kafka官方網站介紹,當前的kafka已經定位爲一個分佈式流式處理平臺( a distributed streaming platform),它最初由LinkedIn公司開發,後來成爲Apache項目的一部分。
kafka核心模塊使用scala語言開發,支持多語言(如java、c/c++、python、go、erlang、node.js等)客戶端,它以能夠水平擴展和具備高吞吐量等特性而被普遍使用。java
二、流式處理平臺的3大關鍵特性。
a、可以容許發佈和訂閱流數據;
b、存儲流數據時提供相應的容錯機制;
c、當流數據到達時可以被及時處理。node
3.體系結構python
四、kafka的名詞
a、producer:生產者
b、consumer:消費者
c、topic:消息以topic爲類別記錄,Kafka將消息種子(Feed)分門別類,每一類的消息稱之爲一個主題(Topic)
d、broker:以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker;消費者能夠訂閱一個或多個主題(topic),並從Broker拉數據,從而消費這些已發佈的消息。c++
e、分區和副本:kafka將一組消息概括爲一個主題,而每一個主題又被分爲一個或多個分區(partition)。每一個分區由一系列有序、不可變的消息組成,是一個有序隊列。
每一個分區在物理上對應爲一個文件夾,分區命名規則爲 主題名+「-」+分區編號,分區編號從0開始,到最大值減一。每一個分區又有一到多個副本(replica),分區的副本分佈在集羣的不一樣代理上,一提升可用性。
每一個消息(也叫做record記錄,也被稱爲消息)是由一個key,一個value和時間戳構成。apache
安裝部署:windows
一、JDK
a、最新的kafka要jdk1.7以上,官方推薦jdk1.8
b、環境變量,JAVA_HOME = D:\ProgramFiles\Java\jdk1.8.0_144
c、jdk安裝路徑不能有空格,不然kafka啓動報錯api
二、zookeeper
a、下載安裝包: http://zookeeper.apache.org/releases.html#download
下載後不須要安裝,直接解壓就好,個人jdk安裝在D盤,我把zookeeper也解壓在那裏。
b、環境變量,ZOOKEEPER_HOME = D:\ProgramFiles\zookeeper-3.3.6
記得添加到path系統變量下,%ZOOKEEPER_HOME%\bin;
c、修改配置文件
(1)進入目錄 D:\ProgramFiles\zookeeper-3.3.6\conf;
(2)將「zoo_sample.cfg」重命名爲「zoo.cfg」;
(3)文本編輯器打開zoo.cfg,找到並編輯 dataDir=D:\\ProgramFiles\\zookeeper-3.3.6\\tmp\\zookeeper_logs
d、在zoo.cfg文件中修改默認的Zookeeper端口(默認端口2181)服務器
e、啓動zookeeper,兩種方式
(1)打開新的cmd,輸入zkServer,運行Zookeeper;
(2)進入目錄D:\ProgramFiles\zookeeper-3.3.6\bin,執行zkServer.cmd腳本。session
三、kafka
a、下載安裝包:
http://kafka.apache.org/downloads.html
下載後不須要安裝,直接解壓就好,個人jdk安裝在D盤,我把kafka也解壓在那裏。
b、修改配置文件
(1)進入Kafka配置目錄: D:\ProgramFiles\kafka_2.12-1.1.0\config
(2)文本編輯器打開 server.properties 文件,找到並編輯日誌路徑
log.dirs=D:\\ProgramFiles\\kafka_2.12-1.1.0\\tmp\\kafka-logs
(3)找到並編輯zookeeper.connect=localhost:2181。表示本地運行
(4)Kafka會按照默認,在9092端口上運行,並鏈接zookeeper的默認端口:2181。
c、運行
重要:請確保在啓動Kafka服務器前,Zookeeper實例已經準備好並開始運行。
(1)進入kafka安裝目錄, D:\ProgramFiles\kafka_2.12-1.1.0
(2)按下Shift鍵,同時單擊鼠標右鍵,選擇「打開命令窗口」選項,打開命令行;
(3)輸入
.\bin\windows\kafka-server-start.bat .\config\server.properties
而後回車
上面的Zookeeper和kafka一直打開。
一、建立主題
a、進入kafka安裝目錄, D:\ProgramFiles\kafka_2.12-1.1.0
b、按下Shift鍵,同時單擊鼠標右鍵,選擇「打開命令窗口」選項,打開命令行;
c、輸入
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 –repartition-factor 1 --partitions 1 --topic gl_test_topic
d、已建立的topic,不能夠重複建立。
上面全部的窗口要一直打開。
二、建立生產者
a、進入kafka安裝目錄, D:\ProgramFiles\kafka_2.12-1.1.0
b、按下Shift鍵,同時單擊鼠標右鍵,選擇「打開命令窗口」選項,打開命令行;
c、輸入
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic gl_test_topic_00
三、建立消費者
a、進入kafka安裝目錄, D:\ProgramFiles\kafka_2.12-1.1.0
b、按下Shift鍵,同時單擊鼠標右鍵,選擇「打開命令窗口」選項,打開命令行;
c、輸入
.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic gl_test_topic_00
1、安裝kafka 1.安裝zookeeper 相關配置 log_Dir 2.安裝kafka 相關配置 log_Dir 3.啓動zookeeper服務 ./bin/zookeeper-server-start.sh config/zookeeper.properties 4.開啓kafka服務 .\bin\windows\kafka-server-start.bat .\config\server.properties ./bin/kafka-server-start.sh config/server.properties 5.建立topic .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink 查看topic: ./bin/kafka-topics.sh --list --zookeeper localhost:2181 建立生產者 .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test_flink ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_flink 建立消費者 .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test_flink ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_flink --from-beginning 2、搭建一個多個broker的集羣 首先爲每一個節點編寫配置文件 1. config/server-1.properties: broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1 2. config/server-2.properties: broker.id=2 port=9094 log.dir=/tmp/kafka-logs-2 3.啓動另外的節點 ./bin/kafka-server-start.sh config/server-1.properties ./bin/kafka-server-start.sh config/server-2.properties 4.建立一個擁有3個副本的topic ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic 3、搭建Kafka開發環境 1.添加依賴 <dependency> <groupId> org.apache.kafka</groupId > <artifactId> kafka_2.10</artifactId > <version> 0.8.0</ version> </dependency> 2. public interface KafkaProperties { final static String ZKCONNECT = "10.22.10.139:2181"; final static String GROUPID = "group1"; final static String TOPIC = "topic1"; final static String KAFKASERVERURL = "10.22.10.139"; final static int KAFKASERVERPORT = 9092; final static int KAFKAPRODUCERBUFFERSIZE = 64 * 1024; final static int CONNECTIONTIMEOUT = 20000; final static int RECONNECTINTERVAL = 10000; final static String TOPIC2 = "topic2"; final static String TOPIC3 = "topic3"; final static String CLIENTID = "SimpleConsumerDemoClient"; } 3.生產者 import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducer extends Thread { private final Producer<Integer, String> producer; private final String topic; private final Properties props = new Properties(); public KafkaProducer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "10.22.10.139:9092"); producer = new Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } @Override public void run() { int messageNo = 1; while (true) { String messageStr = new String("Message_" + messageNo); System.out.println("Send:" + messageStr); producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); messageNo++; try { sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } } 4.消費者 import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private final ConsumerConnector consumer; private final String topic; public KafkaConsumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.ZKCONNECT); props.put("group.id", KafkaProperties.GROUPID); props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } @Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream =consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println("receive:" + new String(it.next().message())); try { sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }