消費隊列

package com.pingan.wifi.dahua.base;java


import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;spring

import com.pingan.wifi.dahua.biz.UpgradeQueueService;apache


/**
 * 隊列消費器
 * 
 * @author renpeng162
 * 
 */
@Component("upgradeQueue")
public class UpgradeQueue implements InitializingBean, DisposableBean {jvm

    private static final Log logger = LogFactory.getLog(UpgradeQueue.class);ide

    // 要消費的隊列名
    private String queueName = RedisKey.FUND_QUEUE_FIRST_BUG;this

    // 消費線程池大小
    private int threadPoolSize = 1;.net

    // 消費線程池
    private List<ConsumerThread> consumerThreads;
    
    @Autowired
    private UpgradeQueueService queueService;線程

    // 消費線程啓動延時
    private int startDelay = 10000;隊列


    /**
     * 在隊列消費器初始化時要先啓動線程池中的線程
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        consumerThreads = new ArrayList<ConsumerThread>();
        
        for (int i = 0; i < threadPoolSize; i++) {
            ConsumerThread t = new ConsumerThread();
            t.setName("pools-consumer-" + this.queueName + "-" + i);
            t.setDaemon(true); // 把這些消費線程設置成守護線程,方便jvm正常退出
            t.start();
            consumerThreads.add(t);
        }
    }get

    /**
     * 消費器銷燬時,把全部的消費線程也關閉掉
     */
    @Override
    public void destroy() throws Exception {
        try {
            for (ConsumerThread ct : consumerThreads) {
                ct.shutdown();
            }
        } catch (Exception t) {
            logger.warn(t);
        }
    }

    private class UpgradeConsumerImpl implements UpgradeQueueConsumer {
        private volatile boolean status = true;

        @Override
        public boolean getStatus() {
            return this.status;
        }

        @Override
        public void setStatus(boolean status) {
            this.status = status;
        }

    }

    /**
     * 消費線程類定義
     * 
     * @author EX-ZHAOXIANGTAO001
     * 
     */
    private class ConsumerThread extends Thread {

        private UpgradeQueueConsumer msgConsumer = new UpgradeConsumerImpl();

        public ConsumerThread() {

        }

        @Override
        public void run() {
            logger.info("首先保證線程可以啓動===========================狀態:"+msgConsumer.getStatus());
            if (startDelay > 0) {
                try {
                    Thread.sleep(startDelay); // 先使線程掛起給定時間,已等待系統其它部分先正常啓動
                } catch (InterruptedException e) {
                    logger.warn(e);
                }
            }
            queueService.pullMsg(queueName,msgConsumer);
        }

        public void shutdown() {
            this.msgConsumer.setStatus(false);
        }
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public int getThreadPoolSize() {
        return threadPoolSize;
    }

    public void setThreadPoolSize(int threadPoolSize) {
        this.threadPoolSize = threadPoolSize;
    }

    public int getStartDelay() {
        return startDelay;
    }

    public void setStartDelay(int startDelay) {
        this.startDelay = startDelay;
    }

}  

相關文章
相關標籤/搜索