多線程查詢數據,將結果存入到redis中,最後批量從redis中取數據批量插入數據庫中【我】

 

多線程查詢數據,將結果存入到redis中,最後批量從redis中取數據批量插入數據庫中java

package com.xxx.xx.reve.service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.xxx.xx.common.service.ParentService;
import com.xxx.xx.redis.service.JedisClient;
import com.xxx.xx.reve.vo.RevenueSt;
import com.xxx.xx.util.PropertieUtil;

import net.sf.json.JSONObject;

@Service
@SuppressWarnings("rawtypes")
public class RevenueStServiceImpl2 extends ParentService implements  RevenueStService{
    private final static Logger logger = LoggerFactory.getLogger(RevenueStServiceImpl.class);
    
    @Autowired
    private JedisClient jedisClient;
    
    //預處理對象轉String方法
    private String o2s(Object o) {
        if(o!=null&&!"".equals(o)&&!"null".equals((o+"").trim().toLowerCase())) {
            return o+"";
        }
        return null;
    }
    
    //生成總收入查詢參數
    private  String genAllIncomeParam(String s) {
        StringBuffer sb = new StringBuffer();
        
        for (int i = 1; i <= getMonth(s); i++) {
            sb.append("SUM(TY_"+i+")+");
        }
        String s2 = sb.toString();
        String res = s2.substring(0, s2.length()-1);
        logger.info("總收入查詢參數:{}",res);
        return res;
    }

    //截取字符串獲取月份信息 "201912";
    private  Integer getMonth(String s) {
        //截取後兩位
        s = s.substring(s.length() -2,s.length());
        char c1 = s.charAt(0);
        char c2 = s.charAt(1);
        if ((c1+"").equals("0")) {
            return Integer.parseInt(c2+"");
        }
        return Integer.parseInt(s);
    }
    
    //線程池
    private ExecutorService exec = Executors.newFixedThreadPool(Integer.parseInt(PropertieUtil.getConfig("revenue.threadNum")));
    //redis中緩存隊列key
    private String redisKeyName = "EVENUE_STATISTICS_LIST";
    
