Springboot集成Kafaka

package com.aicrs.engine.kafaka;

import com.aicrs.engine.common.LogInfo;
import com.aicrs.engine.common.RedisNameEnum;
import com.aicrs.engine.component.DataSourceSwitchCompont;
import com.aicrs.engine.constant.SwitchConstant;
import com.aicrs.engine.entity.DetailLogEntity;
import com.aicrs.engine.entity.QueryLogEntity;
import com.aicrs.engine.kafaka.Singleton.DetailLogSingletonFactory;
import com.aicrs.engine.kafaka.Singleton.QueryLogSingletonFactory;
import com.aicrs.engine.mapper.DetailLogEntityMapper;
import com.aicrs.engine.mapper.QueryLogEntityMapper;
import com.aicrs.engine.utils.DateUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.BlockingQueue;

@Component
public class kafkaConsumerManager {
    private static final Logger logger = LoggerFactory.getLogger(kafkaConsumerManager.class);

    @Autowired
    KafkaConsumer<String, String> consumer;
    @Autowired
    RedisTemplate<String, String> logSwitchRedis;
    @Autowired
    private DataSourceSwitchCompont dataSourceSwitchCompont;
    @Autowired
    private QueryLogEntityMapper queryLogEntityMapper;
    @Autowired
    private DetailLogEntityMapper detailLogEntityMapper;

    @PostConstruct
    public void kafkaConsumer() {
        //日誌接收kafka的開關
        if (!SwitchConstant.LOG_TO_KAFKA_CLOSE.equals(dataSourceSwitchCompont.getDataSourceSign(SwitchConstant.LOG_TO_KAFKA_SWITCH))) {
            try {
                consumer.subscribe(Arrays.asList(LogInfo.KAFAKA_TOPIC));

                new Thread(()->{
                    while (true) {
                        consumeKafakaMessageAndSaveLog();
                    }
                }).start();
            } catch (Exception e) {
                logger.error("kafka 日誌報錯{}", e);
            }
        }

    }

