【中間件】Kafka消息中間件的簡單介紹及應用

1、介紹

  注:本文所指版本Kafka 1.1html

  Kafka是由Apache開發的一款發佈訂閱消息系統,是分佈式的,分區的重複的日誌服務。java

 一、爲何要用kafka?

  ①、解耦web

     容許兩方修改處理過程,只要遵循共同的接口約束。apache

  ②、靈活性和峯值處理能力json

     面對忽然增長的吞吐量有很好應對,發送信息量50M,消費信息量100M。bootstrap

  ③、消息冗餘後端

     消息隊列把數據持久化直到已經徹底被處理。與以往消息「插入 - 獲取 - 刪除」不一樣,在刪除消息時,必須肯定消息已被處理完畢。api

  ④、擴展性緩存

     擴展性較好,只要增長入隊和消費處理過程便可。服務器

  ⑤、順序保證

     針對消息順序的重要性,kafka保證一個partition內的消息有序性。

    

 二、應用場景

  ①、日誌收集

    ELK日誌採集框架中,利用kafka同Logstash來收集服務端日誌。

  ②、消息系統

    解耦生產者與消費者,緩存消息,實現異步處理。

  ③、實現消息 「發佈-訂閱模式「

    對於不一樣消費者消費同一消息,利用Kafka實現:同一個topic中的消息只能被同一個Consumer Group中的一個消費者消費,但能夠被多個Consumer Group消費這一消息。

  ④、用戶活動跟蹤

    Kafka常常被用來記錄web用戶或者app用戶的各類活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,而後訂閱者經過訂閱這些topic來作實時的監控分析,或者裝載到Hadoop、數據倉庫中作離線分析和挖掘。

  ⑤、運營指標

    Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告。

 

2、Kafka分析

 大概介紹些知識點,做爲一個後端開發,瞭解如何用及基本原理就能夠了。

 

kafka官網給出的交互流程

 

 

消息處理流程

 

 

producer.send(ProducerRecord<K,V> record);  生產者在發送消息時,沒有找到topic,會自動建立???

Broker Configs

  auto.create.topics.enable = true (默認爲true,若沒找到topic則自動建立)

 

 

3、具體應用

注意我這裏有些值寫的是僞代碼,還需封裝到一個公共類中調取。如kafka地址等。

另外,下列代碼全手打,有錯誤的地方請指正。

一、pom依賴

producer和consumer均依賴 kafka-clients.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>

二、Producer

2.一、producer configs

Properties properties = new Properties();
properties.put("bootstrap.servers", "ipAndPort"); //指定kafka服務地址,集羣的狀況用逗號分隔,如:host1:port1,host2:port2 ...
properties.put("acks", "all");//表示完成Requests前須要認可的數量。 0:無需認可直接發送到socket  1:須要leader認可  all/-1:須要所有認可後發送
properties.put("retries", 0);//發生錯誤時,重傳次數。當開啓重傳時,須要將`max.in.flight.requests.per.connection`設置爲1,不然可能致使失序
properties.put("batch.size", 16384);
properties.put("linger.ms", 1); //1毫秒,簡單講,就是延時1ms,把期間收集到的全部Requests聚合到一塊兒發送,以此提升吞吐量
properties.put("buffer.memory", 33554432);//默認值就是 33554443,緩存數據的內存大小;若生產速度大於Producer向Broker發送速度,會阻塞超時拋出異常
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定扔到kafka的鍵值對中鍵的類型,實例包下還有Long、Double、Short等等
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定鍵值對的值類型,具體同上

 

2.二、KafkaProducerUtil.java

這裏定義一個Producer的工具類,封裝了producer的初始化及發送消息到指定topic。

public class KafkaProducerUtil {
    private static Logger logger = Logger.getLogger(KafkaProducerUtil.class);
    private static Producer<String, String> producer;

