前段時間在忙,這幾天抽時間寫一下阿里比賽總結,以往比賽都是前十名入圍,此次只有複賽前五才能入圍,競爭很是激烈,我和普哥一組,在普哥的幫助下學到很多優化技巧。先貼下成績 java
初賽第五,有些遺憾複賽只搞到第九,重在參與哈哈。 二、初賽自適應負載均衡:實現一套負載均衡算法,使系統吞吐量達到最大
git
要求:github
實現接口LoadBalance,實現一套自適應負載均衡機制。要求可以具有如下能力:算法
一、Gateway(Consumer) 端可以自動根據服務處理能力變化動態最優化分配請求保證較低響應時間,較高吞吐量;數組
二、Provider 端能自動進行服務容量評估,當請求數量超過服務能力時,容許拒絕部分請求,以保證服務不過載;緩存
三、當請求速率高於全部的 Provider 服務能力之和時,容許 Gateway( Consumer ) 拒絕服務新到請求。安全
LoadBalance接口(其餘輔助接口再也不一一列舉):bash
public interface LoadBalance {
/** * select one invoker in list. * * @param invokers invokers. * @param url refer url * @param invocation invocation. * @return selected invoker. */
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
複製代碼
2.二、賽題解析
在解析賽題以前咱們先了解兩個概念:併發
延遲
:處理單次事件所需時間。負載均衡
吞吐
:單位時間內程序能夠處理的事件數。
假如一個程序只有1個線程,這個線程每秒能夠處理10次事件,那麼咱們說這個程序處理單次事件的延遲爲100ms,吞吐爲10次/秒
假如一個程序有4個線程,每一個線程每秒能夠處理5次事件,那麼咱們說這個程序處理單次事件的延遲爲200ms,吞吐爲20次/秒
假如一個程序有1個線程,每一個線程每秒能夠處理20次事件,那麼咱們說這個程序處理單次事件的延遲爲50ms,吞吐爲20次/秒
以上咱們能夠看出,在一個沒有瓶頸的程序中,增長線程能夠增長吞吐,下降延遲也能夠增長吞吐,但低延遲不必定高吞吐,高延遲也不必定低吞吐
。
賽題要求實時根據Provider能力大小動態選擇Provider來處理請求,這裏說一下三個Provider的線程和延遲都是會變
,且單個Provider總的能力呈泊松分佈
,也就是說屢次請求的rtt方差會比較大
,可是rtt均值在時間段內波動較小
,因此只要咱們每次請求都打到正確(延遲小且有剩餘線程
)的Provider上就ok,前面在釘釘羣裏發現有很多選手有這樣兩個疑問這裏解釋一下,
選手A
:我一頓操做下來分數居然比隨機算法低不少,爲何隨機算法會有那麼高的分數?
解析
:其實隨機算法的分數是能夠計算出來的,舉個例子,假如咱們有三個容器,容量分別爲1L,2L,3L,咱們有6L水,每次隨機往某個容器倒入1L水,容器裏最終可以保持多少水呢?答案是5L,由於6L水隨機倒入三個容器每一個容器都會得到2L水,第一個容器由於容量只有1L因此溢出1L,假如滿分120分的話咱們能夠拿到100分,若是三個容器容量分別是1.5L,2L,2.5L,那麼咱們的分數將會高達110分。(評測環境狀況比這個要複雜一些,須要多考慮一些變量,原理差很少這裏再也不展開)
選手B:個人程序評測下來全程沒有error出現但是分數仍是上不去,咋回事呢?
解析
:不知道你們有沒有注意到Provider的總鏈接數是大於請求數的,有些請求打到有剩餘線程可是延遲不必定小的Provider上了,因此看起來沒錯誤可是延遲很高RPS很難上去(由於評測程序中Provider總吞吐是大於請求總數的,因此通常狀況下也很難出現錯誤)。 貼一張Provider請求數與RTT的圖,
基於歷史rtt爲參考的選擇策略
,由於rtt是呈泊松分佈的,
因此單看rtt(延遲低的)數量,能力大的Provider必定比能力小的Provider的多
,因此咱們能夠搞棵樹把每次請求rtt存儲起來,排序一下,這裏須要爲rtt特別的低的Provider提權,即當rtt小於某值(或百分比)時,往樹裏多add幾個該Provider,同時從數尾移除相同數量的Provider,提權的目的探測Provider能力變化及探測Provider的真實能力(好比說在全部Provider都不拒絕服務的狀況下,單憑樹裏的rtt並不能真實反映出Provider的實際處理能力),每次選擇Provider是從樹裏移除rtt最小的那個Provider,把請求發送到該Provider,若是某個低延遲的Provider線程不夠時,該Provider的rtt逐漸變大,直到出現拒絕服務,此時只要不把reject的res加入到樹就ok。
三、複賽
實現一個進程內基於隊列的消息持久化存儲引擎
,要求包含如下功能:
發送消息功能
A. 查詢必定時間窗口內的消息
B. 對必定時間窗口內的消息屬性某個字段求平均
例子:t表示時間,時間窗口[1000, 1002]表示: t>=1000 & t<=1002 (這裏的t和實際時間戳沒有任何關係, 只是一個模擬時間範圍)消息包括兩個字段,一個是業務字段a,一個是時間戳,以及一個byte數組消息體。 程序接口以下:
public abstract class MessageStore {
/** * 寫入一個消息; * 這個接口須要是線程安全的,也即評測程序會併發調用該接口進行put; * @param message message,表明消息的內容,評測時內容會隨機產生 */
abstract void put(Message message);
/** * 根據a和t的條件,返回符合條件的消息的集合. t是輸入時間戳模擬值,和實際時間戳沒有關係, 線程內升序 * 這個接口須要是線程安全的,也即評測程序會併發調用該接口 * 返回的List須要按照t升序排列 (a不要求排序). 若是沒有符合的消息, 返回大小爲0的List. 若是List裏有null元素, 會當結果失敗處理 * 單條線程最大返回消息數量不會超過8萬 * @param aMin 表明a的最小值(包含此值) * @param aMax 表明a的最大值(包含此值) * @param tMin 表明t的最小值(包含此值) * @param tMax 表明t的最大值(包含此值) */
abstract List<Message> getMessage(long aMin, long aMax, long tMin, long tMax);
/** * 根據a和t的條件,返回符合條件消息的a值的求平均結果. t是輸入時間戳模擬值,和實際時間戳沒有關係, 線程內升序 * 這個接口須要是線程安全的,也即評測程序會併發調用該接口 * 結果忽略小數位,返回整數位便可. 若是沒有符合的消息, 返回0 * 單次查詢求和最大值不會超過Long.MAX_VALUE * @param aMin 表明a的最小值(包含此值) * @param aMax 表明a的最大值(包含此值) * @param tMin 表明t的最小值(包含此值) * @param tMax 表明t的最大值(包含此值) */
abstract long getAvgValue(long aMin, long aMax, long tMin, long tMax);
}
複製代碼
發送消息以下(忽略消息體):
消息1,消息屬性{"a":1,"t":1001}
消息2,消息屬性{"a":2,"t":1002}
消息3,消息屬性{"a":3,"t":1003}
複製代碼
查詢以下:
示例1-
輸入:時間窗口[1001,9999],對a求平均
輸出:2, 即:(1+2+3)/3=2
示例2-
輸入:時間窗口[1002,9999],求符合的消息
輸出:{"a":1,"t":1002},{"a":3,"t":1003}
示例3-
輸入:時間窗口[1000,9999]&(a>=2),對a求平均
輸出:2 (去除小數位)
複製代碼
語言限定
: JAVA
評測指標和規模
: 評測程序分爲3個階段: 發送階段、查詢聚合消息階段、查詢聚合結果階段:
發送階段:假設發送消息條數爲N1,全部消息發送完畢的時間爲T1;發送線程多個,消息屬性爲: a(隨機整數), t(輸入時間戳模擬值,和實際時間戳沒有關係, 線程內升序).消息總大小爲50字節,消息條數在20億條左右,總數據在100G左右
查詢聚合消息階段:有屢次查詢,消息總數爲N2,全部查詢時間爲T2; 返回以t和a爲條件的消息, 返回消息按照t升序排列
查詢聚合結果階段: 有屢次查詢,消息總數爲N3,全部查詢時間爲T3; 返回以t和a爲條件對a求平均的值
若查詢結果都正確,則最終成績爲N1/T1 + N2/T2 + N3/T3
3.二、賽題分析不須要考慮程序崩潰及被kill狀況
不須要考慮同時讀寫狀況
消息爲定長消息,簡化索引設計及存儲緩存設計
線程內無需排序,只須要作線程間消息排序
其中t16G,a16G,b68G,
t,a,b,分別存儲,有利於avg查詢
前面提到過,t在線程內有序,藉助這一點咱們能夠少不少排序量,只須要作線程間排序就好,這裏咱們把每一個發送線程看成一個Queue,把每一個Queue看成一個總體,對多個Queue進行排序,獲得一個有序的Queue隊列,貼張圖,
每次從Queue的頭部取出一個消息,而後對Queue隊列進行排序:MQueue sort_head = this.sort_head;
if (queue == sort_head) {
for (; ; ) {
Message message = sort_head.remove();
if (message == null) {
break;
}
put_message(message);
MQueue next = sort_head.next;
if (next == null) {
break;
}
if (sort_head.cur_t() > next.cur_t()) {
sort_head = sort(next, sort_head);
}
}
this.sort_head = sort_head;
}
複製代碼
這樣咱們獲得一個全局有序的隊列,buffer落盤便可。這裏落盤以前咱們須要對t和a進行壓縮(delta),t壓縮後接近3.8G能夠所有放在內存,a壓縮後大約9G出頭。 由於t排序後再次對a排序,致使t在分片內亂序,因此t用zigzag壓縮。 由於a排序是升序,因此對a使用vlong壓縮,這裏說下a壓縮對vlong作下改進,採用定長(4字節)+變長的方式進行壓縮,代碼以下:
public long readVLong() {
long b1 = get() & 0xff;
long b2 = get() & 0xff;
long b3 = get() & 0xff;
long v = 0;
int off = 0;
for (; ; ) {
long b = get();
v |= (b & 0b0111_1111) << off;
if (b >= 0) {
break;
}
off += 7;
}
v <<= 24;
return v | ((b1 << 16) | (b2 << 8) | (b3));
}
public void writeVLong(long v) {
long w = v & (0xffffff);
v >>>= 24;
put((byte) (w >>> 16));
put((byte) (w >>> 8));
put((byte) (w));
for (; ; ) {
w = v & 0b0111_1111;
v >>>= 7;
if (v != 0) {
put((byte) (w | 0b1000_0000));
} else {
put((byte) w);
break;
}
}
}
複製代碼
有對b嘗試使用lz4壓縮,效果不理想,放棄對b壓縮
3.3.二、查詢先上張圖,
(咱們是按t的個數分segment的,按t值劃分的也寫了一版,試了下兩個方案分數差很少,由於感受按t個數劃分的更好處理些,由於時間緣由後面都是按個數劃分來優化的,理論上按t值劃分分數應該更好)從圖中能夠每隔16384作一次segment,segment內每隔128條消息作一個block,每一個block內有128條消息,每隔128條消息作一次索引,索引分別是:
ByteBuffer index_t_sparse = allocateDirect(SPARSE_INDEX_COUNT * 8);
ByteBuffer index_t_data_pos = allocateDirect(COMPACT_INDEX_COUNT * 8);
long[] index_a_data_pos = new long[COMPACT_INDEX_COUNT];
long[] index_t_compact = new long[COMPACT_INDEX_COUNT];
long[] index_a_compact = new long[COMPACT_INDEX_COUNT];
long[] index_a_sum = new long[COMPACT_INDEX_COUNT];
long[] index_a_min_max = new long[COMPACT_INDEX_COUNT * 2];
複製代碼
index_t_sparse
:segment索引,記錄t值,用於快速定位查詢所在segment
index_t_data_pos
:block索引,t壓縮後位置索引,記錄t數據block起始位置,用於計算t真實值
index_a_data_pos
:同index_t_data_pos
index_t_compact
:block索引,記錄t值,用於計算t真實值
index_a_compact
:同index_t_compact
index_a_sum
:block索引,記錄block內a的sum值,用於avg時快速跳過block
index_a_min_max
:block索引,記錄block內a的min,max值,用於avg時快速跳過block 查詢msg時首先定位所在segment:
int t_index_cnt = (index_t_sparse.position() >>> 3) - 1; // size -1
int t_index_min = half_find(index_t_sparse_addr, 0, t_index_cnt, min_t);
int t_index_max = get_t_index_max(index_t_sparse_addr, t_index_min, t_index_cnt, max_t);
複製代碼
獲得segment起始位置:t_index_min,t_index_max 這裏須要處理下首尾segment,由於首尾segment的t值可能不在查詢範圍內須要單獨處理, 遍歷segment定位block範圍:
int a_index_off = get_a_read_index_off(index_a_min_max, a_index_cnt, a_index_min, min_a);
int a_index_len = get_a_read_index_len(index_a_min_max, a_index_cnt, a_index_min, a_index_off, max_a);
複製代碼
獲得某個segment內block的off和len:a_index_off,a_index_len 根據off和len定位a,b的讀取pos和len
long a_read_pos = index_a_data_pos[a_index_off];
long b_read_pos = 1L * (a_index_off * A_BLOCK) * B_LEN;
int a_read_len = (int) (index_a_data_pos[Math.min(a_index_off + a_index_len + 1, a_index_cnt)] - a_read_pos);
a_read_buf.clear().limit(a_read_len);
b_read_buf.clear().limit(msg_size * B_LEN);
複製代碼
接着定位t的讀取範圍:
long t_read_index = get_long(index_t_data_pos_addr, a_index_off * 8);
long t_write_index = get_limit_index(index_t_data_pos_addr, index_t_data_pos_limit, (a_index_off + 1) * 8, t_data_buf.write_index);
VIntBuf t_data_temp = t_data_buf.slice(t_read_index, t_write_index);
複製代碼
接着遍歷獲得的數據:
for (int i = 0, part_i = 0; i < msg_size; i++) {
t += t_data_temp.readZigZag();
a += a_data_buf.readVLong();
if (t >= min_t && t <= max_t && a >= min_a && a <= max_a) {
byte[] body = new byte[B_LEN];
copy_data(b_read_buf_addr, i * B_LEN, body, B_LEN);
msg_sort_buf[msg_sort_buf_size++] = new Message(a, t, body);
}
part_i++;
if (part_i == A_BLOCK) {
part_i = 0;
a_index_off++;
t = index_t_compact[a_index_off];
a = index_a_compact[a_index_off];
t_read_index = get_long(index_t_data_pos_addr, a_index_off * 8);
t_write_index = get_limit_index(index_t_data_pos_addr, index_t_data_pos_limit, (a_index_off + 1) * 8, t_data_buf.write_index);
t_data_temp = t_data_buf.slice(t_read_index, t_write_index);
}
}
複製代碼
排序最終查詢的數據:
QuickSort.sort(msg_sort_buf, QuickSort.SORT_VALUE_T, 0, msg_sort_buf_size - 1);
for (int i = 0; i < msg_sort_buf_size; i++) {
msg_list.add(msg_sort_buf[i]);
}
複製代碼
查詢avg
:查詢和avg和msg相似,這裏說下跳過邏輯
if (part_i == A_BLOCK) {
part_i = 0;
a_index_off++;
for (; i + A_BLOCK < msg_size; ) {
int a_min_max_pos = (a_index_off + 1) << 1;
long a_min = index_a_min_max[a_min_max_pos];
long a_max = index_a_min_max[a_min_max_pos + 1];
if (a_min >= min_a && a_max <= max_a) {
sum += index_a_sum[a_index_off + 1];
count += A_BLOCK;
i += A_BLOCK;
a_index_off++;
continue;
} else {
break;
}
}
a_read_pos = index_a_data_pos[a_index_off];
a_read_len = (int) (index_a_data_pos[Math.min(a_index_off + 1, a_index_cnt)] - a_read_pos);
a_read_buf.clear().limit(a_read_len);
do_read(a_read_channel, a_read_buf, a_read_pos);
a = index_a_compact[a_index_off];
a_data_buf.set_read_index(index_a_data_pos[a_index_off] - a_read_pos);
}
i++;
複製代碼
當part_i等於128時說明接下來是一個完整的block,能夠進行block的min_max判斷, 若是min_max在查詢範圍內則直接累加sum, 若是min_max部分包含查詢範圍則遍歷該block
3.3.三、可探索的改進跳讀
,在avg查詢時,部分a的block是能夠直接跳過的,這部分跳讀應該會有必定提高。
按t值劃分segment
:一直是按照t的個數劃分segment的,按t值劃分理論上能夠劃分出更大的segment,理論上會有必定提高。
複賽源碼地址:github.com/wangkaish/a…
四、總結此次比賽競爭太激烈了,大佬不少,沒能入圍確實很遺憾,可是也確實學到很多東西,由於前面在搞華爲的比賽,後面參與複賽的時間很少,並且相信不少選手有這樣的感受,線下程序是好的,到線上老是跪,其實咱們也是時間大多花在調bug上了(水平太菜),線下和線上評測數據樣本相差太多,若是比賽能搞成線下線上評測程序比較類似就行了,這樣能夠多些時間嘗試方案。
另附上華爲比賽參賽總結,有興趣能夠閱讀一下:華爲雲TaurusDB性能挑戰賽參賽總結