    /**
     * description 消費Kafaka消息並存儲數據到日誌表
     * return void
     * author 
     * createTime 2021-1-12 16:46
     **/
    private void consumeKafakaMessageAndSaveLog() {
        ConsumerRecords<String, String> records = consumer.poll(100);
        if(records.isEmpty()){
            return;
        }
        List<QueryLogEntity> queryLogEntityList = null;
        List<DetailLogEntity> detailLogEntityList = null;
        BlockingQueue<List<QueryLogEntity>> queryLogBlockingQueue = QueryLogSingletonFactory.getInstance();
        BlockingQueue<List<DetailLogEntity>> detailLogBlockingQueue = DetailLogSingletonFactory.getInstance();
        for (ConsumerRecord<String, String> record : records){
            logger.error("Start Consuming Kafaka Message: ---> " + record);
            //AICRS_ENGINE_QUERY_LOG表
            queryLogEntityList = makeupQueryLogEntityList(queryLogEntityList, record);
            //AICRS_ENGINE_DETAIL_LOG表
            detailLogEntityList = makeupDetailLogEntityList(detailLogEntityList, record);

            if (!CollectionUtils.isEmpty(queryLogEntityList)) {
                //將queryLogEntityList加到BlockingQueue裏,若是BlockingQueue能夠容納,則返回true,不然返回false。(本方法不阻塞當前執行方法的線程)
                boolean flag = queryLogBlockingQueue.offer(queryLogEntityList);
                if (!flag) {
                    logger.error("queryLogBlockingQueue is full...");
                }
            }

            if (!CollectionUtils.isEmpty(detailLogEntityList)) {
                boolean flag = detailLogBlockingQueue.offer(detailLogEntityList);
                if (!flag) {
                    logger.error("detailLogEntityList is full...");
                }
            }

            if(CollectionUtils.isEmpty(queryLogBlockingQueue) && CollectionUtils.isEmpty(detailLogBlockingQueue)){
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    logger.error("線程執行異常:{}", e);
                }
            }

            if(!CollectionUtils.isEmpty(queryLogBlockingQueue)){
                List<List<QueryLogEntity>> saveQueryLogEntityList = Collections.synchronizedList(new LinkedList<>());
                queryLogBlockingQueue.drainTo(saveQueryLogEntityList, queryLogBlockingQueue.size());
                queueQueryLogSave(saveQueryLogEntityList);
            }

            if(!CollectionUtils.isEmpty(detailLogBlockingQueue)){
                List<List<DetailLogEntity>> saveDetailLogEntityList = Collections.synchronizedList(new LinkedList<>());
                detailLogBlockingQueue.drainTo(saveDetailLogEntityList, detailLogBlockingQueue.size());
                queueDetailLogSave(saveDetailLogEntityList);
            }
        }
    }

    /**
     * description 異步存儲「AICRS_ENGINE_QUERY_LOG」表數據
     * param [logList]
     * return void
     * author 
     * createTime 2021-1-12 15:17
     **/
    @Async
    @Transactional
    private void queueQueryLogSave(final List<List<QueryLogEntity>> logList) {
        if(CollectionUtils.isEmpty(logList)) {
            return;
        }
        String logSwitch = logSwitchRedis.opsForValue().get("logSwitch");
        if (RedisNameEnum.LOG_TO_DB_KAFKA_SWITCH_CLOSE.getKeyName().equals(logSwitch)) {
            return;
        }
        List<QueryLogEntity> saveLogRecordList = Collections.synchronizedList(new LinkedList<>());
        for (List<QueryLogEntity> list : logList) {
            saveLogRecordList.addAll(list);
        }
        logList.clear();

        if (!CollectionUtils.isEmpty(saveLogRecordList)) {
            if (saveLogRecordList.size() >= 100) {
                List<List<QueryLogEntity>> tempsQueryLogEntityList = split(saveLogRecordList, 100);
                for (List<QueryLogEntity> queryLogEntity : tempsQueryLogEntityList) {
                    queryLogEntityMapper.insertQueryLogEntityByBatch(queryLogEntity);
                }
                return;
            }
            queryLogEntityMapper.insertQueryLogEntityByBatch(saveLogRecordList);
        }

    }

    /**
     * description 異步存儲「AICRS_ENGINE_DETAIL_LOG」表數據
     * param [logList]
     * return void
     * author 
     * createTime 2021-1-12 15:17
     **/
    @Async
    @Transactional
    private void queueDetailLogSave(final List<List<DetailLogEntity>> logList) {
        if(CollectionUtils.isEmpty(logList)) {
            return;
        }
        String logSwitch = logSwitchRedis.opsForValue().get("logSwitch");
        if (RedisNameEnum.LOG_TO_DB_KAFKA_SWITCH_CLOSE.getKeyName().equals(logSwitch)) {
            return;
        }
        List<DetailLogEntity> saveLogRecordList = Collections.synchronizedList(new LinkedList<>());
        for (List<DetailLogEntity> list : logList) {
            saveLogRecordList.addAll(list);
        }
        logList.clear();

        if (!CollectionUtils.isEmpty(saveLogRecordList)) {
            if (saveLogRecordList.size() >= 100) {
                List<List<DetailLogEntity>> tempsQueryLogEntityList = split(saveLogRecordList, 100);
                for (List<DetailLogEntity> detailLogEntity : tempsQueryLogEntityList) {
                    detailLogEntityMapper.insertDetailLogEntityByBatch(detailLogEntity);
                }
                return;
            }
            detailLogEntityMapper.insertDetailLogEntityByBatch(saveLogRecordList);
        }

    }

    /**
     * description 封裝存儲「AICRS_ENGINE_QUERY_LOG」的數據
     * param [queryLogEntityList, record]
     * return java.util.List<com.aicrs.engine.entity.QueryLogEntity>
     * author 
     * createTime 2021-1-12 15:15
     **/
    private List<QueryLogEntity> makeupQueryLogEntityList(List<QueryLogEntity> queryLogEntityList, ConsumerRecord<String, String> record) {
        if(LogInfo.KEY_QUERY_LOG.equals(record.key())){//AICRS_ENGINE_QUERY_LOG表
            String[] kafakaReturnrecordArr = record.value().split(LogInfo.EXCISION,-1);
            queryLogEntityList = Collections.synchronizedList(new LinkedList<>());
            QueryLogEntity queryLogEntity = new QueryLogEntity();
            queryLogEntity.setRequestUuid(kafakaReturnrecordArr[0]);
            queryLogEntity.setQueryTime(DateUtil.stringToDate(kafakaReturnrecordArr[1]));
            queryLogEntity.setCustomer(kafakaReturnrecordArr[2]);
            queryLogEntity.setReqCompany(kafakaReturnrecordArr[3]);
            queryLogEntity.setUserName(kafakaReturnrecordArr[4]);
            queryLogEntity.setErrorCode(kafakaReturnrecordArr[5]);
            queryLogEntity.setErrorMessage(kafakaReturnrecordArr[6]);
            queryLogEntityList.add(queryLogEntity);
        }
        return queryLogEntityList;
    }

    /**
     * description 封裝存儲「AICRS_ENGINE_DETAIL_LOG」的數據
     * param [detailLogEntityList, record]
     * return java.util.List<com.aicrs.engine.entity.DetailLogEntity>
     * author 
     * createTime 2021-1-12 15:16
     **/
    private List<DetailLogEntity> makeupDetailLogEntityList(List<DetailLogEntity> detailLogEntityList, ConsumerRecord<String, String> record) {
        if(LogInfo.KEY_DETAIL_LOG.equals(record.key())){//AICRS_ENGINE_DETAIL_LOG表
            String[] kafakaReturnrecordArr = record.value().split(LogInfo.EXCISION,-1);
            detailLogEntityList = Collections.synchronizedList(new LinkedList<>());
            DetailLogEntity detailLogEntity = new DetailLogEntity();
            detailLogEntity.setRequestUuid(kafakaReturnrecordArr[0]);
            detailLogEntity.setIsIn(kafakaReturnrecordArr[1]);
            detailLogEntity.setCpname(kafakaReturnrecordArr[2]);
            detailLogEntity.setUniteCode(kafakaReturnrecordArr[3]);
            detailLogEntity.setOrgCode(kafakaReturnrecordArr[4]);
            detailLogEntity.setHitCode(kafakaReturnrecordArr[5]);
            detailLogEntity.setHitColumn(kafakaReturnrecordArr[6]);
            detailLogEntity.setIdAicrsAppBlackList(kafakaReturnrecordArr[7]);
            detailLogEntity.setCpblackflag(kafakaReturnrecordArr[8]);
            detailLogEntity.setIsOwner(kafakaReturnrecordArr[9]);
            detailLogEntity.setFirstType(kafakaReturnrecordArr[10]);
            detailLogEntity.setSecondType(kafakaReturnrecordArr[11]);
            detailLogEntity.setIsMajorWaring(kafakaReturnrecordArr[12]);
            detailLogEntity.setMajorWarningDate(kafakaReturnrecordArr[13]);
            detailLogEntity.setTag(kafakaReturnrecordArr[14]);
            detailLogEntity.setReason(kafakaReturnrecordArr[15]);
            detailLogEntityList.add(detailLogEntity);
        }
        return detailLogEntityList;
    }

    /**
     * description List切分爲100的份量,便於後續存儲
     * param [resList, subListLength]
     * return java.util.List<java.util.List<T>>
     * author 
     * createTime 2021-1-11 15:38
     **/
    public static <T> List<List<T>> split(List<T> resList, int subListLength) {
        if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
            return new ArrayList<>();
        }
        List<List<T>> ret = new ArrayList<>();
        int size = resList.size();
        if (size <= subListLength) {
            // 數據量不足 subListLength 指定的大小
            ret.add(resList);
        } else {
            int pre = size / subListLength;
            int last = size % subListLength;
            // 前面pre個集合,每一個大小都是 subListLength 個元素
            for (int i = 0; i < pre; i++) {
                List<T> itemList = new ArrayList<>();
                for (int j = 0; j < subListLength; j++) {
                    itemList.add(resList.get(i * subListLength + j));
                }
                ret.add(itemList);
            }
            // last的進行處理
            if (last > 0) {
                List<T> itemList = new ArrayList<>();
                for (int i = 0; i < last; i++) {
                    itemList.add(resList.get(pre * subListLength + i));
                }
                ret.add(itemList);
            }
        }
        return ret;
    }
}
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * description 單例模式初始化隊列
 * author
 * createTime 2021-1-12 16:18
 **/
