Kafka是一個分佈式消息系統,由LinkedIn使用Scala語言編寫的,具備高水平擴展和高吞吐量 (每秒處理任務數)spring
目前流行的消息隊列主要有三種:ActiveMQ、RabbitMQ、Kafkaapache
*其中ActiveMQ、RabbitMQ均支持AMQP協議,Kafka有其本身的協議 (仿AMQP,並不通用),但目前愈來愈多的開源分佈式處理系統如Flume(日誌收集系統)、Storm(實時數據處理系統)、Spark(大數據通用處理平臺)、Elasticsearch(全文檢索系統)都支持與Kafka的集成。bootstrap
*動態擴容:在不需中止服務的前提下動態的增長或減小消息隊列服務器,Kafka的動態擴容是經過zookeeper實現的,zookeeper上保存着kafka的相關狀態信息 (topic、partition等)緩存
1.網站活動追蹤:用戶的活動追蹤、搜索、瀏覽、點擊率等,將操做信息發佈到不一樣的主題中,可對數據實時監控,統計分析用戶行爲。安全
2.日誌聚合:做爲一種日誌聚合的解決方案( 日誌聚合的做用在於能夠把來自不一樣服務器上不一樣應用程序產生的日誌聚合起來,存放在單一的服務器上,方便進行搜索和分析)服務器
3.數據集中管理:分佈式應用產生的數據統一存放在kafka集羣中,集中式管理,供其餘程序使用或後續對數據統計分析。session
AMQP(Advanced Message Queuing Protocol),高級消息隊列協議是一個統一消息服務的應用層協議,爲面向消息的中間件所設計,基於此協議的客戶端與消息中間件可相互傳遞消息,並不受客戶端和中間件的產品以及開發語言不一樣所限制。app
生產者(Producer):往消息隊列中發送消息的應用程序負載均衡
消費者(Consumer): 從消息隊列中獲取消息的應用程序異步
AMQP服務器(Broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列
*消息隊列以broker爲最小的運行單元,一個broker的運行就表明着一個Kafka應用程序實例。
*Kafka客戶端支持的語言:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript,可使用任何一種語言和Kafka服務器進行通訊,編寫本身的生產者與消費者客戶端程序。
*broker中能夠包含多個主題,每一個主題中又能夠包含多個分區。
一個主題相似新聞中的體育、娛樂、教育等分類概念,在實際工程中一般一個業務一個主題。
一個topic中由一到多個分區組成,分區是Kafka結構的最小單元,一個分區就是一個FIFO(First In First Out)的隊列,用於存放topic中的消息。
*Kafka的分區是提升Kafka性能的關鍵手段,當Kafka集羣的性能不高時,能夠試着往topic中添加分區。
、
*可見每一個分區都是一個先進先出的隊列,producer往broker中的指定topic發送消息,消息將經過負載均衡策略進入到相應的partition中。
*每一個消息都有一個連續的序列號叫作offset(偏移量),是消息在分區中的惟一標識。
*每一個consumer都須要維護一個當前已消費消息的偏移量,相似於指針,隨着consumer不斷的讀取消息,消費者的offset值也會不斷的增長,consumer也能夠以任意的順序讀取消息,只須要設置偏移量便可。
*每一個consumer讀取的偏移量都會同步給Kafka,在Kafka集羣中同時會維護各個consumer消費的偏移量(消息在分區中的偏移量是固定的,Consumer的偏移量是動態可變的,其至關於讀取的指針)
*在一個可配置的時間段內,Kafka集羣將保留全部發布的消息,無論這些消息是否被被消費。
*每一個分區在Kafka集羣的若干服務中都有副本( 數量可配置 ),使Kafka具有了容錯能力。
*在邏輯上相關的一組分區中,都由一個服務器做爲leader,其他服務器做爲follower,leader和follwer的選舉是隨機的,當follower接收到請求首先會發送給leader,由leader負責消息的讀和寫並將消息同步給各個follower,若是Leader所在節點宕機,followers中的一臺則會自動成爲leader。
Producer將消息發佈到Broker的指定Topic中,消息將根據負載均衡策略進入到相應的Partition中。
Kafka中提供了Consumer組的概念,一個Consumer組中包含若干個Consumer,總體對外可當作是一個消費者。
若全部的消費者都在同一個consumer組中則成爲隊列模式,topic中各個分區的消息僅能被組中分區個的惟一consumer消費,組下的consumer共同競爭topic中的分區。
若全部的消費者都不在同一個consumer組中則成爲發佈-訂閱模式,topic中各個分區的消息都會廣播給全部的consumer組。
因爲Kafka使用scala語言編寫,scale語言運行在JVM中,所以須要先安裝JDK而且配置好環境變量。
因爲Kafka中的狀態信息(topic、partition)都保存在zk上,雖然Kafka中自帶zk,但通常是使用外置的zk集羣,所以須要先安裝zk服務而且配置好zk集羣關係。
從Kafka官網中下載安裝包並進行解壓。
config是Kafka配置文件的存放目錄
*因爲多個Kafka服務(broker)都使用同一個zk集羣,所以在同一個zk集羣中的Kafka也就自動成爲集羣的關係,所以borker.id在同一個集羣中不能重複。
*Kafka中的消息是緩存到本地磁盤的(log.dirs目錄下),每一個topic的分區在broker的日誌路徑下都對應一個目錄,目錄下的.log文件用於存放分區中的消息,當有新消息進入分區時直接追加到文件中。
*若建立的topic其備份數大於1 (狀態保存在zk) ,則Kafka集羣中備份數個broker也會建立此topic,所以在其日誌路徑下也會存在此topic各個分區的目錄。
*在使用Kafka提供的消費者腳本文件時能夠指定其使用的配置文件。
*在程序中使用時須要手動設置配置項。
*在使用Kafka提供的生產者腳本文件時能夠指定其使用的配置文件。
*在程序中使用時須要手動設置配置項。
逐一啓動zk服務
逐一啓動kafka服務並指定使用的配置文件( service.properties文件中配置使用外置的ZK集羣)
建立名爲chat的topic,其備份數爲3而且每一個topic下存在3個分區 (因爲啓動了3個broker,所以topic的備份數最多隻能是3)
建立topic時需指定zk服務地址,zk中保存了topic的相關屬性(備份數和分區數),Kafka集羣再從zk服務中獲取topic屬性信息並在Kafka集羣中備份數個節點建立該topic。
查看各個broker中的日誌目錄,可見目錄下都生成了chat-0、chat-一、chat2分別表示chat主題中的第一個、第二個、第三個分區,每一個分區中都有.log文件存放分區中的消息。
查看Kafka集羣中chat主題下各個分區的狀態
Topic:主題名稱
PartitionCount:主題包含的分區數
ReplicationFactor:topic的備份數
Partition:分區號
Leader:充當leader的broker節點
Replicas:存在備份的broker節點(無論存活)
Isr:存在備份而且存活的broker節點
往Kafka集羣中的chat主題發送消息
*消息將會根據負載均衡機制隨機進入分區
*因爲使用腳本文件啓動消費者時沒有指定使用的配置文件,因此三個消費者都不是同一個消費者組中,所以三個消費者都可以消費到chat主題中各個分區的消息。
*啓動了三個消費者並指定使用的配置文件,默認的group.id是test-consumer-group,所以三個消費者都屬於同一個消費者組中,topic中各個分區僅能被組下的惟一一個consumer消費。
*因爲啓動第一個消費者時,消費者組下只有一個消費者,所以消息都會被此消費者消費,當往消費者組中添加新的消費者而且生產者往主題添加消息時,此時消費者會從新競爭消息。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.1</version>
</dependency>
ZkUtils zkUtils = ZkUtils.apply("192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); // 建立一個名爲chat的主題其包含2個分區,備份數是3
AdminUtils.createTopic(zkUtils, "chat", 2, 3, new Properties(), RackAwareMode.Enforced$.MODULE$); zkUtils.close();
//建立Properties對象用於封裝配置項
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //建立KafkaProducer生產者實例
Producer<String, String> producer = new KafkaProducer<>(props); //建立生產端消息實體ProducerRecord並指定消息上傳的topic名稱、消息的Key、消息的Value(消息由Key Value兩部分組成)
ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic","key","value"); //發送消息
producer.send(record); //關閉鏈接
producer.close();
*KafkaProducer是線程安全的,在線程之間能夠共享單個生產者實例。
*send()方法是異步的,一旦消息被保存在待發送緩衝區中此方法就當即返回,其返回Future<RecordMetadata>實例,當調用該實例的get()方法時將會阻塞直到服務器對請求進行應答(阻塞時長跟acks配置項有關),當服務器處理異常時將拋出異常。
*消息由Key、Value兩部分組成,Key值能夠重複,使用Kafka API往topic發送消息時,默認狀況下將會根據消息的Key值進行散列來決定消息到達的分區。
*生產者的緩衝區保留還沒有發送到服務器的消息,後臺I/O線程負責將這些消息轉換成請求發送到集羣,若使用後不關閉生產者則會泄露這些資源。
//建立Properties對象用於封裝配置項
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092"); props.put("group.id", "consumerA"); //自動提交Consumer的偏移量給Kafka服務
props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //建立KafkaConsumer消費者實例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱主題,一個消費者實例能夠訂閱多個主題
consumer.subscribe(Arrays.asList("chat", "hello")); //接收數據,消息存放在ConsumerRecords消息集合中
ConsumerRecords<String, String> records = consumer.poll(1000*5);
//遍歷消費端消息集合獲取ConsumerRecord消費端消息實體,一個消費端消息實體包含偏移量、消息Key值、消息Value值
for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.0.RELEASE</version> </dependency>
@Configuration @EnableKafka public class KafkaConsumerConfiguration { //Consumer配置 private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "bootKafka"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } //Consumer工廠Bean @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerProps()); } //Kafka監聽器工廠Bean @Bean public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
@Configuration @EnableKafka public class KafkaProducerConfiguration { //Producer配置 private Map<String, Object> senderProps (){ Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } //Producer工廠Bean @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(senderProps()); } //kafkaTemplate,用於向Topic發送消息 @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory()); return template; } }
@Component public class Producer { @Autowired private KafkaTemplate kafkaTemplate; public void send() throws Exception { kafkaTemplate.send("fruit", "apple"); } }
@Component public class Consumer { @KafkaListener(topics = "fruit") public void listen(String msgData) { } }