1.什麼是Kafkajava
Apache Kafka是一個 開源的分佈式消息隊列 (生產者消費者模式) Apache Kafka 目標:構建企業中統一的. 高通量的,低延時的消息平臺 ; 大多的消息隊列是基於JMS標準實現的Apache Kafka l相似於JMS的實現.node
2.Kafka的特色算法
做爲緩衝(流量消減),來異構,解耦系統apache
3.基本架構bootstrap
Kafka Cluster:由多個服務器組成。每一個服務器單獨的名字broker(掮客)。緩存
kafka broker:kafka集羣中包含的服務器服務器
Kafka Producer:消息生產者、發佈消息到 kafka 集羣的終端或服務。架構
Kafka consumer:消息消費者、負責消費數據。負載均衡
Kafka Topic: 主題,一類消息的名稱。存儲數據時將一類數據存放在某個topic下,消費數據也是消費一類數據。異步
訂單系統:建立一個topic,叫作order。
用戶系統:建立一個topic,叫作user。
商品系統:建立一個topic,叫作product。
注意:Kafka的元數據都是存放在zookeeper中。
4.Kafka的基本使用
4.1使用腳本操做Kafka
1) 建立一個topic
./kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic order
2) 使用Kafka 自帶一個命令客戶端啓動一個生產者,生產數據
./kafka-console-producer.sh --broker-list node01:9092 --topic order
3) 使用Kafka自帶一個命令客戶端啓動一個消費者,消費數據
./kafka-console-consumer.sh --bootstrap-server node01:9092 --topic order
該消費語句,只能獲取最新的數據,要想歷史數據,須要添加選項-
4)查看有哪些topic
./kafka-topics.sh --list --zookeeper node01:2181
5) 查看某一個具體的Topic的詳細信息
./kafka-topics.sh --describe --topic order --zookeeper node01:2181
6) 刪除topic
./kafka-topics.sh --delete --topic order --zookeeper node01:2181
第一步: 添加kafka相關的依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>
// kakfa的生產者
public class KafkaProducerTest {
public static void main(String[] args) {
//1. 建立 kafka的生產者的對象
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
props.put("acks", "all"); // 消息確認機制 : all 最高級別, 保證數據不會丟失
props.put("retries", 0); // 重試 : 0 表示發送失敗, 不會重試
props.put("batch.size", 16384); // 發送數據時候 一批數據的大小 默認值: 16384字節(16)
props.put("linger.ms", 1); // 每次發送數據間隔時間
props.put("buffer.memory", 33554432); // 緩存池的大小: 默認值: 33554432 32M
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 效率, java的序列化慢
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
//2. 發送消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>("order", "我是JavaAPI發過來...");
producer.send(record);
//3. 關閉資源
producer.close();
}
}
package com.itheima.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
// kafka的消費者
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
props.put("group.id", "test"); // 組id號
props.put("enable.auto.commit", "true"); // 開啓自動提交
props.put("auto.commit.interval.ms", "1000"); // 每隔多長時間自動提交一次
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//1. 建立 kafka的消費者的對象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//2. 讓消費者訂閱一個topic
consumer.subscribe(Arrays.asList("order"));
//3. 獲取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); // 取出元素,
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
5 Apache Kafka原理
分片機制:主要解決了單臺服務器存儲容量有限的問題
當數據量很是大的時候,一個服務器存放不了,就將數據分紅兩個或者多個部分,存放在多臺服務器上。每一個服務器上的數據,叫作一個分片
副本:副本備份機制解決了數據存儲的高可用問題
當數據只保存一份的時候,有丟失的風險。爲了更好的容錯和容災,將數據拷貝幾份,保存到不一樣的機器上。在放置副本的時候:
假設有三個分片, 三個副本, 共計9個節點
在三臺服務器上 各放置一個分片的副本
第二個副本放置在和這臺服務器同機架上
第三個副本放置在不一樣的機架的服務器上
5.2 Kafka 保證數據不丟失機制
5.2.1保證生產端數據不丟失機制
1)消息生產分爲同步方式和異步方式
2)消息確認分爲三個狀態
a)0:生產者只負責發送數據
b)1:某個分片的leader收到數據給出響應
C)-1:某個分片的副本都收到數據後給出響應
3在同步方式下
生產者等待10秒,若是broke沒有給出ack響應,就認爲失敗
生產者主動將進行重試3次若是尚未響應就報錯
在異步模式下
a)先將數據保存在生產端的buffer中 Buffer大小是2萬條
b)知足數據閥值或者數量(時間)閥值其中的一個條件就能夠發送數據
c 發送一批數據的大小是500條
5,2,2 broker端消息不丟失
broker端的信息不丟失其實就是用partition和副本機制(高可用)來保證
producer ack -1(all)可以保證全部的副本都同步好了數據,其中一臺機器了並不影響數據的完整性
5.2.3 消費端消息不丟失
offSet:偏移量
經過offset commit 來保證數據的不丟失,kafka本身記錄了每次消費的offset數值,下次繼續消費的時候,會接着上次的offset進行消費。
5.3 消息存儲及查詢機制
5.3.1 文件存儲機制
數據都是順序存儲在磁盤中
segment段中有兩個核心的文件一個是log,一個是index。 當log文件等於1G時,新的消息會寫入到下一個segment中。
5.3.2 文件查詢機制
需求: 讀取 offset=368776 的message消息數據
5.4 生產者數據分發策略
1) 若是是用戶制定了parttion ,數據分發策略的時候能夠指定數據發往哪一個parttion
當producerRecord 的構造參數中有partition的時候就能夠發送到對應的parttion上
2) 當用戶指定key, 默認使用hash算法 能夠定義分發策略的算法
3) 當用戶既沒有指定partition 也沒有Key 使用輪詢的方式發送數據
5.5 消費者負載均衡機制
一個parttion能夠被一個組中的某一個成員消費
因此若是消費組中有多於partition數量的消費者,那麼必定會有消費者沒法消費數據。 Kafka 消費者在消費數據的時候 若是數據量過大 如何解決消費者慢的問題
1)在儘量硬件知足的狀況下 多加分片 並將同組中消費者數量和分片數量保持一致
2) 提升消費者處理數據的能力優化代碼