package com.aicrs.engine.kafaka; import com.aicrs.engine.common.LogInfo; import com.aicrs.engine.common.RedisNameEnum; import com.aicrs.engine.component.DataSourceSwitchCompont; import com.aicrs.engine.constant.SwitchConstant; import com.aicrs.engine.entity.DetailLogEntity; import com.aicrs.engine.entity.QueryLogEntity; import com.aicrs.engine.kafaka.Singleton.DetailLogSingletonFactory; import com.aicrs.engine.kafaka.Singleton.QueryLogSingletonFactory; import com.aicrs.engine.mapper.DetailLogEntityMapper; import com.aicrs.engine.mapper.QueryLogEntityMapper; import com.aicrs.engine.utils.DateUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import java.util.*; import java.util.concurrent.BlockingQueue; @Component public class kafkaConsumerManager { private static final Logger logger = LoggerFactory.getLogger(kafkaConsumerManager.class); @Autowired KafkaConsumer<String, String> consumer; @Autowired RedisTemplate<String, String> logSwitchRedis; @Autowired private DataSourceSwitchCompont dataSourceSwitchCompont; @Autowired private QueryLogEntityMapper queryLogEntityMapper; @Autowired private DetailLogEntityMapper detailLogEntityMapper; @PostConstruct public void kafkaConsumer() { //日誌接收kafka的開關 if (!SwitchConstant.LOG_TO_KAFKA_CLOSE.equals(dataSourceSwitchCompont.getDataSourceSign(SwitchConstant.LOG_TO_KAFKA_SWITCH))) { try { consumer.subscribe(Arrays.asList(LogInfo.KAFAKA_TOPIC)); new Thread(()->{ while (true) { consumeKafakaMessageAndSaveLog(); } }).start(); } catch (Exception e) { logger.error("kafka 日誌報錯{}", e); } } } /** * description 消費Kafaka消息並存儲數據到日誌表 * return void * author * createTime 2021-1-12 16:46 **/ private void consumeKafakaMessageAndSaveLog() { ConsumerRecords<String, String> records = consumer.poll(100); if(records.isEmpty()){ return; } List<QueryLogEntity> queryLogEntityList = null; List<DetailLogEntity> detailLogEntityList = null; BlockingQueue<List<QueryLogEntity>> queryLogBlockingQueue = QueryLogSingletonFactory.getInstance(); BlockingQueue<List<DetailLogEntity>> detailLogBlockingQueue = DetailLogSingletonFactory.getInstance(); for (ConsumerRecord<String, String> record : records){ logger.error("Start Consuming Kafaka Message: ---> " + record); //AICRS_ENGINE_QUERY_LOG表 queryLogEntityList = makeupQueryLogEntityList(queryLogEntityList, record); //AICRS_ENGINE_DETAIL_LOG表 detailLogEntityList = makeupDetailLogEntityList(detailLogEntityList, record); if (!CollectionUtils.isEmpty(queryLogEntityList)) { //將queryLogEntityList加到BlockingQueue裏,若是BlockingQueue能夠容納,則返回true,不然返回false。(本方法不阻塞當前執行方法的線程) boolean flag = queryLogBlockingQueue.offer(queryLogEntityList); if (!flag) { logger.error("queryLogBlockingQueue is full..."); } } if (!CollectionUtils.isEmpty(detailLogEntityList)) { boolean flag = detailLogBlockingQueue.offer(detailLogEntityList); if (!flag) { logger.error("detailLogEntityList is full..."); } } if(CollectionUtils.isEmpty(queryLogBlockingQueue) && CollectionUtils.isEmpty(detailLogBlockingQueue)){ try { Thread.sleep(3000); } catch (InterruptedException e) { logger.error("線程執行異常:{}", e); } } if(!CollectionUtils.isEmpty(queryLogBlockingQueue)){ List<List<QueryLogEntity>> saveQueryLogEntityList = Collections.synchronizedList(new LinkedList<>()); queryLogBlockingQueue.drainTo(saveQueryLogEntityList, queryLogBlockingQueue.size()); queueQueryLogSave(saveQueryLogEntityList); } if(!CollectionUtils.isEmpty(detailLogBlockingQueue)){ List<List<DetailLogEntity>> saveDetailLogEntityList = Collections.synchronizedList(new LinkedList<>()); detailLogBlockingQueue.drainTo(saveDetailLogEntityList, detailLogBlockingQueue.size()); queueDetailLogSave(saveDetailLogEntityList); } } } /** * description 異步存儲「AICRS_ENGINE_QUERY_LOG」表數據 * param [logList] * return void * author * createTime 2021-1-12 15:17 **/ @Async @Transactional private void queueQueryLogSave(final List<List<QueryLogEntity>> logList) { if(CollectionUtils.isEmpty(logList)) { return; } String logSwitch = logSwitchRedis.opsForValue().get("logSwitch"); if (RedisNameEnum.LOG_TO_DB_KAFKA_SWITCH_CLOSE.getKeyName().equals(logSwitch)) { return; } List<QueryLogEntity> saveLogRecordList = Collections.synchronizedList(new LinkedList<>()); for (List<QueryLogEntity> list : logList) { saveLogRecordList.addAll(list); } logList.clear(); if (!CollectionUtils.isEmpty(saveLogRecordList)) { if (saveLogRecordList.size() >= 100) { List<List<QueryLogEntity>> tempsQueryLogEntityList = split(saveLogRecordList, 100); for (List<QueryLogEntity> queryLogEntity : tempsQueryLogEntityList) { queryLogEntityMapper.insertQueryLogEntityByBatch(queryLogEntity); } return; } queryLogEntityMapper.insertQueryLogEntityByBatch(saveLogRecordList); } } /** * description 異步存儲「AICRS_ENGINE_DETAIL_LOG」表數據 * param [logList] * return void * author * createTime 2021-1-12 15:17 **/ @Async @Transactional private void queueDetailLogSave(final List<List<DetailLogEntity>> logList) { if(CollectionUtils.isEmpty(logList)) { return; } String logSwitch = logSwitchRedis.opsForValue().get("logSwitch"); if (RedisNameEnum.LOG_TO_DB_KAFKA_SWITCH_CLOSE.getKeyName().equals(logSwitch)) { return; } List<DetailLogEntity> saveLogRecordList = Collections.synchronizedList(new LinkedList<>()); for (List<DetailLogEntity> list : logList) { saveLogRecordList.addAll(list); } logList.clear(); if (!CollectionUtils.isEmpty(saveLogRecordList)) { if (saveLogRecordList.size() >= 100) { List<List<DetailLogEntity>> tempsQueryLogEntityList = split(saveLogRecordList, 100); for (List<DetailLogEntity> detailLogEntity : tempsQueryLogEntityList) { detailLogEntityMapper.insertDetailLogEntityByBatch(detailLogEntity); } return; } detailLogEntityMapper.insertDetailLogEntityByBatch(saveLogRecordList); } } /** * description 封裝存儲「AICRS_ENGINE_QUERY_LOG」的數據 * param [queryLogEntityList, record] * return java.util.List<com.aicrs.engine.entity.QueryLogEntity> * author * createTime 2021-1-12 15:15 **/ private List<QueryLogEntity> makeupQueryLogEntityList(List<QueryLogEntity> queryLogEntityList, ConsumerRecord<String, String> record) { if(LogInfo.KEY_QUERY_LOG.equals(record.key())){//AICRS_ENGINE_QUERY_LOG表 String[] kafakaReturnrecordArr = record.value().split(LogInfo.EXCISION,-1); queryLogEntityList = Collections.synchronizedList(new LinkedList<>()); QueryLogEntity queryLogEntity = new QueryLogEntity(); queryLogEntity.setRequestUuid(kafakaReturnrecordArr[0]); queryLogEntity.setQueryTime(DateUtil.stringToDate(kafakaReturnrecordArr[1])); queryLogEntity.setCustomer(kafakaReturnrecordArr[2]); queryLogEntity.setReqCompany(kafakaReturnrecordArr[3]); queryLogEntity.setUserName(kafakaReturnrecordArr[4]); queryLogEntity.setErrorCode(kafakaReturnrecordArr[5]); queryLogEntity.setErrorMessage(kafakaReturnrecordArr[6]); queryLogEntityList.add(queryLogEntity); } return queryLogEntityList; } /** * description 封裝存儲「AICRS_ENGINE_DETAIL_LOG」的數據 * param [detailLogEntityList, record] * return java.util.List<com.aicrs.engine.entity.DetailLogEntity> * author * createTime 2021-1-12 15:16 **/ private List<DetailLogEntity> makeupDetailLogEntityList(List<DetailLogEntity> detailLogEntityList, ConsumerRecord<String, String> record) { if(LogInfo.KEY_DETAIL_LOG.equals(record.key())){//AICRS_ENGINE_DETAIL_LOG表 String[] kafakaReturnrecordArr = record.value().split(LogInfo.EXCISION,-1); detailLogEntityList = Collections.synchronizedList(new LinkedList<>()); DetailLogEntity detailLogEntity = new DetailLogEntity(); detailLogEntity.setRequestUuid(kafakaReturnrecordArr[0]); detailLogEntity.setIsIn(kafakaReturnrecordArr[1]); detailLogEntity.setCpname(kafakaReturnrecordArr[2]); detailLogEntity.setUniteCode(kafakaReturnrecordArr[3]); detailLogEntity.setOrgCode(kafakaReturnrecordArr[4]); detailLogEntity.setHitCode(kafakaReturnrecordArr[5]); detailLogEntity.setHitColumn(kafakaReturnrecordArr[6]); detailLogEntity.setIdAicrsAppBlackList(kafakaReturnrecordArr[7]); detailLogEntity.setCpblackflag(kafakaReturnrecordArr[8]); detailLogEntity.setIsOwner(kafakaReturnrecordArr[9]); detailLogEntity.setFirstType(kafakaReturnrecordArr[10]); detailLogEntity.setSecondType(kafakaReturnrecordArr[11]); detailLogEntity.setIsMajorWaring(kafakaReturnrecordArr[12]); detailLogEntity.setMajorWarningDate(kafakaReturnrecordArr[13]); detailLogEntity.setTag(kafakaReturnrecordArr[14]); detailLogEntity.setReason(kafakaReturnrecordArr[15]); detailLogEntityList.add(detailLogEntity); } return detailLogEntityList; } /** * description List切分爲100的份量,便於後續存儲 * param [resList, subListLength] * return java.util.List<java.util.List<T>> * author * createTime 2021-1-11 15:38 **/ public static <T> List<List<T>> split(List<T> resList, int subListLength) { if (CollectionUtils.isEmpty(resList) || subListLength <= 0) { return new ArrayList<>(); } List<List<T>> ret = new ArrayList<>(); int size = resList.size(); if (size <= subListLength) { // 數據量不足 subListLength 指定的大小 ret.add(resList); } else { int pre = size / subListLength; int last = size % subListLength; // 前面pre個集合,每一個大小都是 subListLength 個元素 for (int i = 0; i < pre; i++) { List<T> itemList = new ArrayList<>(); for (int j = 0; j < subListLength; j++) { itemList.add(resList.get(i * subListLength + j)); } ret.add(itemList); } // last的進行處理 if (last > 0) { List<T> itemList = new ArrayList<>(); for (int i = 0; i < last; i++) { itemList.add(resList.get(pre * subListLength + i)); } ret.add(itemList); } } return ret; } }
import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * description 單例模式初始化隊列 * author * createTime 2021-1-12 16:18 **/ public class DetailLogSingletonFactory { private static volatile BlockingQueue<List<DetailLogEntity>> detailLogSingleton; public DetailLogSingletonFactory() { } public static BlockingQueue<List<DetailLogEntity>> getInstance() { if (detailLogSingleton == null) { synchronized (QueryLogSingletonFactory.class) { if (detailLogSingleton == null) { detailLogSingleton = new LinkedBlockingQueue<>(5000); } } } return detailLogSingleton; } }
import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; import java.util.UUID; /** * description 調用方法時發送日誌到kafaka中記錄 * author * createTime 2021-1-5 15:51 **/ @Aspect @Component public class KafakaLogAspect { private static final Logger logger = LoggerFactory.getLogger(KafakaLogAspect.class); @Autowired private CommonSaveLog commonSaveLog; @Pointcut("execution(* com.pingan.aicrs.engine..**.BlackInfoServiceImpl.queryBlackInfos(..))") public void log() { } @AfterReturning(returning = "ret", pointcut = "log()") public void doAfterReturning(JoinPoint joinPoint, Object ret) throws Throwable { Object[] obj = joinPoint.getArgs(); if (obj != null && obj.length > 0) { if (obj[0] instanceof JSONObject) { Map<String,Object> responseMap = (Map<String,Object>)ret; DataResponse dataResponse = (DataResponse)responseMap.get("dataResponse"); List<Map<String,Object>> logParamMapList = (List<Map<String,Object>>)responseMap.get("logPamMap"); JSONObject dataRequestEntity = (JSONObject) obj[0]; String uuid = UUID.randomUUID().toString().replaceAll("-", "").toUpperCase(); commonSaveLog.saveLog(dataRequestEntity, dataResponse, logParamMapList, uuid); } } } }