實現線程池配置

剛處理一個由於線程池啓動線程以後,資源一直沒有釋放最終內存溢出的問題,把代碼貼出來,這個貼出來的代碼是沒有問題的。也能夠用做學習多線程的線程池配置以及多線程的學習。java

  具體問題緣由是由於:web

  PooledExecutorManager.getPooledExecutor().execute();數據庫

  方法傳入的參數是須要Runnable 類型的線程,但實際應用的時候 咱們傳遞過來的 MngRuleMatch 類的寫法是:apache

  public class MngRuleMatch  extends Thread{。。。}多線程

      因此咱們把MngRuleMatch 類從新修改成實現 Runnable 後就正常了 寫法爲:app

  public class MngRuleMatch  implements Runnable{。。。} 學習

 

在看該問題以前請先了解下Therad和Runnable之間的關係,推薦一個文章:this

http://www.oschina.net/question/565065_86563spa

而後咱們看下本次出現該問題的代碼,以下:.net

1:線程池配置文件:

 

 1 KEEP_ALIVE_TIME=300000
 2 BOUNDED_BUFFER_SIZE=50
 3 INIT_THREAD_SIZE=5
 4 MINIMUM_POOL_SIZE=5
 5 MAX_POOL_SIZE=20
 6 
 7 #服務處理線程數
 8 tmpLimitInviteTask.thredNum=5
 9 
10 
11 #線程池初始化參數 
12 LIMIT_INVITE_KEEP_ALIVE_TIME=300000
13 LIMIT_INVITE_BOUNDED_BUFFER_SIZE=50
14 LIMIT_INVITE_INIT_THREAD_SIZE=5
15 LIMIT_INVITE_MINIMUM_POOL_SIZE=5
16 LIMIT_INVITE_MAX_POOL_SIZE=20
17 
18 #獲取待處理的額度枯竭交易數
19 LIMIT_INVITE_NUMBER=500
20 REFUSE_LIMIT_INVITE_DAYS=15

 

2: 實現對配置文件的讀寫獲取的java實現類

 

 1 package XXX;
 2 
 3 
 4 import java.io.IOException;
 5 import java.io.InputStream;
 6 import java.util.Properties;
 7 
 8 import com.paic.pafa.app.lwc.core.util.DevLog;
 9 
10 /**
11  * 配製文件讀取類 讀取配製文件config.properties中的內容 注意:
12  * config.properties必定要放在系統環境變量CLASSPATH包含的目錄或jar文件中
13 
14  */
15 public class Config {
16     private static final String LOG_NAME = "Config";
17     private  Properties properties;
18     private Properties getConfig(){
19         if(properties==null){
20             properties = new Properties();
21             InputStream in = null;
22             try {
23                 DevLog.debug("【config.properties】文件被打開了!");
24                 in  = Thread.currentThread().getContextClassLoader().getResourceAsStream("config.properties");
25                 properties.load(in);
26             } catch (Exception e) {
27                 DevLog.debug("【config.properties】有誤", e);
28             } finally {
29                 try {
30                     if (in != null) {
31                         in.close();
32                     }
33                     DevLog.debug("【config.properties】文件成功關閉了!");
34                     in = null;
35                 } catch (IOException e ) {
36                     DevLog.debug("【config.properties】文件關閉有誤", e);
37                 }
38             }
39             DevLog.debug("【config.properties】文件的內容:" + properties);
40         }
41         return properties;
42     }
43 
44     public  int getConfig(String param, int defaultValue) {
45         
46         String val = getConfig().getProperty(param);
47         if (val == null) {
48             Logger.debug(LOG_NAME, "Value of [" + param + "] not exist, use default value [" + defaultValue + "]");
49             return defaultValue;
50         }
51 
52         try {
53             int i = Integer.parseInt(val);
54             Logger.debug(LOG_NAME, "Value of [" + param + "] is [" + i + "]");
55             return i;
56         } catch (NumberFormatException e) {
57             Logger.error(LOG_NAME, "Value of [" + param + "] is not a valid number, use default value [" + defaultValue + "]", e);
58             return defaultValue;
59         }
60     }
61 
62     public  String getConfig(String param, String defaultValue) {
63         String val = getConfig().getProperty(param);
64         if (val == null) {
65             Logger.debug(LOG_NAME, "Value of [" + param + "] not exist, use default value [" + defaultValue + "]");
66             return defaultValue;
67         }
68 
69         Logger.debug(LOG_NAME, "Value of [" + param + "] is [" + val + "]");
70         return val;        
71     }
72 }

 

 3:數據準備業務邏輯處理類

    此類是查詢數據庫數據,也能夠是接口返回等等。而後用一個while循環一直獲取數據並從線程池拿到線程對數據處理

 

