有這麼一個場景,咱們有兩個Hive集羣,Hive集羣1(後面成爲1號集羣)是一直專享於數據計算平臺的,而Hive集羣2(後面成爲2號集羣)是用於其餘團隊使用的,好比特徵,廣告等。而由此存在兩個主要問題:a) 兩個Hive集羣共享了同一份MetaData,致使常常會出如今HUE(創建與2號集羣上)上建表成功後,可是在計算平臺上卻沒法查詢到新建表信息;b) 讓運維同窗們同時維護兩套集羣,管理和資源分配調整起來的確是麻煩不少,畢竟也不利於資源的彈性分配。那麼鑑於此,通過討論,須要作這麼同樣工做:兩個集羣合二爲一,由1號集羣合併到2號集羣上來。java
可是,集羣合併是不可能一會兒所有合併,須要逐步遷移合併(好比每次20個結點)到2號集羣。可是這樣存在一個問題,計算平臺天天使用的計算資源是差很少固定的,而在遷移過程當中,1號集羣的資源在逐漸減小,顯然是不知足計算需求的,因此咱們也須要由獲得遷移資源的2號集羣分擔一些壓力。那麼重點來了,這就須要咱們任務調度器合理的分配任務到1號集羣以及2號集羣的某個隊列。其實,所謂的任務分配也就是一種負載均衡算法,即任務來了,經過負載均衡算法調度到哪一個集羣去執行,可是使用哪一種負載均衡算法就須要好好探究一下。git
Q:經常使用的負載均衡算法有哪些呢?
A:隨機算法,輪詢,hash算法,加權隨機算法,加權輪詢算法,一致性hash算法。github
該算法經過產生隨機數的方式進行負載,可能會致使任務傾斜,好比大量任務調度到了1好集羣,顯然不可取,pass。算法
該算法是經過一個接一個循環往復的方式進行調度,會保證任務分配很均衡,可是咱們的1號集羣資源是在不斷減小的,2號集羣資源是在不斷增長的,若是均衡分配計算任務,顯然也是不合理的,pass。apache
該算法是基於當前結點的ip的hashCode值來進行調度,那麼只要結點ip不變,那麼hashCode值就不會變,全部的任務豈不是都提交到一個結點了嗎?不合理,pass。安全
同隨機算法,只不過是對每一個結點增長了權重,可是由於是隨機調度,不可控的,直接pass。併發
上面說到,輪詢算法能夠保證任務分配很均衡,可是沒法保證隨集羣資源的調整進行任務分配的動態調整。此時,若是咱們能夠依次根據集羣遷移狀況,設置1號集羣與2號集羣的任務比重爲:7:5 -> 3:2 -> 2:3 -> 完整切換。可行。負載均衡
該算法較爲複雜,鑑於咱們是爲了進行集羣合併以及保證任務儘可能根據集羣資源的調整進行合理調度,無需設計太複雜的算法進行處理,故也pass。運維
雖然咱們最終方法選定爲加權輪詢算法,可是它起源於輪詢算法,那麼咱們就從輪詢算法提及。ide
首選,咱們會有Hive集羣對應的HS2的ip地址列表,而後咱們經過某種算法(這裏指的就是負載均衡算法),獲取其中一個HS2的ip地址進行任務提交(這就是任務調度)。
咱們先定義一個算法抽象接口,它只有一個select方法。
import java.util.List; /** * @author buildupchao * @date: 2019/5/12 21:51 * @since JDK 1.8 */ public interface ClusterStrategy { /** * 負載均衡算法接口 * @param ipList ip地址列表 * @return 經過負載均衡算法選中的ip地址 */ String select(List<String> ipList); }
輪詢算法實現:
import org.apache.commons.lang3.StringUtils; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * @author buildupchao * @date: 2019/5/12 21:57 * @since JDK 1.8 */ public class PollingClusterStrategyImpl implements ClusterStrategy { private AtomicInteger counter = new AtomicInteger(0); @Override public String select(List<String> ipList) { String selectedIp = null; try { int size = ipList.size(); if (counter.get() >= size) { counter.set(0); } selectedIp = ipList.get(counter.get()); counter.incrementAndGet(); } catch (Exception ex) { ex.printStackTrace(); } if (StringUtils.isBlank(selectedIp)) { selectedIp = ipList.get(0); } return selectedIp; } public static void main(String[] args) { List<String> ipList = Arrays.asList("172.31.0.191", "172.31.0.192"); PollingClusterStrategyImpl strategy = new PollingClusterStrategyImpl(); ExecutorService executorService = Executors.newFixedThreadPool(100); for (int i = 0; i < 100; i++) { executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + ":" + strategy.select(ipList)); }); } } }
運行上述代碼,你會發現,線程號爲奇數的輪詢到的是'172.31.0.191'這個ip,偶數是‘172.31.0.192’這個ip。至於打印出來的日誌亂序,那是併發打印返回的ip的問題,並非獲取ip進行任務調度的問題。
既然咱們已經實現了輪詢算法,那加權輪詢怎麼實現呢?無非是增長結點被輪詢到的比例罷了,咱們只須要根據指定的權重,進行輪詢便可。由於須要有權重等信息,咱們須要從新設計接口。
提供一個Bean進行封裝ip以及權重等信息:
import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * @author buildupchao * Date: 2019/2/1 02:52 * @since JDK 1.8 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class ProviderService implements Serializable { private String ip; // the weight of service provider private int weight; }
新的負載均衡算法接口:
import com.buildupchao.zns.client.bean.ProviderService; import java.util.List; /** * @author buildupchao * Date: 2019/2/1 02:44 * @since JDK 1.8 */ public interface ClusterStrategy { ProviderService select(List<ProviderService> serviceRoutes); }
加權輪詢算法的實現:
import com.buildupchao.zns.client.bean.ProviderService; import com.buildupchao.zns.client.cluster.ClusterStrategy; import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author buildupchao * Date: 2019/2/4 22:39 * @since JDK 1.8 */ public class WeightPollingClusterStrategyImpl implements ClusterStrategy { private int counter = 0; private Lock lock = new ReentrantLock(); @Override public ProviderService select(List<ProviderService> serviceRoutes) { ProviderService providerService = null; try { lock.tryLock(10, TimeUnit.SECONDS); List<ProviderService> providerServices = Lists.newArrayList(); for (ProviderService serviceRoute : serviceRoutes) { int weight = serviceRoute.getWeight(); for (int i = 0; i < weight; i++) { providerServices.add(serviceRoute); } } if (counter >= providerServices.size()) { counter = 0; } providerService = providerServices.get(counter); counter++; } catch (InterruptedException ex) { ex.printStackTrace(); } finally { lock.unlock(); } if (providerService == null) { providerService = serviceRoutes.get(0); } return providerService; } }
你會發現這裏的算法實現中再也不是經過AtomicInteger來作計數器了,而是藉助於private int counter = 0;
同時藉助於Lock
鎖的技術保證計數器的安全訪問。只是寫法的不一樣,不用糾結啦!
這樣,咱們就能夠應用這個加權輪詢算法
到咱們的任務調度器中了,快速配合運維完成集羣遷移合併工做吧!