kafka自己就是LinkIn公司開發用於日誌系統的,因此其文件叫作log
php
點對對與發佈訂閱的區別java
1.1 點對點模式,生產者發送一條消息到queue,只有一個消費者能收到。 python
1.2 發佈訂閱模式 nginx
發佈者發送到topic的消息,只有訂閱了topic的訂閱者纔會收到消息kafka背景介紹
kafka是最初由Linkedin公司開發,使用Scala語言編寫,Kafka是一個分佈式、分區的、多副本的、多訂閱者的日誌系統(分佈式MQ系統),能夠用於web/nginx日誌,搜索日誌,監控日誌,訪問日誌等等。
kafka目前支持多種客戶端語言:java,python,c++,php等等。
c++
kafka高吞吐量的設計
數據磁盤持久化:消息不在內存中cache,直接寫入到磁盤,充分利用磁盤的順序讀寫性能。
zero-copy:減小IO操做步驟。
支持數據批量發送和拉取。
支持數據壓縮。
Topic劃分爲多個partition,提升並行處理能力。
web
kafka信息存儲
Kafka中的Message是以topic爲基本單位的,不一樣的topic之間是相互獨立的。每一個topic又能夠分紅幾個不一樣的partition(在建立topic時指定的),每一個partition存儲一部分Message。關係以下圖
apache
partition是以文件的形式存儲在文件系統中,好比,建立了一個名爲t101的topic,其有5個partition,那麼在Kafka的數據目錄中(log.dirs指定的)中就有這樣4個目錄: t101-0,t101-1,t101-2,t101-3 ,其命名規則爲<topic_name>-<partition_id>,裏面存儲的分別就是這4個partition的數據。 建立命令以下:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic t101
bootstrap
4.1 數據文件分段
好比有100條Message,它們的offset是從0到99。假設將數據文件分紅5段,第一段爲0-19,第二段爲20-39,以此類推,每段放在一個單獨的數據文件裏面,數據文件以該段中最小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就能夠定位到該Message在哪一個段中。bash
4.2 爲數據文件建索引
數據文件分段使得能夠在一個較小的數據文件中查找對應offset的Message了,可是這依然須要順序掃描才能找到對應offset的Message。爲了進一步提升查找的效率,Kafka爲每一個分段後的數據文件創建了索引文件,文件名與數據文件的名字是同樣的,只是文件擴展名爲.index。
索引文件中包含若干個索引條目,每一個條目表示數據文件中一條Message的索引。索引包含兩個部分,分別爲相對offset和position。
服務器
#相對offset:由於數據文件分段之後,每一個數據文件的起始offset不爲0,相對offset表示這條Message相對於其所屬數據文件中最小的offset的大小。舉例,分段後的一個數據文件的offset是從20開始,那麼offset爲25的Message在index文件中的相對offset就是25-20=5。存儲相對offset可減小索引文件佔用的空間 。
#position,表示該條Message在數據文件中的絕對位置。只要打開文件並移動文件指針到這個position就能夠讀取對應的Message了。
index文件中並無爲數據文件中的每條Message創建索引,而是採用了稀疏存儲的方式,每隔必定字節的數據創建一條索引。這樣避免了索引文件佔用過多的空間,從而能夠將索引文件保留在內存中。缺點是沒有創建索引的Message也不能一次定位到其在數據文件的位置,從而須要作一次順序掃描,可是此次順序掃描的範圍就很小了。
4.3 查找message原理圖
1)首先是用二分查找肯定它是在哪一個LogSegment中,天然是在第一個Segment中。 2)打開這個Segment的index文件,也是用二分查找找到offset小於或者等於指定offset的索引條目中最大的那個offset。天然offset爲6的那個索引是咱們要找的,經過索引文件咱們知道offset爲6的Message在數據文件中的位置爲9807。
3)打開數據文件,從位置爲9807的那個地方開始順序掃描直到找到offset爲7的那條Message。
Kafka的Message存儲採用了分區(partition),分段(LogSegment)和稀疏索引這幾個手段來達到了高效性
5.5 zookeeper記錄分區與消費者的關係
對於每一個消費者組 (Consumer Group),Kafka都會爲其分配一個全局惟一的Group ID,Group內部的全部消費者共享該ID。
同時,Kafka爲每一個消費者分配一個Consumer ID,一般採用"Hostname:UUID"形式表示。
在Kafka中,規定了每一個消息分區 只能被同組的一個消費者進行消費,所以,須要在 Zookeeper 上記錄 消息分區 與 Consumer 之間的關係,每一個消費者一旦肯定了對一個消息分區的消費權力,須要將其Consumer ID 寫入到 Zookeeper 對應消息分區的臨時節點上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
5.6 消息消費進度Offset記錄
在消費者對指定消息分區進行消息消費的過程當中,須要定時地將分區消息的消費進度Offset記錄到Zookeeper上,以便在該消費者進行重啓或者其餘消費者從新接管該消息分區的消息消費後,可以從以前的進度開始繼續進行消息消費。Offset在Zookeeper中由一個專門節點進行記錄,其節點路徑爲:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
節點內容就是Offset的值
新版本消費位移存儲
老版本的消費位移信息是存儲的zookeeper 中的, 可是zookeeper 並不適合頻繁的寫入查詢操做, 因此在新版本的中消費位移信息存放在了__consumer_offsets內置topic中。
能夠利用以下命令建立consumers group信息,建立group consumer_offsets_t105
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --from-beginning --group consumer_offsets_t105
查詢consumer_offsets_t105 在 __consumer_offsets topic 中存放的位移信息__consumer_offsets 默認分區50。經過以下公式便可獲取:Math.abs("consumer_offsets_t105".hashCode()) % 50。
能夠計算得位移偏移量是存在partitionId等於44分區上。kafka java調用
7.1 生產者
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerDemo {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<String, String>("t105", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
複製代碼
7.2 消費者
複製代碼
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerDemo {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "group4");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "none");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("t105"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll( Duration.ofMillis(100)
);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition = "+ record.partition() +" offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}
參數auto.offset.reset有三個值latest, earliest, none。<br>
earliest: automatically reset the offset to the earliest offset<br>
latest: automatically reset the offset to the latest offset<br>
none: hrow exception to the consumer if no previous offset is found for the consumer's group
複製代碼
kafka的流實例參考
juejin.im/post/5cd50a…
建立一個topic bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic t106
describe topics bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic t106
往集羣中發消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t106
集羣消費消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic t106
驗證消息是否生產成功 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic t105 --time -1
—————————
建立消費組消費消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --from-beginning --group consumer_offsets_t105
查詢偏移量消息 bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 44 --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" -- from-beginning
//經過config文件訪問客戶端 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t105 --group consumer_offsets_t105 --consumer.config config/consumer.properties