1、Kafka概述css
離線部分:
Hadoop->離線計算(hdfs / mapreduce) yarn
zookeeper->分佈式協調(動物管理員)
hive->數據倉庫(離線計算 / sql)easy coding
flume->數據採集
sqoop->數據遷移mysql->hdfs/hive hdfs/hive->mysql
Azkaban->任務調度工具
hbase->數據庫(nosql)列式存儲 讀寫速度
實時:
kafka
storm
官網:
http://kafka.apache.org/
ApacheKafka®是一個分佈式流媒體平臺
流媒體平臺有三個關鍵功能:
發佈和訂閱記錄流,相似於消息隊列或企業消息傳遞系統。
以容錯的持久方式存儲記錄流。
記錄發生時處理流。
Kafka一般用於兩大類應用:
構建可在系統或應用程序之間可靠獲取數據的實時流數據管道
構建轉換或響應數據流的實時流應用程序
2、kafka是什麼?java
在流計算中,kafka主要功能是用來緩存數據,storm能夠經過消費kafka中的數據進行流計算。
是一套開源的消息系統,由scala寫成。支持javaAPI的。
kafka最初由LinkedIn公司開發,2011年開源。
2012年從Apache畢業。
是一個分佈式消息隊列,kafka讀消息保存採用Topic進行歸類。
角色
發送消息:Producer(生產者)
接收消息:Consumer(消費者)
3、爲何要用消息隊列mysql
1)解耦
爲了不出現問題
2)拓展性
可增長處理過程
3)靈活
面對訪問量劇增,不會由於超負荷請求而徹底癱瘓。
4)可恢復
一部分組件失效,不會影響整個系統。能夠進行恢復。
5)緩衝
控制數據流通過系統的速度。
6)順序保證
對消息進行有序處理。
7)異步通訊
akka,消息隊列提供了異步處理的機制。容許用戶把消息放到隊列 , 不馬上處理。
4、kafka架構設計sql
kafka依賴zookeeper,用zk保存元數據信息。
搭建kafka集羣要先搭建zookeeper集羣。
zk在kafka中的做用?
保存kafka集羣節點狀態信息和消費者當前消費信息。
Kafka介紹數據庫
Kafka架構apache
5、kafka集羣安裝部署bootstrap
1)官網下載安裝包
2)上傳安裝包
把安裝包 kafka_2.11-2.0.0.tgz 放置在/root下
3)解壓安裝包
cd /root
tar -zxvf kafka_2.11-2.0.0.tgz -C hd
4)重命名
cd hd
mv kafka_2.11-2.0.0/ kafka
5)修改配置文件
cd /root/hd/kafka
mkdir logs
cd config
vi server.properties
broker.id=0 #每臺機器的id不一樣便可
delete.topic.enable=true #是否容許刪除主題
log.dirs=/root/hd/kafka/logs #運行日誌保存位置
zookeeper.connect=hd09-1:2181,hd09-2:2181,hd09-3:2181
6)配置環境變量
vi /etc/profile
最下面添加
#kafka_home
export KAFKA_HOME=/root/hd/kafka
export PATH=$PATH:$KAFKA_HOME/bin
生效環境變量
source /etc/profile
7)分發到其餘節點
cd /root/hd
scp -r kafka/ hd09-2:$PWD
scp -r kafka/ hd09-3:$PWD
8)修改其餘節點/root/hd/kafka/config/server.properties
broker.id=1 #hd09-2
broker.id=2 #hd09-3
9)啓動集羣
cd /root/hd/kafka
bin/kafka-server-start.sh config/server.properties &
10)關閉
cd /root/hd/kafka
bin/kafka-server-stop.sh
6、Kafka命令行操做緩存
1)查看當前集羣中已存在的主題topic
bin/kafka-topics.sh --zookeeper hd09-1:2181 --list
2)建立topic
bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partitions 1 --topic study
--zookeeper 鏈接zk集羣
--create 建立
--replication-factor 副本
--partitions 分區
--topic 主題名
3)刪除主題
bin/kafka-topics.sh --zookeeper hd09-1:2181 --delete --topic study
4)發送消息
生產者啓動:
bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic study
消費者啓動:
bin/kafka-console-consumer.sh --bootstrap-server hd09-1:9092 --topic study --from-beginning
5)查看主題詳細信息
bin/kafka-topics.sh --zookeeper hd09-1:2181 --describe --topic study
7、Kafka簡單API安全
一、Producer1類---kafka生產者API 接口回調服務器
package com.css.kafka.kafka_producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* kafka生產者API
*/
public class Producer1 {
public static void main(String[] args) {
//1.配置生產者屬性(指定多個參數)
Properties prop = new Properties();
//參數配置
//kafka節點的地址
prop.put("bootstrap.servers", "192.168.146.132:9092");
//發送消息是否等待應答
prop.put("acks", "all");
//配置發送消息失敗重試
prop.put("retries", "0");
//配置批量處理消息大小
prop.put("batch.size", "10241");
//配置批量處理數據延遲
prop.put("linger.ms", "5");
//配置內存緩衝大小
prop.put("buffer.memory", "12341235");
//配置在發送前必須序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2.實例化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//3.發送消息
for (int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>("test", "helloworld" + i));
}
//4.釋放資源
producer.close();
}
}
二、Producer2類---kafka生產者API 接口回調
package com.css.kafka.kafka_producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* kafka生產者API 接口回調
*/
public class Producer2 {
public static void main(String[] args) {
//1.配置生產者屬性(指定多個參數)
Properties prop = new Properties();
//參數配置
//kafka節點的地址
prop.put("bootstrap.servers", "192.168.146.132:9092");
//發送消息是否等待應答
prop.put("acks", "all");
//配置發送消息失敗重試
prop.put("retries", "0");
//配置批量處理消息大小
prop.put("batch.size", "10241");
//配置批量處理數據延遲
prop.put("linger.ms", "5");
//配置內存緩衝大小
prop.put("buffer.memory", "12341235");
//配置在發送前必須序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//自定義分區
prop.put("partitioner.class", "com.css.kafka.kafka_producer.Partition1");
//2.實例化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//3.發送消息
for (int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>("yuandan", "nice" + i), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
//若是metadata不爲null 拿到當前的數據偏移量與分區
if(metadata != null) {
System.out.println(metadata.topic() + "----" + metadata.offset() + "----" + metadata.partition());
}
}
});
}
//4.關閉資源
producer.close();
}
}
三、Partition1類---設置自定義分區
package com.css.kafka.kafka_producer;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
/**
* 設置自定義分區
*/
public class Partition1 implements Partitioner{
//設置
public void configure(Map<String, ?> configs) {
}
//分區邏輯
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 1;
}
//釋放資源
public void close() {
}
}
四、Consumer1類---消費者API
package com.css.kafka.kafka_consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* 消費者類
*/
public class Consumer1 {
public static void main(String[] args) {
//1.配置消費者屬性
Properties prop = new Properties();
//2.配置屬性
//指定服務器地址
prop.put("bootstrap.servers", "192.168.146.133:9092");
//配置消費者組
prop.put("group.id", "g1");
//配置是否自動確認offset
prop.put("enable.auto.commit", "true");
//序列化
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2.實例消費者
final KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
//4.釋放資源 線程安全
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
if (consumer != null) {
consumer.close();
}
}
}));
//訂閱消息主題
consumer.subscribe(Arrays.asList("test"));
//3.拉消息 推push 拉poll
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
//遍歷消息
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + "-----" + record.value());
}
}
}
}
五、Producer3類---kafka生產者API-帶攔截器
package com.css.kafka.interceptor;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* kafka生產者API 帶攔截器
*/
public class Producer3 {
public static void main(String[] args) {
//1.配置生產者屬性(指定多個參數)
Properties prop = new Properties();
//參數配置
//kafka節點的地址
prop.put("bootstrap.servers", "192.168.146.132:9092");
//發送消息是否等待應答
prop.put("acks", "all");
//配置發送消息失敗重試
prop.put("retries", "0");
//配置批量處理消息大小
prop.put("batch.size", "10241");
//配置批量處理數據延遲
prop.put("linger.ms", "5");
//配置內存緩衝大小
prop.put("buffer.memory", "12341235");
//配置在發送前必須序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//攔截器
ArrayList<String> inList = new ArrayList<String>();
inList.add("com.css.kafka.interceptor.TimeInterceptor");
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, inList);
//2.實例化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//3.發送消息
for (int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>("test", "helloworld" + i));
}
//4.釋放資源
producer.close();
}
}
六、TimeInterceptor類---攔截器類
package com.css.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 攔截器類
*/
public class TimeInterceptor implements ProducerInterceptor<String, String>{
//配置信息
public void configure(Map<String, ?> configs) {
}
//業務邏輯
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<String, String>(
record.topic(),
record.partition(),
record.key(),
System.currentTimeMillis() + "-" + record.value());
}
//發送失敗調用
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
//關閉資源
public void close() {
}
}
七、kafka的maven依賴
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>