Kafka簡介及使用

1、Kafka概述css

    離線部分:
    Hadoop->離線計算(hdfs / mapreduce) yarn
    zookeeper->分佈式協調(動物管理員)
    hive->數據倉庫(離線計算 / sql)easy coding
    flume->數據採集
    sqoop->數據遷移mysql->hdfs/hive hdfs/hive->mysql
    Azkaban->任務調度工具
    hbase->數據庫(nosql)列式存儲 讀寫速度
    實時:
    kafka
    storm
    官網:
    http://kafka.apache.org/
    ApacheKafka®是一個分佈式流媒體平臺
    流媒體平臺有三個關鍵功能:
    發佈和訂閱記錄流,相似於消息隊列或企業消息傳遞系統。
    以容錯的持久方式存儲記錄流。
    記錄發生時處理流。
    Kafka一般用於兩大類應用:
    構建可在系統或應用程序之間可靠獲取數據的實時流數據管道
    構建轉換或響應數據流的實時流應用程序

2、kafka是什麼?java

    在流計算中,kafka主要功能是用來緩存數據,storm能夠經過消費kafka中的數據進行流計算。
    是一套開源的消息系統,由scala寫成。支持javaAPI的。
    kafka最初由LinkedIn公司開發,2011年開源。
    2012年從Apache畢業。
    是一個分佈式消息隊列,kafka讀消息保存採用Topic進行歸類。
    角色
    發送消息:Producer(生產者)
    接收消息:Consumer(消費者)

3、爲何要用消息隊列mysql

    1)解耦
    爲了不出現問題
    2)拓展性
    可增長處理過程
    3)靈活
    面對訪問量劇增,不會由於超負荷請求而徹底癱瘓。
    4)可恢復
    一部分組件失效,不會影響整個系統。能夠進行恢復。
    5)緩衝
    控制數據流通過系統的速度。
    6)順序保證
    對消息進行有序處理。
    7)異步通訊
    akka,消息隊列提供了異步處理的機制。容許用戶把消息放到隊列 , 不馬上處理。

4、kafka架構設計sql

    kafka依賴zookeeper,用zk保存元數據信息。
    搭建kafka集羣要先搭建zookeeper集羣。
    zk在kafka中的做用?
    保存kafka集羣節點狀態信息和消費者當前消費信息。

Kafka介紹數據庫

Kafka架構apache

5、kafka集羣安裝部署bootstrap

    1)官網下載安裝包
    2)上傳安裝包
    把安裝包 kafka_2.11-2.0.0.tgz 放置在/root下
    
    3)解壓安裝包
    cd /root
    tar -zxvf kafka_2.11-2.0.0.tgz -C hd
    
    4)重命名
    cd hd
    mv kafka_2.11-2.0.0/ kafka
    
    5)修改配置文件
    cd /root/hd/kafka
    mkdir logs
    cd config
    vi server.properties
    broker.id=0 #每臺機器的id不一樣便可
    delete.topic.enable=true #是否容許刪除主題
    log.dirs=/root/hd/kafka/logs #運行日誌保存位置
    zookeeper.connect=hd09-1:2181,hd09-2:2181,hd09-3:2181
    
    6)配置環境變量
    vi /etc/profile
    最下面添加
    #kafka_home
    export KAFKA_HOME=/root/hd/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

    生效環境變量
    source /etc/profile
    
    7)分發到其餘節點
    cd /root/hd
    scp -r kafka/ hd09-2:$PWD
    scp -r kafka/ hd09-3:$PWD
    
    8)修改其餘節點/root/hd/kafka/config/server.properties
    broker.id=1 #hd09-2
    broker.id=2 #hd09-3
    
    9)啓動集羣
    cd /root/hd/kafka
    bin/kafka-server-start.sh config/server.properties &
    10)關閉
    cd /root/hd/kafka
    bin/kafka-server-stop.sh

6、Kafka命令行操做緩存

    1)查看當前集羣中已存在的主題topic
    bin/kafka-topics.sh --zookeeper hd09-1:2181 --list
    
    2)建立topic
    bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partitions 1 --topic study
    
    --zookeeper 鏈接zk集羣
    --create 建立
    --replication-factor 副本
    --partitions 分區
    --topic 主題名
    
    3)刪除主題
    bin/kafka-topics.sh --zookeeper hd09-1:2181 --delete --topic study
    
    4)發送消息
    生產者啓動:
    bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic study
    消費者啓動:
    bin/kafka-console-consumer.sh --bootstrap-server hd09-1:9092 --topic study --from-beginning
    
    5)查看主題詳細信息
    bin/kafka-topics.sh --zookeeper hd09-1:2181 --describe --topic study

7、Kafka簡單API安全

一、Producer1類---kafka生產者API 接口回調服務器

