tar -zxvf kafka_2.11-0.10.1.1.tgz cd kafka_2.11-0.10.1.1/
首先啓動zookeeper:這裏使用zookeeper默認配置。html
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
啓動kafka服務:這裏使用單節點多代理配置。java
修改server.propertiesapache
broker.id=0 #節點ID listeners=PLAINTEXT://192.168.1.112:9092 #節點配置 log.dirs=/opt/kafka_2.11-0.10.1.1/log #數據日誌保存目錄 num.partitions=4 #默認分區數 zookeeper.connect=192.168.1.112:2181 #zookeeper服務
其餘配置項保持默認。bootstrap
複製配置文件api
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties
修改配置節點id 、端口、日誌路徑緩存
server-1.properties安全
broker.id=1 #節點ID listeners=PLAINTEXT://192.168.1.112:9093 #節點配置 log.dirs=/opt/kafka_2.11-0.10.1.1/log-1 #數據日誌保存目錄 num.partitions=4 #默認分區數 zookeeper.connect=192.168.1.112:2181 #zookeeper服務
server-2.properties服務器
broker.id=2 #節點ID listeners=PLAINTEXT://192.168.1.112:9094 #節點配置 log.dirs=/opt/kafka_2.11-0.10.1.1/log-2 #數據日誌保存目錄 num.partitions=4 #默認分區數 zookeeper.connect=192.168.1.112:2181 #zookeeper服務
啓動kafka節點:broker0,broker1,broker2session
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 & nohup bin/kafka-server-start.sh config/server-1.properties >/dev/null 2>&1 & nohup bin/kafka-server-start.sh config/server-2.properties >/dev/null 2>&1 &
建立topic:設置備份2,分區3oracle
./bin/kafka-topics.sh --create --zookeeper 192.168.1.112:2181 --replication-factor 2 --partitions 3 --topic my-topic
運行命令 「describe topics」查看topic信息
bin/kafka-topics.sh --describe --zookeeper 192.168.1.112:2181 --topic my-topic
輸出:
Topic:my-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: my-topic Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: my-topic Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
第一行是全部分區的摘要,其次,每一行提供一個分區信息。這裏三行顯示每一個分區的信息。
「Partition」:分區信息
「Leader」:該節點負責該分區的全部的讀和寫,每一個節點的leader
都是隨機選擇的。
「Replicas」:備份節點列表,不管該節點是不是leader或者目前是否還活着,只是顯示。
「Isr」:「同步備份」的節點列表,也就是活着的節點而且正在同步leader。
在topic中發佈消息:
bin/kafka-console-producer.sh --broker-list 192.168.1.112:9092 --topic my-topic this is message 1 this is message 2
在topic中獲取消息
bin/kafka-console-consumer.sh --zookeeper 192.168.1.112:2181 --topic my-topic --from-beginning this is message 1 this is message 2
測試kill掉其中一個節點,分區是否正常工做,這裏咱們停掉broker0
ps | grep server.properties kill -9 3789
而後查看分區信息
bin/kafka-topics.sh --describe --zookeeper 192.168.1.112:2181 --topic my-topic Topic:my-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: my-topic Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2 Topic: my-topic Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1
跟前面執行"describe topic"命令相比,分區2的leader變爲broker1,「同步備份」的 列表中也沒有broker0。執行從最初獲取消息的命令也會發現,消息並無丟失。
Apache Kafka引入一個新的java客戶端(在org.apache.kafka.clients 包中),替代老的Scala客戶端,可是爲了兼容,將會共存一段時間。爲了減小依賴,這些客戶端都有一個獨立的jar,而舊的Scala客戶端繼續與服務端保留在同個包下。
在項目中引入kafka服務端jar包,包含scala客戶端。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.1</version> </dependency>
咱們鼓勵全部新開發的程序使用新的JAVA客戶端,新的java客戶端比之前的Scala的客戶端更快、功能更全面。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency>
生產者是線程安全的,在多個線程之間共享單個生產者實例。
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.112:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 10); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer(props); for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer .send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); } producer.close(); }
生產者緩衝空間池保留還沒有發送到服務器的消息,後臺I/O線程負責將這些消息轉換成請求發送到集羣。
默認緩衝可當即發送,即使緩衝空間尚未滿,可是,若是你想減小請求的數量,能夠設置linger.ms
大於0。這將指示生產者發送請求以前等待一段時間,但願更多的消息填補到未滿的批中。
send():方法是異步的,添加消息到緩衝區等待發送,並當即返回。生產者將單個的消息批量在一塊兒發送來提升效率。
producer提供兩種send方法實現,第一種不帶回調方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record)
異步發送一條消息到topic,而且消息一旦被保存到「等待發送的消息緩存」中,此方法就當即返回。這樣並行發送多條消息而不阻塞去等待每一條消息的響應。發送消息返回的結果是RecordMetadata,它指定了消息發送的分區,分配的offset和消息的時間戳。
因爲send調用是異步的,它將爲發送此消息的響應RecordMetadata返回一個Future。調用future的get()方法則將阻塞,直到相關請求完成並返回該消息的metadata,或拋出發送異常。
阻塞方式調用:
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); RecordMetadata recordMetadata = future.get(); System.out.println("partition="+recordMetadata.partition()+",offset="+recordMetadata.offset()+"value="+i);
另外一種send方法實現是帶回調方法:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
異步發送消息,當發送成功或異常時會調用回調方法,返回發送結果RecordMetadata信息或異常。這樣能夠不阻塞的方式確保每條消息發送成功。
非阻塞方式調用:
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)),new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null){ //發送成功 System.out.println("partition=" + metadata.partition() + ",offset="+ metadata.offset()); }else{ //發送異常 exception.printStackTrace(); } } });
發送到同一個分區的消息回調保證按必定的順序執行。
注意:callback通常在生產者的I/O線程中執行,因此是至關的快的,不然將延遲其餘的線程的消息發送。若是你須要執行阻塞或計算昂貴(消耗)的回調,建議在callback主體中使用本身的Executor來並行處理。
隨着0.9.0版本,咱們已經增長了一個新的Java消費者替換咱們現有的基於zookeeper的高級和低級消費者。這個客戶端仍是測試版的質量。爲了確保用戶平滑升級,咱們仍然維護舊的0.8版本的消費者客戶端繼續在0.9集羣上工做,兩個老的0.8 API的消費者( 高級消費者 和 低級消費者)。
這個新的消費API,清除了0.8版本的高版本和低版本消費者之間的區別。
kafka客戶端消費者類
public class KafkaConsumer<K, V> implements Consumer<K, V>
偏移量和消費者位置
kafka爲分區中的每條消息保存一個偏移量(offset),這個偏移量是該分區中一條消息的惟一標示符。也表示消費者在分區的位置。
消費者的位置指向下一條記錄的偏移量。它比消費者在該分區中讀取過的最大偏移量要大一個。 它在每次消費者在調用poll(long)中拉取消息時自動增加。
「已提交」的位置是已保存的最後偏移量,若是進程失敗或從新啓動時,消費者將恢復到這個偏移量。消費者能夠選擇按期自動提交偏移量,也能夠選擇經過調用commit API來手動的控制(如:commitSync 和 commitAsync)。
消費者組和訂閱主題
消費者組:具備相同group.id的消費者進程,能夠在一臺機器上運行也能夠分佈在多臺機器上,拉取kafka消息並處理消息。
訂閱主題:分組中的每一個消費者都經過subscribe API
動態的訂閱一個topic列表。kafka將已訂閱topic的消息發送到每一個消費者組中。並經過平衡分區在消費者分組中全部成員之間來達到平均。
消費者組的成員是動態維護的:若是一個消費者故障。分配給它的分區將從新分配給同一個分組中其餘的消費者。一樣的,若是一個新的消費者加入到分組,將從現有消費者中移一個分區給它。這被稱爲從新平衡分組。
此外,當分組從新分配自動發生時,能夠經過ConsumerRebalanceListener通知消費者,這容許他們完成必要的應用程序級邏輯,例如狀態清除,手動偏移提交等。容許消費者經過使用assign(Collection)
手動分配指定分區,若是使用手動指定分配分區,那麼動態分區分配和協調消費者組將失效。
消費者故障
訂閱一組topic後,當調用poll(long)時,消費者將自動加入到組中。只要持續的調用poll,消費者將一直保持可用,並繼續從分配的分區中接收消息。此外,消費者向服務器定時發送心跳。 若是消費者崩潰或沒法在session.timeout.ms配置的時間內發送心跳,則消費者將被視爲死亡,而且其分區將被從新分配。
還有一種可能,消費可能遇到「活鎖」的狀況,它持續的發送心跳,可是沒有處理。爲了預防消費者在這種狀況下一直持有分區,咱們使用max.poll.interval.ms活躍檢測機制。 在此基礎上,若是你調用的poll的頻率大於最大間隔,則客戶端將主動地離開組,以便其餘消費者接管該分區。 發生這種狀況時,你會看到offset提交失敗(調用commitSync()引起的CommitFailedException)。這是一種安全機制,保障只有活動成員可以提交offset。因此要留在組中,你必須持續調用poll。
兩個配置設置來控制poll循環:
max.poll.interval.ms
:增大poll的間隔,能夠爲消費者提供更多的時間去處理返回的消息(調用poll(long)返回的消息,一般返回的消息都是一批)。缺點是此值越大將會延遲組從新平衡。
max.poll.records
:此設置限制每次調用poll返回的消息數,這樣能夠更容易的預測每次poll間隔要處理的最大值。經過調整此值,能夠減小poll間隔,減小從新平衡分組的。
對於消息處理時間不可預測地的狀況,這些選項是不夠的。 處理這種狀況的推薦方法是將消息處理移到另外一個線程中,讓消費者繼續調用poll。
另外,你必須禁用自動提交,並只有在線程完成處理後才爲記錄手動提交偏移量。 還要注意,你須要pause暫停
分區,不會從poll接收到新消息,讓線程處理完以前返回的消息(若是你的處理能力比拉取消息的慢,那建立新線程將致使你機器內存溢出)。
自動提交偏移量實例:
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100);//拉取消息超時時間100ms for (ConsumerRecord<String, String> consumerRecord : records) { System.out.printf("offset=%d,key=%s,value=%s%n", consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()); } } }
enable.auto.commit:true 自動提交偏移量
auto.commit.interval.ms:1000 自動提交偏移量間隔時間
session.timeout.ms:30000 broker自動檢測客戶端進程心跳的最大超時時間。若是中止心跳的時間超過該值,則認爲該客戶端出現故障,它的分區將被從新分配。
集羣是經過配置bootstrap.servers指定一個或多個broker。不用指定所有的broker,它將自動發現集羣中的其他的borker(最好指定多個,萬一有服務器故障)。
手動控制偏移量
不須要定時的提交offset,能夠本身控制offset,當消息被認爲已消費過了,這個時候再去提交它們的偏移量,這樣能夠保證消息被完整的處理。
public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "false"); props.setProperty("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); final int minBatchSize = 20; List<ConsumerRecord<String,String>> buffer = new ArrayList<>(); while(true){ ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> consumerRecord : records) { buffer.add(consumerRecord);//拉取kafka數據到內存中 } if(buffer.size() >= minBatchSize){ //處理buffer中消息 for (ConsumerRecord<String,String> cr : buffer) { System.out.println("處理消息:"+cr.value()); } consumer.commitAsync();//處理完內存中緩存的消息以後手動提交 System.out.println("提交offset"); buffer.clear(); } } }
在這個例子中,咱們將消費一批消息並將它們存儲在內存中。當咱們積累足夠多的消息後,咱們再將它們批量處理。等確保消息被成功徹底處理則手動提交offset。
這種狀況咱們能夠保證「至少一次」,上面例子當處理完全部消息以後,在提交offset時出現異常(雖然機率很小)。這種狀況下進程重啓以後就會重複消費這部分消息。「至少一次」保證,在故障狀況下,能夠重複。
使用自動提交也能夠「至少一次」。可是要求你必須下次調用poll(long)以前或關閉消費者以前,處理完全部返回的數據。若是操做失敗,這將會致使已提交的offset超過消費的位置,從而致使丟失消息。使用手動控制offset的優勢是,你能夠直接控制消息什麼時候提交。
處理完每一個分區中的消息後,提交偏移量實例:
public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "false"); props.setProperty("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String,String> records = consumer.poll(100); for (TopicPartition partition : records.partitions()) { //獲取分區中的消息 List<ConsumerRecord<String,String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> consumerRecord : partitionRecords) { System.out.println("partion:"+consumerRecord.partition()+",value:"+consumerRecord.value()+",offset:"+consumerRecord.offset()); } long lastOffset = partitionRecords.get(partitionRecords.size()-1).offset(); System.out.println(partition.partition()+"分區最後offset:"+lastOffset); //提交最後一個消息的下一個offset consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset+1))); } } }
注意:已提交的offset應始終是你的程序將讀取的下一條消息的offset。所以,調用commitSync(offsets)時,你應該加1個到最後處理的消息的offset。
訂閱指定分區
分區的設置只能經過調用assign
修改,由於手動分配不會進行分組協調,所以消費者故障不會引起分區從新平衡。
public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); TopicPartition p = new TopicPartition("my-topic",0);//訂閱主題的指定分區 consumer.assign(Arrays.asList(p)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100);//拉取消息超時時間100ms for (ConsumerRecord<String, String> consumerRecord : records) { System.out.printf("offset=%d,key=%s,value=%s%n", consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()); } } }
assign方法只指定消費者拉取指定分區的消息,消費者分組仍須要提交offset。
自定義存儲offset
消費者能夠不使用kafka內置的offset倉庫,能夠選擇本身來存儲offset。將消費的offset和消息結果存儲在同一個的系統中,用原子的方式存儲結果和offset。提供的「正好一次」的消費保證比kafka默認的「至少一次」的語義要更高。
本身管理偏移量要注意一下幾點:
1.關閉自動提交offset,enable.auto.commit=false。
2.根據ConsumerRecord 保存分區offset的位置。
3.啓動時恢復分區消費到的位置,經過方法seek(TopicPartition, long)。
手動控制分區,啓動時指定分區偏移量消費實例:
public class ConsumerTest5 { //用於跟蹤偏移量的map,手動提交偏移量狀況下,在分區再均衡時提交每一個分區消費的位置,以便均衡以後新的消費者開始從該位置進行消費 static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap(); //用於保存偏移量位置的map,以便下次啓動時從該位置開始消費。 static Map<String, Long> map = new ConcurrentHashMap<>(); private static KafkaConsumer<String, String> consumer; public static void main(String[] args) { map.put("my-topic0", 1000l); map.put("my-topic1", 1000l); map.put("my-topic2", 1000l); Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.1.112:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "false"); props.setProperty("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); // 手動指定位置消費,須要手動註冊 TopicPartition p = new TopicPartition("my-topic", 0); TopicPartition p1 = new TopicPartition("my-topic", 1); TopicPartition p2 = new TopicPartition("my-topic", 2); //手動註冊分區 consumer.subscribe(Arrays.asList("my-topic"),new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { //再均衡開始以前和消費者中止讀取消息以後被調用 //在這裏提交偏移量,下一個接管該分區的消費者就知道從什麼位置開始消費。 //提交全部分區的偏移量 System.out.println("提交全部分區偏移量"); consumer.commitSync(currentOffsets); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { //從新分配partition以後和消費者開始讀取消息以前被調用 } }); //這裏要先調用一次poll方法,由於本次拉取數據不是咱們須要的位置因此不作處理 consumer.poll(100); //指定分區消費位置 consumer.seek(p, map.get("my-topic0")); consumer.seek(p1, map.get("my-topic1")); consumer.seek(p2, map.get("my-topic2")); long offset = 0; String partitionKey; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); Set<TopicPartition> partitions = records.partitions(); for (TopicPartition topicPartition : partitions) { // 標識分區 partitionKey = topicPartition.topic() + topicPartition.partition(); System.out.println(partitionKey + "從" + map.get(partitionKey) + "開始消費"); List<ConsumerRecord<String, String>> list = records.records(topicPartition); offset = 0; for (ConsumerRecord<String, String> consumerRecord : list) { System.out.println("partition=" + topicPartition.partition() + "offset=" + consumerRecord.offset() + ",value=" + consumerRecord.value()); offset = consumerRecord.offset(); } // offset保存到map中 map.put(partitionKey, offset + 1); //手動提交偏移量 consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1))); //緩存跟蹤分區的偏移量 currentOffsets.put(topicPartition, new OffsetAndMetadata(offset+1)); } } } }
再均衡監聽器(ConsumerRebalanceListener)
在爲消費者分配新的partition或者移除舊的partition時,能夠經過消費者API執行一些應用程序代碼,在使用subscribe()方法時傳入一個ConsumerRebalanceListener實例。
ConsumerRebalanceListener須要實現的兩個方法
1:public void onPartitionRevoked(Collection<TopicPartition> partitions)方法會在再均衡開始以前和消費者中止讀取消息以後被調用。若是在這裏提交偏移量,下一個接管partition的消費者就知道該從哪裏開始讀取了。
2:public void onPartitionAssigned(Collection<TopicPartition> partitions)方法會在從新分配partition以後和消費者開始讀取消息以前被調用。
消費者流量控制
若是消費者分配了多個分區,並同時消費全部的分區,這些分區具備相同的優先級。在一些狀況下,消費者須要首先消費一些指定的分區,當指定的分區有少許或者已經沒有可消費的數據時,則開始消費其餘分區。
kafka支持動態控制消費流量,分別在consumer的poll(long)中使用pause
和 resume
來暫停消費指定分配的分區,從新開始消費指定暫停的分區。
public void pause(Collection<TopicPartition> partitions) //暫停消費指定分區。
public void resume(Collection<TopicPartition> partitions)//從新開始消費,已經暫停消費的分區。