在線上環境,因爲業務場景須要,要求程序可以在普通的4G機器中依然正常運行。 而原來的環境配置爲8核16G,微服務部署,一共有6個功能模塊。而如今要求在一臺4核4G的設備上正常運行。html
@Configuration @Bean @Controller @Service @Repository
等註解中沒有指定Bean實例的名稱。現有的處理流程以下:java
項目採用SpringBoot構建,引入 spring-boot-stater-redis
1. 經過HTTP接收到異步事件,存儲到Redis;
2. 存儲的同時,將事件經過Redis的發佈訂閱發送到不一樣的處理單元進行處理;
3. 每一個事件處理單元經過Redis訂閱,而後處理事件;
4. 起一個定時器,每秒鐘從Redis中查詢一個時間窗口的事件,構建索引,而後bulkIndex到ES
複製代碼
1. Redis的訂閱發佈,內部會維護一個container線程,此線程會一直存在;
2. 每次訂閱,都會產生一個新的前綴爲RedisListeningContainer-的線程處理;
3. 經過jvisualvm.exe 查看線程數,該類線程數一直在飆升
複製代碼
程序中的實現以下:node
@Bean
RedisMessageListenerContainer manageContainer(
RedisConnectionFactory factory, MessageListener listener) {
RedisMessageListenerContainer manageContainer =
new RedisMessageListenerContainer ();
manageContainer.setConnectionFactory(factory);
// manageContainer.setTaskExecutor();
}
複製代碼
代碼中被註釋掉的那一行,實際代碼中是沒有該行的,也就是沒有設置taskExecutor
linux
listener-container
的說明,默認的task-executor
和subscription-task-executor
使用的是SimpleAsyncTaskExecutor
。RedisMessageListenerContainer.classredis
...
protected TaskExecutor createDefaultTaskExecutor() {
String threadNamePrefix = (beanName != null ? beanName + "-" :
DEFAULT_THREAD_NAME_PREFIX) ;
return new SimpleAsyncTaskExecutor(threadNamePrefix);
}
...
複製代碼
SimpleAsyncTaskExecutor.classspring
...
protected void doExecute(Runnable task) {
Thread thread =
(this.threadFactory != null
? this.threadFactory,newThread(task)
: createThread(task));
thread.start();
}
...
複製代碼
SimpleAsyncTaskExecutor
的execute()方法,是很無恥的new Thread()
,調用thread.start()
來執行任務找到問題的產生緣由,主要的解決思路有三種:緩存
配置manageContainer.setTaskExecutor();
而後選擇本身建立的線程池;bash
去掉一部分發布訂閱,改用Spring
提供的觀察者模式,將絕大部分事件處理的場景,經過此方式完成發佈。 SpringUtils.getApplicationContext() .publihEvent(newEventOperation(eventList));
服務器
採用Rector
模式實現事件的異步高效處理;網絡
建立了2個線程組(參考netty的底層實現):
1. 一個用於處理事件接收 「event-recv-executor-」
coreSize = N * 2,CPU密集型
2. 一個用於事件的異步處理 「event-task-executor-」
coreSize = N / 0.1,IO密集型
複製代碼
事件處理邏輯
@Override
public void onApplicationEvent (EventOperation event) {
eventTaskExecutor.execute(() -> {
doDealEventOperation(event);
});
}
複製代碼
現有的處理流程以下:
項目採用SpringBoot構建,引入 spring-boot-stater-redis
1. 後臺維護了一個定時器,每秒鐘從Redis中查詢一個時間窗口的事件
複製代碼
在後臺定位日誌輸出,正常狀況下,應該是每秒鐘執行一次定時,
但實際是,系統並不保證必定能每隔1S執行一次,
因爲系統中線程比較多,CPU的切換頻繁,
致使定時有可能1S執行幾回或者每隔幾秒執行一次
複製代碼
因爲定時並沒有法保證執行,而定時任務獲取事件時,是按照時間窗口截取,
經過redisTemplate.opsForZSet().rangeByScore(key, minScore, maxScore)實現,
勢必會形成有數據沒法被加載到程序中,而一直保存在Redis中,沒法獲取,也沒法刪除
複製代碼
找到問題的產生緣由,主要的解決思路有兩種:
加大容錯率,將時間窗口拉大,原來是相隔1S的時間窗口,修改成相隔1MIN 【治標不治本,極端狀況下,仍有可能形成該問題】;
採用MQ消費,此方法須要額外部署MQ服務器,在集羣配置高的狀況下,能夠採用,在配置低的機器下不合適;
採用阻塞隊列,利用Lock.newCondition()
或者最普通的網絡監聽模式while()
均可以;
本次問題中採用的是第三種形式。起一個單獨的線程,阻塞監聽。
1. 事件接收後,直接塞到一個BlockingQueue中;
2. 當BlockingQueue有數據時,While循環不阻塞,逐條讀取隊列中的信息;
3. 每隔1000條數據,或者每隔1S,將數據寫入ES,並分發其餘處理流程
複製代碼
在4G的機器下,發現通過一段時間的發包處理後,系統cache增加的很是快,最後幾近於所有佔滿:
大概每秒鐘10M的漲幅
複製代碼
1. 由於對於ES的瞭解,插入數據時,先寫緩存,後fsync到磁盤上,所以懷疑ES可能存在問題;
2. 項目中日誌使用log4j2不當:
* 日誌輸出過多,
* 日誌沒有加判斷:if (log.isInfoEnabled())
* 日誌文件append過大,沒有按照大小切分等(本項目此問題以前已解決)
複製代碼
通過隔段分析,將有可能出現問題的地方,分別屏蔽後,進行測試。
最終定位到,在ES批量寫入數據時,纔會出現cache大量增加的現象
複製代碼
用命令查看內存free -m
,
buffer
: 做爲buffer cache
的內存,是塊設備的讀寫緩衝區cached
表示page cache的內存
和文件系統的cache
ES操做數據的底層機制:
數據寫入時,ES內存緩慢上升,是由於小文件過多(ES自己會在index時候創建大量的小文件),linux dentry
和 inode cache
會增長。 能夠參考:ES內存持續上升問題定位
本問題其實並無徹底解決,只是在必定程度上用性能換取緩存。
echo 10000 > /proc/sys/vm/vfs_cache_pressure;
複製代碼
## 這些參數是以前優化的
threadpool.bulk.type: fixed
threadpool.bulk.min: 10
threadpool.bulk.max: 10
threadpool.bulk.queue_size: 2000
threadpool.index.type: fixed
threadpool.index.size: 100
threadpool.index.queue_size: 1000
index.max_result_window: 1000000
index.query.bool.max_clause_count: 1024000
# 如下的參數爲本次優化中添加的:
# 設置ES最大緩存數據條數和緩存失效時間
index.cache.field.max_size: 20000
index.cache.field.expire: 1m
# 當內存不足時,對查詢結果數據緩存進行回收
index.cache.field.type: soft
# 當內存達到必定比例時,觸發GC。默認爲JVM的70%[內存使用最大值]
#indices.breaker.total.limit: 70%
# 用於fielddata緩存的內存數量,
# 主要用於當使用排序操做時,ES會將一些熱點數據加載到內存中來提供客戶端訪問
indices.fielddata.cache.expire: 20m
indices.fielddata.cache.size: 10%
# 一個節點索引緩衝區的大小[max 默認無限制]
#indices.memory.index_buffer_size: 10%
#indices.memory.min_index_buffer_size: 48M
#indices.memory.max_index_buffer_size: 100M
# 執行數據過濾時的數據緩存,默認爲10%
#indices.cache.filter.size: 10%
#indices.cache.filter.expire: 20m
# 當tranlog的大小達到此值時,會進行一次flush操做,默認是512M
index.translog.flush_threshold_size: 100m
# 在指定時間間隔內若是沒有進行進行flush操做,會進行一次強制的flush操做,默認是30分鐘
index.translog.flush_threshold_period: 1m
# 多長時間進行一次的磁盤操做,默認是5S
index.gateway.local.sync: 1s
複製代碼
若是文中有描述失誤內容,或者沒有描述清楚的,能夠將問題發我郵箱,harveytuan@163.com
, 若是有其餘問題,也能夠聯繫我,你們一塊兒共同討論。