多線程應用--Http請求阻塞回調處理

1.需求描述

1.1場景說明:

因爲,微信端的業務需求量愈來愈大.將業務與微信第三方事件處理耦合在一塊兒的單一項目的結構已經逐漸暴露出,承載能力不足的缺點.因此,須要將與微信的交互從業務邏輯中分離出,單獨進行管理和處理.
這樣作有如下幾點好處:java

  1. 能夠達到業務解耦分離.
  2. 能夠爲業務系統微服務化作準備.
  3. 能夠在解耦後針對性的對不一樣業務系統進行優化.
  4. 減小業務系統錯誤的影響面.

1.2技術難點說明:

微信中的經過http調用客戶配置的回調地址的方式來進行事件通知. 事件通知分爲兩種類型:git

  1. 發送http請求和數據之後,客戶服務器默認回覆success字符串.而後,業務系統業務處理完成後經過指定的http地址通知微信方
  2. 發送http請求和數據的同一個請求中須要客戶服務器在response中返回業務處理的結果.

因爲,咱們已經將事件通知從主業務系統中抽離出.因此,微信事件管理系統會在收到微信事件通知之後,經過mq的方式進行發佈事件.可是,事件通知的第二種類型須要在一個http請求中將業務處理數據帶回微信方.此時,就須要微信事件管理系統阻塞微信方發送來的http請求,直到業務系統處理完業務數據並返回給系統.
這樣,咱們就須要有一個靈活,可靠的容器對被阻塞的微信方請求進行管理.github

2.理論基礎

2.1Future多線程模型:

2.1.1模型介紹:

future多線程模型,是一種比較經常使用的多線程阻塞回調的模式.具體邏輯結構以下圖所示:web

future時序圖.png

具體處理邏輯是這樣的.當一個請求發送到一個future模式的入口之後,此時這個線程是阻塞的.這時future的線程會進行後面的回調業務,或者是直接開始等待.直到,其餘線程喚醒future線程或者future等待超時.此時,future線程喚醒,而後將具體的結果返回給調用線程.spring

2.2線程間通訊:

2.2.1wait,notify,notifyAll

線程間通訊有不少種方式,此次用到的是比較簡單的wait notify notifyall組合.這3個方法是object類中的3個方法.他們控制的目標是對於這個實例的控制.因此,須要線程在獲取到這個對象操做的monitor之後才能控制.通常使用的方法是經過synchronized關鍵字獲取對象鎖再調用這3個方法.若是,沒有在同步代碼塊中執行,這時候java會報IllegalMonitorStateException異常.這樣主要是爲了控制當同一個對象實例被多個線程佔用之後的操做問題.能夠避免不一樣步的狀況產生.服務器

2.2.1.1wait

wait方法主要是用來將這個對象實例上的當前線程進行掛起.能夠輸入timeout時間,超過timeout時間之後線程會自動喚醒微信

2.2.1.2notify

notify方法用來喚醒在對應的對象實例上休眠的線程,可是須要注意的是,這個是非公平的.具體喚醒哪個線程由jvm自行決定多線程

2.2.1.3notifyall

notifyall方法顧名思義,是將在這個實例對象上全部掛起的線程喚醒.app

3.實現思路:

  1. 容錯能力:因爲須要給多個業務服務提供消息分發,消息回覆.須要有業務系統超時的處理能力.因此,提供的阻塞服務都會有timeout設定.
  2. 持續服務能力:咱們須要提供持續穩定的服務.在項目中.對阻塞的請求會有一個溢出的管理.若是超出某個最大值,先入的請求就會被直接返回默認值.因此,在業務服務中須要自行處理冪等的問題,避免業務處理完成後,可是因爲溢出致使業務處理失敗.這樣就會致使業務服務數據或者業務出現問題

4.具體實現:

ThreadHolder(消息載體):jvm

import lombok.Data;

import java.util.concurrent.Callable;

/** * <p> * Description: com.javanewb.service * </p> * date:2017/10/31 * * @author Dean.Hwang */
@Data
public abstract class ThreadHolder<T> implements Callable<T> {
    protected abstract T proData();//TODO 正常邏輯處理,以及默認數據返回

    private T defaultData;//返回的默認數據
    private Object needProData;//接受到須要處理的數據
    private Long createTime = System.currentTimeMillis();
    private Long maxWaitTime;
    private String mdc;
    private RequestHolder<T> holder;

    @Override
    public T call() throws Exception {
        waitThread();
        System.out.println("Thread mdc:" + mdc + " notify");
        if (needProData == null) {
            holder.removeThread(mdc, false);
            return defaultData;
        }
        return proData();
    }

    public synchronized void waitThread() throws InterruptedException {
        this.wait(maxWaitTime);
    }

    public synchronized void notifyThread(Object needProData) {
        this.needProData = needProData;
        this.notify();
    }

    public synchronized void notifyDefault() {
        this.notify();
    }
}
複製代碼

RequestHolder(請求管理容器):

import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/** * <p> * Description: com.javanewb.entity * </p> * date:2017/10/26 * * @author Dean.Hwang */
public class RequestHolder<T> {
    private Integer maxSize;
    private Long waitTime;

