多線程查詢數據,將結果存入到redis中,最後批量從redis中取數據批量插入數據庫中java
package com.xxx.xx.reve.service; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.xxx.xx.common.service.ParentService; import com.xxx.xx.redis.service.JedisClient; import com.xxx.xx.reve.vo.RevenueSt; import com.xxx.xx.util.PropertieUtil; import net.sf.json.JSONObject; @Service @SuppressWarnings("rawtypes") public class RevenueStServiceImpl2 extends ParentService implements RevenueStService{ private final static Logger logger = LoggerFactory.getLogger(RevenueStServiceImpl.class); @Autowired private JedisClient jedisClient; //預處理對象轉String方法 private String o2s(Object o) { if(o!=null&&!"".equals(o)&&!"null".equals((o+"").trim().toLowerCase())) { return o+""; } return null; } //生成總收入查詢參數 private String genAllIncomeParam(String s) { StringBuffer sb = new StringBuffer(); for (int i = 1; i <= getMonth(s); i++) { sb.append("SUM(TY_"+i+")+"); } String s2 = sb.toString(); String res = s2.substring(0, s2.length()-1); logger.info("總收入查詢參數:{}",res); return res; } //截取字符串獲取月份信息 "201912"; private Integer getMonth(String s) { //截取後兩位 s = s.substring(s.length() -2,s.length()); char c1 = s.charAt(0); char c2 = s.charAt(1); if ((c1+"").equals("0")) { return Integer.parseInt(c2+""); } return Integer.parseInt(s); } //線程池 private ExecutorService exec = Executors.newFixedThreadPool(Integer.parseInt(PropertieUtil.getConfig("revenue.threadNum"))); //redis中緩存隊列key private String redisKeyName = "EVENUE_STATISTICS_LIST"; //獲取統計數據方法 @SuppressWarnings("unchecked") @Override public void getStatisticsData() { //清空緩存 jedisClient.del(redisKeyName); //普通查詢參數 Map param = new HashMap(); param.put("tablename","REVENUE_STATISTICS_RES"); //將原表數據備份到歷史表 Integer countRevenueNum = this.selectOne("ds2", "reve.countRevenueNum", param); logger.info("----歷史表revenue_statistics_res數據量爲{}",countRevenueNum); if (countRevenueNum!=null && countRevenueNum >Integer.parseInt(PropertieUtil.getConfig("revenue.copyLimitNum"))) { //刪除歷史表 try { this.update("ds2", "reve.dropHisTable", param); logger.info("----歷史表revenue_statistics_res_his刪除成功"); } catch (Exception e) { logger.info("----歷史表revenue_statistics_res_his不存在"); } //建立歷史表並複製數據 // CREATE TABLE revenue_statistics_res_his AS SELECT * FROM revenue_statistics_res this.update("ds2", "reve.copyToHisTable", param); logger.info("----歷史表revenue_statistics_res_his數據複製成功"); } // 清空原表 // truncate table ${tablename} this.update("ds2", "reve.truncateTable", param); //記錄開始時間 long t1 = System.currentTimeMillis(); //準備插入結果表對象列表 List<RevenueSt> preInsertList = new ArrayList<RevenueSt>(); // 產品分類:10:移動;20:寬帶;30:固化;40:電路出租;50:網元出租;-1:其餘; Map<String, String> productMap = new HashMap<String,String>(); productMap.put("10", "移動"); productMap.put("20", "寬帶"); productMap.put("30", "固話"); productMap.put("40", "電路"); productMap.put("50", "網元"); productMap.put("-1", "其餘"); //查詢eda表的參數 Map ep = new HashMap(); //查詢行業列表,獲得全部行業信息,即 行業id,父id,級別 List<Map> industryList = this.selectList("ds2", "reve.queryAllIndusty",ep); //所有人數 Integer allTotalNum = this.selectOne("ds2", "reve.queryAllTotal",ep); //當前最大帳期(應該有收入數據的年月) String accountDay = this.selectOne("ds2", "reve.getMaxAccountDay",ep); logger.info("查詢到當前最新帳期:{}",accountDay); String genAllIncomeParam = genAllIncomeParam(accountDay); logger.info("拼接完當前最新帳期查詢參數:{}",genAllIncomeParam); String sql2 = "SELECT /*+ PARALLEL(12) */ ("+genAllIncomeParam+") FROM EDA_CUST_INC"; //查詢本年累計所有客戶收入 String allIncome = this.selectOne("ds2", "reve.queryAllIncome",sql2); //獲取省份列表,獲取省份對應的Nub,eda表中只有Nub List<Map> provList = this.selectList("ds2", "reve.queryTotalNum",param); //獲取省份對應的本地網列表 List<Map> cityList = new ArrayList<Map>(); for (Map map : provList) { param.put("COMMON_REGION_ID",o2s(map.get("REGION_ID"))); Map provInfo = this.selectOne("ds2", "reve.queryRegionNbr",param); String REGION_ID = o2s(map.get("REGION_ID")); List<Map> subCity = this.selectList("ds2", "reve.querySubCity",REGION_ID); for (Map city : subCity) { city.put("PAR_NBR", provInfo.get("REGION_NBR")); } cityList.addAll(subCity); } //市級數據(本地網)-------- //遍歷市列表獲取統計數據 for (Map city : cityList) { //產品 for (String produce : productMap.keySet()) { //行業 for (Map industry : industryList) { ep.clear(); //查詢參數:地區行業 ep.put("STD_PRVNCE_CD", o2s(city.get("PAR_NBR"))); ep.put("STD_LATN_CD", o2s(city.get("REGION_NBR"))); ep.put("PROD_TYPE", produce); //加入行業類型相關參數 ep.putAll(industry); //返回數據 ep.put("PROVINCE_REGION_ID", o2s(city.get("PAR_REGION_ID"))); ep.put("CITY_REGION_ID", o2s(city.get("COMMON_REGION_ID"))); ep.put("REGION_NAME", o2s(city.get("REGION_NAME"))); //存入公共數據 ep.put("ALL_CUST_NUM", allTotalNum); ep.put("ALL_INCOME", allIncome); //查詢參數:身份證臨時 ep.put("IDENTITY_TYPE", 1); addVo(ep,preInsertList); //查詢參數:身份證正式 ep.put("IDENTITY_TYPE", 2); addVo(ep,preInsertList); } } } exec.shutdown(); while (true) { if (exec.isTerminated()) { logger.info("--》--收入統計獲取數據,全部子線程都結束了,共計耗時:{}",(System.currentTimeMillis()-t1)); break; } } //批量插入數據庫 insertBatchRevenue(); } //從redis中批量獲取數據,批量插入統計結果表, private void insertBatchRevenue() { //每次取出數量 int onceNum = Integer.parseInt(PropertieUtil.getConfig("revenue.batchNum")); //統計個數 int sum = 0; //開始角標 int startIndex = 0; //步進 int step = onceNum-1; //結束角標 int endIndex = 0; // 按照範圍取,每次取出onceNum條 for (int i = 1; ; i++) { endIndex = startIndex+step; logger.info("----第"+i+"次取數據,角標起始:"+startIndex+"---"+endIndex); List<String> lrange = jedisClient.lrange(redisKeyName, startIndex, endIndex); //若是取完了退出循環 if (lrange==null || lrange.size()==0) { break; } //統計計數 sum += lrange.size(); //插入數據庫 //遍歷lrange,轉成Vo,放入新的list中,插入數據庫 //判斷當前的表是哪一個表,執行對應的一個表的批量插入邏輯 try { List<RevenueSt> paramList = new ArrayList<RevenueSt>(); for (String s : lrange) { RevenueSt vo = (RevenueSt)JSONObject.toBean(JSONObject.fromObject(s), RevenueSt.class); paramList.add(vo); } long t1 = System.currentTimeMillis(); //批量插入結果表 this.insert("ds2", "reve.insertRevenue", paramList); logger.info("----第"+i+"次取數據,插入完成,耗時毫秒:"+(System.currentTimeMillis()-t1)); } catch (Exception e) { logger.error("----批量統計數據插入發生錯誤,當前隊列名:{},批次起始角標:{}~{},異常詳情:{}",redisKeyName,startIndex,endIndex,e); e.printStackTrace(); } //起始位置 startIndex = endIndex+1; } logger.info("----插入完成,共插入:{} 條記錄",sum); } //複製map,爲多線程準備 private Map copyMap(Map<String, Object> oldMap) { HashMap pMap = new HashMap(); for ( Map.Entry<String,Object> entry : oldMap.entrySet()) { pMap.put(entry.getKey(), entry.getValue()); } return pMap; } //由於單次查詢比較慢,因此開啓多線程查詢,每次查詢完將結果存入redis的list中(多線程中只查詢,不插入或更新數據庫會避免數據庫鎖表,大大提高效率) private void addVo(Map ep,List<RevenueSt> list) { //這裏必定要在循環內 new 參數map,以確保每一個線程中使用的都是單獨new的參數Map對象,不然會有併發問題 //複製參數對象 Map epT = copyMap(ep); //開啓多線程 Runnable task = new Runnable() { @Override public void run() { long t1 = System.currentTimeMillis(); logger.info("--》--收入統計線程: {} -- 開始執行",Thread.currentThread().getName()); // 查詢數據,查詢數據庫的動做要封裝到一個方法中,直接在多線程的run方法中寫 this.selectOne("ds2", "reve.queryIncome",ep); 會報錯 RevenueSt vo = genVo(epT); //將查詢結果對象轉爲 json 字符串 存入 redis隊列中 String upJson = JSONObject.fromObject(vo).toString(); jedisClient.rpush(redisKeyName, upJson); //本地同步處理方法,不用redis本地同步方法版本,比用redis慢太多了 // synchronized (RevenueStServiceImpl.class) { // //添加到集合 // list.add(vo); // if (list.size()>=500) { // //批量插入結果表 // insertRevenue(list); // list.clear(); // } // } logger.info("--》--收入統計線程: {} -- 結束,耗時{}",Thread.currentThread().getName(),System.currentTimeMillis()-t1); } }; exec.submit(task); } /** * 查詢數據並生成臨時待插入對象 * @param ep EDA表查詢條件 * @param comp 公共參數,用於拼接結果vo * @return */ private RevenueSt genVo(Map ep) { //返回對象 RevenueSt re = new RevenueSt(); //查序列 String eq = this.selectOne("ds2", "reve.queryRevenueEQ",ep); re.setID(eq); //本年累計所有客戶收入 re.setALL_INCOME(o2s(ep.get("ALL_INCOME"))); //所有政企客戶數 re.setALL_CUST_NUM(o2s(ep.get("ALL_CUST_NUM"))); //備註 re.setREMARK(o2s(ep.get("REMARK"))); //條件 //省Id re.setPROVINCE_REGION_ID(o2s(ep.get("PROVINCE_REGION_ID"))); re.setCITY_REGION_ID(o2s(ep.get("CITY_REGION_ID"))); //地區名稱++ re.setREGION_NAME(o2s(ep.get("REGION_NAME"))); //地區級別 re.setREGION_GRADE(o2s(ep.get("REGION_GRADE"))); //身份證臨時/正式 re.setIDENTITY_TYPE(o2s(ep.get("IDENTITY_TYPE"))); //行業 re.setINDUSTRY_TYPE_ID(o2s(ep.get("INDUSTRY_TYPE_ID"))); //行業代碼++ re.setINDUSTRY_TYPE_CODE(o2s(ep.get("INDUSTRY_TYPE_CODE"))); //行業名稱++ re.setINDUSTRY_TYPE_NAME(o2s(ep.get("INDUSTRY_TYPE_NAME"))); re.setPAR_INDUSTRY_TYPE_ID(o2s(ep.get("PAR_INDUSTRY_TYPE_ID"))); re.setINDUSTRY_TYPE_GRADE(o2s(ep.get("INDUSTRY_TYPE_GRADE"))); //產品類型 re.setPROD_TYPE(o2s(ep.get("PROD_TYPE"))); //查詢EDA表 Map income = this.selectOne("ds2", "reve.queryIncome",ep); logger.info("---查詢結果:"+income); //查詢合規客戶數 re.setAUDIT_CUST_NUM(o2s(income.get("CUSTNUM"))); //查詢合規客戶身份證數 re.setAUDIT_CUST_PARTY_NUM(o2s(income.get("PARTYNUM"))); //查詢收入列表(今年、去年所有12個月) re.setTY_1(o2s(income.get("TY1"))); re.setTY_2(o2s(income.get("TY2"))); re.setTY_3(o2s(income.get("TY3"))); re.setTY_4(o2s(income.get("TY4"))); re.setTY_5(o2s(income.get("TY5"))); re.setTY_6(o2s(income.get("TY6"))); re.setTY_7(o2s(income.get("TY7"))); re.setTY_8(o2s(income.get("TY8"))); re.setTY_9(o2s(income.get("TY9"))); re.setTY_10(o2s(income.get("TY10"))); re.setTY_11(o2s(income.get("TY11"))); re.setTY_12(o2s(income.get("LY12"))); re.setLY_1(o2s(income.get("LY1"))); re.setLY_2(o2s(income.get("LY2"))); re.setLY_3(o2s(income.get("LY3"))); re.setLY_4(o2s(income.get("LY4"))); re.setLY_5(o2s(income.get("LY5"))); re.setLY_6(o2s(income.get("LY6"))); re.setLY_7(o2s(income.get("LY7"))); re.setLY_8(o2s(income.get("LY8"))); re.setLY_9(o2s(income.get("LY9"))); re.setLY_10(o2s(income.get("LY10"))); re.setLY_11(o2s(income.get("LY11"))); re.setLY_12(o2s(income.get("LY12"))); logger.info("---------------拼裝對象:"+re); return re; } //批量插入統計結果表,本地list緩存版本 // private void insertBatchRevenue2(List<RevenueSt> preInsertList) { // System.out.println("入參集合數量:"+preInsertList.size()); // //統計數量 // int num = 0; // //每批個數 // int batchLen = 2000; // // 每批集合臨時存儲 // List<RevenueSt> batchlist = new ArrayList<RevenueSt>(); // for (int i = 0; i < preInsertList.size(); i++) { // RevenueSt item = preInsertList.get(i); // batchlist.add(item); // if ((i + 1) % batchLen == 0) { // insertRevenue(batchlist); // num += batchlist.size(); // System.out.println("----本次插入數量:"+num); // batchlist.clear();// 處理完清空批次集合 // } // } // // 處理最後一批數據 // if (batchlist.size() > 0) { // //批量插入結果表 // insertRevenue(batchlist); // num += batchlist.size(); // } // System.out.println("----一共插入數量:" + num); // } }
注意:redis
若是數據量在100萬如下能夠,一直往redis的一個list中存,最後處理,spring
若是數據量大於100萬,可能撐爆redis,這時,能夠 單獨開啓一守護線程,裏面用while true 循環 加 wait必定時間,定時從 list的頭部取數據,批量插入數據庫(模擬消息隊列),等全部的查詢線程都結束,再最後執行一次從redis中取剩下的全部數據批量插入。sql