學習kafka教程(一)

簡介

kafka是用於構建實時數據管道和流應用程序。具備橫向擴展,容錯,wicked fast(變態快)等優勢,並已在成千上萬家公司運行。spring

kafka

目標

  • 瞭解kafka的基本原理
  • 掌握kafka的基本操做
  • kafka的深度探究在另外一篇文章。

相關概念

producer:生產者,就是它來生產「叉燒包」的飯堂阿姨。
consumer:消費者,生產出來的「叉燒包」它來消費。
topic:你把它理解爲標籤,生產者每生產出來一個叉燒包就貼上一個標籤(topic),消費者可不是誰生產的「叉燒包」都吃的,這樣不一樣的生產者生產出來的「叉燒包」,消費者就能夠選擇性的「吃」了。
broker:就是蒸籠了。apache

因此整個過程能夠以下形象的說明:bootstrap

飯堂阿姨製做一個叉燒包,消費者就消費一個叉燒包。
1.假設消費者消費叉燒包的時候噎住了(系統宕機了),生產者還在生產叉燒包,那新生產的叉燒包就丟失了。
2.再好比生產者很強勁(大交易量的狀況),生產者1秒鐘生產100個叉燒包,消費者1秒鐘只能吃50個叉燒包,那要不了一會,消費者就吃不消了(消息堵塞,最終致使系統超時),消費者拒絕再吃了,」叉燒包「又丟失了。
3.這個時候咱們放個籃子在它們中間,生產出來的叉燒包都放到籃子裏,消費者去籃子裏拿叉燒包,這樣叉燒包就不會丟失了,都在籃子裏,而這個籃子就是」kafka「。
4.叉燒包其實就是「數據流」,系統之間的交互都是經過「數據流」來傳輸的(就是tcp、http什麼的),也稱爲報文,也叫「消息」。
5.消息隊列滿了,其實就是籃子滿了,」叉燒包「 放不下了,那趕忙多放幾個籃子,其實就是kafka的擴容。
因此說 kafka == 籃子

安裝

1.因爲kafka須要zookeeper的。因此您能夠參考【談談zookeeper
2.kafka安裝
2.1下載地址:http://mirror.bit.edu.cn/apac...
2.2 配置:
(注:KAFKA_HOME爲你配置的環境變量。hadoop01爲你配置hosts)
編輯$KAFKA_HOME/config/下的server.properties文件
server.propertiesspringboot

broker.id=0
#listeners=PLAINTEXT://:9092
log.dirs=/root/app/tmp/kafkalog
num.partitions=1
zookeeper.connect=hadoop01:2181

2.3 多broker的kafka安裝配置app

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
config/server-2.properties:
   broker.id=2
   listeners=PLAINTEXT://:9094
   log.dir=/tmp/kafka-logs-2

經常使用操做命令

啓動kafkatcp

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

建立topicoop

bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic hello_topic

查看topic測試

./kafka-topics.sh --list --zookeeper hadoop01:2181

查看指定topic的詳細信息spa

kafka-topics.sh --describe --zookeeper hadoop01:2181

生產消息code

./kafka-console-producer.sh --broker-list hadoop01:9092 --topic hello_topic

消費消息

./kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic hello_topic --from-beginning

0.9.0版本的用下面的命令

./kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic hello_topic --from-beginning

解析:--from-beginning:是從producer開始的位置開始拿數據的。

Springboot操做kafka

特別注意(巨坑):kafka有不少版本的。各版本對應使用的springboot或者jar是不同。請參考spring官網的說明:http://spring.io/projects/spr...

本文使用的是springboot1.5系列+0.10.0.x的
pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.0.5.RELEASE</version>
</dependency>

生產者代碼
主要是向kafka服務發送消息(生產消息)。

/**
 * 測試kafka生產者
 */
@RestController
@RequestMapping("kafka")
public class TestKafkaProducerController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("send")
    public String send(String msg){
        kafkaTemplate.send("hello_topic", msg);
        return "success";
    }

}

消費者代碼
從主題(topic)中獲取消息進行消費。

/**
 * kafka消費者測試
 */
@Component
public class TestConsumer {

    @KafkaListener(topics = "hello_topic")
    public void listen (ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

yml配置文件
主要是配置kafka的服務地址。

spring:
  kafka:
    bootstrap-servers: 120.79.xxx.x:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

最後

本人水平有限,歡迎各位建議以及指正。順便關注一下公衆號唄,會常常更新文章的哦。
n平方

相關文章
相關標籤/搜索