kafka是用於構建實時數據管道和流應用程序。具備橫向擴展,容錯,wicked fast(變態快)等優勢,並已在成千上萬家公司運行。spring
producer:生產者,就是它來生產「叉燒包」的飯堂阿姨。
consumer:消費者,生產出來的「叉燒包」它來消費。
topic:你把它理解爲標籤,生產者每生產出來一個叉燒包就貼上一個標籤(topic),消費者可不是誰生產的「叉燒包」都吃的,這樣不一樣的生產者生產出來的「叉燒包」,消費者就能夠選擇性的「吃」了。
broker:就是蒸籠了。apache
因此整個過程能夠以下形象的說明:bootstrap
飯堂阿姨製做一個叉燒包,消費者就消費一個叉燒包。
1.假設消費者消費叉燒包的時候噎住了(系統宕機了),生產者還在生產叉燒包,那新生產的叉燒包就丟失了。
2.再好比生產者很強勁(大交易量的狀況),生產者1秒鐘生產100個叉燒包,消費者1秒鐘只能吃50個叉燒包,那要不了一會,消費者就吃不消了(消息堵塞,最終致使系統超時),消費者拒絕再吃了,」叉燒包「又丟失了。
3.這個時候咱們放個籃子在它們中間,生產出來的叉燒包都放到籃子裏,消費者去籃子裏拿叉燒包,這樣叉燒包就不會丟失了,都在籃子裏,而這個籃子就是」kafka「。
4.叉燒包其實就是「數據流」,系統之間的交互都是經過「數據流」來傳輸的(就是tcp、http什麼的),也稱爲報文,也叫「消息」。
5.消息隊列滿了,其實就是籃子滿了,」叉燒包「 放不下了,那趕忙多放幾個籃子,其實就是kafka的擴容。
因此說 kafka == 籃子。
1.因爲kafka須要zookeeper的。因此您能夠參考【談談zookeeper】
2.kafka安裝
2.1下載地址:http://mirror.bit.edu.cn/apac...
2.2 配置:
(注:KAFKA_HOME爲你配置的環境變量。hadoop01爲你配置hosts)
編輯$KAFKA_HOME/config/下的server.properties文件
server.propertiesspringboot
broker.id=0 #listeners=PLAINTEXT://:9092 log.dirs=/root/app/tmp/kafkalog num.partitions=1 zookeeper.connect=hadoop01:2181
2.3 多broker的kafka安裝配置app
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1
config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
啓動kafkatcp
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
建立topicoop
bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic hello_topic
查看topic測試
./kafka-topics.sh --list --zookeeper hadoop01:2181
查看指定topic的詳細信息spa
kafka-topics.sh --describe --zookeeper hadoop01:2181
生產消息code
./kafka-console-producer.sh --broker-list hadoop01:9092 --topic hello_topic
消費消息
./kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic hello_topic --from-beginning
0.9.0版本的用下面的命令
./kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic hello_topic --from-beginning
解析:--from-beginning:是從producer開始的位置開始拿數據的。
特別注意(巨坑):kafka有不少版本的。各版本對應使用的springboot或者jar是不同。請參考spring官網的說明:http://spring.io/projects/spr...
本文使用的是springboot1.5系列+0.10.0.x的
pom.xml
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.0.5.RELEASE</version> </dependency>
生產者代碼
主要是向kafka服務發送消息(生產消息)。
/** * 測試kafka生產者 */ @RestController @RequestMapping("kafka") public class TestKafkaProducerController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("send") public String send(String msg){ kafkaTemplate.send("hello_topic", msg); return "success"; } }
消費者代碼
從主題(topic)中獲取消息進行消費。
/** * kafka消費者測試 */ @Component public class TestConsumer { @KafkaListener(topics = "hello_topic") public void listen (ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); } }
yml配置文件
主要是配置kafka的服務地址。
spring: kafka: bootstrap-servers: 120.79.xxx.x:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test enable-auto-commit: true auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
本人水平有限,歡迎各位建議以及指正。順便關注一下公衆號唄,會常常更新文章的哦。