Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是爲處理實時數據提供一個統1、高吞吐、低延遲的平臺。其持久化層本質上是一個「按照分佈式事務日誌架構的大規模發佈/訂閱消息隊列」,這使它做爲企業級基礎設施來處理流式數據很是有價值。此外,Kafka能夠經過Kafka Connect鏈接到外部系統(用於數據輸入/輸出),並提供了Kafka Streams——一個Java流式處理庫。該設計受事務日誌的影響較大。java
Kafka是一個分佈式數據流平臺,能夠運行在單臺服務器上,也能夠在多臺服務器上部署造成集羣。它提供了發佈和訂閱功能,使用者能夠發送數據到Kafka中,也能夠從Kafka中讀取數據(以便進行後續的處理)。Kafka具備高吞吐、低延遲、高容錯等特色。下面介紹一下Kafka中經常使用的基本概念:node
消息隊列中經常使用的概念,在Kafka中指部署了Kafka實例的服務器節點。linux
用來區分不一樣類型信息的主題。好比應用程序A訂閱了主題t1,應用程序B訂閱了主題t2而沒有訂閱t1,那麼發送到主題t1中的數據將只能被應用程序A讀到,而不會被應用程序B讀到。git
每一個topic能夠有一個或多個partition(分區)。分區是在物理層面上的,不一樣的分區對應着不一樣的數據文件。Kafka使用分區支持物理上的併發寫入和讀取,從而大大提升了吞吐量。github
實際寫入Kafka中並能夠被讀取的消息記錄。每一個record包含了key、value和timestamp。web
生產者,用來向Kafka中發送數據(record)。spring
消費者,用來讀取Kafka中的數據(record)。apache
一個消費者組能夠包含一個或多個消費者。使用多分區+多消費者方式能夠極大提升數據下游的處理速度。bootstrap
分區的表示: topic名字-分區的id每一個日誌文件都是一個Log Entry序列,每一個Log Entry包含一個4字節整型數值(值爲M+5),1個字節的"magic value",4個字節的CRC校驗碼,而後跟M個字節的消息這個log entries並不是由一個文件構成,而是分紅多個segment,每一個segment以該segment第一條消息的offset命名並以「.kafka」爲後綴。另外會有一個索引文件,它標明瞭每一個segment下包含的log entry的offset範圍分區中每條消息都有一個當前Partition下惟一的64字節的offset,它指明瞭這條消息的起始位置,Kafka只保證一個分區的數據順序發送給消費者,而不保證整個topic裏多個分區之間的順序vim
若是沒有一個Leader,全部Replica均可同時讀/寫數據,那就須要保證多個Replica之間互相(N×N條通路)同步數據,數據的一致性和有序性很是難保證,大大增長了Replication實現的複雜性,同時也增長了出現異常的概率。而引入Leader後,只有Leader負責數據讀寫,Follower只向Leader順序Fetch數據(N條通路),系統更加簡單且高效。
每個分區,根據複製因子N,會有N個副本,好比在broker1上有一個topic,分區爲topic-1, 複製因子爲2,那麼在兩個broker的數據目錄裏,就都有一個topic-1,其中一個是leader,一個replicas同一個Partition可能會有多個Replica,而這時須要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader交互,其它Replica做爲Follower從Leader中複製數據
目前主流使用場景基本以下:
在系統架構設計中,常常會使用消息隊列(Message Queue)——MQ。MQ是一種跨進程的通訊機制,用於上下游的消息傳遞,使用MQ可使上下游解耦,消息發送上游只須要依賴MQ,邏輯上和物理上都不須要依賴其餘下游服務。MQ的常見使用場景如流量削峯、數據驅動的任務依賴等等。在MQ領域,除了Kafka外還有傳統的消息隊列如ActiveMQ和RabbitMQ等。
Kafka最出就是被設計用來進行網站活動(好比PV、UV、搜索記錄等)的追蹤。能夠將不一樣的活動放入不一樣的主題,供後續的實時計算、實時監控等程序使用,也能夠將數據導入到數據倉庫中進行後續的離線處理和生成報表等。
Kafka常常被用來傳輸監控數據。主要用來聚合分佈式應用程序的統計數據,將數據集中後進行統一的分析和展現等。
不少人使用Kafka做爲日誌聚合的解決方案。日誌聚合一般指將不一樣服務器上的日誌收集起來並放入一個日誌中心,好比一臺文件服務器或者HDFS中的一個目錄,供後續進行分析處理。相比於Flume和Scribe等日誌聚合工具,Kafka具備更出色的性能。
因爲kafka依賴zookeeper環境因此先安裝zookeeper,zk安裝
安裝環境
linux: CentSO-7.5_x64 java: jdk1.8.0_191 zookeeper: zookeeper3.4.10 kafka: kafka_2.11-2.0.1
# 下載 $ wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz # 解壓 $ tar -zxvf kafka_2.11-2.1.0.tgz # 編輯配置文件修改一下幾個配置 $ vim $KAFKA_HOME/config/server.properties # 每臺服務器的broker.id都不能相同只能是數字 broker.id=1 # 修改成你的服務器的ip或主機名 advertised.listeners=PLAINTEXT://node-1:9092 # 設置zookeeper的鏈接端口,將下面的ip修改成你的IP稱或主機名 zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
$ cd $KAFKA_HOME # 分別在每一個節點啓動kafka服務(-daemon表示在後臺運行) $ bin/kafka-server-start.sh -daemon config/server.properties # 建立一個名詞爲 test-topic 的 Topic,partitions 表示分區數量爲3 --replication-factor 表示副本數量爲2 $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test-topic # 查看topic $ bin/kafka-topics.sh --list --zookeeper localhost:2181 # 查看topic狀態 $ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic # 查看topic詳細信息 $ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic # 修改topic信息 $ bin/kafka-topics.sh --alter --topic test-topic --zookeeper localhost:2181 --partitions 5 # 刪除topic(簡單的刪除,只是標記刪除) $ bin/kafka-topics.sh --delete --topic test-topic --zookeeper localhost:2181 # 在一臺服務器上建立一個 producer (生產者) $ bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092 --topic test-topic # 在一臺服務器上建立一個 consumer (消費者) $ bin/kafka-console-consumer.sh --bootstrap-server node-2:9092,node-3:9092,node-4:9092 --topic test-topic --from-beginning # 如今能夠在生產者的控制檯輸入任意字符就能夠看到消費者端有消費消息。
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; import java.util.Properties; /** * <p> * * @author leone * @since 2018-12-26 **/ public class JavaKafkaConsumer { private static Logger logger = LoggerFactory.getLogger(JavaKafkaConsumer.class); private static Producer<String, String> producer; private final static String TOPIC = "kafka-test-topic"; private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181"; private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092"; private static Properties properties; static { properties = new Properties(); properties.put("bootstrap.servers", KAFKA_BROKER); properties.put("group.id", "test"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer", StringDeserializer.class.getName()); } public static void main(String[] args) { final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList(TOPIC), new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> collection) { } public void onPartitionsAssigned(Collection<TopicPartition> collection) { // 將偏移設置到最開始 consumer.seekToBeginning(collection); } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { logger.info("offset: {}, key: {}, value: {}", record.offset(), record.key(), record.value()); } } } }
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.UUID; /** * <p> * * @author leone * @since 2018-12-26 **/ public class JavaKafkaProducer { private static Logger logger = LoggerFactory.getLogger(JavaKafkaProducer.class); private static Producer<String, String> producer; private final static String TOPIC = "kafka-test-topic"; private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181"; private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092"; private static Properties properties; static { properties = new Properties(); properties.put("bootstrap.servers", KAFKA_BROKER); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", StringSerializer.class.getName()); properties.put("value.serializer", StringSerializer.class.getName()); } public static void main(String[] args) { Producer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 200; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String uuid = UUID.randomUUID().toString(); producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), uuid)); logger.info("send message success key: {}, value: {}", i, uuid); } producer.close(); } }
import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.ConfigType; import kafka.utils.ZkUtils; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.security.JaasUtils; import org.junit.Test; import java.util.*; /** * <p> * * @author leone * @since 2018-12-26 **/ public class KafkaClient { private final static String TOPIC = "kafka-test-topic"; private static final String ZOOKEEPER_HOST = "node-2:2181,node-3:2181,node-4:2181"; private static final String KAFKA_BROKER = "node-2:9092,node-3:9092,node-4:9092"; private static Properties properties = new Properties(); static { properties.put("bootstrap.servers", KAFKA_BROKER); } /** * 建立topic */ @Test public void createTopic() { AdminClient adminClient = AdminClient.create(properties); List<NewTopic> newTopics = Arrays.asList(new NewTopic(TOPIC, 1, (short) 1)); CreateTopicsResult result = adminClient.createTopics(newTopics); try { result.all().get(); } catch (Exception e) { e.printStackTrace(); } } /** * 建立topic */ @Test public void create() { ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled()); // 建立一個3個分區2個副本名爲t1的topic AdminUtils.createTopic(zkUtils, "t1", 3, 2, new Properties(), RackAwareMode.Enforced$.MODULE$); zkUtils.close(); } /** * 查詢topic */ @Test public void listTopic() { ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled()); // 獲取 topic 全部屬性 Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "streaming-topic"); Iterator it = props.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = (Map.Entry) it.next(); System.err.println(entry.getKey() + " = " + entry.getValue()); } zkUtils.close(); } /** * 修改topic */ @Test public void updateTopic() { ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled()); Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "log-test"); // 增長topic級別屬性 props.put("min.cleanable.dirty.ratio", "0.4"); // 刪除topic級別屬性 props.remove("max.message.bytes"); // 修改topic 'test'的屬性 AdminUtils.changeTopicConfig(zkUtils, "log-test", props); zkUtils.close(); } /** * 刪除topic 't1' */ @Test public void deleteTopic() { ZkUtils zkUtils = ZkUtils.apply(ZOOKEEPER_HOST, 30000, 30000, JaasUtils.isZkSecurityEnabled()); AdminUtils.deleteTopic(zkUtils, "t1"); zkUtils.close(); } }
log4j.rootLogger=info, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>spring-boot-kafka</artifactId> <groupId>com.andy</groupId> <version>1.0.7.RELEASE</version> <packaging>jar</packaging> <modelVersion>4.0.0</modelVersion> <dependencyManagement> <dependencies> <dependency> <groupId>io.spring.platform</groupId> <artifactId>platform-bom</artifactId> <version>Cairo-SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.0.3.RELEASE</version> <configuration> <!--<mainClass>${start-class}</mainClass>--> <layout>ZIP</layout> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
spring: application: name: spring-jms kafka: bootstrap-servers: node-2:9092,node-3:9092,node-4:9092 producer: retries: batch-size: 16384 buffer-memory: 33554432 compressionType: snappy acks: all consumer: group-id: 0 auto-offset-reset: earliest enable-auto-commit: true
/** * <p> * * @author leone * @since 2018-12-26 **/ @ToString public class Message<T> { private Long id; private T message; private Date time; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public T getMessage() { return message; } public void setMessage(T message) { this.message = message; } public Date getTime() { return time; } public void setTime(Date time) { this.time = time; } }
import com.andy.jms.kafka.service.KafkaSender; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * <p> * * @author leone * @since 2018-12-26 **/ @Slf4j @RestController public class KafkaController { @Autowired private KafkaSender kafkaSender; @GetMapping("/kafka/{topic}") public String send(@PathVariable("topic") String topic, @RequestParam String message) { kafkaSender.send(topic, message); return "success"; } }
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * <p> * * @author leone * @since 2018-12-26 **/ @Slf4j @Component public class KafkaReceiver { @KafkaListener(topics = {"order"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("record:{}", record); log.info("message:{}", message); } } }
import com.andy.jms.kafka.commen.Message; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.util.Date; /** * <p> * * @author leone * @since 2018-12-26 **/ @Slf4j @Component public class KafkaSender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private ObjectMapper objectMapper; /** * * @param topic * @param body */ public void send(String topic, Object body) { Message<String> message = new Message<>(); message.setId(System.currentTimeMillis()); message.setMessage(body.toString()); message.setTime(new Date()); String content = null; try { content = objectMapper.writeValueAsString(message); } catch (JsonProcessingException e) { e.printStackTrace(); } kafkaTemplate.send(topic, content); log.info("send {} to {} success!", message, topic); } }
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author Leone * @since 2018-04-10 **/ @SpringBootApplication public class JmsApplication { public static void main(String[] args) { SpringApplication.run(JmsApplication.class, args); } }