javaWeb 使用線程池+隊列解決"訂單併發"問題

遇到問題:java

最近作微信支付,項目上線一陣,發現一個問題。有一條訂單流水竟然在數據庫的出現兩次。這個問題很是嚴重。spring

查看微信回調系統的接口代碼發現代碼是沒錯的(正常狀況下),而此次遇到非正常狀況了數據庫

緣由:微信支付成功後回調咱們系統接口在極短期回調了2次,微信官方文檔說明了,是最短15s回調一次。瀏覽器

前幾天微信支付抽風了,可能業務出現了波動。緩存

簡單來講就是在併發狀況下沒有作數據惟一性處理,無論怎麼樣這類併發狀況都是有必要的處理。tomcat

 

 

解決方式:使用線程池+隊列服務器

項目基於Spring,若是不用spring須要本身把微信

ThreadPoolManager.java

改爲單例模式併發

 

1.寫一個Controller(Spring mvc)mvc

/**
 * @author HeyS1
 * @date 2016/12/1
 * @description
 */
@Controller
public class ThreadPoolController {
    @Autowired
    ThreadPoolManager tpm;

    @RequestMapping("/pool")
    public
    @ResponseBody
    Object test() {
        for (int i = 0; i < 500; i++) {
            //模擬併發500條記錄
            tpm.processOrders(Integer.toString(i));
        }

        return "ok";
    }
}

 

2.線程池管理

/**
 * @author HeyS1
 * @date 2016/12/1
 * @description threadPool訂單線程池, 處理訂單
 * scheduler 調度線程池 用於處理訂單線程池因爲超出線程範圍和隊列容量而不能處理的訂單
 */
@Component
public class ThreadPoolManager implements BeanFactoryAware {
    private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class);
    private BeanFactory factory;//用於從IOC裏取對象
    // 線程池維護線程的最少數量
    private final static int CORE_POOL_SIZE = 2;
    // 線程池維護線程的最大數量
    private final static int MAX_POOL_SIZE = 10;
    // 線程池維護線程所容許的空閒時間
    private final static int KEEP_ALIVE_TIME = 0;
    // 線程池所使用的緩衝隊列大小
    private final static int WORK_QUEUE_SIZE = 50;
    // 消息緩衝隊列
    Queue<Object> msgQueue = new LinkedList<Object>();

    //用於儲存在隊列中的訂單,防止重複提交
    Map<String, Object> cacheMap = new ConcurrentHashMap<>();

    //因爲超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程序
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //System.out.println("太忙了,把該訂單交給調度線程池逐一處理" + ((DBThread) r).getMsg());
            msgQueue.offer(((DBThread) r).getMsg());
        }
    };

    // 訂單線程池
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
            TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);

    // 調度線程池。此線程池支持定時以及週期性執行任務的需求。
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

    // 訪問消息緩存的調度線程,每秒執行一次
    // 查看是否有待定請求,若是有,則建立一個新的AccessDBThread,並添加到線程池中
    final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            if (!msgQueue.isEmpty()) {
                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                    System.out.print("調度:");
                    String orderId = (String) msgQueue.poll();
                    DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
                    accessDBThread.setMsg(orderId);
                    threadPool.execute(accessDBThread);
                }
                // while (msgQueue.peek() != null) {
                // }
            }
        }
    }, 0, 1, TimeUnit.SECONDS);

    //終止訂單線程池+調度線程池
    public void shutdown() {
        //true表示若是定時任務在執行,當即停止,false則等待任務結束後再中止
        System.out.println(taskHandler.cancel(false));
        scheduler.shutdown();
        threadPool.shutdown();
    }

    public Queue<Object> getMsgQueue() {
        return msgQueue;
    }


    //將任務加入訂單線程池
    public void processOrders(String orderId) {
        if (cacheMap.get(orderId) == null) {
            cacheMap.put(orderId,new Object());
            DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
            accessDBThread.setMsg(orderId);
            threadPool.execute(accessDBThread);
        }
    }


    //BeanFactoryAware
    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        factory = beanFactory;
    }
}

3.線程池中工做的線程

//線程池中工做的線程
@Component
@Scope("prototype")//spring 多例
public class DBThread implements Runnable {
    private String msg;
    private Logger log = LoggerFactory.getLogger(DBThread.class);

    @Autowired
    SystemLogService systemLogService;



    @Override
    public void run() {
        //模擬在數據庫插入數據
        Systemlog systemlog = new Systemlog();
        systemlog.setTime(new Date());
        systemlog.setLogdescribe(msg);
        //systemLogService.insert(systemlog);
        log.info("insert->" + msg);
    }


    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

 

瀏覽器輸入地址127.0.0.1/pool

幾秒後關閉tomcat。

模擬500條數據,訂單線程池處理了117條。調度線程池處理5條

關閉tomcat,後還有378條未處理(這裏的實現須要用到spring監聽器)。加起來一共500

OK。完畢

spring監聽器,監聽tomcat關閉事件:

public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {

    @Autowired
    ThreadPoolManager threadPoolManager;

    @Override
    public void onApplicationEvent(ApplicationEvent event) {

        if (event instanceof ContextClosedEvent) {
            XmlWebApplicationContext x = (XmlWebApplicationContext) event.getSource();
            //防止執行兩次。root application context 沒有parent,他就是老大
            if (x.getDisplayName().equals("Root WebApplicationContext")) {
                threadPoolManager.shutdown();
                Queue q = threadPoolManager.getMsgQueue();
                System.out.println("關閉了服務器,還有未處理的信息條數:" + q.size());
            }


        } else if (event instanceof ContextRefreshedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已發生!");
        } else if (event instanceof ContextStartedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已發生!");
        } else if (event instanceof ContextStoppedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已發生!");
        } else {
//            System.out.println("有其它事件發生:"+event.getClass().getName());
        }
    }
}

spring配置一下

<bean id="springStartListener" class="com.temp.MyApplicationListener"></bean>
相關文章
相關標籤/搜索