Druid Segment Balance 及其代價計算函數分析

一. 引言

Druid 的查詢須要有實時和歷史部分的 Segment,歷史部分的 Segment 由 Historical 節點加載,因此加載的效率直接影響了查詢的 RT(不考慮緩存)。查詢一般須要指定一個時間範圍[StartTime, EndTime],該時間範圍的內全部 Segment 須要由 Historical 加載,最差的狀況是全部 Segment 不幸都儲存在一個節點上,加載無疑會很慢;最好的狀況是 Segment 均勻分佈在全部的節點上,並行加載提升效率。因此 Segment 在 Historical 集羣中分佈就變得極爲重要,Druid 經過 Coordinator 的 Balance 策略協調 Segment 在集羣中的分佈。git

本文將分析 Druid 的 Balance 策略、源碼及其代價計算函數,本文使用 Druid 的版本是 0.12.0。github

二. Balance方法解析

2.1 Balance 相關的配置

Druid 目前有三種 Balance 算法: cachingCost, diskNormalized, Cost, 其中 cachingCost 是基於緩存的,diskNormalized 則是基於磁盤的 Balance 策略,本文不對前兩種展開篇幅分析, Druid Coordinator 中開啓 cost balance 的配置以下:算法

druid.coordinator.startDelay=PT30S
druid.coordinator.period=PT30S 調度的時間
druid.coordinator.balancer.strategy=cost 默認

動態配置:
maxSegmentsToMove = 5  ##每次Balance最多移動多少個Segment
複製代碼

2.2 Cost 算法概述

Cost 是 Druid 在 0.9.1 開始引入的,在 0.9.1 以前使用的 Balance 算法會存在 Segment 不能快速均衡,分佈不均勻的狀況,Cost 算法的核心思想是:當在作均衡的時候,隨機選擇一個 Segment(假設 Segment A ), 依次計算Segment A 和 Historical 節點上的全部 Segment 的 Cost,選取 Cost 值最小的節點,而後到該節點上從新加載 Segment。緩存

2.3 源碼和流程圖分析

如下會省略一些沒必要要的代碼bash

DruidCoordinatorBalancer 類dom

@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
	final CoordinatorStats stats = new CoordinatorStats();
	// 不一樣tier層的分開Balance
	params.getDruidCluster().getHistoricals().forEach((String tier, NavigableSet<ServerHolder> servers) -> {
	  balanceTier(params, tier, servers, stats);
	});
	return params.buildFromExisting().withCoordinatorStats(stats).build();
}
複製代碼

DruidCoordinatorBalancer 類的 balanceTier 方法,主要是均衡入口函數ide

private void balanceTier(DruidCoordinatorRuntimeParams params, String tier, SortedSet<ServerHolder> servers,CoordinatorStats stats){
	final BalancerStrategy strategy = params.getBalancerStrategy();
	final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();

	currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());

	final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);

	//集羣中只有一個 Historical 節點時不進行Balance
	if (serverHolderList.size() <= 1) {
	  log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
	  return;
	}

	int numSegments = 0;
	for (ServerHolder server : serverHolderList) {
	  numSegments += server.getServer().getSegments().size();
	}

	if (numSegments == 0) {
	  log.info("No segments found. Cannot balance.");
	  return;
	}
	long unmoved = 0L;
	for (int iter = 0; iter < maxSegmentsToMove; iter++) {
	  //經過隨機算法選擇一個候選Segment,該Segment會參與後面的Cost計算
	  final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);

	  if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
	  	 //找Cost最小的節點,Cost計算入口
	    final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
	    //找到候選節點,發起一次Move Segment的任務
	    if (holder != null) {
	      moveSegment(segmentToMove, holder.getServer(), params);
	    } else {
	      ++unmoved;
	    }
	  }
	}
	......
}
複製代碼

Reservoir 隨機算法,隨機選擇一個 Segment 進行 Balance。Segment 被選中的機率:函數

public class ReservoirSegmentSampler
{

  public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
  {
    final Random rand = new Random();
    ServerHolder fromServerHolder = null;
    DataSegment proposalSegment = null;
    int numSoFar = 0;

    //遍歷全部List上的Historical節點
    for (ServerHolder server : serverHolders) {
      //遍歷一個Historical節點上全部的Segment
      for (DataSegment segment : server.getServer().getSegments().values()) {
        int randNum = rand.nextInt(numSoFar + 1);
        // w.p. 1 / (numSoFar+1), swap out the server and segment
        // 隨機選出一個Segment,後面的會覆蓋前面選中的,以最後一個被選中爲止。
        if (randNum == numSoFar) {
          fromServerHolder = server;
          proposalSegment = segment;
        }
        numSoFar++;
      }
    }
    if (fromServerHolder != null) {
      return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
    } else {
      return null;
    }
  }
}
複製代碼

