動態線程池組件-設計與實現(二)

1.總體設計

        總體分爲業務服務與管理端兩大部分,包括了三個主要功能點:
(1)簡化線程池配置:corePoolSize、maximumPoolSize,workQueue,它們最大程度地決定了線程池的任務分配和線程分配策略。考慮到在實際應用中咱們獲取併發性的場景主要是兩種:(1)並行執行子任務,提升響應速度。這種狀況下,應該使用同步隊列,沒有什麼任務應該被緩存下來,而是應該當即執行。(2)並行執行大批次任務,提高吞吐量。這種狀況下,應該使用有界隊列,使用隊列去緩衝大批量的任務,隊列容量必須聲明,防止任務無限制堆積。因此線程池只須要提供這三個關鍵參數的配置,而且提供兩種隊列的選擇,就能夠知足絕大多數的業務需求
(2)參數可動態配置: 在java線程池留有的基礎上,封裝線程池、容許線程池根據外部配置變化,進行修改配置,並在管理端開發控制界面方便開發;
(3)增長線程池監控:在線程池執行任務的整個生命週期添加監控能力,如當前活躍任務數、執行任務發生的異常數、任務隊列大小等指標,方便相關開發瞭解線程池狀態;



2.整體架構設計

動態調參:提供管理界面,支持線程池參數動態調整,包括線程池核心線程數大小,最大線程數大小,緩衝隊列長度等;參數修改後及時生效
任務監控:支持應用粒度、線程池粒度、任務粒度的事務監控,能夠查看線程池的任務執行狀況,最大任務執行時間,平均任務執行時間
負載告警:支持告警規則配置,當超過閾值時會通知相關的開發負責人
操做監控與日誌:管理段配置操做接入審計日誌
權限校驗:不一樣應用用戶


3.主要業務流程

      主要可分爲應用(客戶端)、管理端兩大部分,使用mysql、es數據庫,其中監控頁面使用kibana可視化工具配置, 線程池的配置頁面使用簡單前端完成;



4.部分核心代碼

4.1 客戶端實現

4.1.1 動態線程池定義與實現
       因爲ThreadPoolExecutor只開放了maxCorePoolSize/corePoolSize/
只能實現BlockingQueue接口實現阻塞隊列,並開放隊列容量改變的方法,這樣set方法就能夠動態修改,其餘部分直接拷貝ArrayBlockingQueue的實現便可實現動態阻塞隊列;

//自定義鏈表阻塞隊列
public class PhxResizeLinkedBlockingQueue extends AbstractQueue
        implements BlockingQueue, java.io.Serializable {
            
    private volatile int capacity;
    
    //增長設置隊列容量接口
    public void setCapacity(int capacity) {
        this.capacity = capacity;
    }
}複製代碼

        除了動態修改後,還須要支持狀態上報、過載異常信息的上報、任務運行耗時統計等,因此還須要對重寫afterExecute方法對任務運行狀態監控,自定義RejectHandler監控拒絕動做,以下所示。

//重寫方法,監放任務運行狀態
 public class PhxThreadPool extends ThreadPoolExecutor {
    
    @Override
    protected void afterExecute(Runnable r, Throwable throwable) {
        LocalDateTime startTime = LocalDateTime.now();
        super.afterExecute(r, throwable);
        LocalDateTime endTime = LocalDateTime.now();
        
        if (throwable == null && r instanceof Future) {
            try {
                ((Future) r).get();
            } catch (CancellationException ce) {
                throwable = ce;
            } catch (ExecutionException ee) {
                throwable = ee.getCause();
            } catch(InterruptedException ie){
                Thread.currentThread().interrupt();
            }
        }
        
        try {
            if (throwable != null) {
                publishEvent(EventEnums.POOL_RUNNABLE_EXECUTE_ERROR.name(), null);
            } else {
                Duration duration = Duration.between(startTime, endTime);
                Integer costTime = Integer.parseInt(String.valueOf(duration.getSeconds()));
                publishEvent(EventEnums.POOL_RUNTIME_STATISTICS.name(), costTime);
            }
        } catch (Exception ex){
         
        }
}
    
 //自定義拒絕策略,監聽並上報
 public class PhxRejectHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        PhxThreadPool threadPool = (PhxThreadPool)executor;
        List metaDataDTOS = new ArrayList<>();
        ...
        publisher.publishEvent(metaDataDTOS);
    }
}複製代碼

4.1.2 動態線程池註冊
爲了方便管理線程池實例,定義了線程池容器

public class ThreadPoolContainer {
    private ConcurrentMap container = new ConcurrentHashMap<>();
    public void put(String poolName, PhxThreadPool threadPool) {
        container.putIfAbsent(poolName, threadPool);
    }
    public PhxThreadPool get(String poolName) {
        return container.getOrDefault(poolName, null);
    }   
}
複製代碼

掃描與註冊線程時使用了bean的後置處理器處理器,對spring容器啓動時逐個掃描定義好的線程池實例,爲了避免阻塞啓動過程,並另起一條線程專門用來註冊。還須要將自定義的拒絕策略設置到線程池實例中。

public class PhxThreadPoolBeanPostProcessor implements BeanPostProcessor {
    
