package kafka; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.log4j.Logger; import java.util.Properties; public class Producer { Logger logger = Logger.getLogger("Producer"); public KafkaProducer getKafkaProducer() { Properties kafkaProps = new Properties(); /** * kafka生產者必選是三個屬性 * bootstrap.servers 指定broker的地址清單 * key.serializer 必須是一個實現org.apache.kafka.common.serialization.Serializer接口的類,將key序列化成字節數組。注意:key.serializer必須被設置,即便消息中沒有指定key * value.serializer 將value序列化成字節數組 */ kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //其餘設置 /* acks=0:若是設置爲0,生產者不會等待kafka的響應,高吞吐。消息會被馬上加到發送緩衝通道中,而且認爲已經發送成功。這種狀況下,不能保證kafka接收到了這條消息,retries配置不會生效,每條消息的偏移量都是1; acks=1:這個配置意味着kafka會把這條消息寫到本地日誌文件中,可是不會等待集羣中其餘機器的成功響應。這種狀況下,在寫入日誌成功後,集羣主機器掛掉,同時從機器還沒來得及寫的話,消息就會丟失掉。 acks=all:這個配置意味着leader會等待全部的follower同步完成。這個確保消息不會丟失,除非kafka集羣中全部機器掛掉。這是最強的可用性保證,最安全模式,但延遲相對較長。 (1)acks指定必需要有多少個partition副本收到消息,生產者纔會認爲消息的寫入是成功的。 acks=0,生產者不須要等待服務器的響應,以網絡能支持的最大速度發送消息,吞吐量高,可是若是broker沒有收到消息,生產者是不知道的 acks=1,leader partition收到消息,生產者就會收到一個來自服務器的成功響應 acks=all,全部的partition都收到消息,生產者纔會收到一個服務器的成功響應 (2)buffer.memory,設置生產者內緩存區域的大小,生產者用它緩衝要發送到服務器的消息。 (3)compression.type,默認狀況下,消息發送時不會被壓縮,該參數能夠設置成snappy、gzip或lz4對發送給broker的消息進行壓縮 (4)retries,生產者從服務器收到臨時性錯誤時,生產者重發消息的次數 (5)batch.size,發送到同一個partition的消息會被先存儲在batch中,該參數指定一個batch能夠使用的內存大小,單位是byte。不必定須要等到batch被填滿才能發送 (6)linger.ms,生產者在發送消息前等待linger.ms,從而等待更多的消息加入到batch中。若是batch被填滿或者linger.ms達到上限,就把batch中的消息發送出去 (7)max.in.flight.requests.per.connection,生產者在收到服務器響應以前能夠發送的消息個數 */ kafkaProps.put("acks", "all");// return new KafkaProducer(kafkaProps); } /** * 同步發送 * * @param topic * @param key * @param value * @param kafkaProducer */ public void sendMsgSynchr(String topic, String key, String value, KafkaProducer kafkaProducer) { ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, value); kafkaProducer.send(producerRecord); } /** * 異步發送 * * @param topic * @param key * @param value * @param kafkaProducer */ public void sendMsgAsynchr(String topic, String key, String value, KafkaProducer kafkaProducer) { ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value); kafkaProducer.send(producerRecord, new ProducerCallback());//發送消息時,傳遞一個回調對象,該回調對象必須實現org.apahce.kafka.clients.producer.Callback接口 } private class ProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) {//若是Kafka返回一個錯誤,onCompletion方法拋出一個non null異常。 e.printStackTrace();//對異常進行一些處理,這裏只是簡單打印出來 } } } public static void main(String[] args) { Producer producer = new Producer(); KafkaProducer kafkaProducer = producer.getKafkaProducer(); for (int i = 0; i < 100; i++) { String msg = "msg------" + i; System.out.println(msg); producer.sendMsgAsynchr("test_kafka", null, msg, kafkaProducer); } } }
package kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class Consumer { public KafkaConsumer getKafkaConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "groupid1"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //其餘參數 /* 1:fetch.min.bytes,指定消費者從broker獲取消息的最小字節數,即等到有足夠的數據時才把它返回給消費者 2:fetch.max.wait.ms,等待broker返回數據的最大時間,默認是500ms。fetch.min.bytes和fetch.max.wait.ms哪一個條件先獲得知足,就按照哪一種方式返回數據 3:max.partition.fetch.bytes,指定broker從每一個partition中返回給消費者的最大字節數,默認1MB 4:session.timeout.ms,指定消費者被認定死亡以前能夠與服務器斷開鏈接的時間,默認是3s 5:auto.offset.reset,消費者在讀取一個沒有偏移量或者偏移量無效的狀況下(由於消費者長時間失效,包含偏移量的記錄已通過時並被刪除)該做何處理。默認是latest(消費者從最新的記錄開始讀取數據)。另外一個值是 earliest(消費者從起始位置讀取partition的記錄) 6:enable.auto.commit,指定消費者是否自動提交偏移量,默認爲true 7:partition.assignment.strategy,指定partition如何分配給消費者,默認是Range。Range:把Topic的若干個連續的partition分配給消費者。RoundRobin:把Topic的全部partition逐個分配給消費者 8:max.poll.records,單次調用poll方法可以返回的消息數量 */ return new KafkaConsumer(props); } public void getMsg(String topic, KafkaConsumer kafkaConsumer) { //2.訂閱Topic //建立一個只包含單個元素的列表,Topic的名字叫做customerCountries kafkaConsumer.subscribe(Collections.singletonList(topic)); //主題列表 //支持正則表達式,訂閱全部與test相關的Topic //consumer.subscribe("test.*"); //3.輪詢 //消息輪詢是消費者的核心API,經過一個簡單的輪詢向服務器請求數據,一旦消費者訂閱了Topic,輪詢就會處理所欲的細節,包括羣組協調、partition再均衡、發送心跳 //以及獲取數據,開發者只要處理從partition返回的數據便可。 // try { while (true) {//消費者是一個長期運行的程序,經過持續輪詢向Kafka請求數據。在其餘線程中調用consumer.wakeup()能夠退出循環 //在100ms內等待Kafka的broker返回數據.超市參數指定poll在多久以後能夠返回,無論有沒有可用的數據都要返回 ConsumerRecords<String, String> records = kafkaConsumer.poll(100l); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } } // } finally { // //退出應用程序前使用close方法關閉消費者,網絡鏈接和socket也會隨之關閉,並當即觸發一次再均衡 // kafkaConsumer.close(); // } } public static void main(String[] args) { Consumer consumer = new Consumer(); KafkaConsumer kafkaConsumer = consumer.getKafkaConsumer(); consumer.getMsg("test_kafka", kafkaConsumer); } }