Apache Kafka 編程實戰您可能感性的文章:java
Apache Kafka安裝和使用apache
....服務器
本章經過實際例子,講解了如何使用java進行kafka開發。網絡
添加依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
複製代碼
下面是建立主題的代碼:
public class TopicProcessor {
private static final String ZK_CONNECT="localhost:2181";
private static final int SESSION_TIME_OUT=30000;
private static final int CONNECT_OUT=30000;
public static void createTopic(String topicName,int partitionNumber,int replicaNumber,Properties properties){
ZkUtils zkUtils = null;
try{
zkUtils=ZkUtils.apply(ZK_CONNECT,SESSION_TIME_OUT,CONNECT_OUT, JaasUtils.isZkSecurityEnabled());
if(!AdminUtils.topicExists(zkUtils,topicName)){
AdminUtils.createTopic(zkUtils,topicName,partitionNumber,replicaNumber,properties,AdminUtils.createTopic$default$6());
}
}catch (Exception e){
e.printStackTrace();
}finally {
zkUtils.close();
}
}
public static void main(String[] args){
createTopic("javatopic",1,1,new Properties());
}
}
複製代碼
首先定義了zookeeper相關鏈接信息。而後在createTopic中,先初始化ZkUtils,和zookeeper交互依賴於它。而後經過AdminUtils先判斷是否存在你要建立的主題,若是不存在,則經過createTopic方法進行建立。傳入參數包括主題名稱,分區數量,副本數量等。
生產者生產消息代碼以下:
public class MessageProducer {
private static final String TOPIC="education-info";
private static final String BROKER_LIST="localhost:9092";
private static KafkaProducer<String,String> producer = null;
static{
Properties configs = initConfig();
producer = new KafkaProducer<String, String>(configs);
}
private static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
return properties;
}
public static void main(String[] args){
try{
String message = "hello world";
ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC,message);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(null==exception){
System.out.println("perfect!");
}
if(null!=metadata){
System.out.print("offset:"+metadata.offset()+";partition:"+metadata.partition());
}
}
}).get();
}catch (Exception e){
e.printStackTrace();
}finally {
producer.close();
}
}
}
複製代碼
一、首先初始化KafkaProducer對象。
producer = new KafkaProducer<String, String>(configs);
複製代碼
二、建立要發送的消息對象。
ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC,message);
複製代碼
三、經過producer的send方法,發送消息
四、發送消息時,能夠經過回調函數,取得消息發送的結果。異常發生時,對異常進行處理。
初始化producer時候,須要注意下面屬性設置:
properties.put(ProducerConfig.ACKS_CONFIG,"all");
複製代碼
這裏有三種值可供選擇:
咱們直接看代碼
public class MessageConsumer {
private static final String TOPIC="education-info";
private static final String BROKER_LIST="localhost:9092";
private static KafkaConsumer<String,String> kafkaConsumer = null;
static {
Properties properties = initConfig();
kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Arrays.asList(TOPIC));
}
private static Properties initConfig(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"test");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
return properties;
}
public static void main(String[] args){
try{
while(true){
ConsumerRecords<String,String> records = kafkaConsumer.poll(100);
for(ConsumerRecord record:records){
try{
System.out.println(record.value());
}catch(Exception e){
e.printStackTrace();
}
}
}
}catch(Exception e){
e.printStackTrace();
}finally {
kafkaConsumer.close();
}
}
}
複製代碼
代碼邏輯以下:
一、初始化消費者KafkaConsumer,並訂閱主題。
kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Arrays.asList(TOPIC));
複製代碼
二、循環拉取消息
ConsumerRecords<String,String> records = kafkaConsumer.poll(100);
複製代碼
poll方法傳入的參數100,是等待broker返回數據的時間,若是超過100ms沒有響應,則再也不等待。
三、拉取回消息後,循環處理。
for(ConsumerRecord record:records){
try{
System.out.println(record.value());
}catch(Exception e){
e.printStackTrace();
}
}
複製代碼
消費相關代碼比較簡單,不過這個版本沒有處理偏移量提交。學習過第四章-協調器相關的同窗應該還記得偏移量提交的問題。我曾說過最佳實踐是同步和異步提交相結合,同時在特定的時間點,好比再均衡前進行手動提交。
加入偏移量提交,須要作以下修改:
一、enable.auto.commit設置爲false
二、消費代碼以下:
public static void main(String[] args){
try{
while(true){
ConsumerRecords<String,String> records =
kafkaConsumer.poll(100);
for(ConsumerRecord record:records){
try{
System.out.println(record.value());
}catch(Exception e){
e.printStackTrace();
}
}
kafkaConsumer.commitAsync();
}
}catch(Exception e){
e.printStackTrace();
}finally {
try{
kafkaConsumer.commitSync();
}finally {
kafkaConsumer.close();
}
}
}
複製代碼
三、訂閱消息時,實現再均衡的回調方法,在此方法中手動提交偏移量
kafkaConsumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//再均衡以前和消費者中止讀取消息以後調用
kafkaConsumer.commitSync(currentOffsets);
}
});
複製代碼
經過以上三步,咱們把自動提交偏移量改成了手動提交。正常消費時,異步提交kafkaConsumer.commitAsync()。即便偶爾失敗,也會被後續成功的提交覆蓋掉。而在發生異常的時候,手動提交 kafkaConsumer.commitSync()。此外在步驟3中,咱們經過實現再均衡時的回調方法,手動同步提交偏移量,確保了再均衡前偏移量提交成功。
以上面的最佳實踐提交偏移量,既能保證消費時較高的效率,又可以儘可能避免重複消費。不過因爲重複消費沒法100%避免,消費邏輯須要本身處理重複消費的判斷。
更多你可能感興趣的文章:
1-Flink入門
2-本地環境搭建&構建第一個Flink應用
3-DataSet API
4-DataSteam API
5-集羣部署
6-分佈式緩存
7-重啓策略
8-Flink中的窗口
9-Flink中的Time
Flink時間戳和水印
Broadcast廣播變量
FlinkTable&SQL
Flink實戰項目實時熱銷排行
Flink寫入RedisSink
Flink消費Kafka寫入Mysql
Flink組件和邏輯計劃
Flink執行計劃生成
JobManager中的基本組件(1)
JobManager中的基本組件(2)
JobManager中的基本組件(3)
TaskManager
算子
網絡
水印WaterMark
CheckPoint
任務調度與負載均衡
異常處理
Alibaba Blink新特性
Java高級特性加強-集合
Java高級特性加強-多線程
Java高級特性加強-Synchronized
Java高級特性加強-volatile
Java高級特性加強-併發集合框架
Java高級特性加強-分佈式
Java高級特性加強-Zookeeper
Java高級特性加強-JVM
Java高級特性加強-NIO
Java高級特性加強-Netty
你真的不關注一下嘛~