繼續調用到CostBalancerStrategy類的findNewSegmentHomeBalancer方法,其實就是找最合適的Historical節點大數據

@Override
public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List<ServerHolder> serverHolders){
	return chooseBestServer(proposalSegment, serverHolders, true).rhs;
}

protected Pair<Double, ServerHolder> chooseBestServer(
  final DataSegment proposalSegment,
  final Iterable<ServerHolder> serverHolders,
  final boolean includeCurrentServer
){
	Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);

	List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();

	for (final ServerHolder server : serverHolders) {
	  futures.add(
	      exec.submit(
	          new Callable<Pair<Double, ServerHolder>>()
	          {
	            @Override
	            public Pair<Double, ServerHolder> call() throws Exception
	            {
	              //計算Cost:候選Segment和Historical節點上全部Segment的cost和
	              return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server);
	            }
	          }
	      )
	  );
	}

	final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures);
	final List<Pair<Double, ServerHolder>> bestServers = new ArrayList<>();
	bestServers.add(bestServer);
	try {
	  for (Pair<Double, ServerHolder> server : resultsFuture.get()) {
	    if (server.lhs <= bestServers.get(0).lhs) {
	      if (server.lhs < bestServers.get(0).lhs) {
	        bestServers.clear();
	      }
	      bestServers.add(server);
	    }
	  }

	  //Cost最小的若是有多個,隨機選擇一個
	  bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
	}
	catch (Exception e) {
	  log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
	}
  return bestServer;
}
複製代碼
protected double computeCost(final DataSegment proposalSegment, final ServerHolder server,final boolean includeCurrentServer){
    final long proposalSegmentSize = proposalSegment.getSize();

    // (optional) Don't include server if it is already serving segment if (!includeCurrentServer && server.isServingSegment(proposalSegment)) { return Double.POSITIVE_INFINITY; } // Don't calculate cost if the server doesn't have enough space or is loading the segment if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { return Double.POSITIVE_INFINITY; } // 初始cost爲0 double cost = 0d; //計算Cost:候選Segment和Historical節點上全部Segment的totalCost cost += computeJointSegmentsCost( proposalSegment, Iterables.filter( server.getServer().getSegments().values(), Predicates.not(Predicates.equalTo(proposalSegment)) ) ); // 須要加上和即將被加載的Segment之間的cost cost += computeJointSegmentsCost(proposalSegment, server.getPeon().getSegmentsToLoad()); // 須要減掉和即將被加載的 Segment 之間的 cost cost -= computeJointSegmentsCost (proposalSegment, server.getPeon().getSegmentsMarkedToDrop()); return cost; } 複製代碼

開始計算:優化

static double computeJointSegmentsCost(final DataSegment segment, final Iterable<DataSegment> segmentSet){
	double totalCost = 0;
	// 此處須要注意,當新增的Historical節點第一次上線的時候,segmentSet應該是空,因此totalCost=0最小
	// 新增節點總會很快的被均衡
	for (DataSegment s : segmentSet) {
	  totalCost += computeJointSegmentsCost(segment, s);
	}
	return totalCost;
}
複製代碼

進行一些處理:1)Segment的Interval毫秒轉換成hour; 2)先計算了帶lambda的x1, y0, y1的值

public static double computeJointSegmentsCost(final DataSegment segmentA, final DataSegment segmentB){
	final Interval intervalA = segmentA.getInterval();
	final Interval intervalB = segmentB.getInterval();

	final double t0 = intervalA.getStartMillis();
	final double t1 = (intervalA.getEndMillis() - t0) / MILLIS_FACTOR;        //x1
	final double start = (intervalB.getStartMillis() - t0) / MILLIS_FACTOR;   //y0
	final double end = (intervalB.getEndMillis() - t0) / MILLIS_FACTOR;       //y1

	// constant cost-multiplier for segments of the same datsource
	final double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0;

	return INV_LAMBDA_SQUARE * intervalCost(t1, start, end) * multiplier;
}
複製代碼

真正計算 cost 函數的值

