JavaWeb項目架構之Kafka分佈式日誌隊列

架構、分佈式、日誌隊列,標題本身都看着唬人,其實就是一個日誌收集的功能,只不過中間加了一個Kafka作消息隊列罷了。java

kafka介紹

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。git

特性

Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,有以下特性:redis

  • 經過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即便數以TB的消息存儲也可以保持長時間的穩定性能。
  • 高吞吐量:即便是很是普通的硬件Kafka也能夠支持每秒數百萬的消息。
  • 支持經過Kafka服務器和消費機集羣來分區消息。
  • 支持Hadoop並行數據加載。

主要功能

  • 發佈和訂閱消息流,這個功能相似於消息隊列,這也是kafka歸類爲消息隊列框架的緣由spring

  • 以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流apache

  • 能夠再消息發佈的時候進行處理json

使用場景

  • 在系統或應用程序之間構建可靠的用於傳輸實時數據的管道,消息隊列功能服務器

  • 構建實時的流數據處理程序來變換或處理數據流,數據處理功能網絡

消息傳輸流程

相關術語介紹

  • Broker
    Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker
  • Topic
    每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)
  • Partition
    Partition是物理上的概念,每一個Topic包含一個或多個Partition.
  • Producer
    負責發佈消息到Kafka broker
  • Consumer
    消息消費者,向Kafka broker讀取消息的客戶端。
  • Consumer Group
    每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)

Kafka安裝

環境

Linux、JDK、Zookeepersession

下載二進制程序

wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz

安裝

tar -zxvf kafka_2.11-0.10.0.1.tgz
cd kafka_2.11-0.10.0.1

目錄說明

bin 啓動,中止等命令
config 配置文件
libs 類庫

參數說明

#########################參數解釋##############################

broker.id=0  #當前機器在集羣中的惟一標識,和zookeeper的myid性質同樣

port=9092 #當前kafka對外提供服務的端口默認是9092

host.name=192.168.1.170 #這個參數默認是關閉的

num.network.threads=3 #這個是borker進行網絡處理的線程數

num.io.threads=8 #這個是borker進行I/O處理的線程數

log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,若是配置多個目錄,新建立的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個

socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一會兒就發送的,先回存儲到緩衝區了到達必定的大小後在發送,能提升性能

socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達必定大小後在序列化到磁盤

socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小

num.partitions=1 #默認的分區數,一個topic默認1個分區數

log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天

message.max.byte=5242880  #消息保存的最大值5M

default.replication.factor=2  #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務

replica.fetch.max.bytes=5242880  #取消息的最大直接數

log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除

log.cleaner.enable=false #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能

zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218 #設置zookeeper的鏈接端口、若是非集羣配置一個地址便可

#########################參數解釋##############################

啓動kafka

啓動kafka以前要啓動相應的zookeeper集羣、自行安裝,這裏不作說明。數據結構

#進入到kafka的bin目錄
./kafka-server-start.sh -daemon ../config/server.properties

Kafka集成

環境

spring-boot、elasticsearch、kafka

pom.xml引入:

<!-- kafka 消息隊列 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.1.RELEASE</version>
</dependency>

生產者

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
 * 生產者
 * 建立者 科幫網
 * 建立時間 2018年2月4日
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;


    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

消費者

mport java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
/**
 * 消費者
 * 建立者 科幫網
 * 建立時間 2018年2月4日
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }
}

日誌監聽

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.itstyle.es.common.utils.JsonMapper;
import com.itstyle.es.log.entity.SysLogs;
import com.itstyle.es.log.repository.ElasticLogRepository;
/**
 * 掃描監聽
 * 建立者 科幫網
 * 建立時間 2018年2月4日
 */
@Component
public class Listener {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    @Autowired
    private  ElasticLogRepository elasticLogRepository;
    
    @KafkaListener(topics = {"itstyle"})
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("kafka的key: " + record.key());
        logger.info("kafka的value: " + record.value());
        if(record.key().equals("itstyle_log")){
            try {
                SysLogs log = JsonMapper.fromJsonString(record.value().toString(), SysLogs.class);
                logger.info("kafka保存日誌: " + log.getUsername());
                elasticLogRepository.save(log);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

測試日誌傳輸

/**
    * kafka 日誌隊列測試接口
    */
   @GetMapping(value="kafkaLog")
   public @ResponseBody String kafkaLog() {
        SysLogs log = new SysLogs();
        log.setUsername("紅薯");
        log.setOperation("開源中國社區");
        log.setMethod("com.itstyle.es.log.controller.kafkaLog()");
        log.setIp("192.168.1.80");
        log.setGmtCreate(new Timestamp(new Date().getTime()));
        log.setExceptionDetail("開源中國社區");
        log.setParams("{'name':'碼雲','type':'開源'}");
        log.setDeviceType((short)1);
        log.setPlatFrom((short)1);
        log.setLogType((short)1);
        log.setDeviceType((short)1);
        log.setId((long)200000);
        log.setUserId((long)1);
        log.setTime((long)1);
        //模擬日誌隊列實現
        String json = JsonMapper.toJsonString(log);
        kafkaTemplate.send("itstyle", "itstyle_log",json);
        return "success";
   }

Kafka與Redis

以前簡單的介紹過,JavaWeb項目架構之Redis分佈式日誌隊列,有小夥伴們聊到, Redis PUB/SUB沒有任何可靠性保障,也不會持久化。固然了,原項目中僅僅是記錄日誌,並非十分重要的信息,能夠有必定程度上的丟失

Kafka與Redis PUB/SUB之間最大的區別在於Kafka是一個完整的分佈式發佈訂閱消息系統,而Redis PUB/SUB只是一個組件而已。

使用場景

  • Redis PUB/SUB
    消息持久性需求不高、吞吐量要求不高、能夠忍受數據丟失
  • Kafka
    高可用、高吞吐、持久性、多樣化的消費處理模型
開源項目源碼(參考):https://gitee.com/52itstyle/spring-boot-elasticsearch
相關文章
相關標籤/搜索