通常地,像kafka之類的消息中間件,做爲一個能夠保持歷史消息的組件,其消費模型通常是主動拉取方式。這是爲了給消費者足夠的自由,回滾或者前進。java
然而,也正是因爲將消費消息的權力交給了消費者,因此,消費者每每須要承擔更多的責任。好比:須要自行保存消費偏移量,以便後續能夠知道從哪裏繼續。而當這一點處理很差時,則可能帶來一些麻煩。redis
無論怎麼樣,解決方案也都是現成的,我們也不用擔憂。算法
今天咱們要談論的是一個場景: 如何讓n個機器消費m個分片數據?(帶狀態的,即不能任意機器消費任意shard)apache
這在消息中間件的解決方案裏,明白地寫着,使用消費者羣組就能夠實現了。具體來講就是,每一個分片至多會被一機器消費,每一個機器則能夠消費多個分片數據。即機器數據小於分片數時,分片會被均衡地分配到消費者中。當機器數大於分片數時,多餘的機器將不作任何事情。緩存
好吧,既然官方已經說明白了,那我們應該就再也不須要本身搞一個輪子了吧。服務器
可是,我還有個場景:若是我要求在機器作負載重平衡時,須要保證被抽取出去的機器分片,至少保留一段時間,不容許任何機器消費該分片,由於可能還有數據須要備份。數據結構
針對這種場景,我想官方也許是有提供回調函數之類的解決方案的吧。無論了,反正我沒找到,只能本身先造個輪子了。負載均衡
本文場景前提:框架
1. 使用loghub做爲消息中間件(原理同kafka);
2. 整個數據有m個分片shard;
3. 整個消費者集羣有n臺機器;
4. 每一個分片的數據須要集中到一機器上作有狀態處理;
5. 能夠藉助redis保存有狀態數據,以便消費者機器作優雅停機;dom
最簡單的方案是,使 n=m, 每臺機器消費一個shard, 這樣狀態永遠不會錯亂。
可是這樣明顯可擴展能力太差了!
好比有時數據量小了,雖然分片還在,可是徹底不用那麼多機器的時候,如何縮減機器?
好比因爲數據壓力大了,我想增長下分片數,以提升發送者性能,可是消費者我還不想理他,消費慢點無所謂?
其實,咱們可使用官方的消費者羣組方法,能夠動態縮減機器。
可是這個有狀態就比較難作到了。
以上痛點,總結下來就是,可擴展性問題。
想象中的輪子是怎麼樣的?
1. 須要有個註冊中心,管理機器的上下線監控;
2. 須要有負載均衡器,負載將shard的負載均衡的分佈到在線機器中;
3. 須要有每一個機器本身消費的分片記錄,以使機器自身有據可查;
4. 須要有每一個分片的消費狀況,以斷定出哪些分片已分配給哪些機器;
咱們來細看下實現:
【1】平衡協調器主框架:
import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.Shard; import com.aliyun.openservices.log.exception.LogException; import com.aliyun.openservices.log.response.ListShardResponse; import com.test.common.config.LogHubProperties; import com.test.utils.RedisPoolUtil; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static com.test.dispatcher.work.RedisKeyConstants.MAX_CONSUMER_SHARD_LOAD; /** * loghub動態消費者 shard分配shard 協調器 * */ public class LoghubConsumerShardCoWorker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(LoghubConsumerShardCoWorker.class); private LogHubProperties logHubProperties; private RedisPoolUtil redisPoolUtil; private Client mClient; private ShardAssignMaster shardAssignMaster; private String HOST_NAME; public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties) { this(redisPoolUtil, logHubProperties, null); } public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties, String hostName) { this.redisPoolUtil = redisPoolUtil; this.logHubProperties = logHubProperties; this.HOST_NAME = hostName; initSharedVars(); initConsumerClient(); initShardAssigner(); getAllShardList(); registerSelfConsumer(); startHeartBeatThread(); } /** * 開啓心跳線程,保活 */ private void startHeartBeatThread() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleAtFixedRate(() -> { String serverConsumeCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + HOST_NAME; redisPoolUtil.expire(serverConsumeCacheKey, 30); shardAssignMaster.sendHeartbeat(HOST_NAME); }, 30, 25, TimeUnit.SECONDS); } /** * 初始化客戶端實例 */ private void initConsumerClient() { this.mClient = new Client(logHubProperties.getEndpoint(), logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey()); } /** * 初始化分片分配控制器 */ private void initShardAssigner() { shardAssignMaster = new ShardAssignMaster(redisPoolUtil); } /** * 初始化公共變量 */ private void initSharedVars() { try { if(HOST_NAME != null) { return; } HOST_NAME = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { logger.error("init error : 獲取服務器主機名失敗", e); throw new RuntimeException("init error : 獲取服務器主機名失敗"); } } /** * 將本身做爲消費者註冊到消費者列表中,以斷定後續能夠進行消費 */ private void registerSelfConsumer() { shardAssignMaster.registerConsumer(HOST_NAME); shardAssignMaster.sendHeartbeat(HOST_NAME); } @Override public void run() { try { checkConsumerSharding(); } catch (Exception e) { logger.error("動態分配shard 發生異常", e); } } /** * job 只作一件事,即檢查 shard 的消費狀況,不平衡則處理 */ private void checkConsumerSharding() { try { if (tryCoWorkerLock()) { // step1. 檢查是否須要進行shard分配 // 集羣消費loghub數據動態伸縮策略 // 1. 啓動時先去獲取部分片數,備用; // 2. 應用啓動後,把本身註冊到註冊中心或redis中; // 3. 根據註冊上來的機器列表,按平均分配策略分配shard(只能由一個機器來分配,其餘機器處理分佈式鎖競爭失敗,等待狀態); // 4. 分配好後,釋放鎖,各機器開始消費,如機器A消費shard 0/3,則機器1以輪詢的方式依次從shard 0/3 摘取數據消費; // 5. 分配好的數據結構爲:prefix+ip保存具體數據,另外將本身的key添加到另外一個zset中,標識本身存活;本身的key有效期爲30秒;使用另外一維度 shard,保存每一個shard被佔用狀況,使用hash保存,key爲shard,value爲當有佔用時爲機器ip或主機名,當無佔用時爲null或空串; // 6. 以上數據刷入,將在機器搶佔到shard更新數據;shard總數信息暫時不容許在運行期間進行變動;(即若是變理shard必須重啓服務器) // 7. 機器下線時,佔用的key將自動過時;(考慮是否主動刪除) // 8. 各機器上啓動一個後臺掃描線程,每隔30秒掃描一次。掃描zset,取出全部值後查看是否存在相應的key,若是不存在說明機器已下線,須要從新分配其佔用的shard; // 9. 從新分配策略,使用一致性hash算法實現; // 10. 機器上線時,使用一致性hash算法從新平衡shard; // 11. 使用分佈式鎖保證分配進程只有一個; CheckConsumerShardingResultContainer resultContainer = checkShardConsumerReBalanceStatus(); if(resultContainer.getStatusResultType() != ReBalanceStatusResultEnum.OK) { reBalanceConsumerShard(resultContainer); } } } finally { releaseCoWorkerLock(); } } /** * 確認機器和shard是否須要再平衡 * * @return 結果狀態集 */ private CheckConsumerShardingResultContainer checkShardConsumerReBalanceStatus() { // step1. 檢查自身是否存在shard, 不存在則當即進行一次重分配(消費者機器數大於分片數時,重平衡動做將是無效動做) // step2. 檢查全部shard列表,是否有未被分配的shard,若有,當即觸發一次重分配 // step3. 檢查是否有負荷比較高的機器,若有觸發平衡(功能預留,此功能須要基於統計信息) CheckConsumerShardingResultContainer resultContainer = new CheckConsumerShardingResultContainer(); final List<String> activeServersList = shardAssignMaster.getAllOnlineServerList(); final List<String> allShardList = getAllShardList(); // 計算空閒機器 Map<String, Integer> hostConsumeLoadCountMap = new HashMap<>(); List<String> idleServerList = filterIdleServerList(activeServersList, hostConsumeLoadCountMap); // 計算未被分配的shard List<String> unAssignedShardList = filterUnAssignedShardList(allShardList); // 根據資源信息,得出目前的負載狀態 ReBalanceStatusResultEnum statusResult = computeReBalanceStatusOnResources( unAssignedShardList, idleServerList, hostConsumeLoadCountMap); resultContainer.setAllServerList(activeServersList); resultContainer.setAllShardList(allShardList); resultContainer.setIdleServerList(idleServerList); resultContainer.setUnAssignedShardList(unAssignedShardList); resultContainer.setServerConsumeShardLoad(hostConsumeLoadCountMap); resultContainer.setStatusResultType(statusResult); return resultContainer; } /** * 根據給定資源信息,計算出目前的負載狀態 * * @param unAssignedShardList 未分配的shard列表 * @param idleServerList 空閒機器列表 * @param hostConsumeLoadMap 機器消費計數容器(負載狀況) * @return 狀態值 */ private ReBalanceStatusResultEnum computeReBalanceStatusOnResources( List<String> unAssignedShardList, List<String> idleServerList, Map<String, Integer> hostConsumeLoadMap) { // 沒有未分配的shard,檢測是否平衡便可 // 0. 有空閒機器,則直接分配給空閒機器便可 // 1. 最大消費shard-最小消費shard數 >= 2, 則說明有機器消費過多shard,需重分配 // 2. 機器負載平衡,無須調整 if(unAssignedShardList.isEmpty()) { int minConsume = MAX_CONSUMER_SHARD_LOAD; int maxConsume = 0; for (Map.Entry<String, Integer> entry : hostConsumeLoadMap.entrySet()) { int gotCount = entry.getValue(); if(gotCount > maxConsume) { maxConsume = gotCount; } if(gotCount < minConsume) { minConsume = gotCount; } } // 因有未分配的機器,假如現有的機器消費都是2,則須要重分配的大壓力的機器 shard 給空閒機器 if(!idleServerList.isEmpty()) { if (maxConsume > 1) { return ReBalanceStatusResultEnum.HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED; } } // 有消費相差2的機器,從新分配,從大數上借調到小數上 if(maxConsume > minConsume + 1) { return ReBalanceStatusResultEnum.HEAVY_LOAD_BALANCE_NEEDED; } return ReBalanceStatusResultEnum.OK; } // 有可用shard // 3. 有空閒機器,直接讓空閒shard分配給這些空閒機器就ok了 // 4. 沒有空閒機器,須將空閒shard 分配給負載小的機器 if(idleServerList.isEmpty()) { return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS; } return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS; } /** * 過濾出空閒的機器列表 * * @param activeServersList 全部機器列表 * @return 空閒機器集, 且將各自消費數放入計數容器 */ private List<String> filterIdleServerList(List<String> activeServersList, Map<String, Integer> hostConsumeCountMap) { List<String> idleServerList = new ArrayList<>(); for (String hostname1 : activeServersList) { if(!shardAssignMaster.isConsumerServerAlive(hostname1)) { shardAssignMaster.invalidateOfflineServer(hostname1); continue; } int consumeCount; Set<String> consumeShardSet = shardAssignMaster.getServerDutyConsumeShardSet(hostname1); if(consumeShardSet == null || consumeShardSet.isEmpty()) { idleServerList.add(hostname1); consumeCount = 0; } else { consumeCount = consumeShardSet.size(); } hostConsumeCountMap.put(hostname1, consumeCount); } return idleServerList; } /** * 過濾出未分配的shard列表 * * @param allShardList 全部shard * @return 未分配的shard */ private List<String> filterUnAssignedShardList(List<String> allShardList) { List<String> unAssignedShardList = new ArrayList<>(); for (String shardId1 : allShardList) { String consumeHostname = shardAssignMaster.getShardAssignedServer(shardId1); // 若是不爲空,則以前分配過,檢查機器是否下線 // 若是爲空,則是第一次分配 if(!StringUtils.isBlank(consumeHostname)) { if(!shardAssignMaster.isConsumerServerAlive(consumeHostname)) { // 清除下線機器信息,將當前shard置爲空閒 shardAssignMaster.invalidateOfflineServer(consumeHostname); shardAssignMaster.invalidateShardAssignInfo(shardId1); unAssignedShardList.add(shardId1); } } else { unAssignedShardList.add(shardId1); } } return unAssignedShardList; } /** * 嘗試獲取協調者協調鎖 * * 在集羣環境中,只容許有一個協調器在運行 * * @return true:成功, false:失敗,不得進行協調分配工做 */ private boolean tryCoWorkerLock() { return redisPoolUtil.getDistributedLock("distributedLock", HOST_NAME, 30); } /** * 釋放協調鎖,以便下次再競爭 */ private void releaseCoWorkerLock() { redisPoolUtil.releaseDistributedLock("distributedLock", HOST_NAME); } /** * 從新平衡消費者和shard的關係 * * @param resultContainer 待重平衡狀態 */ private void reBalanceConsumerShard(CheckConsumerShardingResultContainer resultContainer) { // 集羣消費loghub數據動態伸縮策略,根據負載狀態,調用相應策略進行重平衡 StatusReBalanceStrategy strategy = StatusReBalanceStrategyFactory.createStatusReBalanceAlgorithm(resultContainer, shardAssignMaster); strategy.loadBalance(); } /** * 獲取分片列表 * * @return 分片列表,如: 0,1,2,3 */ private List<String> getAllShardList() { // 實時讀取列表 List<String> shardList = Lists.newArrayList(); try { ListShardResponse listShardResponse = mClient.ListShard(logHubProperties.getProjectName(), logHubProperties.getEventlogStore()); ArrayList<Shard> getShards = listShardResponse.GetShards(); for (Shard shard : getShards) { shardList.add(shard.GetShardId() + ""); } } catch (LogException e) { logger.error("loghub 獲取shard列表 error :", e); } return shardList; } }
如上,就是協調均衡主框架。主要邏輯以下:
1. 啓動時初始化各類端,分配器,註冊本身到控制中心等等;
2. 以線程的形式,被外部以定時任務執行的方式調用;
3. 檢查任務前,須得到檢查鎖,不然直接返回;
4. 先得到目前機器的全部消費狀況和shard的分配狀況,得出資源負載數據;
5. 根據獲得的數據信息,推算出目前的平衡狀態;
6. 根據平衡狀態,調用相應的平衡策略進行重平衡;
7. 等待下一次調度;
檢查結果將做爲後續選擇均衡策略的依據,因此須要相應的狀態容器保存。以下:
/** * 集羣狀態預檢查 結果容器 */ class CheckConsumerShardingResultContainer { /** * 全部shard列表 */ private List<String> allShardList; /** * 未被分配的shard列表 */ private List<String> unAssignedShardList; /** * 全部機器列表 */ private List<String> allServerList; /** * 空閒的機器列表(未被分配shard) */ private List<String> idleServerList; /** * 機器消費shard的負載計數容器 */ private Map<String, Integer> serverConsumeShardLoad; /** * 狀態檢查結果類型 */ private ReBalanceStatusResultEnum statusResultType; public Map<String, Integer> getServerConsumeShardLoad() { return serverConsumeShardLoad; } public void setServerConsumeShardLoad(Map<String, Integer> serverConsumeShardLoad) { this.serverConsumeShardLoad = serverConsumeShardLoad; } public List<String> getAllShardList() { return allShardList; } public void setAllShardList(List<String> allShardList) { this.allShardList = allShardList; } public List<String> getUnAssignedShardList() { return unAssignedShardList; } public void setUnAssignedShardList(List<String> unAssignedShardList) { this.unAssignedShardList = unAssignedShardList; } public List<String> getAllServerList() { return allServerList; } public void setAllServerList(List<String> allServerList) { this.allServerList = allServerList; } public List<String> getIdleServerList() { return idleServerList; } public void setIdleServerList(List<String> idleServerList) { this.idleServerList = idleServerList; } public ReBalanceStatusResultEnum getStatusResultType() { return statusResultType; } public void setStatusResultType(ReBalanceStatusResultEnum statusResultType) { this.statusResultType = statusResultType; } }
針對多個平衡策略算法,使用一個工廠類來生產各類策略實例。以下:
/** * 再平衡算法工廠類 */ class StatusReBalanceStrategyFactory { /** * 無需作平衡的控制器 */ private static final StatusReBalanceStrategy EMPTY_BALANCER = new EmptyReBalancer(); /** * 根據當前的負載狀態,建立對應的負載均衡算法 * * @param resultContainer 負載狀態集 * @param shardAssignMaster 分片分配管理者實例 * @return 算法實例 */ public static StatusReBalanceStrategy createStatusReBalanceAlgorithm(CheckConsumerShardingResultContainer resultContainer, ShardAssignMaster shardAssignMaster) { ReBalanceStatusResultEnum balanceStatus = resultContainer.getStatusResultType(); switch (balanceStatus) { case OK: return EMPTY_BALANCER; case UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS: return new UnAssignedShardWithConsumerIdleReBalancer(shardAssignMaster, resultContainer.getUnAssignedShardList(), resultContainer.getIdleServerList()); case UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS: return new UnassignedShardWithoutConsumerIdleReBalancer(shardAssignMaster, resultContainer.getUnAssignedShardList(), resultContainer.getServerConsumeShardLoad()); case HEAVY_LOAD_BALANCE_NEEDED: return new HeavyLoadReBalancer(shardAssignMaster, resultContainer.getServerConsumeShardLoad()); case HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED: return new HeavyLoadWithConsumerIdleReBalancer(shardAssignMaster, resultContainer.getServerConsumeShardLoad(), resultContainer.getIdleServerList()); default: break; } return EMPTY_BALANCER; } } /** * 負載均衡策略統一接口 */ interface StatusReBalanceStrategy { /** * 執行負載均衡方法 */ public void loadBalance(); }
針對各類場景的負載均衡,各自實現以下。其中,無需操做時,將返回一個空操做實例!
1. 空操做實例
/** * 無需作平衡的控制器 * * @see ReBalanceStatusResultEnum#OK 狀態枚舉 */ class EmptyReBalancer implements StatusReBalanceStrategy { @Override public void loadBalance() { // ignore ... } }
2. 分配剩餘shard給空閒的機器控制器
/** * 爲全部空閒的其餘空閒機器分配可用 shard 的控制器 * * @see ReBalanceStatusResultEnum#UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS 狀態枚舉 */ class UnAssignedShardWithConsumerIdleReBalancer implements StatusReBalanceStrategy { /** * 未被分配的分片列表 */ private List<String> unAssignedShardList; /** * 分片分配管理者實例 */ private ShardAssignMaster shardAssignMaster; /** * 空閒的機器列表 */ private List<String> idleServerList; public UnAssignedShardWithConsumerIdleReBalancer( ShardAssignMaster shardAssignMaster, List<String> unAssignedShardList, List<String> idleServerList) { this.shardAssignMaster = shardAssignMaster; this.unAssignedShardList = unAssignedShardList; this.idleServerList = idleServerList; } @Override public void loadBalance() { // 1. 找出還未被消費的shard // 2. 依次分配給各空閒機器,每一個空閒機器只至多分配一個shard int serverIndex = 0; for (String shard1 : unAssignedShardList) { // 輪詢分配shard, 先只給一個機器分配一個shard if(serverIndex >= idleServerList.size()) { break; } String serverHostname = idleServerList.get(serverIndex++); shardAssignMaster.assignShardToServer(shard1, serverHostname); } } }
3. 分配剩餘shard給負載低的機器的控制器
/** * 有空閒shard場景 的控制器 , 須找出負載最低的機器塞入shard到現有的機器中(多是有機器下線致使) * * @see ReBalanceStatusResultEnum#UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS 狀態枚舉 */ class UnassignedShardWithoutConsumerIdleReBalancer implements StatusReBalanceStrategy { /** * 未被分配分片列表 */ private List<String> unAssignedShardList; /** * 分片管理者實例 */ private ShardAssignMaster shardAssignMaster; /** * 消費者負載狀況 */ private Map<String, Integer> consumerLoadCount; public UnassignedShardWithoutConsumerIdleReBalancer( ShardAssignMaster shardAssignMaster, List<String> unAssignedShardList, Map<String, Integer> consumerLoadCount) { this.shardAssignMaster = shardAssignMaster; this.unAssignedShardList = unAssignedShardList; this.consumerLoadCount = consumerLoadCount; } @Override public void loadBalance() { // 1. 找出負載最低的機器 // 2. 依次分配shard給該機器 // 3. 分配的後負載數+1, 循環分配 // 先根據空閒數,計算出一個能夠接受新shard的機器的shard負載最低值,而後依次分配給這些機器 for (String shard1 : unAssignedShardList) { // 按負載最小分配原則 分配shard Map.Entry<String, Integer> minLoadServer = getMinLoadServer(consumerLoadCount); String serverHostname = minLoadServer.getKey(); // 分配shard給機器 shardAssignMaster.assignShardToServer(shard1, serverHostname); // 負載數 +1 minLoadServer.setValue(minLoadServer.getValue() + 1); } } /** * 獲取負載最小的機器名備用 * * @param loadCount 負載數據 * @return 最小負載機器 */ private Map.Entry<String, Integer> getMinLoadServer(Map<String, Integer> loadCount) { int minCount = MAX_CONSUMER_SHARD_LOAD; Map.Entry<String, Integer> minLoadServer = null; for(Map.Entry<String, Integer> server1 : loadCount.entrySet()) { if(server1.getValue() < minCount) { minCount = server1.getValue(); minLoadServer = server1; } } return minLoadServer; } }
4. 將現有機器消費狀況作重分配,從而使各自負載相近控制器
/** * 負載不均衡致使的 從新均衡控制器,將消費shard多的機器的 shard 拆解部分到 消費少的機器上 (須上鎖) * * @see ReBalanceStatusResultEnum#HEAVY_LOAD_BALANCE_NEEDED 狀態枚舉 */ class HeavyLoadReBalancer implements StatusReBalanceStrategy { /** * 分片分配管理者實例 */ private ShardAssignMaster shardAssignMaster; /** * 機器消費負載狀況 */ private Map<String, Integer> consumerLoadCount; public HeavyLoadReBalancer(ShardAssignMaster shardAssignMaster, Map<String, Integer> consumerLoadCount) { this.shardAssignMaster = shardAssignMaster; this.consumerLoadCount = consumerLoadCount; } @Override public void loadBalance() { // 1. 找出全部機器的消費數的平均線值 // 2. 負載數大於均線1的,直接抽出多餘的shard, 放到待分配容器中 // 3. 從大到小排序負載機器 // 4. 從大的負載上減小shard到最後的機器上,直到小的機器達到平均負載線最貼近的地方,或者小的機器到達平均負載線最貼近的地方 // 5. ++大負載機器 或者 --小負載機器,下一次循環 double avgLoadCount = computeAliveServersAvgLoadCount(consumerLoadCount); List<Map.Entry<String, Integer>> sortedLoadCountList = sortLoadCountByLoadWithSmallEndian(consumerLoadCount); int bigLoadIndex = 0; int smallLoadIndex = sortedLoadCountList.size() - 1; for (;;) { // 首先檢測是否已遍歷完成,完成後再也不進行分配 if(isRoundRobinComplete(bigLoadIndex, smallLoadIndex)) { break; } Map.Entry<String, Integer> bigLoadServerEntry = sortedLoadCountList.get(bigLoadIndex); double canTakeCountFromBigLoad = bigLoadServerEntry.getValue() - avgLoadCount; if(canTakeCountFromBigLoad < 1) { bigLoadIndex += 1; continue; } for (int reAssignShardIndex = 0; reAssignShardIndex < canTakeCountFromBigLoad; reAssignShardIndex++) { if(isRoundRobinComplete(bigLoadIndex, smallLoadIndex)) { break; } Map.Entry<String, Integer> smallLoadServerEntry = sortedLoadCountList.get(smallLoadIndex); double canPutIntoSmallLoad = avgLoadCount - smallLoadServerEntry.getValue(); if(canPutIntoSmallLoad < 1) { smallLoadIndex -= 1; continue; } // 此處可使用管道操做,更流暢, 或者更準確的說,使用事務操做 // 從 bigLoad 中移除shard 0 // 將移除的 shard 上鎖,以防後續新機器當即消費,致使數據異常 // 添加新shard到 smallLoad 中 String firstLoadSHardId = shardAssignMaster.popServerFirstConsumeShardId(bigLoadServerEntry.getKey()); bigLoadServerEntry.setValue(bigLoadServerEntry.getValue() - 1); // 上鎖分片,禁用消費 shardAssignMaster.lockShardId(firstLoadSHardId); // 添加shard到 smallLoad 中 shardAssignMaster.assignShardToServer(firstLoadSHardId, smallLoadServerEntry.getKey()); smallLoadServerEntry.setValue(smallLoadServerEntry.getValue() + 1); } } } /** * 斷定輪詢是否完成 * * @param startIndex 開始下標 * @param endIndex 結束下標 * @return true: 輪詢完成, false: 未完成 */ private boolean isRoundRobinComplete(int startIndex, int endIndex) { return startIndex == endIndex; } /** * 從大到小排序 負載機器 * * @param consumerLoadCount 總負載狀況 * @return 排序後的機器列表 */ private List<Map.Entry<String, Integer>> sortLoadCountByLoadWithSmallEndian(Map<String, Integer> consumerLoadCount) { List<Map.Entry<String, Integer>> sortedList = new ArrayList<>(consumerLoadCount.entrySet()); sortedList.sort(new Comparator<Map.Entry<String, Integer>>() { @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return o2.getValue() - o1.getValue(); } }); return sortedList; } /** * 計算平均每臺機器的消費shard負載 * * @param loadCount 總負載指標容器 * @return 負載均線 */ private double computeAliveServersAvgLoadCount(Map<String, Integer> loadCount) { int totalServerCount = loadCount.size(); int totalShardCount = 0; for(Integer consumeShardCount : loadCount.values()) { totalShardCount += consumeShardCount; } return (double) totalShardCount / totalServerCount; } }
5. 從負載重的機器上剝奪shard,分配給空閒的機器 控制器
/** * 負載不均衡,且存在空閒的機器, 此時應是 均值與最大值之間相差較小值,可是至少有一個 消費2 的機器,能夠剝奪其1個shard給空閒機器 的控制器 * * @see ReBalanceStatusResultEnum#HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED 狀態枚舉 */ class HeavyLoadWithConsumerIdleReBalancer implements StatusReBalanceStrategy { /** * 分片分配管理者實例 */ private ShardAssignMaster shardAssignMaster; /** * 空閒的機器列表 */ private List<String> idleServerList; /** * 機器消費負載狀況 */ private Map<String, Integer> consumerLoadCount; public HeavyLoadWithConsumerIdleReBalancer( ShardAssignMaster shardAssignMaster, Map<String, Integer> consumerLoadCount, List<String> idleServerList) { this.shardAssignMaster = shardAssignMaster; this.consumerLoadCount = consumerLoadCount; this.idleServerList = idleServerList; } @Override public void loadBalance() { // 1. 找出還未被消費的shard // 2. 分配一個給本身 // 3. 若是還有其餘機器也未分配,則一樣進行分配 for (String idleHostname1 : idleServerList) { Map.Entry<String, Integer> maxLoadEntry = getMaxLoadConsumerEntry(consumerLoadCount); // 自己只有一個則再也不分配負擔了 if(maxLoadEntry.getValue() <= 1) { break; } String maxLoadServerHostname = maxLoadEntry.getKey(); // 此處可使用管道操做,更流暢, 或者更準確的說,使用事務操做 // 從 bigLoad 中移除shard 0 // 將移除的 shard 上鎖,以防後續新機器當即消費,致使數據異常 // 添加新shard到 smallLoad 中 String firstLoadSHardId = shardAssignMaster.popServerFirstConsumeShardId(maxLoadServerHostname); maxLoadEntry.setValue(maxLoadEntry.getValue() - 1); // 上鎖卸載下來的shard,鎖定50s shardAssignMaster.lockShardId(firstLoadSHardId); // 添加shard到 smallLoad 中 shardAssignMaster.assignShardToServer(firstLoadSHardId, idleHostname1); consumerLoadCount.put(idleHostname1, 1); } } /** * 獲取負載最大的機器實例做 * * @param consumerLoadCount 全部機器的負載狀況 * @return 最大負載機器實例 */ private Map.Entry<String, Integer> getMaxLoadConsumerEntry(Map<String, Integer> consumerLoadCount) { Integer maxConsumeCount = 0; Map.Entry<String, Integer> maxEntry = null; for (Map.Entry<String, Integer> server1 : consumerLoadCount.entrySet()) { if(server1.getValue() > maxConsumeCount) { maxConsumeCount = server1.getValue(); maxEntry = server1; } } return maxEntry; } }
如上,各個平衡策略,實現各自的功能,就能掌控整個集羣的消費控制了!
除了上面的主料,還有一些附帶的東西!
【2】均衡狀態枚舉值以下:
/** * 再平衡檢測結果類型枚舉 * */ public enum ReBalanceStatusResultEnum { /** * 一切正常,無須操做 */ OK("一切正常,無須操做"), /** * 有新下線機器,能夠將其分片分配給其餘機器 */ UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS("有未分配的分片,能夠分配給其餘機器"), /** * 有未分配的分片,且有空閒機器,直接將空閒shard分配給空閒機器便可(最好只分配1個,以便其餘機器啓動後可用) */ UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS("有未分配的分片,且有空閒機器"), /** * 負載不均衡,須平生衡 */ HEAVY_LOAD_BALANCE_NEEDED("負載不均衡,須平生衡"), /** * 負載不均衡,且存在空閒的機器, 此時應是 均值與最大值之間相差較小值,可是至少有一個 消費2 的機器,能夠剝奪其1個shard給空閒機器 */ HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED("負載不均衡,且存在空閒的機器"), ; private ReBalanceStatusResultEnum(String remark) { // ignore } }
【3】RedisKeyConstants 常量定義
/** * redis 相關常量定義 */ public class RedisKeyConstants { /** * 在線機器緩存key.與心跳同時做用 * * @see #SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX */ public static final String ALL_ONLINE_SERVER_CACHE_KEY = "prefix:active.servers"; /** * 機器消費shard狀況 緩存key前綴 */ public static final String SERVER_CONSUME_CACHE_KEY_PREFIX = "prefix:log.consumer:server:"; /** * 分片被分配狀況 緩存key前綴 */ public static final String SHARD_ASSIGNED_CACHE_KEY_PREFIX = "prefix:shard.assigned:id:"; /** * 分片鎖 緩存key前綴, 當上鎖時,任何機器不得再消費 */ public static final String SHARD_LOCK_CONSUME_CACHE_PREFIX = "prefix:consume.lock.shard:id:"; /** * 存活機器心跳,與上面的機器造成呼應 * * @see #ALL_ONLINE_SERVER_CACHE_KEY */ public static final String SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX = "prefix:log.consumer:server.heartbeat:"; /** * 單個消費者最大消費負載數 (一個不可能達到的值) */ public static final Integer MAX_CONSUMER_SHARD_LOAD = 9999; }
【4】shard分配控制器負責全部shard分配
import com.test.utils.RedisPoolUtil; import java.util.ArrayList; import java.util.List; import java.util.Set; /** * shard分配管理者 (儘可能使用接口表達) * */ public class ShardAssignMaster { private RedisPoolUtil redisPoolUtil; public ShardAssignMaster(RedisPoolUtil redisPoolUtil) { this.redisPoolUtil = redisPoolUtil; } /** * 註冊消費者到 控制中心(註冊中心) */ public void registerConsumer(String serverHostname) { // 註冊server到 redis zset 中,若有條件,可使用 zk 進行操做,也許更好 redisPoolUtil.zadd(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, (double)System.currentTimeMillis(), serverHostname); } /** * 心跳發送數據 */ public void sendHeartbeat(String serverHostname) { String heartbeatCacheKey = RedisKeyConstants.SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX + serverHostname; redisPoolUtil.set(heartbeatCacheKey, "1", 30); } /** * 檢測指定消費者服務器還存活與否 * * @param consumeHostname 機器名 * @return true: 存活, false: 宕機 */ public boolean isConsumerServerAlive(String consumeHostname) { String aliveValue = redisPoolUtil.get(RedisKeyConstants.SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX + consumeHostname); return aliveValue != null && "1".equals(aliveValue); } /** * 獲取並刪除指定server的所屬消費的第一個 shardId * * @param serverHostname 機器名 * @return 第一個shardId */ public String popServerFirstConsumeShardId(String serverHostname) { String bigLoadConsumerServerCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname; Set<String> firstLoadShardSet = redisPoolUtil.zrange(bigLoadConsumerServerCacheKey, 0, 0); String firstLoadSHardId = firstLoadShardSet.iterator().next(); redisPoolUtil.zrem(bigLoadConsumerServerCacheKey, firstLoadSHardId); redisPoolUtil.expire(bigLoadConsumerServerCacheKey, 60); return firstLoadSHardId; } /** * 對shard進行上鎖,禁止全部消費行爲 * * @param shardId 分片id */ public void lockShardId(String shardId) { String shardLockCacheKey = RedisKeyConstants.SHARD_LOCK_CONSUME_CACHE_PREFIX + shardId; redisPoolUtil.set(shardLockCacheKey, "1", 50); } /** * 分配shard分片數據給 指定server * * @param shardId 分片id * @param serverHostname 分配給的消費者機器名 */ public void assignShardToServer(String shardId, String serverHostname) { String smallLoadConsumerServerCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname; redisPoolUtil.zadd(smallLoadConsumerServerCacheKey, (double)System.currentTimeMillis(), shardId); redisPoolUtil.expire(smallLoadConsumerServerCacheKey, 60); // 更新新的shard消費者標識 String shardIdAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId; redisPoolUtil.set(shardIdAssignCacheKey, serverHostname); } /** * 獲取被分配了shardId的server信息 * * @param shardId 要檢查的分片id * @return 被分配了shardId 的機器名 */ public String getShardAssignedServer(String shardId) { String shardAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId; return redisPoolUtil.get(shardAssignCacheKey); } /** * 刪除shard的分配信息,使無效化 * * @param shardId 要刪除的分片id */ public void invalidateShardAssignInfo(String shardId) { String shardAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId; redisPoolUtil.del(shardAssignCacheKey); } /** * 清理下線機器 * * @param hostname 下線機器名 */ public void invalidateOfflineServer(String hostname) { redisPoolUtil.zrem(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, hostname); } /** * 獲取機器消費的shard列表 * * @param serverHostname 機器主機名 * @return shard列表 或者 null */ public Set<String> getServerDutyConsumeShardSet(String serverHostname) { String serverDutyConsumeShardCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname; return redisPoolUtil.zrange(serverDutyConsumeShardCacheKey, 0, -1); } /** * 獲取全部在線機器列表 * * @return 在線機器列表 */ public List<String> getAllOnlineServerList() { Set<String> hostnameSet = redisPoolUtil.zrange(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, 0, -1); return new ArrayList<>(hostnameSet); } }
以上是協同負載均衡器代碼實現。
【5】固然你還須要一個消費者
接下來咱們還要看下消費者如何實現消費。
import com.test.utils.RedisPoolUtil; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.time.LocalDateTime; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 消費者業務線程 * */ public class LoghubConsumeWorker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(LoghubConsumeWorker.class); private RedisPoolUtil redisPoolUtil; private String HOST_NAME; /** * 因消費者數目不必定,因此使用 CachedThreadPool */ private ExecutorService consumeExecutorService = Executors.newCachedThreadPool(); public LoghubConsumeWorker(RedisPoolUtil redisPoolUtil) { this(redisPoolUtil, null); } public LoghubConsumeWorker(RedisPoolUtil redisPoolUtil, String hostName) { this.redisPoolUtil = redisPoolUtil; // 爲測試須要添加的 hostName HOST_NAME = hostName; initSharedVars(); } /** * 初始化公共變量 */ private void initSharedVars() { try { if(HOST_NAME != null) { return; } HOST_NAME = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { throw new RuntimeException("init error : 獲取服務器主機名失敗"); } } @Override public void run() { while (!Thread.interrupted()) { // 先獲取全部分配給的shard列表,爲空則進入下一次循環(注意此時阻塞鎖不能起做用) Set<String> shardsSet = blockingTakeAvailableConsumeShardList(); try { // 消費全部給定shard數據 consumeLogHubShards(shardsSet); } catch (Exception e) { logger.error("消費loghub, error", e); } } } /** * 獲取可用的分片列表(沒有則阻塞等待) * * @return 分片列表 */ private Set<String> blockingTakeAvailableConsumeShardList() { while (!Thread.interrupted()) { String serverConsumeCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + HOST_NAME; Set<String> shardsSet = redisPoolUtil.zrange(serverConsumeCacheKey, 0, -1); if (shardsSet != null && !shardsSet.isEmpty()) { return shardsSet; } logger.warn(" =========== 當前主機[hostname:{}]未查詢到任何shard =========", HOST_NAME); try { Thread.sleep(1000); } catch (InterruptedException e) { logger.error("LogHubClientWork run 未獲取到該主機的shard時,每隔1秒鐘獲取 ,error : {}", e); } } return null; } /** * 消費loghub 分片數據 * * @param shardsSet 被分配的分片列表 */ public void consumeLogHubShards(Set<String> shardsSet) throws InterruptedException { if(shardsSet == null || shardsSet.isEmpty()) { return; } // 此處使用 CountdownLatch, 保證至少有一個任務完成時,纔開始下一次任務的調入 // Semaphore semaphoreLock = new Semaphore(shardsSet.size()); CountDownLatch openDoorLatch = new CountDownLatch(1); boolean startNewJobAtLeastOnce = false; for (String shard : shardsSet) { // 檢測當前shard是否處於鎖定狀態,若是鎖定則不能消費, 注意鎖狀況 if(isShardLocked(shard)) { logger.info("=============== shard:{} is locked, continue... ======", shard); continue; } int shardId = Integer.parseInt(shard); LoghubConsumerTaskExecutor consumer = getConsumerExecutor(shardId); // consumer 應保證有所消費,若是沒有消費,則自行等待一個長週期,外部應只管調入請求 // consumer 應保證全部消費,在上一個任務未完成時,不得再開啓下一輪提交消費 boolean startNewJob = consumer.startNewConsumeJob(openDoorLatch); if(startNewJob) { // start failed, prev job is running maybe // ignore job, no blocking startNewJobAtLeastOnce = true; } } // 任意一個任務完成,都將打開新的分配週期,且後續 countDown 將無效,此處可能致使死鎖 if(startNewJobAtLeastOnce) { openDoorLatch.await(); } else { // 當本次分配調度一個任務都未提交時,則睡眠等待 // (通常此狀況爲 消費者被分配了上了鎖的shard時,即搶佔另的機器的shard, 須要給別的機器備份數據時間鎖) Thread.sleep(200); } } /** * 檢測分片是否被鎖定消費了 * * @param shardId 分片id * @return true:鎖定, false:未鎖定可用 */ private boolean isShardLocked(String shardId) { String shardCacheKey = RedisKeyConstants.SHARD_LOCK_CONSUME_CACHE_PREFIX + shardId; String lockValue = redisPoolUtil.get(shardCacheKey); return !StringUtils.isBlank(lockValue) && "1".equals(lockValue); } /** * 獲取消費者實例,針對一個shard, 只建立一個實例 */ private Map<Integer, LoghubConsumerTaskExecutor> mShardConsumerMap = new ConcurrentHashMap<>(); private LoghubConsumerTaskExecutor getConsumerExecutor(final int shardId) { LoghubConsumerTaskExecutor consumer = mShardConsumerMap.get(shardId); if (consumer != null) { return consumer; } consumer = new LoghubConsumerTaskExecutor(new SingleShardConsumerJob(shardId)); mShardConsumerMap.put(shardId, consumer); logger.info(" ======================= create new consumer executor shard:{}", shardId); return consumer; } /** * 消費者調度器 * * 統一控制消費者的運行狀態管控 */ class LoghubConsumerTaskExecutor { private Future<?> future; private ConsumerJob consumerJob; public LoghubConsumerTaskExecutor(ConsumerJob consumerJob) { this.consumerJob = consumerJob; } /** * 啓動一個新消費任務 * * @return true: 啓動成功, false: 啓動失敗有未完成任務在前 */ public boolean startNewConsumeJob(CountDownLatch latch) { if(future == null || future.isCancelled() || future.isDone()) { //沒有任務或者任務已取消或已完成 提交任務 future = consumeExecutorService.submit(new Runnable() { @Override public void run() { try { consumerJob.consumeShardData(); } finally { latch.countDown(); } } }); return true; } return false; } } } /** * 消費者任務接口定義 */ interface ConsumerJob { /** * 消費數據具體邏輯實現 */ public void consumeShardData(); } /** * 單個shard消費的任務實現 */ class SingleShardConsumerJob implements ConsumerJob { /** * 當前任務的消費 shardId */ private int shardId; public SingleShardConsumerJob(int shardId) { this.shardId = shardId; } @Override public void consumeShardData() { System.out.println(LocalDateTime.now() + " - host -> consume shard: " + shardId); try { // do complex biz // 此處若是發現shard 不存在異常,則應回調協調器,進行shard的移除 Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
【6】固然你還須要一個demo
看不到效果,我就是不信!
因此來看個 demo 吧!
咱們使用單機開多個 單元測試用例,直接測試就好!
測試代碼:.
import com.test.common.config.LogHubProperties; import com.test.utils.RedisPoolUtil; import org.junit.Test; import java.io.IOException; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * 臨時測試 負載均衡 * */ public class ShardConsumerLoadBalanceTest { public static void main(String[] args) throws IOException { startAConsumer(); System.in.read(); } // 啓動一個單元測試,就至關於啓動一個消費者應用 @Test public void mainMock() throws IOException { startAConsumer(); System.in.read(); } // 啓動一個單元測試,就至關於啓動一個消費者應用 @Test public void startNewConsumer() throws IOException { startAConsumer(); System.in.read(); } // 啓動一個單元測試,就至關於啓動一個消費者應用 @Test public void startNewConsumer2() throws IOException { startAConsumer(); System.in.read(); } private static void startAConsumer() { RedisPoolUtil redisPoolUtil = new RedisPoolUtil(); redisPoolUtil.setIp("127.0.0.1"); redisPoolUtil.setMaxActive(111); redisPoolUtil.setMaxIdle(1000); redisPoolUtil.setPort(6379); redisPoolUtil.setMaxWait(100000); redisPoolUtil.setTimeout(100000); redisPoolUtil.setPassWord("123"); redisPoolUtil.setDatabase(0); redisPoolUtil.initPool(); LogHubProperties logHubProperties = new LogHubProperties(); logHubProperties.setProjectName("test"); logHubProperties.setEndpoint("cn-shanghai-finance-1.log.aliyuncs.com"); logHubProperties.setAccessKey("xxxx"); logHubProperties.setAccessKeyId("11111"); // 使用隨機 hostname 模擬多臺機器調用 Random random = new Random(); String myHostname = "my-host-" + random.nextInt(10); // 啓動管理線程 LoghubConsumerShardCoWorker shardCoWorker = new LoghubConsumerShardCoWorker(redisPoolUtil, logHubProperties, myHostname); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); scheduledExecutorService.scheduleAtFixedRate(shardCoWorker, 5, 30, TimeUnit.SECONDS); // 啓動業務線程 ExecutorService executorService = Executors.newFixedThreadPool(2); LoghubConsumeWorker worker = new LoghubConsumeWorker(redisPoolUtil, myHostname); executorService.submit(worker); } }
如上,就能夠實現本身的負載均衡消費了。
好比: 總分片數爲4。
1. 最開始啓動1個機器時,將會被分配 0,1,2,3。
2. 啓動兩個後,將分爲 0,1; 2,3;
3. 啓動3個後,將分爲 0; 1; 2,3;
4. 反之,關閉一個機器後,將把壓力分擔到原機器上。
當作負載重分配時,將有50秒的鎖定時間備份。
【7】待完善的點
本文是基於loghub實現的分片拉取,其實在這方面loghub與kafka是一模一樣的,只是loghub更商業產品化。
當shard縮減時,應可以自動發現,從而去除原有的機器消費分配。而不是讓消費者報錯。
老話: 能夠適當造輪子!