package com.pingan.wifi.dahua.base;java
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;spring
import com.pingan.wifi.dahua.biz.UpgradeQueueService;apache
/**
* 隊列消費器
*
* @author renpeng162
*
*/
@Component("upgradeQueue")
public class UpgradeQueue implements InitializingBean, DisposableBean {jvm
private static final Log logger = LogFactory.getLog(UpgradeQueue.class);ide
// 要消費的隊列名
private String queueName = RedisKey.FUND_QUEUE_FIRST_BUG;this
// 消費線程池大小
private int threadPoolSize = 1;.net
// 消費線程池
private List<ConsumerThread> consumerThreads;
@Autowired
private UpgradeQueueService queueService;線程
// 消費線程啓動延時
private int startDelay = 10000;隊列
/**
* 在隊列消費器初始化時要先啓動線程池中的線程
*/
@Override
public void afterPropertiesSet() throws Exception {
consumerThreads = new ArrayList<ConsumerThread>();
for (int i = 0; i < threadPoolSize; i++) {
ConsumerThread t = new ConsumerThread();
t.setName("pools-consumer-" + this.queueName + "-" + i);
t.setDaemon(true); // 把這些消費線程設置成守護線程,方便jvm正常退出
t.start();
consumerThreads.add(t);
}
}get
/**
* 消費器銷燬時,把全部的消費線程也關閉掉
*/
@Override
public void destroy() throws Exception {
try {
for (ConsumerThread ct : consumerThreads) {
ct.shutdown();
}
} catch (Exception t) {
logger.warn(t);
}
}
private class UpgradeConsumerImpl implements UpgradeQueueConsumer {
private volatile boolean status = true;
@Override
public boolean getStatus() {
return this.status;
}
@Override
public void setStatus(boolean status) {
this.status = status;
}
}
/**
* 消費線程類定義
*
* @author EX-ZHAOXIANGTAO001
*
*/
private class ConsumerThread extends Thread {
private UpgradeQueueConsumer msgConsumer = new UpgradeConsumerImpl();
public ConsumerThread() {
}
@Override
public void run() {
logger.info("首先保證線程可以啓動===========================狀態:"+msgConsumer.getStatus());
if (startDelay > 0) {
try {
Thread.sleep(startDelay); // 先使線程掛起給定時間,已等待系統其它部分先正常啓動
} catch (InterruptedException e) {
logger.warn(e);
}
}
queueService.pullMsg(queueName,msgConsumer);
}
public void shutdown() {
this.msgConsumer.setStatus(false);
}
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public int getThreadPoolSize() {
return threadPoolSize;
}
public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}
public int getStartDelay() {
return startDelay;
}
public void setStartDelay(int startDelay) {
this.startDelay = startDelay;
}
}