    //獲取統計數據方法
    @SuppressWarnings("unchecked")
    @Override
    public void getStatisticsData() {
        
        //清空緩存
        jedisClient.del(redisKeyName);
        //普通查詢參數
        Map param = new HashMap();
        param.put("tablename","REVENUE_STATISTICS_RES");
        //將原表數據備份到歷史表
        Integer countRevenueNum = this.selectOne("ds2", "reve.countRevenueNum", param);
        logger.info("----歷史表revenue_statistics_res數據量爲{}",countRevenueNum);
        if (countRevenueNum!=null && countRevenueNum >Integer.parseInt(PropertieUtil.getConfig("revenue.copyLimitNum"))) {
            //刪除歷史表
            try {
                this.update("ds2", "reve.dropHisTable", param);
                logger.info("----歷史表revenue_statistics_res_his刪除成功");
            } catch (Exception e) {
                logger.info("----歷史表revenue_statistics_res_his不存在");
            }
            //建立歷史表並複製數據
//            CREATE TABLE revenue_statistics_res_his AS SELECT * FROM revenue_statistics_res
            this.update("ds2", "reve.copyToHisTable", param);
            logger.info("----歷史表revenue_statistics_res_his數據複製成功");
        }
//        清空原表
//        truncate table ${tablename}
        this.update("ds2", "reve.truncateTable", param);
        //記錄開始時間
        long t1 = System.currentTimeMillis();
        //準備插入結果表對象列表
        List<RevenueSt> preInsertList = new ArrayList<RevenueSt>();
//        產品分類:10:移動;20:寬帶;30:固化;40:電路出租;50:網元出租;-1:其餘;
        Map<String, String> productMap = new HashMap<String,String>();
        productMap.put("10", "移動");
        productMap.put("20", "寬帶");
        productMap.put("30", "固話");
        productMap.put("40", "電路");
        productMap.put("50", "網元");
        productMap.put("-1", "其餘");
        //查詢eda表的參數
        Map ep = new HashMap();
        //查詢行業列表,獲得全部行業信息,即 行業id,父id,級別
        List<Map> industryList = this.selectList("ds2", "reve.queryAllIndusty",ep);
        //所有人數
        Integer allTotalNum = this.selectOne("ds2", "reve.queryAllTotal",ep);
        //當前最大帳期(應該有收入數據的年月)
        String accountDay = this.selectOne("ds2", "reve.getMaxAccountDay",ep);
        logger.info("查詢到當前最新帳期:{}",accountDay);
        String genAllIncomeParam = genAllIncomeParam(accountDay);
        logger.info("拼接完當前最新帳期查詢參數:{}",genAllIncomeParam);
        String sql2 =  "SELECT /*+ PARALLEL(12) */ ("+genAllIncomeParam+") FROM EDA_CUST_INC";
        
        //查詢本年累計所有客戶收入
        String allIncome = this.selectOne("ds2", "reve.queryAllIncome",sql2);
        //獲取省份列表,獲取省份對應的Nub,eda表中只有Nub
        List<Map> provList = this.selectList("ds2", "reve.queryTotalNum",param);
        //獲取省份對應的本地網列表
        List<Map> cityList = new ArrayList<Map>();
        for (Map map : provList) {
            param.put("COMMON_REGION_ID",o2s(map.get("REGION_ID")));
            Map provInfo = this.selectOne("ds2", "reve.queryRegionNbr",param);
            String REGION_ID = o2s(map.get("REGION_ID"));
            List<Map> subCity = this.selectList("ds2", "reve.querySubCity",REGION_ID);
            for (Map city : subCity) {
                city.put("PAR_NBR", provInfo.get("REGION_NBR"));
            }
            cityList.addAll(subCity);
        }

//市級數據(本地網)--------
        //遍歷市列表獲取統計數據
        for (Map city : cityList) {
            //產品
            for (String produce : productMap.keySet()) {
                //行業
                for (Map industry : industryList) {
                    ep.clear();
                    //查詢參數:地區行業
                    ep.put("STD_PRVNCE_CD", o2s(city.get("PAR_NBR")));
                    ep.put("STD_LATN_CD", o2s(city.get("REGION_NBR")));
                    ep.put("PROD_TYPE", produce);
                    //加入行業類型相關參數
                    ep.putAll(industry);
                    //返回數據
                    ep.put("PROVINCE_REGION_ID", o2s(city.get("PAR_REGION_ID")));
                    ep.put("CITY_REGION_ID", o2s(city.get("COMMON_REGION_ID")));
                    ep.put("REGION_NAME", o2s(city.get("REGION_NAME")));
                    //存入公共數據
                    ep.put("ALL_CUST_NUM", allTotalNum);
                    ep.put("ALL_INCOME", allIncome);
                    //查詢參數:身份證臨時
                    ep.put("IDENTITY_TYPE", 1);
                    addVo(ep,preInsertList);
                    //查詢參數:身份證正式
                    ep.put("IDENTITY_TYPE", 2);
                    addVo(ep,preInsertList);
                }
            }
        }

        exec.shutdown();
        while (true) {
            if (exec.isTerminated()) {
                logger.info("--》--收入統計獲取數據,全部子線程都結束了,共計耗時:{}",(System.currentTimeMillis()-t1));
                break;
            }
        }
        //批量插入數據庫
        insertBatchRevenue();
    }

    //從redis中批量獲取數據,批量插入統計結果表,
    private void insertBatchRevenue() {
        //每次取出數量
        int onceNum = Integer.parseInt(PropertieUtil.getConfig("revenue.batchNum"));
        //統計個數
        int sum = 0;
        //開始角標
        int startIndex = 0;
        //步進
        int step = onceNum-1;
        //結束角標
        int endIndex  = 0;
        // 按照範圍取,每次取出onceNum條
        for (int i = 1; ; i++) {
            endIndex = startIndex+step;
            logger.info("----第"+i+"次取數據,角標起始:"+startIndex+"---"+endIndex);
            List<String> lrange = jedisClient.lrange(redisKeyName, startIndex, endIndex);
            //若是取完了退出循環
            if (lrange==null || lrange.size()==0) {
                break;
            }
            //統計計數
            sum += lrange.size();
            //插入數據庫
            //遍歷lrange,轉成Vo,放入新的list中,插入數據庫
            //判斷當前的表是哪一個表,執行對應的一個表的批量插入邏輯
            try {
                List<RevenueSt> paramList = new ArrayList<RevenueSt>();
                for (String s : lrange) {
                    RevenueSt vo = (RevenueSt)JSONObject.toBean(JSONObject.fromObject(s), RevenueSt.class);
                    paramList.add(vo);
                }
                long t1 = System.currentTimeMillis();
                //批量插入結果表
                this.insert("ds2", "reve.insertRevenue", paramList);
                logger.info("----第"+i+"次取數據,插入完成,耗時毫秒:"+(System.currentTimeMillis()-t1));
            } catch (Exception e) {
                logger.error("----批量統計數據插入發生錯誤,當前隊列名:{},批次起始角標:{}~{},異常詳情:{}",redisKeyName,startIndex,endIndex,e);
                e.printStackTrace();
            }
            //起始位置
            startIndex = endIndex+1;
        }
        logger.info("----插入完成,共插入:{} 條記錄",sum);
    }