  /** 作一個簡單的單例模式,實例化producer對象 **/
private static Producer<String, String> getProducer() { if (producer == null) { synchronized (KafkaProducerUtil.class) { if (producer == null) { Properties properties = new Properties(); //寫入配置信息供初始化Producer producer = new KafkaProducer<String, String>(properties); } } } return producer; }
  /**
   * send record to topic
   */
public static void sendToKafka(String topic, String message, Long timeOut) throws InterruptedException, ExecutionException, TimeoutException { producer = getProducer(); producer.send(new ProducerRecord<String, String>(topic, message)).get(timeOut, TimeUnit.SECONDS); logger.info("sendToKafka:" + message); } }

 

2.三、KafkaProducerService.java

提供工業務層調用的接口,這裏作了http請求方式的兼容處理。

public interface KafkaProducerService {
/**
* @param httpUrl 經過http發送請求的方式調用地址
* @param code 生成topic
* @param request 請求數據
* @param version 加簽
* @param timeOut 超時時間
*/
void send(String httpUrl, String code, Object request, String version, Long timeout);
}
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {
  
  private static Logger logger = LoggerFactory.getLogger(KafkaProducerServiceImpl.class);

  public void send(String httpUrl, String code, Object request, String version, Long timeout) {
    String isOpenFlag = "能夠做爲系統參數,不一樣環境有不一樣的啓用程度,開啓走kafka,關閉走http方式";
    if ("open" == isOpenFlag) {
      sendByKafka(httpUrl, code, request, version, timeout);
    } else {
      sendByHttp(...);
    }
  }

  private void sendByKafka(String httpUrl, String code, Object request, String version, Long timeout) {
    // 按必定規則拼接topic
    String topic = "XXX能夠依環境決定,也能夠依系統決定" + "_" + code;
    // 取加簽私鑰
    String privateKey = "自行封裝";
    logger.info("topic:" + topic + "請求報文:" + JSON.toJSONOString(request, SerializerFeature,WriteMapNullValue));
    // 報文加簽
    Object producerObject = SecurityUtil.digest(request, privateKey, version);
    String producerRecord = JSON.toJSONString(producerObject, SerializerFeature.WriteNullStringAsEmpty, SerializerFeature.WriteMapNullValue);
    try {
      KafkaProducerUtil.sendToKafka(topic, producerRecord, timeout);
    } catch (RuntimeException e) {
      //出現異常,改用http調用
      logger.error(e.getMessage());
      sendByHttp(httpUrl, request, version);
    }
  }

  private void sendByHttp(String httpUrl, Strign request, String version) {
    //TODO 自行封裝,一般狀況加驗籤,把請求報文打印日誌,轉成json格式發送至api接口,此處不過多贅述。
  }
}

以上基本知足Producer方使用。

三、Consumer

3.一、consumer configs

Properties props = new Properties();
props.put("bootstrap.servers", "IpAndPort");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");//自動提交時間間隔,前提是 enable.auto.commit設置爲true
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//指定接受到數據的鍵值對類型
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "unique group id");//指定consumer的惟一group

 

 3.二、KafkaConsumerInit.java

consumer方初始化類,封裝參數配置、線程定義,業務接口調用等。

public class KafkaConsumerInit extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerInit.class);

    private final AtomicBoolean closed = new AtomicBoolean(false);
    KafkaConsumer consumer;
    //此處格式<topic對應code,對應業務bean名>
    private Map<String, String> topicsAndBeans;

    public KafkaConsumerInit(Map<String, String> topicsAndBeans) {
        super();
        // 作轉換,根據部署環境,topic加環境的前綴
        Set<String> codes= topicsAndBeans.keySet();
        Map<String, String> realTopicsAndBeans = new HashMap<String, String>();
        for (String code : codes) {
            String topic = "XXX能夠依環境決定,也能夠依系統決定" + "_" + code;
            realTopicsAndBeans.put(topic, topicsAndBeans.get(code));
        }
        this.topicsAndBeans = realTopicsAndBeans;
    }

    @Override
    public void run() {
        String isOpenFlag = "XXX";
        if (!"open".equals(isOpenFlag)) {
            logger.info("===============配置文件設置 KafkaConsumer 不啓動===============");
            return;
        }
        logger.info("===============啓動KafkaConsumer===============");
        try {
            Properties props = new Properties();
            //TODO 添加consumer configs
            consumer = new KafkaConsumer<>(props);
            //給consumer註冊topics 類型Collection<String>
            consumer.subscribe(topicsAndBeans.keySet());
            logger.info("初始化consumer參數");
            while (!closed.get()) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        logger.info("offset = " + record.offset() + ", value = " + record.value());
                        logger.info(record.value());
               //定義一個統一接口,不一樣業務實現同一接口。 CommunicationConsumerService communicationHandleService
= (CommunicationConsumerService) ApplicationContext.getContext() .getBean(topicsAndBeans.get(record.topic())); communicationHandleService.doHandle(record.value()); } catch (Exception e) { logger.error("數據處理異常:" + record.value()); logger.error(e.getMessage(), e); } } sleep(1000); } } catch (WakeupException e) { logger.error(e.getMessage(), e); if (!closed.get()) { throw e; } } catch (InterruptedException e) { logger.error(e.getMessage(), e); } finally { consumer.close(); } } }

 

3.三、KafkaConsumerListener.java

服務啓動後初始化kafka,這裏是利用基於Spring的ApplicationListener接口實現的,若果這方面知識還不清楚,先請點這裏

@Component
public class KafkaConsumerListener implements ApplicationListener<ApplicationEvent> {
    private static final Logger logger = LogManager.getLogger(KafkaConsumerListener.class);

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        logger.info("==============添加訂閱的topic和對應的處理方法===============");
        Map<String, String> topicAndBeans = new HashMap<String, String>();
        topicAndBeans.put("codeForTopic1", "beanImpl1");
        topicAndBeans.put("codeForTopic2", "beanImpl2");
        topicAndBeans.put("codeForTopic3", "beanImpl3");
        //實例化consumer
        new KafkaConsumerInit(topicAndBeans).start();
    }
}

 

2.3.四、ComsumerService.java

具體業務層實現及封裝調用

public interface ConsumerService {
    /**
     * @param message
     */
    void doHandle(String message);
}
@Serivce
public class BeanImpl1 implements ConsumerService {
    public void doHandle(String message) {
        //TODO  集體業務實現          
    }
}

以上基本知足consumer方使用。

相關文章
相關標籤/搜索