package com.css.kafka.kafka_producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * kafka生產者API
*/ public class Producer1 { public static void main(String[] args) { //1.配置生產者屬性(指定多個參數) Properties prop = new Properties(); //參數配置 //kafka節點的地址 prop.put("bootstrap.servers", "192.168.146.132:9092"); //發送消息是否等待應答 prop.put("acks", "all"); //配置發送消息失敗重試 prop.put("retries", "0"); //配置批量處理消息大小 prop.put("batch.size", "10241"); //配置批量處理數據延遲 prop.put("linger.ms", "5"); //配置內存緩衝大小 prop.put("buffer.memory", "12341235"); //配置在發送前必須序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //2.實例化producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); //3.發送消息 for (int i = 0; i < 99; i++) { producer.send(new ProducerRecord<String, String>("test", "helloworld" + i)); } //4.釋放資源 producer.close(); } }

二、Producer2類---kafka生產者API 接口回調

package com.css.kafka.kafka_producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * kafka生產者API  接口回調
*/ public class Producer2 { public static void main(String[] args) { //1.配置生產者屬性(指定多個參數) Properties prop = new Properties(); //參數配置 //kafka節點的地址 prop.put("bootstrap.servers", "192.168.146.132:9092"); //發送消息是否等待應答 prop.put("acks", "all"); //配置發送消息失敗重試 prop.put("retries", "0"); //配置批量處理消息大小 prop.put("batch.size", "10241"); //配置批量處理數據延遲 prop.put("linger.ms", "5"); //配置內存緩衝大小 prop.put("buffer.memory", "12341235"); //配置在發送前必須序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //自定義分區 prop.put("partitioner.class", "com.css.kafka.kafka_producer.Partition1"); //2.實例化producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); //3.發送消息 for (int i = 0; i < 99; i++) { producer.send(new ProducerRecord<String, String>("yuandan", "nice" + i), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { //若是metadata不爲null 拿到當前的數據偏移量與分區 if(metadata != null) { System.out.println(metadata.topic() + "----" + metadata.offset() + "----" + metadata.partition()); } } }); } //4.關閉資源 producer.close(); } }

三、Partition1類---設置自定義分區

package com.css.kafka.kafka_producer;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

/**
 * 設置自定義分區
*/ public class Partition1 implements Partitioner{ //設置 public void configure(Map<String, ?> configs) { } //分區邏輯 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 1; } //釋放資源 public void close() { } }

四、Consumer1類---消費者API

package com.css.kafka.kafka_consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * 消費者類
*/ public class Consumer1 { public static void main(String[] args) { //1.配置消費者屬性 Properties prop = new Properties(); //2.配置屬性 //指定服務器地址 prop.put("bootstrap.servers", "192.168.146.133:9092"); //配置消費者組 prop.put("group.id", "g1"); //配置是否自動確認offset prop.put("enable.auto.commit", "true"); //序列化 prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //2.實例消費者 final KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop); //4.釋放資源 線程安全 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { if (consumer != null) { consumer.close(); } } })); //訂閱消息主題 consumer.subscribe(Arrays.asList("test")); //3.拉消息 推push 拉poll while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); //遍歷消息 for (ConsumerRecord<String, String> record : records) { System.out.println(record.topic() + "-----" + record.value()); } } } }

五、Producer3類---kafka生產者API-帶攔截器

package com.css.kafka.interceptor;

import java.util.ArrayList;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * kafka生產者API 帶攔截器
*/ public class Producer3 { public static void main(String[] args) { //1.配置生產者屬性(指定多個參數) Properties prop = new Properties(); //參數配置 //kafka節點的地址 prop.put("bootstrap.servers", "192.168.146.132:9092"); //發送消息是否等待應答 prop.put("acks", "all"); //配置發送消息失敗重試 prop.put("retries", "0"); //配置批量處理消息大小 prop.put("batch.size", "10241"); //配置批量處理數據延遲 prop.put("linger.ms", "5"); //配置內存緩衝大小 prop.put("buffer.memory", "12341235"); //配置在發送前必須序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //攔截器 ArrayList<String> inList = new ArrayList<String>(); inList.add("com.css.kafka.interceptor.TimeInterceptor"); prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, inList); //2.實例化producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); //3.發送消息 for (int i = 0; i < 99; i++) { producer.send(new ProducerRecord<String, String>("test", "helloworld" + i)); } //4.釋放資源 producer.close(); } }

六、TimeInterceptor類---攔截器類

package com.css.kafka.interceptor;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * 攔截器類
*/ public class TimeInterceptor implements ProducerInterceptor<String, String>{ //配置信息 public void configure(Map<String, ?> configs) { } //業務邏輯 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord<String, String>( record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "-" + record.value()); } //發送失敗調用 public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } //關閉資源 public void close() { } }

 七、kafka的maven依賴

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.0.0</version>
    </dependency>
相關文章
相關標籤/搜索