    //複製map,爲多線程準備
    private Map copyMap(Map<String, Object> oldMap) {
        HashMap pMap = new HashMap();
        for ( Map.Entry<String,Object> entry : oldMap.entrySet()) { 
            pMap.put(entry.getKey(), entry.getValue());
        }
        return pMap;
    }
    
    //由於單次查詢比較慢,因此開啓多線程查詢,每次查詢完將結果存入redis的list中(多線程中只查詢,不插入或更新數據庫會避免數據庫鎖表,大大提高效率)
    private void addVo(Map ep,List<RevenueSt> list) {
        //這裏必定要在循環內 new 參數map,以確保每一個線程中使用的都是單獨new的參數Map對象,不然會有併發問題
        //複製參數對象
        Map epT =  copyMap(ep);
        //開啓多線程
        Runnable task = new Runnable() {
            @Override
            public void run() {
                long t1 = System.currentTimeMillis();
                logger.info("--》--收入統計線程: {} -- 開始執行",Thread.currentThread().getName());
//                查詢數據,查詢數據庫的動做要封裝到一個方法中,直接在多線程的run方法中寫 this.selectOne("ds2", "reve.queryIncome",ep); 會報錯
                RevenueSt vo = genVo(epT);
                //將查詢結果對象轉爲 json 字符串 存入 redis隊列中
                String upJson = JSONObject.fromObject(vo).toString();
                jedisClient.rpush(redisKeyName, upJson);
                //本地同步處理方法,不用redis本地同步方法版本,比用redis慢太多了
//                synchronized (RevenueStServiceImpl.class) {
//                    //添加到集合
//                    list.add(vo);
//                    if (list.size()>=500) {
//                        //批量插入結果表
//                        insertRevenue(list);
//                        list.clear();
//                    }
//                }
                logger.info("--》--收入統計線程: {} -- 結束,耗時{}",Thread.currentThread().getName(),System.currentTimeMillis()-t1);
            }
        };
        exec.submit(task);
    }