public class DetailLogSingletonFactory {
    private static volatile BlockingQueue<List<DetailLogEntity>> detailLogSingleton;

    public DetailLogSingletonFactory() {
    }

    public static BlockingQueue<List<DetailLogEntity>> getInstance() {
        if (detailLogSingleton == null) {
            synchronized (QueryLogSingletonFactory.class) {
                if (detailLogSingleton == null) {
                    detailLogSingleton = new LinkedBlockingQueue<>(5000);
                }
            }
        }
        return detailLogSingleton;
    }
}
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.UUID;


/**
 * description 調用方法時發送日誌到kafaka中記錄
 * author
 * createTime 2021-1-5 15:51
 **/
@Aspect
@Component
public class KafakaLogAspect {

    private static final Logger logger = LoggerFactory.getLogger(KafakaLogAspect.class);
    @Autowired
    private CommonSaveLog commonSaveLog;

    @Pointcut("execution(* com.pingan.aicrs.engine..**.BlackInfoServiceImpl.queryBlackInfos(..))")
    public void log() {
    }

    @AfterReturning(returning = "ret", pointcut = "log()")
    public void doAfterReturning(JoinPoint joinPoint, Object ret) throws Throwable {
        Object[] obj = joinPoint.getArgs();
        if (obj != null && obj.length > 0) {
            if (obj[0] instanceof JSONObject) {
                Map<String,Object> responseMap = (Map<String,Object>)ret;
                DataResponse dataResponse = (DataResponse)responseMap.get("dataResponse");
                List<Map<String,Object>> logParamMapList = (List<Map<String,Object>>)responseMap.get("logPamMap");
                JSONObject dataRequestEntity = (JSONObject) obj[0];
                String uuid = UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
                commonSaveLog.saveLog(dataRequestEntity, dataResponse, logParamMapList, uuid);
            }
        }
    }

}
相關文章
相關標籤/搜索