因爲,微信端的業務需求量愈來愈大.將業務與微信第三方事件處理耦合在一塊兒的單一項目的結構已經逐漸暴露出,承載能力不足的缺點.因此,須要將與微信的交互從業務邏輯中分離出,單獨進行管理和處理.
這樣作有如下幾點好處:java
微信中的經過http調用客戶配置的回調地址的方式來進行事件通知. 事件通知分爲兩種類型:git
因爲,咱們已經將事件通知從主業務系統中抽離出.因此,微信事件管理系統會在收到微信事件通知之後,經過mq的方式進行發佈事件.可是,事件通知的第二種類型須要在一個http請求中將業務處理數據帶回微信方.此時,就須要微信事件管理系統阻塞微信方發送來的http請求,直到業務系統處理完業務數據並返回給系統.
這樣,咱們就須要有一個靈活,可靠的容器對被阻塞的微信方請求進行管理.github
future多線程模型,是一種比較經常使用的多線程阻塞回調的模式.具體邏輯結構以下圖所示:web
具體處理邏輯是這樣的.當一個請求發送到一個future模式的入口之後,此時這個線程是阻塞的.這時future的線程會進行後面的回調業務,或者是直接開始等待.直到,其餘線程喚醒future線程或者future等待超時.此時,future線程喚醒,而後將具體的結果返回給調用線程.spring
線程間通訊有不少種方式,此次用到的是比較簡單的wait notify notifyall組合.這3個方法是object類中的3個方法.他們控制的目標是對於這個實例的控制.因此,須要線程在獲取到這個對象操做的monitor之後才能控制.通常使用的方法是經過synchronized關鍵字獲取對象鎖再調用這3個方法.若是,沒有在同步代碼塊中執行,這時候java會報IllegalMonitorStateException異常.這樣主要是爲了控制當同一個對象實例被多個線程佔用之後的操做問題.能夠避免不一樣步的狀況產生.服務器
wait方法主要是用來將這個對象實例上的當前線程進行掛起.能夠輸入timeout時間,超過timeout時間之後線程會自動喚醒微信
notify方法用來喚醒在對應的對象實例上休眠的線程,可是須要注意的是,這個是非公平的.具體喚醒哪個線程由jvm自行決定多線程
notifyall方法顧名思義,是將在這個實例對象上全部掛起的線程喚醒.app
ThreadHolder(消息載體):jvm
import lombok.Data;
import java.util.concurrent.Callable;
/** * <p> * Description: com.javanewb.service * </p> * date:2017/10/31 * * @author Dean.Hwang */
@Data
public abstract class ThreadHolder<T> implements Callable<T> {
protected abstract T proData();//TODO 正常邏輯處理,以及默認數據返回
private T defaultData;//返回的默認數據
private Object needProData;//接受到須要處理的數據
private Long createTime = System.currentTimeMillis();
private Long maxWaitTime;
private String mdc;
private RequestHolder<T> holder;
@Override
public T call() throws Exception {
waitThread();
System.out.println("Thread mdc:" + mdc + " notify");
if (needProData == null) {
holder.removeThread(mdc, false);
return defaultData;
}
return proData();
}
public synchronized void waitThread() throws InterruptedException {
this.wait(maxWaitTime);
}
public synchronized void notifyThread(Object needProData) {
this.needProData = needProData;
this.notify();
}
public synchronized void notifyDefault() {
this.notify();
}
}
複製代碼
RequestHolder(請求管理容器):
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/** * <p> * Description: com.javanewb.entity * </p> * date:2017/10/26 * * @author Dean.Hwang */
public class RequestHolder<T> {
private Integer maxSize;
private Long waitTime;
public RequestHolder(Integer maxSize, Long maxWait, ExecutorService executorService) {
if (maxSize > 1000) {
throw new BusinessException(1022, "Bigger than max size num");
}
this.maxSize = maxSize;
this.waitTime = maxWait;
if (executorService != null) {
this.executorService = executorService;
} else {
this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
}
}
public RequestHolder(Integer maxSize, Long maxWait) {
if (maxSize > 1000) {
throw new BusinessException(1022, "Bigger than max size num");
}
this.waitTime = maxWait;
this.maxSize = maxSize;
this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
}
private ExecutorService executorService;
private final Map<String, ThreadHolder<T>> holderMap = new ConcurrentHashMap<>();
private List<String> mdcOrderList = new CopyOnWriteArrayList<>();
private AtomicBoolean isCleaning = new AtomicBoolean(false);
public ThreadHolder<T> removeThread(String mdc, boolean needNotifyDefault) {
mdcOrderList.remove(mdc);
ThreadHolder<T> holder;
synchronized (holderMap) {
holder = holderMap.get(mdc);
holderMap.remove(mdc);
}
if (holder != null && needNotifyDefault) {
holder.notifyDefault();
}
return holder;
}
public void notifyThread(String mdc, Object data) {
ThreadHolder<T> holder = removeThread(mdc, false);
if (holder != null) {
holder.notifyThread(data);
}
}
public Future<T> getFuture(String mdcStr, Class<? extends ThreadHolder<T>> holder) {
if (StringUtil.isEmpty(mdcStr) || holder == null) {
throw new BusinessException(1020, "Mdc target missing!!!");
}
Future<T> future;
try {
ThreadHolder<T> thread = holder.newInstance();
holderMap.put(mdcStr, thread);
mdcOrderList.add(mdcStr);
thread.setMaxWaitTime(waitTime);
thread.setMdc(mdcStr);
thread.setHolder(this);
future = executorService.submit(thread);
cleanThreadPool();
} catch (InstantiationException | IllegalAccessException e) {
holderMap.remove(mdcStr);
mdcOrderList.remove(mdcStr);
throw new BusinessException(1021, "Thread Holder initialized failed");
}
return future;
}
private void cleanThreadPool() {
if (mdcOrderList.size() >= maxSize && isCleaning.compareAndSet(false, true)) {
try {
mdcOrderList.subList(0, mdcOrderList.size() - maxSize).forEach(//看測試效率,看是否用並行stream處理
mdc -> removeThread(mdc, true)
);
} finally {
isCleaning.set(false);
}
}
}
}
複製代碼
TestController(測試入口):
import com.javanewb.entity.TestThreadHolder;
import com.javanewb.thread.tools.RequestHolder;
import com.keruyun.portal.common.filter.LoggerMDCFilter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/** * <p> * Description: com.javanewb.controller * </p> * <p> * Copyright: Copyright (c) 2015 * </p> * <p> * </p> * date:2017/10/25 * * @author Dean.Hwang */
@Api
@RestController
@Slf4j
public class TestController {
private RequestHolder<String> holder = new RequestHolder<>(100, 500000L);
private List<String> mdcList = new ArrayList<>();
@ApiOperation(value = "請求同步測試", notes = "請求同步測試")
@RequestMapping(value = "/async", method = RequestMethod.GET)
public void async(HttpServletRequest request, HttpServletResponse response, String id) {
Long startTime = System.currentTimeMillis();
String mdc = MDC.get(LoggerMDCFilter.IDENTIFIER);
mdcList.add(mdc);
Future<String> future = holder.getFuture(id, TestThreadHolder.class);
log.info(Thread.currentThread().getName());
try {
System.out.println(mdc + " Thread Wait");
String result = future.get();
response.getOutputStream().print(result);
System.out.println(" time: " + (System.currentTimeMillis() - startTime));
} catch (IOException | ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@ApiOperation(value = "釋放list第一個", notes = "請求同步測試")
@RequestMapping(value = "/notify", method = RequestMethod.GET)
public String notifyFirst() {
String mdc = mdcList.get(0);
mdcList.remove(0);
holder.notifyThread(mdc, "");
return mdc;
}
@ApiOperation(value = "釋放list第一個", notes = "請求同步測試")
@RequestMapping(value = "/notifyThis", method = RequestMethod.GET)
public String notifyThis(String mdc) {
int idx = 0;
for (int i = 0; i < mdcList.size(); i++) {
if (mdcList.get(i).equals(mdc)) {
idx = i;
break;
}
}
mdcList.remove(idx);
holder.notifyThread(mdc, "");
return mdc;
}
}
複製代碼
本項目會放在github上,若是有興趣,或者發現有bug須要處理的能夠直接從博客聯繫我,也能夠直接去github 地址:github.com/crowhyc/Thr… 郵箱:crowhyc@163.com