    /**
     * 查詢數據並生成臨時待插入對象
     * @param ep EDA表查詢條件
     * @param comp 公共參數,用於拼接結果vo
     * @return
     */
    private RevenueSt genVo(Map ep) {
        //返回對象
        RevenueSt re = new RevenueSt();
        //查序列
        String eq = this.selectOne("ds2", "reve.queryRevenueEQ",ep);
        re.setID(eq);
        //本年累計所有客戶收入
        re.setALL_INCOME(o2s(ep.get("ALL_INCOME")));
        //所有政企客戶數
        re.setALL_CUST_NUM(o2s(ep.get("ALL_CUST_NUM")));
        //備註
        re.setREMARK(o2s(ep.get("REMARK")));
        //條件
        //省Id
        re.setPROVINCE_REGION_ID(o2s(ep.get("PROVINCE_REGION_ID")));
        re.setCITY_REGION_ID(o2s(ep.get("CITY_REGION_ID")));
        //地區名稱++
        re.setREGION_NAME(o2s(ep.get("REGION_NAME")));
        //地區級別
        re.setREGION_GRADE(o2s(ep.get("REGION_GRADE")));
        //身份證臨時/正式
        re.setIDENTITY_TYPE(o2s(ep.get("IDENTITY_TYPE")));
        //行業
        re.setINDUSTRY_TYPE_ID(o2s(ep.get("INDUSTRY_TYPE_ID")));
        //行業代碼++
        re.setINDUSTRY_TYPE_CODE(o2s(ep.get("INDUSTRY_TYPE_CODE")));
        //行業名稱++
        re.setINDUSTRY_TYPE_NAME(o2s(ep.get("INDUSTRY_TYPE_NAME")));
        re.setPAR_INDUSTRY_TYPE_ID(o2s(ep.get("PAR_INDUSTRY_TYPE_ID")));
        re.setINDUSTRY_TYPE_GRADE(o2s(ep.get("INDUSTRY_TYPE_GRADE")));
        //產品類型
        re.setPROD_TYPE(o2s(ep.get("PROD_TYPE")));
        //查詢EDA表
        Map income = this.selectOne("ds2", "reve.queryIncome",ep);
        logger.info("---查詢結果:"+income);
        //查詢合規客戶數
        re.setAUDIT_CUST_NUM(o2s(income.get("CUSTNUM")));
        //查詢合規客戶身份證數
        re.setAUDIT_CUST_PARTY_NUM(o2s(income.get("PARTYNUM")));
        //查詢收入列表(今年、去年所有12個月)
        re.setTY_1(o2s(income.get("TY1")));
        re.setTY_2(o2s(income.get("TY2")));
        re.setTY_3(o2s(income.get("TY3")));
        re.setTY_4(o2s(income.get("TY4")));
        re.setTY_5(o2s(income.get("TY5")));
        re.setTY_6(o2s(income.get("TY6")));
        re.setTY_7(o2s(income.get("TY7")));
        re.setTY_8(o2s(income.get("TY8")));
        re.setTY_9(o2s(income.get("TY9")));
        re.setTY_10(o2s(income.get("TY10")));
        re.setTY_11(o2s(income.get("TY11")));
        re.setTY_12(o2s(income.get("LY12")));
        re.setLY_1(o2s(income.get("LY1")));
        re.setLY_2(o2s(income.get("LY2")));
        re.setLY_3(o2s(income.get("LY3")));
        re.setLY_4(o2s(income.get("LY4")));
        re.setLY_5(o2s(income.get("LY5")));
        re.setLY_6(o2s(income.get("LY6")));
        re.setLY_7(o2s(income.get("LY7")));
        re.setLY_8(o2s(income.get("LY8")));
        re.setLY_9(o2s(income.get("LY9")));
        re.setLY_10(o2s(income.get("LY10")));
        re.setLY_11(o2s(income.get("LY11")));
        re.setLY_12(o2s(income.get("LY12")));
        logger.info("---------------拼裝對象:"+re);
        return re;
    }
    
    
    //批量插入統計結果表,本地list緩存版本
//    private void insertBatchRevenue2(List<RevenueSt> preInsertList) {
//        System.out.println("入參集合數量:"+preInsertList.size());
//        //統計數量
//        int num = 0;
//        //每批個數
//        int batchLen = 2000;
//        // 每批集合臨時存儲
//        List<RevenueSt> batchlist = new ArrayList<RevenueSt>();
//        for (int i = 0; i < preInsertList.size(); i++) {
//            RevenueSt item = preInsertList.get(i);
//            batchlist.add(item);
//            if ((i + 1) % batchLen == 0) {
//                insertRevenue(batchlist);
//                num += batchlist.size();
//                System.out.println("----本次插入數量:"+num);
//                batchlist.clear();// 處理完清空批次集合
//            }
//        }
//        // 處理最後一批數據
//        if (batchlist.size() > 0) {
//            //批量插入結果表
//            insertRevenue(batchlist);
//            num += batchlist.size();
//        }
//        System.out.println("----一共插入數量:" + num);
//    }
    
}

 

注意:redis

若是數據量在100萬如下能夠,一直往redis的一個list中存,最後處理,spring

若是數據量大於100萬,可能撐爆redis,這時,能夠 單獨開啓一守護線程,裏面用while true 循環 加 wait必定時間,定時從 list的頭部取數據,批量插入數據庫(模擬消息隊列),等全部的查詢線程都結束,再最後執行一次從redis中取剩下的全部數據批量插入。sql

相關文章
相關標籤/搜索