public static double intervalCost(double x1, double y0, double y1){
	if (x1 == 0 || y1 == y0) {
	  return 0;
	}

	// 保證Segment A開始時間小於B的開始時間
	if (y0 < 0) {
	  // swap X and Y
	  double tmp = x1;
	  x1 = y1 - y0;
	  y1 = tmp - y0;
	  y0 = -y0;
	}

	if (y0 < x1) {
	  // Segment A和B 時間有重疊的狀況,這個分支暫時不分析
	  .......
	} else {
	  // 此處就是計算A和B兩個Segment之間的cost,代價計算函數:See https://github.com/druid-io/druid/pull/2972
	  final double exy0 = FastMath.exp(x1 - y0);
	  final double exy1 = FastMath.exp(x1 - y1);
	  final double ey0 = FastMath.exp(0f - y0);
	  final double ey1 = FastMath.exp(0f - y1);

	  return (ey1 - ey0) - (exy1 - exy0);
	}
}
複製代碼

2.4 代價計算函數分析

如今咱們有 2 個 Segment, A 和 B,須要計算他們之間的代價,假設 A 的 start 和 end 時間都是小於 B 的。

2.4.1 Cost函數介紹

Cost 函數的提出請參考 Druid PR2972

Cost(X, Y)=\\int\_{x\_0}^{x\_1}\\int\_{y\_0}^{y\_1} {e^{\\lambda|x-y|}}\\,{\\rm d}x{\\rm d}y

其中 \( \lambda = \frac{log_2e}{24.0} \) 是Cost函數的半衰期

爲了弄清楚這個 Cost 函數以及影響 Cost 值的因素?咱們先使用一些經常使用的參數配置:
假設1:Segment A 的Interval是1小時,即 \( A_{end}-A_{start}=1*Hour \), 獲得:

x\_1 = \\frac{(A\_{end}-A\_{start})\*log\_e2}{24*Hour} = \\frac{log\_e2}{24}

假設2:Segment B 的 Interval 也是 1 小時, 獲得:

y\_1 = y\_0 + x\_1

假設3:Segment B 和 A start 時間相差了 t 個小時,獲得:

y\_0 = \\frac{t\*Hour\*log\_e2}{24*Hour} = \\frac{t}{24}\*log\_e2

在實際的代碼中,\( \lambda \)的計算已經放到了\( {x_0}{x_1}{y_0}{y_1} \)中

2.4.2 計算 Cost 函數

Cost(A, B)=(e^{x\_1-y\_0}-e^{x\_1-y\_1})-(e^{-y\_0}-e^{-y\_1})

根據假設 2,獲得:

Cost(A, B)=(e^{x\_1 - y\_0} - e^{-y\_0}) - (e^{-y\_0} - e^{-x\_1 - y\_0})=e^{x\_1 - y\_0} + e^{-x\_1 - y\_0} - 2e^{-y\_0}

繼續簡化,獲得:

Cost(A, B)=(e^{x\_1} + e^{-x\_1} - 2 )e^{-y\_0}

根據假設 1,獲得:

Cost(A, B)=(2^{\\frac{1}{24}} + 2^{-\\frac{1}{24}} - 2)e^{-y\_0}

根據假設 3,獲得:

Cost(A, B)=(2^{\\frac{1}{24}} + 2^{-\\frac{1}{24}} - 2)*e^{\\frac{-t}{24} * log\_e2}

繼續簡化,獲得:

Cost(A, B)={(2^{\frac{1}{24}} + 2^{-\frac{1}{24}} - 2)*2^{\frac{-t}{24}}}

2.4.5 小結

根據上訴 cost 函數化簡的結果,當 Segment A 和 B 的 Interval 都是 1 小時的狀況下:Segment A 和 B 時間相距越大 Cost 越小,它們就越可能共存在同一個 Historical 節點。這也和本文開始時候提出的時間相鄰的 Segment 存儲在不一樣的節點上讓查詢更快相呼應。

三. 總結

Druid 的 balance 機制,主要解決 segments 數據在 history 節點的分佈問題,這裏的優化主要針對於查詢作優化,通常狀況下,用戶的某一次查詢針對的是一個時間範圍內的多個 Segment 數據, cost 算法的核心思想是,儘量打散 Segment 數據分佈,這樣在一次查詢設計多個連續時間 Segment 數據的時候可以利用多臺 history server 的並行處理能力,分散系統開銷,縮短查詢 RT.

最後打個小廣告,有贊大數據團隊基礎設施團隊,主要負責有讚的數據平臺(DP), 實時計算(Storm, Spark Streaming, Flink),離線計算(HDFS,YARN,HIVE, SPARK SQL),在線存儲(HBase),實時 OLAP(Druid) 等數個技術產品,歡迎感興趣的小夥伴聯繫.

相關文章
相關標籤/搜索