     private final String url;
     private PhxThreadPoolConfig config;
     private ThreadPoolContainer container;
     private RejectedExecutionHandler rejectHandler;
     private ExecutorService executorService = Executors.newSingleThreadExecutor();
     public PhxThreadPoolBeanPostProcessor(PhxThreadPoolConfig config, ThreadPoolContainer container) {
        this.config = config;
        url = config.getAdminUrl() + "/meta/register";
        this.container = container;
    }

    @Override
    public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(final Object poolBean, final String beanName) throws BeansException {
        if (poolBean instanceof PhxThreadPool) {
            executorService.execute(() -> handler((PhxThreadPool) poolBean));
        }
        return poolBean;
    }
    
    private void handler(final PhxThreadPool poolBean) {
        String poolName = poolBean.getName();
        poolBean.setRejectedExecutionHandler(rejectHandler);
        container.put(poolName, poolBean);
        post(buildJsonParams(poolBean));
    }
    
}複製代碼


4.1.3 線程池運行狀態信息上報

        爲了加大消息吞吐量,最大程度節省應用的資源,在客戶端組件中使 用了消息隊列進行緩衝,其中包括了事件/事件發佈器/消息隊列/事件處理器幾大部分,


主要實現代碼以下:

//監控事件包裝類
public class MonitorEvent implements Serializable {
    private List eventDTOs;  
    public void clear () {
        eventDTOs = null; //使用後讓jvm主動gc回收
    }
}

//事件
public class MonitorEventDTO {
    private String appName;
    private String ip;
    private String poolName;
    private Integer corePoolSize;
    private Integer maxPoolSize;
    private Integer queueSize;
    private Integer queueCapacity;
}

//事件生成工廠類
public class MonitorEventFactory implements EventFactory {
    @Override
    public MonitorEvent newInstance() {
        return new MonitorEvent();
    }
}

//事件處理器
public class MonitorEventHandler implements EventHandler {
    private PhxThreadPoolConfig config;
    public MonitorEventHandler(PhxThreadPoolConfig config) {
        this.config = config;
    }
    @Override
    public void onEvent(MonitorEvent monitorEvent, long l, boolean b) throws Exception {
        if (monitorEvent == null || monitorEvent.getEventDTOs() == null
                || monitorEvent.getEventDTOs().size() == 0) {
            return;
        }
        String path = config.getAdminUrl() + "/meta/upload";
        List eventDTOs = monitorEvent.getEventDTOs();
        PhxSender.upload(path, eventDTOs);        
        monitorEvent.clear();
    }
}

//消息發佈器
public class MonitorEventPublisher {    
    private Disruptor disruptor;
    private MonitorEventHandler eventHandler;
    private PhxThreadPoolConfig config;
    public void publishEvent(final List events) {
        final RingBuffer ringBuffer = disruptor.getRingBuffer();
        ringBuffer.publishEvent(new EventTranslator(), events);
    }
    public void destroy() {
        disruptor.shutdown();
    }
}

//消息轉換
public class EventTranslator implements EventTranslatorOneArg> {
    @Override
    public void translateTo(MonitorEvent event, long l, List monitorEventDTOS) {
        event.setEventDTOs(monitorEventDTOS);
    }
}

//發送
public class PhxSender {
    public static void upload(String path, List metaDataDTOS) throws IOException {
        if (metaDataDTOS != null && metaDataDTOS.size() > 0) {
            String jsonBody = OkHttpTools.getInstance().getGosn().toJson(metaDataDTOS);
            OkHttpTools.getInstance().post(path, jsonBody);
        }
    }
}
複製代碼

4.1.4 線程池動態配置
       實現線程池的配置動態修改能夠有兩種方式,第一種方式是使用apollo客戶端監聽控制端發出的配置修改事件,而後根據事件響應,相似於「推模式「;第二種方式是在客戶端起一條線程,不停地請求控制端服務,而後更新本地的線程池配置,相似於「拉模式「。這裏採用第二種方式。

public class PhxClient {
    public void startPullPoolConfig() {
        pullConfigThread = new Thread(() -> {
            while(!toStop) {
                try {
    
                    String path = config.getAdminUrl() + "/meta/" + config.getAppName();
                    String response = OkHttpTools.getInstance().get(path);
                    //解析控制端返回的線程池配置信息
                    List pools = JSON.parseArray(response, ThreadPoolConfigDto.class);
                    if (pools != null && pools.size() > 0) {
                        pools.forEach(pool -> {
                            PhxThreadPool threadPool = container.get(pool.getPoolName());
                            threadPool.setCorePoolSize(pool.getCorePoolSize());
                            threadPool.setMaximumPoolSize(pool.getMaxPoolSize());
                            PhxResizeLinkedBlockingQueue queue = (PhxResizeLinkedBlockingQueue)threadPool.getQueue();
                            queue.setCapacity(pool.getQueueCapacity());
                            container.put(pool.getPoolName(), threadPool);
                        });
                    }
    
                } catch (Exception ex) {
                    log.error("【線程池控制客戶端-拉取配置線程發生異常】exception:", ex);
                    if (!toStop) {
                    }
                }
    
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }複製代碼

5.應用效果 

5.1 負載監控與告警

相關文章
相關標籤/搜索