其餘更多java基礎文章:
java基礎學習(目錄)java
學習資料:kafka數據可靠性深度解讀apache
消息發送時都被髮送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區日誌)組成,其組織結構以下圖所示:bootstrap
咱們能夠看到,每一個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每個消息都被賦予了一個惟一的offset值。緩存
同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的狀況下,一旦broker 宕機,其上全部 patition 的數據都不可被消費,同時producer也不能再將數據存於其上的patition。引入replication以後,同一個partition可能會有多個replication,而這時須要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication做爲follower從leader 中複製數據。bash
ACK,HW,ISR等能夠閱讀kafka數據可靠性深度解讀學習
簡單來講:服務器
- HW是HighWatermark的縮寫,是指consumer可以看到的此partition的位置
- ISR (In-Sync Replicas),這個是指副本同步隊列。全部的副本(replicas)統稱爲Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個維度),任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。
- ack機制:在kafka發送數據的時候,每次發送消息都會有一個確認反饋機制,確保消息正常的可以被收到。
kafka提供了兩套consumer API:高級Consumer API和低級API。架構
消費者是以consumer group消費者組的方式工做,由一個或者多個消費者組成一個組,共同消費一個topic。每一個分區在同一時間只能由group中的一個消費者讀取,可是多個group能夠同時消費這個partition。併發
import org.apache.kafka.clients.producer.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args){
//test();
test2();
}
public static void test(){
Properties props= new Properties();
props.put("bootstrap.servers", "172.26.40.181:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer(props);
for(int i = 0; i < 10; i++){
producer.send(new ProducerRecord("first",Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
/**
* 帶回調函數
*/
public static void test2(){
Properties props = new Properties();
// Kafka服務端的主機名和端口號
props.put("bootstrap.servers", "172.26.40.181:9092");
// 等待全部副本節點的應答
props.put("acks", "all");
// 消息發送最大嘗試次數
props.put("retries", 0);
// 一批消息處理大小
props.put("batch.size", 16384);
// 增長服務端請求延時
props.put("linger.ms", 1);
// 發送緩存區內存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//攔截器
List<String> interceptor = new ArrayList<>();
interceptor.add("com.hiway.practice.kafka.TimeInterceptor");
interceptor.add("com.hiway.practice.kafka.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptor);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.err.println(metadata.partition() + "---" + metadata.offset());
}
}
});
}
kafkaProducer.close();
}
}
複製代碼
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomNewConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定義kakfa 服務的地址,不須要將全部broker指定上
props.put("bootstrap.servers", "172.26.40.181:9092");
// 制定consumer group
props.put("group.id", "test");
// 是否自動確認offset
props.put("enable.auto.commit", "true");
// 自動確認offset的時間間隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 定義consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消費者訂閱的topic, 可同時訂閱多個
consumer.subscribe(Arrays.asList("first"));
while (true) {
// 讀取數據,讀取超時時間爲100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
複製代碼
public class TimeInterceptor implements ProducerInterceptor<String,String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 統計成功和失敗的次數
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存結果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
複製代碼
1. Uncaught error in kafka producer I/O thread錯誤
這個問題主要是服務器上的kafka版本和IDEA中的kafka版本不一致致使的。負載均衡
2.producer發送數據到集羣上無反應分佈式
將kafka/config/server.properties文件中advertised.listeners改成以下屬性。172.26.40.181是我虛擬機的IP。改完後重啓,OK了。Java端的代碼終於能通訊了 advertised.listeners=PLAINTEXT://172.26.40.181:9092 advertised.listeners上的註釋是這樣的:
#Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
複製代碼
意思就是說:hostname、port都會廣播給producer、consumer。若是你沒有配置了這個屬性的話,則使用listeners的值,若是listeners的值也沒有配置的話,則使用 java.net.InetAddress.getCanonicalHostName()返回值(這裏也就是返回localhost了)。