Apache Kafka® 是 一個分佈式流處理平臺apache
上面是官網的介紹,和通常的消息處理系統相比,不一樣之處在於:編程
和其餘的消息系統之間的對比:安全
對比指標 | kafka | activemq | rabbitmq | rocketmq |
---|---|---|---|---|
背景 | Kafka 是LinkedIn 開發的一個高性能、分佈式的消息系統,普遍用於日誌收集、流式數據處理、在線和離線消息分發等場景 | ActiveMQ是一種開源的,實現了JMS1.1規範的,面向消息(MOM)的中間件, 爲應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通訊。 | RabbitMQ是一個由erlang開發的AMQP協議(Advanced Message Queue )的開源實現。 | RocketMQ是阿里巴巴在2012年開源的分佈式消息中間件,目前已經捐贈給Apache基金會,已經於2016年11月成爲 Apache 孵化項目 |
開發語言 | Java、Scala | Java | Erlang | Java |
協議支持 | 本身實現的一套 | JMS協議 | AMQP | JMS、MQTT |
持久化 | 支持 | 支持 | 支持 | 支持 |
producer容錯 | 在kafka中提供了acks配置選項, acks=0 生產者在成功寫入悄息以前不會等待任何來自服務器的響應 acks=1 只要集羣的首領節點收到消息,生產者就會收到一個來自服務器的成功響應 acks=all 只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應,這種模式最安全 | 發送失敗後便可重試 | 有ack模型。 ack模型可能重複消息 ,事務模型保證徹底一致 | 和kafka相似 |
吞吐量 | kafka具備高的吞吐量,內部採用消息的批量處理,zero-copy機制,數據的存儲和獲取是本地磁盤順序批量操做,具備O(1)的複雜度,消息處理的效率很高 | rabbitMQ在吞吐量方面稍遜於kafka,他們的出發點不同,rabbitMQ支持對消息的可靠的傳遞,支持事務,不支持批量的操做;基於存儲的可靠性的要求存儲能夠採用內存或者硬盤。 | kafka在topic數量很少的狀況下吞吐量比rocketMq高,在topic數量多的狀況下rocketMq比kafka高 | |
負載均衡 | kafka採用zookeeper對集羣中的broker、consumer進行管理,能夠註冊topic到zookeeper上;經過zookeeper的協調機制,producer保存對應topic的broker信息,能夠隨機或者輪詢發送到broker上;而且producer能夠基於語義指定分片,消息發送到broker的某分片上 | rabbitMQ的負載均衡須要單獨的loadbalancer進行支持 | NamerServer進行負載均衡 |
架構圖:bash
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
@Override
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
* non-null.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
}
複製代碼
public class Consumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
public Consumer(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
@Override
public void run() {
while (true) {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1).getSeconds());
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}
}
複製代碼
public class KafkaProperties {
public static final String TOPIC = "topic1";
public static final String KAFKA_SERVER_URL = "localhost";
public static final int KAFKA_SERVER_PORT = 9092;
public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
public static final int CONNECTION_TIMEOUT = 100000;
public static final String TOPIC2 = "topic2";
public static final String TOPIC3 = "topic3";
public static final String CLIENT_ID = "SimpleConsumerDemoClient";
private KafkaProperties() {}
}
複製代碼
小尾巴走一波,歡迎關注個人公衆號,不按期分享編程、投資、生活方面的感悟:)服務器