從使用上來看,以0.9爲分界線,0.9開始再也不區分高級(至關於mysql binlog的GTID,只須要跟topic打交道,服務器自動管理偏移量和負載均衡)/低級消費者API(至關於mysql binlog的文件+position,直接和分區以及偏移量打交道)。java
一、生產者不須要訪問zookeeper(0.8.x版本的kafka consumer直連zk獲得偏移量信息,以後的版本直接從cluster獲取,因此這兩個版本的API並不兼容,上圖是0.8x的結構,0.9.x以及以後略有失真)。c++
三、和rocketmq同樣,爲了線性提升性能,每一個topic被分爲partition(跟數據庫的分庫分表同樣的道理,對業務而言透明,屬於技術策略,不是業務策略),每一個partition只能被相同消費組的任何一個成員消費(因此若是topic中的message不要求有序消費的話,partition是在大流量下提高性能的關鍵機制),topic的分區數量(默認是1)可經過./kafka-topics.sh –zookeeper localhost:2181 -alter –partitions 5 –topic userService修改,其合理值的設置能夠參考https://blog.csdn.net/kwengelie/article/details/51150114。程序員
四、kafka 0.8.x使用zk存儲每一個consumer-group在每一個topic每一個partition的點位,0.9版本開始存儲在專門的topic中,該topic名爲"__consumer_offset",採用日誌壓縮存儲,也就是僅存儲每一個key的最新值,而非全部。 github
五、每一個topic本地有一個local log,broker會持續順序寫入。web
ISRs見下文所述。一個topic中的不一樣parition能夠爲不一樣broker中的leader,這種模式能夠提升性能,由於讀寫都是leader負責。committed記錄所在的截止位置也成爲高水位"High Watermark"。雖然使用角度不直接care,可是partition是HA和擴展性的真正落地之處。
kafka controller架構以下:
低級API則容許控制各個交互過程,好比從哪裏開始讀以及在客戶端維護點位,rocketmq實現其實採用的就是高層和底層結合的API,也就是kafka 0.9以後合併的api版本。
每一個消費者group會記錄本身在每一個分區中的消費進度(該信息記錄在專門的topic log中,見上文)。一個分區只能由被每一個消費者group中的任意一個消費者成員消費,由於通常狀況下微服務都是集羣部署,因此這會致使N-1個微服務節點中的topic listener空跑,這是須要注意的,可是若是當前消費者所在的服務掛了,kafka會自動選擇其中一個剩下的consumer,可是若是已經消費可是ack未被kafka收到,其它consumer接管時就會重複消費,要注意冪等。想要一個topic被消費者group中的成員並行消費的話,就須要配置不低於集羣成員數的partition。簡單的說,就是管理粒度是消費者組(在其餘MQ中稱訂閱者)和topic,底層消息接收粒度分區和消費者。
An ISR is an in-sync replica. If a leader fails, an ISR is picked to be a new leader.
kafka的topic log會持續增加,因此爲了保持穩定,應該按期回收。這涉及到兩方面:消息的key是否會相同,它們的策略是不一樣的。Log Compaction主要用於key會相同的狀況,也就是非UUID做爲消息的鍵,不然就沒有意義了。其機制是根據消息保留的時間或文件大小來刪除key相同的歷史value,以下所示:
可知,歷史版本被清了。啓用compact後,topic log分爲了head和tail部分,只有tail的纔會被壓縮,可是刪除還要根據其它配置決定,以下。
每一個topic能夠基於時間或topic log的大小聲明消息的保留時間,由下列參數決定:
屬性名 | 含義 | 默認值 |
log.cleanup.polict | 日誌清理保存的策略只有delete和compact兩種 | delete |
log.retention.hours | 日誌保存的時間,能夠選擇hours,minutes和ms | 168(7day) |
log.retention.bytes | 刪除前日誌文件容許保存的最大值(任意一個達到都會執行刪除) | -1 |
log.segment.delete.delay.ms | 日誌文件被真正刪除前的保留時間 | 60000 |
log.cleanup.interval.mins | 每隔一段時間多久調用一次清理的步驟 | 10 |
log.retention.check.interval.ms | 週期性檢查是否有日誌符合刪除的條件(新版本使用) | 300000 |
生產者(如今面試,咱們都問如何保證發出的消息不丟失)能夠經過ack設置數據一致性要求(和mysql機制相似)。ack=0(不須要ACK,至多一次), ack=all(leader和全部follows都寫入成功,默認), ack=1(leader成功便可)。
能夠經過在producer properties中設置,以下:
早期版本的生產者不支持「精確一次」的概念,從Kafka 0.11.0支持精確一次投遞概念,它是經過引入生產者消息冪等+原子事務概念實現的,能夠參考https://dzone.com/articles/exactly-once-semantics-with-apache-kafka-1。
To implement 「at-most-once」 consumer reads a message, then saves its offset in the partition by sending it to the broker, and finally process the message. The issue with 「at-most-once」 is a consumer could die after saving its position but before processing the message. Then the consumer that takes over or gets restarted would leave off at the last position and message in question is never processed.
To implement 「at-least-once」 the consumer reads a message, process messages, and finally saves offset to the broker. The issue with 「at-least-once」 is a consumer could crash after processing a message but before saving last offset position. Then if the consumer is restarted or another consumer takes over, the consumer could receive the message that was already processed. The 「at-least-once」 is the most common set up for messaging, and it is your responsibility to make the messages idempotent, which means getting the same message twice will not cause a problem (two debits).
To implement 「exactly once」 on the consumer side, the consumer would need a two-phase commit between storage for the consumer position, and storage of the consumer’s message process output. Or, the consumer could store the message process output in the same location as the last offset.
props.put("enable.auto.commit", "false"); try { while (running) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) System.out.println(record.offset() + ": " + record.value()); try { consumer.commitSync(); } catch (CommitFailedException e) { // application specific failure handling } } } finally { consumer.close(); }
能夠經過java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk --port 8088 --refresh 10.seconds --retain 2.days啓動,各配置含義能夠參考github。
如何經過java api獲取全部topic?
[root@hs-test-10-20-30-11 kafka]# bin/kafka-topics.sh --zookeeper --list
[root@hs-test-10-20-30-11 kafka]# bin/kafka-topics.sh --zookeeper --topic global --describe
Topic:global PartitionCount:1 ReplicationFactor:1 Configs:
Topic: global Partition: 0 Leader: 0 Replicas: 0 Isr: 0
WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[root@hs-test-10-20-30-11 kafka]# bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
[root@hs-test-10-20-30-11 kafka]# bin/kafka-consumer-groups.sh --zookeeper --list
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version></version> </dependency>
logger增長kafka appender。
<Root level="INFO" additivity="false"> <AppenderRef ref="Console"/> <AppenderRef ref="KAFKA"/> <AppenderRef ref="app_error" /> </Root>
增長kafka appender。
<Appenders> <!-- 輸出錯誤日誌到Kafka --> <Kafka name="KAFKA" topic="bomp"> <ThresholdFilter level="error" onMatch="DENY" onMismatch="NEUTRAL"/> <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}:%4p %t (%F:%L) - %m%n" /> <Property name="bootstrap.servers"></Property> </Kafka> </Appenders>
./kafka-server-start.sh ../config/server.properties的時候,進程是前臺模式的,意味着關掉控制檯,kafka就停了。因此須要加-daemon選項之後臺模式啓動。以下:
./kafka-server-start.sh -daemon ../config/server.properties
該方法又調用了ConsumerCoordinator 的poll方法:
在看了Kafka的設計思想,查閱了相應資料再加上本身的測試後,發現磁盤的順序讀寫速度(Cassandra, LevelDB, RocksDB也都是這種策略)和內存持平。
若是在內存作這些操做的時候,一個是JAVA對象的內存開銷很大,另外一個是隨着堆內存數據的增多,JAVA的GC時間會變得很長,而利用OS的page cache,gc的開銷就節省了很多(JNI是否也能夠達到相似效果???,起碼netty中的ByteBuffer及Unsafe一大部分是的)。
在jvm方面,默認kafka用的是cms gc,能夠考慮g1垃圾回收期,調整爲:-server -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true
Kafka uses tombstones instead of deleting records right away
❖ Kafka producers support record batching. by the size of records and auto-flushed based on time
❖ Batching is good for network IO throughput.
❖ Batching speeds up throughput drastically
With Kafka consumers pull data from brokers
replica.lag.time.max.ms > lag時,leader就把follow從ISRs踢掉
If all replicas are down for a partition, Kafka chooses first replica (not
necessarily in ISR set) that comes alive as the leader
❖ Config unclean.leader.election.enable=true is default
❖ If unclean.leader.election.enable=false, if all replicas are down for a
partition, Kafka waits for the ISR member that comes alive as new
Outside of using a single ensemble(協調器,zookeeper) for multiple Kafka clusters, it is not recommended
to share the ensemble with other applications, if it can be avoided. Kafka is sensitive
to Zookeeper latency and timeouts, and an interruption in communications with the
ensemble will cause the brokers to behave unpredictably. This can easily cause multiple
brokers to go offline at the same time, should they lose Zookeeper connections,
which will result in offline partitions. It also puts stress on the cluster controller,
which can show up as subtle errors long after the interruption has passed, such as
when trying to perform a controlled shutdown of a broker. Other applications that
can put stress on the Zookeeper ensemble, either through heavy usage or improper
operations, should be segregated to their own ensemble.
