1.總體設計
總體分爲業務服務與管理端兩大部分,包括了三個主要功能點:
(1)簡化線程池配置:corePoolSize、maximumPoolSize,workQueue,它們最大程度地決定了線程池的任務分配和線程分配策略。考慮到在實際應用中咱們獲取併發性的場景主要是兩種:(1)並行執行子任務,提升響應速度。這種狀況下,應該使用同步隊列,沒有什麼任務應該被緩存下來,而是應該當即執行。(2)並行執行大批次任務,提高吞吐量。這種狀況下,應該使用有界隊列,使用隊列去緩衝大批量的任務,隊列容量必須聲明,防止任務無限制堆積。因此線程池只須要提供這三個關鍵參數的配置,而且提供兩種隊列的選擇,就能夠知足絕大多數的業務需求
(2)參數可動態配置: 在java線程池留有的基礎上,封裝線程池、容許線程池根據外部配置變化,進行修改配置,並在管理端開發控制界面方便開發;
(3)增長線程池監控:在線程池執行任務的整個生命週期添加監控能力,如當前活躍任務數、執行任務發生的異常數、任務隊列大小等指標,方便相關開發瞭解線程池狀態;
![](http://static.javashuo.com/static/loading.gif)
2.整體架構設計
動態調參:提供管理界面,支持線程池參數動態調整,包括線程池核心線程數大小,最大線程數大小,緩衝隊列長度等;參數修改後及時生效
任務監控:支持應用粒度、線程池粒度、任務粒度的事務監控,能夠查看線程池的任務執行狀況,最大任務執行時間,平均任務執行時間
負載告警:支持告警規則配置,當超過閾值時會通知相關的開發負責人
操做監控與日誌:管理段配置操做接入審計日誌
權限校驗:不一樣應用用戶
![](http://static.javashuo.com/static/loading.gif)
3.主要業務流程
主要可分爲應用(客戶端)、管理端兩大部分,使用mysql、es數據庫,其中監控頁面使用kibana可視化工具配置,
線程池的配置頁面使用簡單前端完成;
![](http://static.javashuo.com/static/loading.gif)
4.部分核心代碼
4.1 客戶端實現
4.1.1 動態線程池定義與實現
因爲ThreadPoolExecutor只開放了maxCorePoolSize/corePoolSize/
只能實現BlockingQueue接口實現阻塞隊列,並開放隊列容量改變的方法,這樣set方法就能夠動態修改,其餘部分直接拷貝ArrayBlockingQueue的實現便可實現動態阻塞隊列;
//自定義鏈表阻塞隊列
public class PhxResizeLinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
private volatile int capacity;
//增長設置隊列容量接口
public void setCapacity(int capacity) {
this.capacity = capacity;
}
}複製代碼
除了動態修改後,還須要支持狀態上報、過載異常信息的上報、任務運行耗時統計等,因此還須要對重寫afterExecute方法對任務運行狀態監控,自定義RejectHandler監控拒絕動做,以下所示。
//重寫方法,監放任務運行狀態
public class PhxThreadPool extends ThreadPoolExecutor {
@Override
protected void afterExecute(Runnable r, Throwable throwable) {
LocalDateTime startTime = LocalDateTime.now();
super.afterExecute(r, throwable);
LocalDateTime endTime = LocalDateTime.now();
if (throwable == null && r instanceof Future) {
try {
((Future) r).get();
} catch (CancellationException ce) {
throwable = ce;
} catch (ExecutionException ee) {
throwable = ee.getCause();
} catch(InterruptedException ie){
Thread.currentThread().interrupt();
}
}
try {
if (throwable != null) {
publishEvent(EventEnums.POOL_RUNNABLE_EXECUTE_ERROR.name(), null);
} else {
Duration duration = Duration.between(startTime, endTime);
Integer costTime = Integer.parseInt(String.valueOf(duration.getSeconds()));
publishEvent(EventEnums.POOL_RUNTIME_STATISTICS.name(), costTime);
}
} catch (Exception ex){
}
}
//自定義拒絕策略,監聽並上報
public class PhxRejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
PhxThreadPool threadPool = (PhxThreadPool)executor;
List metaDataDTOS = new ArrayList<>();
...
publisher.publishEvent(metaDataDTOS);
}
}複製代碼
4.1.2 動態線程池註冊
爲了方便管理線程池實例,定義了線程池容器
public class ThreadPoolContainer {
private ConcurrentMap container = new ConcurrentHashMap<>();
public void put(String poolName, PhxThreadPool threadPool) {
container.putIfAbsent(poolName, threadPool);
}
public PhxThreadPool get(String poolName) {
return container.getOrDefault(poolName, null);
}
}
複製代碼
掃描與註冊線程時使用了bean的後置處理器處理器,對spring容器啓動時逐個掃描定義好的線程池實例,爲了避免阻塞啓動過程,並另起一條線程專門用來註冊。還須要將自定義的拒絕策略設置到線程池實例中。
public class PhxThreadPoolBeanPostProcessor implements BeanPostProcessor {
private final String url;
private PhxThreadPoolConfig config;
private ThreadPoolContainer container;
private RejectedExecutionHandler rejectHandler;
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public PhxThreadPoolBeanPostProcessor(PhxThreadPoolConfig config, ThreadPoolContainer container) {
this.config = config;
url = config.getAdminUrl() + "/meta/register";
this.container = container;
}
@Override
public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(final Object poolBean, final String beanName) throws BeansException {
if (poolBean instanceof PhxThreadPool) {
executorService.execute(() -> handler((PhxThreadPool) poolBean));
}
return poolBean;
}
private void handler(final PhxThreadPool poolBean) {
String poolName = poolBean.getName();
poolBean.setRejectedExecutionHandler(rejectHandler);
container.put(poolName, poolBean);
post(buildJsonParams(poolBean));
}
}複製代碼
4.1.3 線程池運行狀態信息上報
爲了加大消息吞吐量,最大程度節省應用的資源,在客戶端組件中使
用了消息隊列進行緩衝,其中包括了事件/事件發佈器/消息隊列/事件處理器幾大部分,
![](http://static.javashuo.com/static/loading.gif)
主要實現代碼以下:
//監控事件包裝類
public class MonitorEvent implements Serializable {
private List eventDTOs;
public void clear () {
eventDTOs = null; //使用後讓jvm主動gc回收
}
}
//事件
public class MonitorEventDTO {
private String appName;
private String ip;
private String poolName;
private Integer corePoolSize;
private Integer maxPoolSize;
private Integer queueSize;
private Integer queueCapacity;
}
//事件生成工廠類
public class MonitorEventFactory implements EventFactory {
@Override
public MonitorEvent newInstance() {
return new MonitorEvent();
}
}
//事件處理器
public class MonitorEventHandler implements EventHandler {
private PhxThreadPoolConfig config;
public MonitorEventHandler(PhxThreadPoolConfig config) {
this.config = config;
}
@Override
public void onEvent(MonitorEvent monitorEvent, long l, boolean b) throws Exception {
if (monitorEvent == null || monitorEvent.getEventDTOs() == null
|| monitorEvent.getEventDTOs().size() == 0) {
return;
}
String path = config.getAdminUrl() + "/meta/upload";
List eventDTOs = monitorEvent.getEventDTOs();
PhxSender.upload(path, eventDTOs);
monitorEvent.clear();
}
}
//消息發佈器
public class MonitorEventPublisher {
private Disruptor disruptor;
private MonitorEventHandler eventHandler;
private PhxThreadPoolConfig config;
public void publishEvent(final List events) {
final RingBuffer ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(new EventTranslator(), events);
}
public void destroy() {
disruptor.shutdown();
}
}
//消息轉換
public class EventTranslator implements EventTranslatorOneArg> {
@Override
public void translateTo(MonitorEvent event, long l, List monitorEventDTOS) {
event.setEventDTOs(monitorEventDTOS);
}
}
//發送
public class PhxSender {
public static void upload(String path, List metaDataDTOS) throws IOException {
if (metaDataDTOS != null && metaDataDTOS.size() > 0) {
String jsonBody = OkHttpTools.getInstance().getGosn().toJson(metaDataDTOS);
OkHttpTools.getInstance().post(path, jsonBody);
}
}
}
複製代碼
4.1.4 線程池動態配置
實現線程池的配置動態修改能夠有兩種方式,第一種方式是使用apollo客戶端監聽控制端發出的配置修改事件,而後根據事件響應,相似於「推模式「;第二種方式是在客戶端起一條線程,不停地請求控制端服務,而後更新本地的線程池配置,相似於「拉模式「。這裏採用第二種方式。
public class PhxClient {
public void startPullPoolConfig() {
pullConfigThread = new Thread(() -> {
while(!toStop) {
try {
String path = config.getAdminUrl() + "/meta/" + config.getAppName();
String response = OkHttpTools.getInstance().get(path);
//解析控制端返回的線程池配置信息
List pools = JSON.parseArray(response, ThreadPoolConfigDto.class);
if (pools != null && pools.size() > 0) {
pools.forEach(pool -> {
PhxThreadPool threadPool = container.get(pool.getPoolName());
threadPool.setCorePoolSize(pool.getCorePoolSize());
threadPool.setMaximumPoolSize(pool.getMaxPoolSize());
PhxResizeLinkedBlockingQueue queue = (PhxResizeLinkedBlockingQueue)threadPool.getQueue();
queue.setCapacity(pool.getQueueCapacity());
container.put(pool.getPoolName(), threadPool);
});
}
} catch (Exception ex) {
log.error("【線程池控制客戶端-拉取配置線程發生異常】exception:", ex);
if (!toStop) {
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}複製代碼
5.應用效果
5.1 負載監控與告警
![](http://static.javashuo.com/static/loading.gif)