    public RequestHolder(Integer maxSize, Long maxWait, ExecutorService executorService) {
        if (maxSize > 1000) {
            throw new BusinessException(1022, "Bigger than max size num");
        }
        this.maxSize = maxSize;
        this.waitTime = maxWait;
        if (executorService != null) {
            this.executorService = executorService;
        } else {
            this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
        }
    }

    public RequestHolder(Integer maxSize, Long maxWait) {
        if (maxSize > 1000) {
            throw new BusinessException(1022, "Bigger than max size num");
        }
        this.waitTime = maxWait;
        this.maxSize = maxSize;
        this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
    }

    private ExecutorService executorService;
    private final Map<String, ThreadHolder<T>> holderMap = new ConcurrentHashMap<>();
    private List<String> mdcOrderList = new CopyOnWriteArrayList<>();
    private AtomicBoolean isCleaning = new AtomicBoolean(false);

    public ThreadHolder<T> removeThread(String mdc, boolean needNotifyDefault) {
        mdcOrderList.remove(mdc);
        ThreadHolder<T> holder;
        synchronized (holderMap) {
            holder = holderMap.get(mdc);
            holderMap.remove(mdc);
        }
        if (holder != null && needNotifyDefault) {
            holder.notifyDefault();
        }
        return holder;
    }

    public void notifyThread(String mdc, Object data) {
        ThreadHolder<T> holder = removeThread(mdc, false);
        if (holder != null) {
            holder.notifyThread(data);
        }
    }


    public Future<T> getFuture(String mdcStr, Class<? extends ThreadHolder<T>> holder) {
        if (StringUtil.isEmpty(mdcStr) || holder == null) {
            throw new BusinessException(1020, "Mdc target missing!!!");
        }
        Future<T> future;
        try {
            ThreadHolder<T> thread = holder.newInstance();
            holderMap.put(mdcStr, thread);
            mdcOrderList.add(mdcStr);
            thread.setMaxWaitTime(waitTime);
            thread.setMdc(mdcStr);
            thread.setHolder(this);
            future = executorService.submit(thread);
            cleanThreadPool();
        } catch (InstantiationException | IllegalAccessException e) {
            holderMap.remove(mdcStr);
            mdcOrderList.remove(mdcStr);
            throw new BusinessException(1021, "Thread Holder initialized failed");
        }
        return future;
    }

    private void cleanThreadPool() {
        if (mdcOrderList.size() >= maxSize && isCleaning.compareAndSet(false, true)) {

            try {
                mdcOrderList.subList(0, mdcOrderList.size() - maxSize).forEach(//看測試效率,看是否用並行stream處理
                        mdc -> removeThread(mdc, true)
                );
            } finally {
                isCleaning.set(false);
            }
        }
    }
}
複製代碼

TestController(測試入口):

import com.javanewb.entity.TestThreadHolder;
import com.javanewb.thread.tools.RequestHolder;
import com.keruyun.portal.common.filter.LoggerMDCFilter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/** * <p> * Description: com.javanewb.controller * </p> * <p> * Copyright: Copyright (c) 2015 * </p> * <p> * </p> * date:2017/10/25 * * @author Dean.Hwang */
@Api
@RestController
@Slf4j
public class TestController {
    private RequestHolder<String> holder = new RequestHolder<>(100, 500000L);
    private List<String> mdcList = new ArrayList<>();

    @ApiOperation(value = "請求同步測試", notes = "請求同步測試")
    @RequestMapping(value = "/async", method = RequestMethod.GET)
    public void async(HttpServletRequest request, HttpServletResponse response, String id) {
        Long startTime = System.currentTimeMillis();
        String mdc = MDC.get(LoggerMDCFilter.IDENTIFIER);
        mdcList.add(mdc);
        Future<String> future = holder.getFuture(id, TestThreadHolder.class);
        log.info(Thread.currentThread().getName());
        try {
            System.out.println(mdc + " Thread Wait");
            String result = future.get();
            response.getOutputStream().print(result);
            System.out.println(" time: " + (System.currentTimeMillis() - startTime));
        } catch (IOException | ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    @ApiOperation(value = "釋放list第一個", notes = "請求同步測試")
    @RequestMapping(value = "/notify", method = RequestMethod.GET)
    public String notifyFirst() {
        String mdc = mdcList.get(0);
        mdcList.remove(0);
        holder.notifyThread(mdc, "");
        return mdc;
    }

    @ApiOperation(value = "釋放list第一個", notes = "請求同步測試")
    @RequestMapping(value = "/notifyThis", method = RequestMethod.GET)
    public String notifyThis(String mdc) {
        int idx = 0;
        for (int i = 0; i < mdcList.size(); i++) {
            if (mdcList.get(i).equals(mdc)) {
                idx = i;
                break;
            }
        }
        mdcList.remove(idx);
        holder.notifyThread(mdc, "");
        return mdc;
    }
}
複製代碼

5.後話:

本項目會放在github上,若是有興趣,或者發現有bug須要處理的能夠直接從博客聯繫我,也能夠直接去github 地址:github.com/crowhyc/Thr… 郵箱:crowhyc@163.com

相關文章
相關標籤/搜索