剛處理一個由於線程池啓動線程以後,資源一直沒有釋放最終內存溢出的問題,把代碼貼出來,這個貼出來的代碼是沒有問題的。也能夠用做學習多線程的線程池配置以及多線程的學習。java
具體問題緣由是由於:web
PooledExecutorManager.getPooledExecutor().execute();數據庫
方法傳入的參數是須要Runnable 類型的線程,但實際應用的時候 咱們傳遞過來的 MngRuleMatch 類的寫法是:apache
public class MngRuleMatch extends Thread{。。。}多線程
因此咱們把MngRuleMatch 類從新修改成實現 Runnable 後就正常了 寫法爲:app
public class MngRuleMatch implements Runnable{。。。} 學習
在看該問題以前請先了解下Therad和Runnable之間的關係,推薦一個文章:this
http://www.oschina.net/question/565065_86563spa
而後咱們看下本次出現該問題的代碼,以下:.net
1:線程池配置文件:
1 KEEP_ALIVE_TIME=300000 2 BOUNDED_BUFFER_SIZE=50 3 INIT_THREAD_SIZE=5 4 MINIMUM_POOL_SIZE=5 5 MAX_POOL_SIZE=20 6 7 #服務處理線程數 8 tmpLimitInviteTask.thredNum=5 9 10 11 #線程池初始化參數 12 LIMIT_INVITE_KEEP_ALIVE_TIME=300000 13 LIMIT_INVITE_BOUNDED_BUFFER_SIZE=50 14 LIMIT_INVITE_INIT_THREAD_SIZE=5 15 LIMIT_INVITE_MINIMUM_POOL_SIZE=5 16 LIMIT_INVITE_MAX_POOL_SIZE=20 17 18 #獲取待處理的額度枯竭交易數 19 LIMIT_INVITE_NUMBER=500 20 REFUSE_LIMIT_INVITE_DAYS=15
2: 實現對配置文件的讀寫獲取的java實現類
1 package XXX; 2 3 4 import java.io.IOException; 5 import java.io.InputStream; 6 import java.util.Properties; 7 8 import com.paic.pafa.app.lwc.core.util.DevLog; 9 10 /** 11 * 配製文件讀取類 讀取配製文件config.properties中的內容 注意: 12 * config.properties必定要放在系統環境變量CLASSPATH包含的目錄或jar文件中 13 14 */ 15 public class Config { 16 private static final String LOG_NAME = "Config"; 17 private Properties properties; 18 private Properties getConfig(){ 19 if(properties==null){ 20 properties = new Properties(); 21 InputStream in = null; 22 try { 23 DevLog.debug("【config.properties】文件被打開了!"); 24 in = Thread.currentThread().getContextClassLoader().getResourceAsStream("config.properties"); 25 properties.load(in); 26 } catch (Exception e) { 27 DevLog.debug("【config.properties】有誤", e); 28 } finally { 29 try { 30 if (in != null) { 31 in.close(); 32 } 33 DevLog.debug("【config.properties】文件成功關閉了!"); 34 in = null; 35 } catch (IOException e ) { 36 DevLog.debug("【config.properties】文件關閉有誤", e); 37 } 38 } 39 DevLog.debug("【config.properties】文件的內容:" + properties); 40 } 41 return properties; 42 } 43 44 public int getConfig(String param, int defaultValue) { 45 46 String val = getConfig().getProperty(param); 47 if (val == null) { 48 Logger.debug(LOG_NAME, "Value of [" + param + "] not exist, use default value [" + defaultValue + "]"); 49 return defaultValue; 50 } 51 52 try { 53 int i = Integer.parseInt(val); 54 Logger.debug(LOG_NAME, "Value of [" + param + "] is [" + i + "]"); 55 return i; 56 } catch (NumberFormatException e) { 57 Logger.error(LOG_NAME, "Value of [" + param + "] is not a valid number, use default value [" + defaultValue + "]", e); 58 return defaultValue; 59 } 60 } 61 62 public String getConfig(String param, String defaultValue) { 63 String val = getConfig().getProperty(param); 64 if (val == null) { 65 Logger.debug(LOG_NAME, "Value of [" + param + "] not exist, use default value [" + defaultValue + "]"); 66 return defaultValue; 67 } 68 69 Logger.debug(LOG_NAME, "Value of [" + param + "] is [" + val + "]"); 70 return val; 71 } 72 }
3:數據準備業務邏輯處理類
此類是查詢數據庫數據,也能夠是接口返回等等。而後用一個while循環一直獲取數據並從線程池拿到線程對數據處理
package xxx; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.paic.btds.common.util.BizAppContextObjectNames; import com.paic.btds.sysServers.biz.bo.BtdsTxnMsgBO; import com.paic.btds.sysServers.biz.bo.BtrsMngLogHisBO; import com.paic.btds.sysServers.common.dto.BtdsTxnMsgDTO; import com.paic.btds.sysServers.common.dto.BtrsMngLogHisDTO; import com.paic.pafa.app.biz.service.BusinessServiceException; import com.paic.pafa.app.lwc.core.context.ApplicationContext; import com.paic.pafa.app.lwc.core.context.support.PafaCoreContexton; import com.paic.pafa.app.lwc.core.util.DevLog; import com.paic.pafa.logging2.Level; import com.paic.pafa.logging2.Logger; /** * 流水錶的規則匹配服務的線程 * */ public class MngRuleMatchThread extends Thread { private Logger error = PafaCoreContexton.getInstance().getErrorLogger(); private boolean flag = true; private ApplicationContext context; private Config config = new Config(); private long sleep_time_1 ; private long sleep_time_2 ; private long sleep_time_3 ; public MngRuleMatchThread(ApplicationContext context) { this.context = context; } public boolean isFlag() { return flag; } public void setFlag(boolean flag) { this.flag = flag; } // 獲取交易信息,調用規則引擎,標識交易信息 public void run() { DevLog.debug("管理交易歷史流水錶規則匹配服務啓動。。。。"); /**初始化線程池*/ PooledExecutorManager.initPoolExecutor(); BtdsTxnMsgBO btdsTxnMsgBO = (BtdsTxnMsgBO)context.getBean(BizAppContextObjectNames.BTDSTXNMSG_BO); List btdsTxnMsgList = new ArrayList(); Map channelMap = new HashMap(); try { btdsTxnMsgList = btdsTxnMsgBO.selectBtdsTxnMsg(); } catch (BusinessServiceException e2) { e2.printStackTrace(); } if(btdsTxnMsgList != null && btdsTxnMsgList.size() > 0){ for(int i = 0; i < btdsTxnMsgList.size(); i++){ BtdsTxnMsgDTO btdsTxnMsgDTO = (BtdsTxnMsgDTO)btdsTxnMsgList.get(i); if(btdsTxnMsgDTO != null){ channelMap.put(btdsTxnMsgDTO.getSubTxnCode(), btdsTxnMsgDTO.getChannelFlg()); } } } setSleepTime(); while (isFlag()) { String transActionId = PafaCoreContexton.getInstance() .getThreadContext().getTxnID(); InetAddress localHost = null; try { localHost = InetAddress.getLocalHost(); } catch (UnknownHostException e1) { } String hostAddress = localHost.getHostAddress(); BtrsMngLogHisDTO dto = new BtrsMngLogHisDTO(); dto.setIp(hostAddress); //更新管理交易歷史流水錶 try { updateBtrsMngLogHis(dto); } catch (BusinessServiceException e1) { error.log(transActionId, Level.INFO, this.getClass().getName(), "updateBtrsMngLogHis()", "entering run", e1, ""); } // 從管理交易歷史流水錶獲取交易信息 List btrsMngLogHisList = selectBtrsMngLogHis(dto); if(btrsMngLogHisList != null && btrsMngLogHisList.size() > 0){ MngRuleMatchProducer mngRuleMatchProducer = new MngRuleMatchProducer(btrsMngLogHisList,context,channelMap); mngRuleMatchProducer.ruleMatch(); if(btrsMngLogHisList.size() < 200 && btrsMngLogHisList.size() >100){ try { DevLog.info("掃描管理交易歷史流水錶種交易不足200條,休眠0.5秒"); Thread.sleep(getSleepTime1()); } catch (InterruptedException e) { } }else if(btrsMngLogHisList.size() < 100) { try { DevLog.info("掃描完管理交易歷史流水錶不足100條交易,休眠1秒"); Thread.sleep(getSleepTime2()); } catch (InterruptedException e) { } } }else{ try { DevLog.info("管理交易歷史流水錶中沒有數據,休眠5秒"); Thread.sleep(getSleepTime3()); } catch (InterruptedException e) { } } } } public void setSleepTime(){ this.sleep_time_1 = config.getConfig("MNG_SLEEP_TIME_1", 500); this.sleep_time_2 = config.getConfig("MNG_SLEEP_TIME_2", 1000); this.sleep_time_3 = config.getConfig("MNG_SLEEP_TIME_3", 5000); } public long getSleepTime1() { return this.sleep_time_1; } public long getSleepTime2() { return this.sleep_time_2; } public long getSleepTime3() { return this.sleep_time_3; } private void updateBtrsMngLogHis(BtrsMngLogHisDTO btrsMngLogHisDTO) throws BusinessServiceException { BtrsMngLogHisBO btrsMngLogHisBO = (BtrsMngLogHisBO)context.getBean(BizAppContextObjectNames.BTRSMNGLOGHIS_BO); btrsMngLogHisBO.updateBtrsMngLogHis(btrsMngLogHisDTO); } private List selectBtrsMngLogHis(BtrsMngLogHisDTO dto) { List btrsMngLogHisList = new ArrayList(); String transActionId = PafaCoreContexton.getInstance() .getThreadContext().getTxnID(); BtrsMngLogHisBO btrsMngLogHisBO = (BtrsMngLogHisBO) context .getBean(BizAppContextObjectNames.BTRSMNGLOGHIS_BO); try { btrsMngLogHisList = btrsMngLogHisBO.selectBtrsMngLogHis(dto); } catch (BusinessServiceException e) { error.log(transActionId, Level.INFO, this.getClass().getName(), "selectBtrsMngLogHis()", "entering run", e, ""); } return btrsMngLogHisList; } }
4:線程池獲取線程,並啓動線程對數據處理
1 package cxx; 2 3 import java.util.Map; 4 5 import EDU.oswego.cs.dl.util.concurrent.CountDown; 6 7 import com.paic.btds.sysServers.common.dto.BtrsMngLogHisDTO; 8 import com.paic.pafa.app.lwc.core.context.ApplicationContext; 9 /** 10 * 管理交易歷史流水錶的規則匹配服務的交易分發服務 11 * @author EX_WEIPENG413 12 * 13 */ 14 public class MngRuleMatch implements Runnable{ 15 16 private CountDown clock; 17 private BtrsMngLogHisDTO btrsMngLogHisDTO ; 18 private ApplicationContext context ; 19 private Map channelMap; 20 21 public MngRuleMatch(BtrsMngLogHisDTO btrsMngLogHisDTO ,CountDown clock ,ApplicationContext context, Map channelMap){ 22 this.btrsMngLogHisDTO = btrsMngLogHisDTO; 23 this.clock =clock; 24 this.context =context; 25 this.channelMap = channelMap; 26 } 27 public void run(){ 28 try{ 29 BtrsMngLogHisDTO dto = btrsMngLogHisDTO; 30 31 BtrsMngLogRuleMatch btrsMngLogRuleMatch=new BtrsMngLogRuleMatch(dto,context,channelMap); 32 btrsMngLogRuleMatch.match(); 33 34 }catch(Exception e){ 35 e.printStackTrace(); 36 }finally{ 37 clock.release(); 38 } 39 } 40 41 }
5:線程處理類
1 package xxx; 2 3 import java.lang.reflect.InvocationTargetException; 4 import java.math.BigDecimal; 5 import java.util.HashMap; 6 import java.util.Map; 7 8 import org.apache.commons.beanutils.BeanUtils; 9 import org.apache.log4j.Logger; 10 11 import com.paic.btds.common.util.BizAppContextObjectNames; 12 import com.paic.btds.para.biz.bo.SysParamBO; 13 import com.paic.btds.para.common.dto.SysParamDTO; 14 import com.paic.btds.sysServers.common.dto.BtrsMngLogHisDTO; 15 import com.paic.btds.sysServers.common.dto.BtrsTrsAllDTO; 16 import com.paic.btds.sysServers.common.dto.BtrsTrsRuleDTO; 17 import com.paic.btds.sysServers.web.util.SqlMappingName; 18 import com.paic.paces.common.dao.ExecuteSqlDAO; 19 import com.paic.pafa.app.biz.service.BusinessServiceException; 20 import com.paic.pafa.app.integration.dao.PafaDAOException; 21 import com.paic.pafa.app.lwc.core.beans.BeansException; 22 import com.paic.pafa.app.lwc.core.context.ApplicationContext; 23 import com.paic.pafa.app.lwc.core.util.DevLog; 24 /** 25 * 管理交易歷史流水錶的規則匹配服務的交易分發 26 * 27 * 28 */ 29 public class BtrsMngLogRuleMatch { 30 31 private ApplicationContext context; 32 33 static Logger logger = Logger.getLogger(BtrsMngLogRuleMatch.class.getName()); 34 35 private ExecuteSqlDAO executeSqlDao; 36 37 private Map channelMap; 38 /** 39 * 交易數據 40 */ 41 private BtrsMngLogHisDTO btrsMngLogHisDTO = null; 42 43 public BtrsMngLogRuleMatch(BtrsMngLogHisDTO btrsMngLogHisDTO,ApplicationContext context, Map channelMap) { 44 this.btrsMngLogHisDTO=btrsMngLogHisDTO; 45 this.context=context; 46 this.executeSqlDao=(ExecuteSqlDAO) context.getBean(BizAppContextObjectNames.EXECUTE_SQL_DAO); 47 this.channelMap = channelMap; 48 } 49 50 public void match() { 51 try { 52 BtrsTrsAllDTO btrsTrsAllDTO = new BtrsTrsAllDTO(); 53 BtrsTrsRuleDTO btrsTrsRuleDTO = new BtrsTrsRuleDTO(); 54 BeanUtils.copyProperties(btrsTrsAllDTO, 55 btrsMngLogHisDTO); 56 BeanUtils.copyProperties(btrsTrsRuleDTO, 57 btrsMngLogHisDTO); 58 String amtTrans = btrsMngLogHisDTO.getTransAmt(); 59 if(amtTrans != null && !"".equals(amtTrans)){ 60 btrsTrsAllDTO.setAmtTrans(new BigDecimal(amtTrans).multiply(new BigDecimal("100")) 61 .toString()); 62 btrsTrsRuleDTO.setAmtTrans(new BigDecimal(amtTrans).multiply(new BigDecimal("100")) 63 .toString()); 64 } 65 66 btrsTrsAllDTO.setPrimaryAcctNum(btrsMngLogHisDTO 67 .getCardNum()); 68 btrsTrsAllDTO.setSystemDate(btrsMngLogHisDTO 69 .getTrsSysDate()); 70 btrsTrsAllDTO.setSystemTime(btrsMngLogHisDTO 71 .getTrsSysTime()); 72 btrsTrsAllDTO.setSystemSsn(btrsMngLogHisDTO.getSystemNo()); 73 74 btrsTrsAllDTO.setTranDate(btrsTrsAllDTO.getSystemDate()+btrsTrsAllDTO.getSystemTime()); 75 76 btrsTrsRuleDTO.setPrimaryAcctNum(btrsMngLogHisDTO 77 .getCardNum()); 78 btrsTrsRuleDTO.setSystemDate(btrsMngLogHisDTO 79 .getTrsSysDate()); 80 btrsTrsRuleDTO.setSystemTime(btrsMngLogHisDTO 81 .getTrsSysTime()); 82 btrsTrsRuleDTO.setSystemSsn(btrsMngLogHisDTO.getSystemNo()); 83 84 btrsTrsRuleDTO.setTranDate(btrsTrsRuleDTO.getSystemDate()+btrsTrsRuleDTO.getSystemTime()); 85 86 87 //獲取流水號 88 String seqNumber = String.valueOf(executeSqlDao.queryForObject(SqlMappingName.GET_SEQ, null)); 89 btrsTrsAllDTO.setSqNumber(seqNumber); 90 btrsTrsRuleDTO.setSqNumber(seqNumber); 91 92 String subTxnCode = btrsMngLogHisDTO.getSubTxnCode(); 93 if(channelMap.containsKey(subTxnCode)){ 94 String channelFlg = String.valueOf(channelMap.get(subTxnCode)); 95 btrsTrsAllDTO.setChannelFlg(channelFlg); 96 btrsTrsRuleDTO.setChannelFlg(channelFlg); 97 }else{ 98 btrsTrsAllDTO.setChannelFlg(""); 99 btrsTrsRuleDTO.setChannelFlg(""); 100 } 101 if("2002010110".equals(subTxnCode) || "2002010140".equals(subTxnCode)){ 102 btrsTrsAllDTO.setCurrcyCodeTrans(""); 103 btrsTrsRuleDTO.setCurrcyCodeTrans(""); 104 } 105 106 107 //獲取交易的聯機或離線的信息 108 String messagetypeid = btrsMngLogHisDTO.getMessagetypeid(); 109 if(messagetypeid != null && !"".equals(messagetypeid)){ 110 btrsTrsAllDTO.setMessagetypeid(messagetypeid); 111 btrsTrsRuleDTO.setMessagetypeid(messagetypeid); 112 } 113 //彙總到交易彙總表中 114 executeSqlDao.insert(SqlMappingName.INSERT_BTRSTRSALLFROMMNG, btrsTrsAllDTO); 115 116 if(channelMap.containsKey(subTxnCode)){ 117 //抽取到交易規則表中 118 executeSqlDao.insert(SqlMappingName.INSERT_BTRSTRSRULEFROMMNG, btrsTrsRuleDTO); 119 120 Map map = new HashMap(); 121 map.put("KEY", seqNumber); 122 map.put("MATCH_RS", null); 123 map.put("ERR_MSG", null); 124 map.put("V_MATCH_PRIORITY", null); 125 map.put("NRETCDE", null); 126 map.put("V_MATCH_RULE_ID", null); 127 SysParamBO sysParamBO = (SysParamBO)context.getBean(BizAppContextObjectNames.SYSPARAM_BO); 128 SysParamDTO sysParamDTO = new SysParamDTO(); 129 sysParamDTO.setParamName("engine_switch"); 130 sysParamDTO.setSysFlag("S"); 131 SysParamDTO dto = sysParamBO.selectSysParamByName(sysParamDTO); 132 if(dto != null){ 133 String engineSwitch = dto.getParamValue(); 134 if("true".equals(engineSwitch)){ 135 //調用規則引擎 136 DevLog.debug("管理交易歷史流水調用規則引擎。。。"); 137 executeSqlDao.callProc(SqlMappingName.ANALYS_NEW_SQL_RULE, map); 138 String matchRS = (String)map.get("MATCH_RS"); 139 String errMsg =(String)map.get("ERR_MSG"); 140 BigDecimal nretCde =(BigDecimal)map.get("NRETCDE"); 141 String vMatchRuleId = (String)map.get("V_MATCH_RULE_ID"); 142 if (null!=matchRS && "1".equals(matchRS) && vMatchRuleId != null && !"".equals(vMatchRuleId)) 143 DevLog.info("交易"+seqNumber+"匹配成功!"); 144 else if(nretCde != null) 145 DevLog.info("交易"+seqNumber+"匹配失敗!緣由以下:"+nretCde.doubleValue() + errMsg); 146 } 147 } 148 } 149 } catch (PafaDAOException e) { 150 logger.error(e.getMessage()); 151 } catch (BeansException e) { 152 logger.error(e.getMessage()); 153 } catch (BusinessServiceException e) { 154 logger.error(e.getMessage()); 155 } catch (IllegalAccessException e) { 156 logger.error(e.getMessage()); 157 } catch (InvocationTargetException e) { 158 logger.error(e.getMessage()); 159 } catch (Exception e){ 160 logger.error(e.getMessage()); 161 } 162 163 //更新交易狀態 164 try { 165 executeSqlDao.update(SqlMappingName.UPDATE_BTRSMNGLOGHISSTATUS, btrsMngLogHisDTO); 166 } catch (PafaDAOException e) { 167 logger.error(e.getMessage()); 168 } 169 170 } 171 172 173 }