zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維java
負載均衡是一種手段,用來把對某種資源的訪問分攤給不一樣的設備,從而減輕單點的壓力。算法
圖中左側爲ZooKeeper集羣,右側上方爲工做服務器,下面爲客戶端。每臺工做服務器在啓動時都會去zookeeper的servers節點下注冊臨時節點,每臺客戶端在啓動時都會去servers節點下取得全部可用的工做服務器列表,並經過必定的負載均衡算法計算得出一臺工做服務器,並與之創建網絡鏈接。網絡鏈接咱們採用開源框架netty。segmentfault
負載均衡客戶端流程服務器
服務端主體流程網絡
Server端核心類架構
每一個服務端對應一個Server接口,ServiceImpl是服務端的實現類。把服務端啓動時的註冊過程抽出爲一個接口RegistProvider,並給予一個默認實現DefaultRegistProvider,它將用到一個上下文的類ZooKeeperRegistContext。咱們的服務端是給予Netty的,它須要ServerHandler來處理與客戶端之間的鏈接,當有客戶端創建或失去鏈接時,咱們都須要去修改當前服務器的負載信息,咱們把修改負載信息的過程也抽出爲一個接口BalanceUpdateProvider,而且給予了一個默認實現DefaultBalanceUpdateProvider。ServerRunner是調度類,負責調度咱們的Server。負載均衡
Client端核心類框架
每一個客戶端都須要實現一個Client接口,ClientImpl是實現,Client須要ClientHandler來處理與服務器以前的通信,同時它須要BalanceProvider爲它提供負載均衡的算法。BalanceProvider是接口,它有2個實現類,一個是抽象的實現AbstractBalanceProvider,一個是默認的實現DefaultBalanceProvider。ServerData是服務端和客戶端共用的一個類,服務端會把本身的基本信息,包括負載信息,打包成ServerData並寫入到zookeeper中,客戶端在計算負載的時候須要到zookeeper中拿到ServerData,並取得服務端的地址和負載信息。ClientRunner是客戶端的調度類,負責啓動客戶端。運維
先是Server端的代碼:分佈式
public class ServerData implements Serializable,Comparable<ServerData> { private static final long serialVersionUID = -8892569870391530906L; private Integer balance; private String host; private Integer port; public Integer getBalance() { return balance; } public void setBalance(Integer balance) { this.balance = balance; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } @Override public String toString() { return "ServerData [balance=" + balance + ", host=" + host + ", port=" + port + "]"; } public int compareTo(ServerData o) { return this.getBalance().compareTo(o.getBalance()); } }
public interface Server { public void bind(); }
public class ServerImpl implements Server { private EventLoopGroup bossGroup = new NioEventLoopGroup(); private EventLoopGroup workGroup = new NioEventLoopGroup(); private ServerBootstrap bootStrap = new ServerBootstrap(); private ChannelFuture cf; private String zkAddress; private String serversPath; private String currentServerPath; private ServerData sd; private volatile boolean binded = false; private final ZkClient zc; private final RegistProvider registProvider; private static final Integer SESSION_TIME_OUT = 10000; private static final Integer CONNECT_TIME_OUT = 10000; public String getCurrentServerPath() { return currentServerPath; } public String getZkAddress() { return zkAddress; } public String getServersPath() { return serversPath; } public ServerData getSd() { return sd; } public void setSd(ServerData sd) { this.sd = sd; } public ServerImpl(String zkAddress, String serversPath, ServerData sd){ this.zkAddress = zkAddress; this.serversPath = serversPath; this.zc = new ZkClient(this.zkAddress,SESSION_TIME_OUT,CONNECT_TIME_OUT,new SerializableSerializer()); this.registProvider = new DefaultRegistProvider(); this.sd = sd; } //初始化服務端 private void initRunning() throws Exception { String mePath = serversPath.concat("/").concat(sd.getPort().toString()); //註冊到zookeeper registProvider.regist(new ZooKeeperRegistContext(mePath,zc,sd)); currentServerPath = mePath; } public void bind() { if (binded){ return; } System.out.println(sd.getPort()+":binding..."); try { initRunning(); } catch (Exception e) { e.printStackTrace(); return; } bootStrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ServerHandler(new DefaultBalanceUpdateProvider(currentServerPath,zc))); } }); try { cf = bootStrap.bind(sd.getPort()).sync(); binded = true; System.out.println(sd.getPort()+":binded..."); cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
public interface RegistProvider { public void regist(Object context) throws Exception; public void unRegist(Object context) throws Exception; }
public class DefaultRegistProvider implements RegistProvider { // 在zookeeper中建立臨時節點並寫入信息 public void regist(Object context) throws Exception { // Server在zookeeper中註冊本身,須要在zookeeper的目標節點上建立臨時節點並寫入本身 // 將須要的如下3個信息包裝成上下文傳入 // 1:path // 2:zkClient // 3:serverData ZooKeeperRegistContext registContext = (ZooKeeperRegistContext) context; String path = registContext.getPath(); ZkClient zc = registContext.getZkClient(); try { zc.createEphemeral(path, registContext.getData()); } catch (ZkNoNodeException e) { String parentDir = path.substring(0, path.lastIndexOf('/')); zc.createPersistent(parentDir, true); regist(registContext); } } public void unRegist(Object context) throws Exception { return; } }
public class ZooKeeperRegistContext { private String path; private ZkClient zkClient; private Object data; public ZooKeeperRegistContext(String path, ZkClient zkClient, Object data) { super(); this.path = path; this.zkClient = zkClient; this.data = data; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } public ZkClient getZkClient() { return zkClient; } public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } }
/** * 處理服務端與客戶端之間的通訊 */ public class ServerHandler extends ChannelHandlerAdapter{ private final BalanceUpdateProvider balanceUpdater; private static final Integer BALANCE_STEP = 1; public ServerHandler(BalanceUpdateProvider balanceUpdater){ this.balanceUpdater = balanceUpdater; } public BalanceUpdateProvider getBalanceUpdater() { return balanceUpdater; } // 創建鏈接時增長負載 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("one client connect..."); balanceUpdater.addBalance(BALANCE_STEP); } // 斷開鏈接時減小負載 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { balanceUpdater.reduceBalance(BALANCE_STEP); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
public interface BalanceUpdateProvider { // 增長負載 public boolean addBalance(Integer step); // 減小負載 public boolean reduceBalance(Integer step); }
public class DefaultBalanceUpdateProvider implements BalanceUpdateProvider { private String serverPath; private ZkClient zc; public DefaultBalanceUpdateProvider(String serverPath, ZkClient zkClient) { this.serverPath = serverPath; this.zc = zkClient; } public boolean addBalance(Integer step) { Stat stat = new Stat(); ServerData sd; // 增長負載:讀取服務器的信息ServerData,增長負載,並寫回zookeeper while (true) { try { sd = zc.readData(this.serverPath, stat); sd.setBalance(sd.getBalance() + step); // 帶上版本,由於可能有其餘客戶端鏈接到服務器修改了負載 zc.writeData(this.serverPath, sd, stat.getVersion()); return true; } catch (ZkBadVersionException e) { // ignore } catch (Exception e) { return false; } } } public boolean reduceBalance(Integer step) { Stat stat = new Stat(); ServerData sd; while (true) { try { sd = zc.readData(this.serverPath, stat); final Integer currBalance = sd.getBalance(); sd.setBalance(currBalance>step?currBalance-step:0); zc.writeData(this.serverPath, sd, stat.getVersion()); return true; } catch (ZkBadVersionException e) { // ignore } catch (Exception e) { return false; } } } }
/** * 用於測試,負責啓動Work Server */ public class ServerRunner { private static final int SERVER_QTY = 2; private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; private static final String SERVERS_PATH = "/servers"; public static void main(String[] args) { List<Thread> threadList = new ArrayList<Thread>(); for(int i=0; i<SERVER_QTY; i++){ final Integer count = i; Thread thread = new Thread(new Runnable() { public void run() { ServerData sd = new ServerData(); sd.setBalance(0); sd.setHost("127.0.0.1"); sd.setPort(6000+count); Server server = new ServerImpl(ZOOKEEPER_SERVER,SERVERS_PATH,sd); server.bind(); } }); threadList.add(thread); thread.start(); } for (int i=0; i<threadList.size(); i++){ try { threadList.get(i).join(); } catch (InterruptedException ignore) { // } } } }
再看Client端的代碼:
public interface Client { // 鏈接服務器 public void connect() throws Exception; // 斷開服務器 public void disConnect() throws Exception; }
public class ClientImpl implements Client { private final BalanceProvider<ServerData> provider; private EventLoopGroup group = null; private Channel channel = null; private final Logger log = LoggerFactory.getLogger(getClass()); public ClientImpl(BalanceProvider<ServerData> provider) { this.provider = provider; } public BalanceProvider<ServerData> getProvider() { return provider; } public void connect(){ try{ ServerData serverData = provider.getBalanceItem(); // 獲取負載最小的服務器信息,並與之創建鏈接 System.out.println("connecting to "+serverData.getHost()+":"+serverData.getPort()+", it's balance:"+serverData.getBalance()); group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ClientHandler()); } }); ChannelFuture f = b.connect(serverData.getHost(),serverData.getPort()).syncUninterruptibly(); channel = f.channel(); System.out.println("started success!"); }catch(Exception e){ System.out.println("鏈接異常:"+e.getMessage()); } } public void disConnect(){ try{ if (channel!=null) channel.close().syncUninterruptibly(); group.shutdownGracefully(); group = null; log.debug("disconnected!"); }catch(Exception e){ log.error(e.getMessage()); } } }
public class ClientHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
public interface BalanceProvider<T> { public T getBalanceItem(); }
public abstract class AbstractBalanceProvider<T> implements BalanceProvider<T> { protected abstract T balanceAlgorithm(List<T> items); protected abstract List<T> getBalanceItems(); public T getBalanceItem(){ return balanceAlgorithm(getBalanceItems()); } }
public class DefaultBalanceProvider extends AbstractBalanceProvider<ServerData> { private final String zkServer; // zookeeper服務器地址 private final String serversPath; // servers節點路徑 private final ZkClient zc; private static final Integer SESSION_TIME_OUT = 10000; private static final Integer CONNECT_TIME_OUT = 10000; public DefaultBalanceProvider(String zkServer, String serversPath) { this.serversPath = serversPath; this.zkServer = zkServer; this.zc = new ZkClient(this.zkServer, SESSION_TIME_OUT, CONNECT_TIME_OUT, new SerializableSerializer()); } @Override protected ServerData balanceAlgorithm(List<ServerData> items) { if (items.size()>0){ Collections.sort(items); // 根據負載由小到大排序 return items.get(0); // 返回負載最小的那個 }else{ return null; } } /** * 從zookeeper中拿到全部工做服務器的基本信息 */ @Override protected List<ServerData> getBalanceItems() { List<ServerData> sdList = new ArrayList<ServerData>(); List<String> children = zc.getChildren(this.serversPath); for(int i=0; i<children.size();i++){ ServerData sd = zc.readData(serversPath+"/"+children.get(i)); sdList.add(sd); } return sdList; } }
public class ClientRunner { private static final int CLIENT_QTY = 3; private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; private static final String SERVERS_PATH = "/servers"; public static void main(String[] args) { List<Thread> threadList = new ArrayList<Thread>(CLIENT_QTY); final List<Client> clientList = new ArrayList<Client>(); final BalanceProvider<ServerData> balanceProvider = new DefaultBalanceProvider(ZOOKEEPER_SERVER, SERVERS_PATH); try{ for(int i=0; i<CLIENT_QTY; i++){ Thread thread = new Thread(new Runnable() { public void run() { Client client = new ClientImpl(balanceProvider); clientList.add(client); try { client.connect(); } catch (Exception e) { e.printStackTrace(); } } }); threadList.add(thread); thread.start(); //延時 Thread.sleep(2000); } System.out.println("敲回車鍵退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); }catch(Exception e){ e.printStackTrace(); }finally{ //關閉客戶端 for (int i=0; i<clientList.size(); i++){ try { clientList.get(i); clientList.get(i).disConnect(); } catch (Exception ignore) { //ignore } } //關閉線程 for (int i=0; i<threadList.size(); i++){ threadList.get(i).interrupt(); try{ threadList.get(i).join(); }catch (InterruptedException e){ //ignore } } } } }
咱們先啓動服務端ServerRunner
6000:binding... 6000:binded... 6001:binding... 6001:binded...
再來啓動客戶端ClientRunner
connecting to 127.0.0.1:6000, it's balance 0 started success! connecting to 127.0.0.1:6001, it's balance 0 started success! connecting to 127.0.0.1:6000, it's balance 1 started success! 敲回車退出!
zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維