Druid 的查詢須要有實時和歷史部分的 Segment,歷史部分的 Segment 由 Historical 節點加載,因此加載的效率直接影響了查詢的 RT(不考慮緩存)。查詢一般須要指定一個時間範圍[StartTime, EndTime],該時間範圍的內全部 Segment 須要由 Historical 加載,最差的狀況是全部 Segment 不幸都儲存在一個節點上,加載無疑會很慢;最好的狀況是 Segment 均勻分佈在全部的節點上,並行加載提升效率。因此 Segment 在 Historical 集羣中分佈就變得極爲重要,Druid 經過 Coordinator 的 Balance 策略協調 Segment 在集羣中的分佈。git
本文將分析 Druid 的 Balance 策略、源碼及其代價計算函數,本文使用 Druid 的版本是 0.12.0。github
Druid 目前有三種 Balance 算法: cachingCost, diskNormalized, Cost, 其中 cachingCost 是基於緩存的,diskNormalized 則是基於磁盤的 Balance 策略,本文不對前兩種展開篇幅分析, Druid Coordinator 中開啓 cost balance 的配置以下:算法
druid.coordinator.startDelay=PT30S
druid.coordinator.period=PT30S 調度的時間
druid.coordinator.balancer.strategy=cost 默認
動態配置:
maxSegmentsToMove = 5 ##每次Balance最多移動多少個Segment
複製代碼
Cost 是 Druid 在 0.9.1 開始引入的,在 0.9.1 以前使用的 Balance 算法會存在 Segment 不能快速均衡,分佈不均勻的狀況,Cost 算法的核心思想是:當在作均衡的時候,隨機選擇一個 Segment(假設 Segment A ), 依次計算Segment A 和 Historical 節點上的全部 Segment 的 Cost,選取 Cost 值最小的節點,而後到該節點上從新加載 Segment。緩存
如下會省略一些沒必要要的代碼bash
DruidCoordinatorBalancer 類dom
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final CoordinatorStats stats = new CoordinatorStats();
// 不一樣tier層的分開Balance
params.getDruidCluster().getHistoricals().forEach((String tier, NavigableSet<ServerHolder> servers) -> {
balanceTier(params, tier, servers, stats);
});
return params.buildFromExisting().withCoordinatorStats(stats).build();
}
複製代碼
DruidCoordinatorBalancer 類的 balanceTier 方法,主要是均衡入口函數ide
private void balanceTier(DruidCoordinatorRuntimeParams params, String tier, SortedSet<ServerHolder> servers,CoordinatorStats stats){
final BalancerStrategy strategy = params.getBalancerStrategy();
final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());
final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);
//集羣中只有一個 Historical 節點時不進行Balance
if (serverHolderList.size() <= 1) {
log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
return;
}
int numSegments = 0;
for (ServerHolder server : serverHolderList) {
numSegments += server.getServer().getSegments().size();
}
if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
return;
}
long unmoved = 0L;
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
//經過隨機算法選擇一個候選Segment,該Segment會參與後面的Cost計算
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
//找Cost最小的節點,Cost計算入口
final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
//找到候選節點,發起一次Move Segment的任務
if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params);
} else {
++unmoved;
}
}
}
......
}
複製代碼
Reservoir 隨機算法,隨機選擇一個 Segment 進行 Balance。Segment 被選中的機率:函數
public class ReservoirSegmentSampler
{
public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
{
final Random rand = new Random();
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int numSoFar = 0;
//遍歷全部List上的Historical節點
for (ServerHolder server : serverHolders) {
//遍歷一個Historical節點上全部的Segment
for (DataSegment segment : server.getServer().getSegments().values()) {
int randNum = rand.nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar+1), swap out the server and segment
// 隨機選出一個Segment,後面的會覆蓋前面選中的,以最後一個被選中爲止。
if (randNum == numSoFar) {
fromServerHolder = server;
proposalSegment = segment;
}
numSoFar++;
}
}
if (fromServerHolder != null) {
return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
} else {
return null;
}
}
}
複製代碼
繼續調用到CostBalancerStrategy類的findNewSegmentHomeBalancer方法,其實就是找最合適的Historical節點大數據
@Override
public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List<ServerHolder> serverHolders){
return chooseBestServer(proposalSegment, serverHolders, true).rhs;
}
protected Pair<Double, ServerHolder> chooseBestServer(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders,
final boolean includeCurrentServer
){
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();
for (final ServerHolder server : serverHolders) {
futures.add(
exec.submit(
new Callable<Pair<Double, ServerHolder>>()
{
@Override
public Pair<Double, ServerHolder> call() throws Exception
{
//計算Cost:候選Segment和Historical節點上全部Segment的cost和
return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server);
}
}
)
);
}
final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures);
final List<Pair<Double, ServerHolder>> bestServers = new ArrayList<>();
bestServers.add(bestServer);
try {
for (Pair<Double, ServerHolder> server : resultsFuture.get()) {
if (server.lhs <= bestServers.get(0).lhs) {
if (server.lhs < bestServers.get(0).lhs) {
bestServers.clear();
}
bestServers.add(server);
}
}
//Cost最小的若是有多個,隨機選擇一個
bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
}
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
}
return bestServer;
}
複製代碼
protected double computeCost(final DataSegment proposalSegment, final ServerHolder server,final boolean includeCurrentServer){
final long proposalSegmentSize = proposalSegment.getSize();
// (optional) Don't include server if it is already serving segment if (!includeCurrentServer && server.isServingSegment(proposalSegment)) { return Double.POSITIVE_INFINITY; } // Don't calculate cost if the server doesn't have enough space or is loading the segment if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { return Double.POSITIVE_INFINITY; } // 初始cost爲0 double cost = 0d; //計算Cost:候選Segment和Historical節點上全部Segment的totalCost cost += computeJointSegmentsCost( proposalSegment, Iterables.filter( server.getServer().getSegments().values(), Predicates.not(Predicates.equalTo(proposalSegment)) ) ); // 須要加上和即將被加載的Segment之間的cost cost += computeJointSegmentsCost(proposalSegment, server.getPeon().getSegmentsToLoad()); // 須要減掉和即將被加載的 Segment 之間的 cost cost -= computeJointSegmentsCost (proposalSegment, server.getPeon().getSegmentsMarkedToDrop()); return cost; } 複製代碼
開始計算:優化
static double computeJointSegmentsCost(final DataSegment segment, final Iterable<DataSegment> segmentSet){
double totalCost = 0;
// 此處須要注意,當新增的Historical節點第一次上線的時候,segmentSet應該是空,因此totalCost=0最小
// 新增節點總會很快的被均衡
for (DataSegment s : segmentSet) {
totalCost += computeJointSegmentsCost(segment, s);
}
return totalCost;
}
複製代碼
進行一些處理:1)Segment的Interval毫秒轉換成hour; 2)先計算了帶lambda的x1, y0, y1的值
public static double computeJointSegmentsCost(final DataSegment segmentA, final DataSegment segmentB){
final Interval intervalA = segmentA.getInterval();
final Interval intervalB = segmentB.getInterval();
final double t0 = intervalA.getStartMillis();
final double t1 = (intervalA.getEndMillis() - t0) / MILLIS_FACTOR; //x1
final double start = (intervalB.getStartMillis() - t0) / MILLIS_FACTOR; //y0
final double end = (intervalB.getEndMillis() - t0) / MILLIS_FACTOR; //y1
// constant cost-multiplier for segments of the same datsource
final double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0;
return INV_LAMBDA_SQUARE * intervalCost(t1, start, end) * multiplier;
}
複製代碼
真正計算 cost 函數的值
public static double intervalCost(double x1, double y0, double y1){
if (x1 == 0 || y1 == y0) {
return 0;
}
// 保證Segment A開始時間小於B的開始時間
if (y0 < 0) {
// swap X and Y
double tmp = x1;
x1 = y1 - y0;
y1 = tmp - y0;
y0 = -y0;
}
if (y0 < x1) {
// Segment A和B 時間有重疊的狀況,這個分支暫時不分析
.......
} else {
// 此處就是計算A和B兩個Segment之間的cost,代價計算函數:See https://github.com/druid-io/druid/pull/2972
final double exy0 = FastMath.exp(x1 - y0);
final double exy1 = FastMath.exp(x1 - y1);
final double ey0 = FastMath.exp(0f - y0);
final double ey1 = FastMath.exp(0f - y1);
return (ey1 - ey0) - (exy1 - exy0);
}
}
複製代碼
如今咱們有 2 個 Segment, A 和 B,須要計算他們之間的代價,假設 A 的 start 和 end 時間都是小於 B 的。
Cost 函數的提出請參考 Druid PR2972:
其中 \( \lambda = \frac{log_2e}{24.0} \) 是Cost函數的半衰期
爲了弄清楚這個 Cost 函數以及影響 Cost 值的因素?咱們先使用一些經常使用的參數配置:
假設1:Segment A 的Interval是1小時,即 \( A_{end}-A_{start}=1*Hour \), 獲得:
假設2:Segment B 的 Interval 也是 1 小時, 獲得:
假設3:Segment B 和 A start 時間相差了 t 個小時,獲得:
在實際的代碼中,\( \lambda \)的計算已經放到了\( {x_0}{x_1}{y_0}{y_1} \)中
根據假設 2,獲得:
繼續簡化,獲得:
根據假設 1,獲得:
根據假設 3,獲得:
繼續簡化,獲得:
根據上訴 cost 函數化簡的結果,當 Segment A 和 B 的 Interval 都是 1 小時的狀況下:Segment A 和 B 時間相距越大 Cost 越小,它們就越可能共存在同一個 Historical 節點。這也和本文開始時候提出的時間相鄰的 Segment 存儲在不一樣的節點上讓查詢更快相呼應。
Druid 的 balance 機制,主要解決 segments 數據在 history 節點的分佈問題,這裏的優化主要針對於查詢作優化,通常狀況下,用戶的某一次查詢針對的是一個時間範圍內的多個 Segment 數據, cost 算法的核心思想是,儘量打散 Segment 數據分佈,這樣在一次查詢設計多個連續時間 Segment 數據的時候可以利用多臺 history server 的並行處理能力,分散系統開銷,縮短查詢 RT.
最後打個小廣告,有贊大數據團隊基礎設施團隊,主要負責有讚的數據平臺(DP), 實時計算(Storm, Spark Streaming, Flink),離線計算(HDFS,YARN,HIVE, SPARK SQL),在線存儲(HBase),實時 OLAP(Druid) 等數個技術產品,歡迎感興趣的小夥伴聯繫.