一個名爲 fetch- 線程池負責從Redis中讀取文本數據,將讀取到的文本數據提交給另外一個線程池 tw-,將 tw- 線程池將任務經過HTTP請求的形式上報給過濾服務。以下圖所示:html
一開始採用默認線程池配置方式:java
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1000 * 20); private final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fetch-%d").build(); private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, nThreads, 1, TimeUnit.HOURS, taskQueue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
而後只提交三個任務startService()
,startService() 是個while(true)
以 pipeline 形式不停地從redis上讀取文本數據。程序運行一段時間以後,就卡死了。web
//nThreads 是 3 for(int i = 0; i < nThreads; i++) { executorService.execute(()->{ startService(); }); } }
看CPU、內存以及程序的GC日誌,都是正常的。sudo jstack -l pid
發現:redis
"fetch-26" #109 prio=5 os_prio=0 tid=0x00007fbfe00db000 nid=0xea76 waiting on condition [0x00007fc127bfc000]
"fetch-25" #108 prio=5 os_prio=0 tid=0x00007fbfec03c000 nid=0xea75 waiting on condition [0x00007fc1257dc000]
"fetch-24" #107 prio=5 os_prio=0 tid=0x00007fbf6c001000 nid=0xea74 waiting on condition [0x00007fc127cfd000]spring
執行從redis中讀取文本任務的fetch- 線程池中的全部線程都阻塞了。因爲提交的是Runnable任務,引用《Java併發編程實戰》第七章中一段話:apache
致使線程提早死亡的最主要的緣由是RuntimeException。因爲這些異常表示出現了某種錯誤或者其餘不可修復的錯誤,所以它們一般不會被捕獲。它們不會在調用棧中逐層傳遞,而是默認地在控制檯中輸出棧追蹤信息,並終止線程編程
When a thread exits due to an uncaught exception, the JVM reports this event to our UncaughtExceptionHandler, otherwise the default handler just prints the stack trace to standard error.服務器
所以,我就自定義一個UncaughtExceptionHandler
看看到底出現了什麼錯誤:併發
public class FetchTextExceptionHandler implements Thread.UncaughtExceptionHandler { private static final Logger logger = LoggerFactory.getLogger(FetchTextExceptionHandler.class); @Override public void uncaughtException(Thread t, Throwable e) { logger.error("fetch redis text exception,thread name:{},msg:{}", t.getName(), e.getMessage()); } }
UncaughtExceptionHandler
只是簡單地記錄日誌,先找到出錯緣由再說。從新發版,上線一段時間後發現出現程序卡死了,此次有了異常日誌:ide
2018-11-23 23:10:25.681 ERROR 29818 --- [fetch-0] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-0,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
2018-11-23 23:10:25.686 ERROR 29818 --- [fetch-2] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-2,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
2018-11-23 23:10:27.429 ERROR 29818 --- [fetch-1] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-1,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
一看這個日誌有點奇怪,fetch線程只是讀取redis上的文本數據,並將文本數據封裝到一個Runnable任務裏面提交給 Tw- 線程池,Tw-線程 纔是發送HTTP POST 請求將數據提交給過濾服務。
因而去檢查建立Tw-線程池建立代碼:發現了Tw-線程池採用的是CallerRunsPolicy
延遲策略。
private final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setUncaughtExceptionHandler(exceptionHandler).setNameFormat("tw-%d").build(); private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(20, maximumPoolSize, 1, TimeUnit.HOURS, taskQueue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
也就是說:當Fecth-線程提交任務過快時,Tw-線程池的taskQueue
滿了,CallerRunsPolicy
讓任務回退到調用者,任務由fetch-線程來執行了。所以,上面的日誌打印出來的是線程名字是fetch:thread name:fetch-0,msg.....
再引用一段話:
調用者運行策略(Caller-Runs)實現了一種調節機制,該策略不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而下降新任務的流量。它不會在線程池(這裏的線程池是 tw-線程池)的某個線程中執行新提交的任務,而是在一個調用了execute的線程(fetch 線程)中執行該任務【fetch 線程執行execute 向 tw-線程池提交任務】
知道了異常出現的緣由,因而我就把 tw-線程的 飽和策略從原來的CallerRunsPolicy
修改爲AbortPolicy
,再從新運行程序,一段時間後,發現 tw-線程池中的30個線程所有阻塞,fetch-線程池中的三個線程也所有阻塞。以下圖:
在程序中每一個tw-線程隔20ms發送一次HTTP POST請求,將文本上報給過濾服務,30個tw-線程,併發量大約是1500次每秒,每次提交的數據不超過30KB吧。
看程序輸出的log日誌:30個tw- 線程 都是同樣的異常SocketTimeoutException,Read timed out
。
2018-11-24 09:16:47.885 ERROR 9765 --- [tw-310] c.y.t.a.s.ReportTwExceptionHandler : http request report tw exception,thread name:tw-310,cause:java.net.SocketTimeoutException: Read timed out,msg:I/O error on POST request for "xxx": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
而3個 fetch-線程的異常日誌是:rejected from java.util.concurrent.ThreadPoolExecutor
2018-11-24 09:04:36.758 ERROR 9765 --- [fetch-2] c.y.t.a.s.FetchTextExceptionHandler : fetch redis text exception,thread name:fetch-2,msg:Task ReportTwAuditService$$Lambda$75/259476123@7376559c rejected from java.util.concurrent.ThreadPoolExecutor@75fa1939[Running, pool size = 30, active threads = 30, queu
ed tasks = 50000, completed tasks = 20170]
這是由於 fetch-線程向 tw-線程池提交任務,而tw-線程池上面的飽和策略已經改爲了AbortPolicy
,當tw-線程池任務隊列滿了時,tw-線程就把 fetch-線程 提交過來的任務給拒絕了,並向fetch-線程拋出RejectedExecutionException 異常。
總結一下就是:30個tw-線程由於發送HTTP POST請求給過濾服務出現 SocketTimeoutException,Read timed out
所有阻塞,而tw-線程池的飽和策略是AbortPolicy,
即:丟棄任務並拋出RejectedExecutionException 異常,致使 fetch 線程阻塞,且提交給tw-線程池的任務被 abort,這就是上面那張圖中全部線程都所有阻塞的緣由。
再引用一段話:
工做線程在執行一個任務時被阻塞,若是等待用戶的輸入數據,可是用戶一直不輸入數據,致使這個線程一直被阻塞。這樣的工做線程名不副實,它實際上不執行任何任務了。若是線程池中的全部線程都處於這樣的狀態,那麼線程池就沒法加入新的任務了。各類類型的線程池中一個嚴重的風險是線程泄漏,當從線程池中除去一個線程以執行一項任務,而在任務完成後該線程卻沒有返回池時,會發生這種狀況。發生線程泄漏的一種情形出如今任務拋出一個 RuntimeException 或一個 Error 時。若是池類沒有捕捉到它們,那麼線程只會退出而線程池的大小將會永久減小一個。當這種狀況發生的次數足夠多時,線程池最終就爲空,並且系統將中止,由於沒有可用的線程來處理任務。
既然tw-線程發送HTTP請求出現了 SocketTImeoutException,那麼來看看HTTP鏈接池的配置:
import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.web.client.RestTemplate; import java.util.concurrent.TimeUnit; /** * Created by Administrator on 2018/7/4. * 配置 RestTemplate 鏈接池 */ @Configuration public class RestTemplateConfig { /** * https://stackoverflow.com/questions/44762794/java-spring-resttemplate-sets-unwanted-headers * set http header explicitly: "Accept-Charset": "utf-8" */ @Bean public RestTemplate restTemplate() { RestTemplate restTemplate = new RestTemplate(httpRequestFactory()); for (HttpMessageConverter converter : restTemplate.getMessageConverters()) { if (converter instanceof StringHttpMessageConverter) { ((StringHttpMessageConverter)converter).setWriteAcceptCharset(false); } } return restTemplate; } @Bean public ClientHttpRequestFactory httpRequestFactory() { return new HttpComponentsClientHttpRequestFactory(httpClient()); } @Bean public HttpClient httpClient() { //配置http長鏈接 PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(30, TimeUnit.SECONDS); connectionManager.setMaxTotal(1000); connectionManager.setDefaultMaxPerRoute(20); RequestConfig requestConfig = RequestConfig.custom() //服務器返回數據(response)的時間,超過該時間拋出read timeout .setSocketTimeout(5000) //鏈接上服務器(握手成功)的時間,超出該時間拋出connect timeout .setConnectTimeout(5000) //從鏈接池中獲取鏈接的超時時間,超過該時間未拿到可用鏈接拋出異常 .setConnectionRequestTimeout(1000).build(); return HttpClientBuilder.create().setDefaultRequestConfig(requestConfig). setConnectionManager(connectionManager). setConnectionManagerShared(true) //keep alive .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE) .build(); } }
到這裏就大概知道解決方案了:
讓弱雞的過濾服務牛B一點,這有點不太可能的。你懂的……
控制HTTP 請求速度,並添加線程拋出異常時處理方法(自定義ThreadPoolExecutor,重寫afterExecute方法)而不只僅是實現UncaughtExceptionHandler,簡單地打印出異常日誌。
fetch-線程 阻塞的緣由是由於:向tw-線程池提交任務,而tw-線程池採用的飽和策略是AbortPolicy
,若是把它改爲:DiscardPolicy
直接丟棄任務而不拋出異常。這樣fetch-線程就不會收到RejectedExecutionException 異常而阻塞了。固然了,採用DiscardPolicy
飽和策略的話,fetch-線程提交任務出現異常就沒法感知了,這時咱們還能夠自定義飽和策略。以下:能夠簡單地打印出一個日誌:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * @author xxx * @date 2018/11/24 */ public class ReportTwRejectExceptionHandler implements RejectedExecutionHandler { private static final Logger logger = LoggerFactory.getLogger(ReportTwRejectExceptionHandler.class); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { logger.error("fetch thread submit task rejected. {}",executor.toString()); } }
再修改一下 tw-線程池的配置參數:
//使用咱們自定義的飽和策略 RejectedExecutionHandler,當有線程提交任務給tw-線程池時,若出現錯誤會打印日誌 private RejectedExecutionHandler rejectedExecutionHandler = new ReportTwRejectExceptionHandler(); private ReportTwExceptionHandler exceptionHandler = new ReportTwExceptionHandler(); private final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setUncaughtExceptionHandler(exceptionHandler).setNameFormat("tw-%d").build(); //隊列長度爲50000 private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1000 * 10 * 5); //maximumPoolSize=30 private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(20, maximumPoolSize, 1, TimeUnit.HOURS, taskQueue, threadFactory, rejectedExecutionHandler);
這樣,若是fetch線程向 tw-線程池提交任務出錯,就會收到以下日誌了:
2018-11-24 17:05:45.511 ERROR 38161 --- [fetch-0] c.y.t.a.e.ReportTwRejectExceptionHandler : fetch thread submit task rejected.java.util.concurrent.ThreadPoolExecutor@41a28fcd[Running, pool size = 30, active threads = 30, queued tasks = 50000, completed tasks = 110438]
private static final int nThreads = Runtime.getRuntime().availableProcessors();//24
返回的是邏輯cpu的個數。
~$ cat /proc/cpuinfo| grep "processor"| wc -l
24
在一臺機器上開多少個線程合適?有個公式\[N_{threads}=N_{cpu}*U_{cpu}*(1+\frac{W}{C})\]
W是等待時間、C是使用CPU的計算時間。所以,須要估計任務的類型,是計算密集型,仍是IO密集型?另外:一臺物理機上不只僅是你寫的程序在上面跑,還有其餘人寫的程序也在上面跑,所以,在使用這個公式計算線程數目時也要注意到這一點。
若是任務之間是異構的且獨立的,兩種不一樣類型的任務,那麼可使用2個線程池來執行這些任務。好比一個線程池執行CPU密集型任務,另外一個線程池執行IO密集型任務。
我的認爲咱們在寫代碼的時候對要處理的任務是有必定的瞭解的,好比並髮量多大?數據量多大?根據這些信息就大概能知道任務隊列定義多長合適,而不是採用默認的無界阻塞隊列。
同時,對任務的特徵也有所瞭解,好比是否要調用遠程HTTP服務?是否寫磁盤有IO阻塞?仍是隻是轉換數據、處理數據,另外所部署的服務器的硬件性能咋樣?這些都能做爲定義線程個數的一些參考。
最後,採用自定義線程池,在任務執行出錯了,可能更靈活地控制處理錯誤,好比記錄錯誤日誌、執行任務前以及執行任務後的清理操做……
原文:https://www.cnblogs.com/hapjin/p/10012435.html