package xxx;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.paic.btds.common.util.BizAppContextObjectNames;
import com.paic.btds.sysServers.biz.bo.BtdsTxnMsgBO;
import com.paic.btds.sysServers.biz.bo.BtrsMngLogHisBO;
import com.paic.btds.sysServers.common.dto.BtdsTxnMsgDTO;
import com.paic.btds.sysServers.common.dto.BtrsMngLogHisDTO;
import com.paic.pafa.app.biz.service.BusinessServiceException;
import com.paic.pafa.app.lwc.core.context.ApplicationContext;
import com.paic.pafa.app.lwc.core.context.support.PafaCoreContexton;
import com.paic.pafa.app.lwc.core.util.DevLog;
import com.paic.pafa.logging2.Level;
import com.paic.pafa.logging2.Logger;
/**
 * 流水錶的規則匹配服務的線程
 *
 */
public class MngRuleMatchThread extends Thread {

    private Logger error = PafaCoreContexton.getInstance().getErrorLogger();

    private boolean flag = true;

    private ApplicationContext context;

    private Config config = new Config();
    
    private long sleep_time_1 ;
    private long sleep_time_2 ;
    private long sleep_time_3 ;

    public MngRuleMatchThread(ApplicationContext context) {
        this.context = context;
    }

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    // 獲取交易信息,調用規則引擎,標識交易信息
    public void run() {
        DevLog.debug("管理交易歷史流水錶規則匹配服務啓動。。。。");

        /**初始化線程池*/
        PooledExecutorManager.initPoolExecutor();
        
        BtdsTxnMsgBO btdsTxnMsgBO = (BtdsTxnMsgBO)context.getBean(BizAppContextObjectNames.BTDSTXNMSG_BO);
        List btdsTxnMsgList = new ArrayList();
        Map channelMap = new HashMap();
        try {
            btdsTxnMsgList = btdsTxnMsgBO.selectBtdsTxnMsg();
        } catch (BusinessServiceException e2) {
            e2.printStackTrace();
        }
        if(btdsTxnMsgList != null && btdsTxnMsgList.size() > 0){
            for(int i = 0; i < btdsTxnMsgList.size(); i++){
                BtdsTxnMsgDTO btdsTxnMsgDTO = (BtdsTxnMsgDTO)btdsTxnMsgList.get(i);
                if(btdsTxnMsgDTO != null){
                    channelMap.put(btdsTxnMsgDTO.getSubTxnCode(), btdsTxnMsgDTO.getChannelFlg());
                }
            }
        }
        
        setSleepTime();
        
        while (isFlag()) { String transActionId = PafaCoreContexton.getInstance() .getThreadContext().getTxnID(); InetAddress localHost = null; try { localHost = InetAddress.getLocalHost(); } catch (UnknownHostException e1) { } String hostAddress = localHost.getHostAddress(); BtrsMngLogHisDTO dto = new BtrsMngLogHisDTO(); dto.setIp(hostAddress); //更新管理交易歷史流水錶
            try { updateBtrsMngLogHis(dto); } catch (BusinessServiceException e1) { error.log(transActionId, Level.INFO, this.getClass().getName(), "updateBtrsMngLogHis()", "entering run", e1, ""); } // 從管理交易歷史流水錶獲取交易信息
            List btrsMngLogHisList = selectBtrsMngLogHis(dto); if(btrsMngLogHisList != null && btrsMngLogHisList.size() > 0){ MngRuleMatchProducer mngRuleMatchProducer = new MngRuleMatchProducer(btrsMngLogHisList,context,channelMap); mngRuleMatchProducer.ruleMatch(); if(btrsMngLogHisList.size() < 200 && btrsMngLogHisList.size() >100){ try { DevLog.info("掃描管理交易歷史流水錶種交易不足200條,休眠0.5秒"); Thread.sleep(getSleepTime1()); } catch (InterruptedException e) { } }else if(btrsMngLogHisList.size() < 100) { try { DevLog.info("掃描完管理交易歷史流水錶不足100條交易,休眠1秒"); Thread.sleep(getSleepTime2()); } catch (InterruptedException e) { } } }else{ try { DevLog.info("管理交易歷史流水錶中沒有數據,休眠5秒"); Thread.sleep(getSleepTime3()); } catch (InterruptedException e) { } } } 
    }
    
