每個分區都是一個順序的、不可變的消息隊列, 而且能夠持續的添加。分區中的消息都被分了一個序列號,稱之爲偏移量(offset),在每一個分區中此偏移量都是惟一的。html
- 建立topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic1 - 生產者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic 1.建立了3個p的topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test-topic3 2.Kafka中能夠將Topic從物理上劃分紅一個或多個分區(Partition),每一個分區在物理上對應一個文件夾,以」topicName_partitionIndex」的命名方式命名,該文件夾下存儲這個分區的全部消息(.log)和索引文件(.index),這使得Kafka的吞吐率能夠水平擴展。 n1: p0 test-topic3-0 n2: p1 test-topic3-1 n3: p2 test-topic3-2
[root@n1 logstash]# ls -ld /data/kafka-logs/test-topic3-0 [root@n2 logstash]# ls -ld /data/kafka-logs/test-topic3-1 [root@n3 logstash]# ls -ld /data/kafka-logs/test-topic3-2
消費者能夠從
1.zk拉數據json
## src_base_zk - 查看基於zk的消費組 bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3 - 查看group詳情(判斷cusumer是否正常) bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group logstash-group --describe
2,kafka拉數據bootstrap
## src_base_kafka - 查看zk bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list - 查看group詳情(判斷cusumer是否正常) bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group logstash-group --describe - 查看實時消費日誌 bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3
- 建立topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic1 - 生產者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic 監控消費日誌 [root@n1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic hi mao
logstash消費ruby
input { kafka { bootstrap_servers => "localhost:9092" topics => "test-topic" group_id => "logstash-group" codec => "json" consumer_threads => 1 decorate_events => true } } output { stdout { codec => rubydebug } }
/usr/local/logstash/bin/logstash -f logstash.yaml --config.reload.automatic併發
先建立3個p的test-topic3
1.當有3個p, 1個消費者時
app
2.當有3個p,2個消費者時debug
3.當有3個p,3個消費者時
都是動態調配的(新增一個消費者, p的分配會自動變)3d
4.當有1個p,2個消費者
日誌
小結: 同一個消費組, 消費者個數<=p個數code
- 建立topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic2 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test-topic3 - 查看topic list bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list - 查看topic 詳細 bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic test-topic3 --describe - 生產者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic3 基於zk的消費者 - 查看基於zk的消費組 bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3 - 開啓一個消費者(隨機生成group) bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3 - 查看group詳情(判斷cusumer是否正常) bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group logstash-group --describe 基於kafka的消費者 - 查看基於kafka的消費組 bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list - 查看group詳情(判斷cusumer是否正常) bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group logstash-group --describe - 開啓一個消費者(隨機生成group) bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3 --group logstash-group - 開啓一個消費者(指定group,可能偷走已有的消費者的數據) bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3 --group logstash-group
參考: http://lxw1234.com/archives/2015/10/538.htm https://www.cnblogs.com/AcAc-t/p/kafka_topic_consumer_group_command.html https://www.cnblogs.com/happyday56/p/4208663.html