Kafka是一個相似於RabbitMQ的消息系統,它的主要功能是消息的發佈和訂閱、處理和存儲。java
1.它相似於一個消息系統,讀寫流式的數據。apache
2.編寫可擴展的流應用處理程序,用於實時事件響應的場景。bootstrap
3.安全的將流式的數據存儲在一個分佈式,有副本備份,容錯的集羣。安全
本篇博文主要介紹如何使用Java編寫程序將數據寫入到Kafka中,即Kafka生產者,並不涉及Kafka消費者。另外,像Spark,Storm等都有相應的程序從Kafka消費者中獲取數據的方法,直接調用便可。服務器
Kafka的運行須要Zookeeper的幫助,因此,須要先安裝Zookeeper。分佈式
1.先啓動Zookeeperthis
bin/zookeeper-server-start.sh config/zookeeper.properties
再啓動Kafka服務器:spa
bin/kafka-server-start.sh config/server.properties
2.建立一個Topic:code
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
顯示topicorm
bin/kafka-topics.sh --list --zookeeper localhost:2181
也能夠在程序中進行topic的建立。
3.發送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
4.接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
下面,是本次的程序:
1 import org.apache.kafka.clients.producer.*; 2 3 import java.util.Properties; 4 import java.util.concurrent.ExecutionException; 5 6 public class MyProducer extends Thread { 7 private final KafkaProducer<Integer, String> producer; 8 private final String topic; 9 private final Boolean isAsync; 10 11 public MyProducer(String topic, Boolean isAsync) { 12 Properties prop = new Properties(); 13 prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 14 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); 15 prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 16 producer = new KafkaProducer<Integer, String>(prop); 17 this.topic = topic; 18 this.isAsync = isAsync; 19 } 20 21 public void run() { 22 int messageNo = 1; 23 while (true) { 24 String messageStr = "Message_" + messageNo; 25 long startTime = System.currentTimeMillis(); 26 if (isAsync) { 27 producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr), new DemoCallback(startTime, messageNo, messageStr)); 28 } else { 29 try { 30 producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr)).get(); 31 System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); 32 } catch (InterruptedException | ExecutionException e) { 33 e.printStackTrace(); 34 } 35 } 36 ++messageNo; 37 } 38 } 39 public static void main(String[] args) { 40 boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); 41 42 MyProducer producerThread = new MyProducer("test", isAsync); 43 producerThread.start(); 44 45 } 46 } 47 48 class DemoCallback implements Callback { 49 private final long startTime; 50 private final int key; 51 private final String message; 52 53 public DemoCallback(long startTime, int key, String message) { 54 this.startTime = startTime; 55 this.key = key; 56 this.message = message; 57 } 58 59 public void onCompletion(RecordMetadata metadata, Exception exception) { 60 long elapsedTime = System.currentTimeMillis() - startTime; 61 if (metadata != null) { 62 System.out.println( 63 "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + 64 "), " + 65 "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); 66 } else { 67 exception.printStackTrace(); 68 } 69 } 70 }
好了,完成!