    public void setSleepTime(){
        this.sleep_time_1 = config.getConfig("MNG_SLEEP_TIME_1", 500);
        this.sleep_time_2 = config.getConfig("MNG_SLEEP_TIME_2", 1000);
        this.sleep_time_3 = config.getConfig("MNG_SLEEP_TIME_3", 5000);
    }
    
    public long getSleepTime1() {
        return this.sleep_time_1;
    }
    public long getSleepTime2() {
        return this.sleep_time_2;
    }
    public long getSleepTime3() {
        return this.sleep_time_3;
    }

    private void updateBtrsMngLogHis(BtrsMngLogHisDTO btrsMngLogHisDTO) throws BusinessServiceException {
        BtrsMngLogHisBO btrsMngLogHisBO = (BtrsMngLogHisBO)context.getBean(BizAppContextObjectNames.BTRSMNGLOGHIS_BO);
        btrsMngLogHisBO.updateBtrsMngLogHis(btrsMngLogHisDTO);
    }

    private List selectBtrsMngLogHis(BtrsMngLogHisDTO dto) {
        List btrsMngLogHisList = new ArrayList();
        String transActionId = PafaCoreContexton.getInstance()
                .getThreadContext().getTxnID();
        BtrsMngLogHisBO btrsMngLogHisBO = (BtrsMngLogHisBO) context
                .getBean(BizAppContextObjectNames.BTRSMNGLOGHIS_BO);
        try {
            btrsMngLogHisList = btrsMngLogHisBO.selectBtrsMngLogHis(dto);
        } catch (BusinessServiceException e) {
            error.log(transActionId, Level.INFO, this.getClass().getName(),
                    "selectBtrsMngLogHis()", "entering run", e, "");
        }

        return btrsMngLogHisList;
    }

}

 

4:線程池獲取線程,並啓動線程對數據處理

 1 package cxx;
 2 
 3 import java.util.Map;
 4 
 5 import EDU.oswego.cs.dl.util.concurrent.CountDown;
 6 
 7 import com.paic.btds.sysServers.common.dto.BtrsMngLogHisDTO;
 8 import com.paic.pafa.app.lwc.core.context.ApplicationContext;
 9 /**
10  * 管理交易歷史流水錶的規則匹配服務的交易分發服務
11  * @author EX_WEIPENG413
12  *
13  */
14 public class MngRuleMatch  implements Runnable{
15 
16     private CountDown clock;
17     private BtrsMngLogHisDTO btrsMngLogHisDTO ;
18     private ApplicationContext context ;
19     private Map channelMap;
20     
21     public MngRuleMatch(BtrsMngLogHisDTO btrsMngLogHisDTO ,CountDown clock ,ApplicationContext context, Map channelMap){
22         this.btrsMngLogHisDTO = btrsMngLogHisDTO;
23         this.clock =clock;
24         this.context =context;
25         this.channelMap = channelMap;
26     }
27     public void run(){
28         try{
29             BtrsMngLogHisDTO dto = btrsMngLogHisDTO;
30 
31             BtrsMngLogRuleMatch btrsMngLogRuleMatch=new BtrsMngLogRuleMatch(dto,context,channelMap);
32             btrsMngLogRuleMatch.match();  
33 
34         }catch(Exception e){
35             e.printStackTrace();
36         }finally{
37             clock.release();
38         } 
39     }
40     
41 }

5:線程處理類

  1 package xxx;
  2 
  3 import java.lang.reflect.InvocationTargetException;
  4 import java.math.BigDecimal;
  5 import java.util.HashMap;
  6 import java.util.Map;
  7 
  8 import org.apache.commons.beanutils.BeanUtils;
  9 import org.apache.log4j.Logger;
 10 
 11 import com.paic.btds.common.util.BizAppContextObjectNames;
 12 import com.paic.btds.para.biz.bo.SysParamBO;
 13 import com.paic.btds.para.common.dto.SysParamDTO;
 14 import com.paic.btds.sysServers.common.dto.BtrsMngLogHisDTO;
 15 import com.paic.btds.sysServers.common.dto.BtrsTrsAllDTO;
 16 import com.paic.btds.sysServers.common.dto.BtrsTrsRuleDTO;
 17 import com.paic.btds.sysServers.web.util.SqlMappingName;
 18 import com.paic.paces.common.dao.ExecuteSqlDAO;
 19 import com.paic.pafa.app.biz.service.BusinessServiceException;
 20 import com.paic.pafa.app.integration.dao.PafaDAOException;
 21 import com.paic.pafa.app.lwc.core.beans.BeansException;
 22 import com.paic.pafa.app.lwc.core.context.ApplicationContext;
 23 import com.paic.pafa.app.lwc.core.util.DevLog;
 24 /**
 25  * 管理交易歷史流水錶的規則匹配服務的交易分發
 26  *
 27  *
 28  */
 29 public class BtrsMngLogRuleMatch {
 30 
 31     private ApplicationContext context;
 32     
 33     static Logger logger = Logger.getLogger(BtrsMngLogRuleMatch.class.getName());
 34     
 35     private ExecuteSqlDAO executeSqlDao;
 36     
 37     private Map channelMap;
 38     /**
 39      * 交易數據
 40      */
 41     private BtrsMngLogHisDTO btrsMngLogHisDTO = null;   
 42 
 43     public BtrsMngLogRuleMatch(BtrsMngLogHisDTO btrsMngLogHisDTO,ApplicationContext context, Map channelMap) {
 44         this.btrsMngLogHisDTO=btrsMngLogHisDTO;
 45         this.context=context;
 46         this.executeSqlDao=(ExecuteSqlDAO) context.getBean(BizAppContextObjectNames.EXECUTE_SQL_DAO);
 47         this.channelMap = channelMap;
 48     }
 49 
 50     public void match() {
 51         try {
 52             BtrsTrsAllDTO btrsTrsAllDTO = new BtrsTrsAllDTO();
 53             BtrsTrsRuleDTO btrsTrsRuleDTO = new BtrsTrsRuleDTO();
 54             BeanUtils.copyProperties(btrsTrsAllDTO,
 55                     btrsMngLogHisDTO);
 56             BeanUtils.copyProperties(btrsTrsRuleDTO,
 57                     btrsMngLogHisDTO);
 58             String amtTrans = btrsMngLogHisDTO.getTransAmt();
 59             if(amtTrans != null && !"".equals(amtTrans)){
 60                 btrsTrsAllDTO.setAmtTrans(new BigDecimal(amtTrans).multiply(new BigDecimal("100"))
 61                         .toString());
 62                 btrsTrsRuleDTO.setAmtTrans(new BigDecimal(amtTrans).multiply(new BigDecimal("100"))
 63                         .toString());
 64             }
 65             
 66             btrsTrsAllDTO.setPrimaryAcctNum(btrsMngLogHisDTO
 67                     .getCardNum());
 68             btrsTrsAllDTO.setSystemDate(btrsMngLogHisDTO
 69                     .getTrsSysDate());
 70             btrsTrsAllDTO.setSystemTime(btrsMngLogHisDTO
 71                     .getTrsSysTime());
 72             btrsTrsAllDTO.setSystemSsn(btrsMngLogHisDTO.getSystemNo());
 73             
 74             btrsTrsAllDTO.setTranDate(btrsTrsAllDTO.getSystemDate()+btrsTrsAllDTO.getSystemTime());
 75             
 76             btrsTrsRuleDTO.setPrimaryAcctNum(btrsMngLogHisDTO
 77                     .getCardNum());
 78             btrsTrsRuleDTO.setSystemDate(btrsMngLogHisDTO
 79                     .getTrsSysDate());
 80             btrsTrsRuleDTO.setSystemTime(btrsMngLogHisDTO
 81                     .getTrsSysTime());
 82             btrsTrsRuleDTO.setSystemSsn(btrsMngLogHisDTO.getSystemNo());
 83             
 84             btrsTrsRuleDTO.setTranDate(btrsTrsRuleDTO.getSystemDate()+btrsTrsRuleDTO.getSystemTime());
 85             
 86             
 87             //獲取流水號
 88             String seqNumber = String.valueOf(executeSqlDao.queryForObject(SqlMappingName.GET_SEQ, null));
 89             btrsTrsAllDTO.setSqNumber(seqNumber);
 90             btrsTrsRuleDTO.setSqNumber(seqNumber);
 91             
 92             String subTxnCode = btrsMngLogHisDTO.getSubTxnCode();
 93             if(channelMap.containsKey(subTxnCode)){
 94                 String channelFlg = String.valueOf(channelMap.get(subTxnCode));
 95                 btrsTrsAllDTO.setChannelFlg(channelFlg);
 96                 btrsTrsRuleDTO.setChannelFlg(channelFlg);
 97             }else{
 98                 btrsTrsAllDTO.setChannelFlg("");
 99                 btrsTrsRuleDTO.setChannelFlg("");
100             }
101             if("2002010110".equals(subTxnCode) || "2002010140".equals(subTxnCode)){
102                 btrsTrsAllDTO.setCurrcyCodeTrans("");
103                 btrsTrsRuleDTO.setCurrcyCodeTrans("");
104             }
105             
106 
107             //獲取交易的聯機或離線的信息
108             String messagetypeid = btrsMngLogHisDTO.getMessagetypeid();
109             if(messagetypeid != null && !"".equals(messagetypeid)){
110                 btrsTrsAllDTO.setMessagetypeid(messagetypeid);
111                 btrsTrsRuleDTO.setMessagetypeid(messagetypeid);
112             }
113             //彙總到交易彙總表中
114             executeSqlDao.insert(SqlMappingName.INSERT_BTRSTRSALLFROMMNG, btrsTrsAllDTO);
115             
116             if(channelMap.containsKey(subTxnCode)){
117                 //抽取到交易規則表中
118                 executeSqlDao.insert(SqlMappingName.INSERT_BTRSTRSRULEFROMMNG, btrsTrsRuleDTO);
119                 
120                 Map map = new HashMap();
121                 map.put("KEY", seqNumber);
122                 map.put("MATCH_RS", null);
123                 map.put("ERR_MSG", null);
124                 map.put("V_MATCH_PRIORITY", null);
125                 map.put("NRETCDE", null);
126                 map.put("V_MATCH_RULE_ID", null);
127                 SysParamBO sysParamBO = (SysParamBO)context.getBean(BizAppContextObjectNames.SYSPARAM_BO);
128                 SysParamDTO sysParamDTO = new SysParamDTO();
129                 sysParamDTO.setParamName("engine_switch");
130                 sysParamDTO.setSysFlag("S");
131                 SysParamDTO dto = sysParamBO.selectSysParamByName(sysParamDTO);
132                 if(dto != null){
133                     String engineSwitch = dto.getParamValue();
134                     if("true".equals(engineSwitch)){
135                         //調用規則引擎
136                         DevLog.debug("管理交易歷史流水調用規則引擎。。。");
137                         executeSqlDao.callProc(SqlMappingName.ANALYS_NEW_SQL_RULE, map);
138                         String matchRS = (String)map.get("MATCH_RS");
139                         String errMsg =(String)map.get("ERR_MSG");
140                         BigDecimal nretCde =(BigDecimal)map.get("NRETCDE");
141                         String vMatchRuleId = (String)map.get("V_MATCH_RULE_ID");
142                         if (null!=matchRS && "1".equals(matchRS) && vMatchRuleId != null && !"".equals(vMatchRuleId))
143                             DevLog.info("交易"+seqNumber+"匹配成功!");
144                         else if(nretCde != null)
145                             DevLog.info("交易"+seqNumber+"匹配失敗!緣由以下:"+nretCde.doubleValue() + errMsg);
146                     }
147                 }
148             }
149         } catch (PafaDAOException e) {
150             logger.error(e.getMessage());
151         } catch (BeansException e) {
152             logger.error(e.getMessage());
153         } catch (BusinessServiceException e) {
154             logger.error(e.getMessage());
155         } catch (IllegalAccessException e) {
156             logger.error(e.getMessage());
157         } catch (InvocationTargetException e) {
158             logger.error(e.getMessage());
159         } catch (Exception e){
160             logger.error(e.getMessage());
161         }
162         
163         //更新交易狀態
164         try {
165             executeSqlDao.update(SqlMappingName.UPDATE_BTRSMNGLOGHISSTATUS, btrsMngLogHisDTO);
166         } catch (PafaDAOException e) {
167             logger.error(e.getMessage());
168         }
169             
170     }
171     
172     
173 }
相關文章
相關標籤/搜索