自研服務治理框架----服務地址發現

服務獲取接口

//獲取服務名稱  
    String getService();  
  
    /** 
     * 獲取全部服務端地址 
     * @return 
     */  
    List<InetSocketAddress> findServerAddressList();  
  
    /** 
     * 選取一個合適的address,能夠隨機獲取等' 
     * 內部能夠使用合適的算法. 
     * @return 
     */  
    InetSocketAddress selector();  
  
    void close();

基於zookeeper服務獲取

public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {  
  
    private Logger logger = LoggerFactory.getLogger(getClass());  
  
    // 註冊服務  
    private String service;  
    // 服務版本號  
    private String version = "1.0.0";  
  
    private PathChildrenCache cachedPath;  
  
    private CuratorFramework zkClient;  
  
    // 用來保存當前provider所接觸過的地址記錄  
    // 當zookeeper集羣故障時,能夠使用trace中地址,做爲"備份"  
    private Set<String> trace = new HashSet<String>();  
  
    private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();  
  
    private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();  
  
    private Object lock = new Object();  
  
    // 默認權重  
    private static final Integer DEFAULT_WEIGHT = 1;  
  
    public void setService(String service) {  
        this.service = service;  
    }  
  
    public void setVersion(String version) {  
        this.version = version;  
    }  
  
    public ThriftServerAddressProviderZookeeper() {  
    }  
  
    public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {  
        this.zkClient = zkClient;  
    }  
  
    public void setZkClient(CuratorFramework zkClient) {  
        this.zkClient = zkClient;  
    }  
  
    @Override  
    public void afterPropertiesSet() throws Exception {  
        // 若是zk還沒有啓動,則啓動  
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {  
            zkClient.start();  
        }  
        buildPathChildrenCache(zkClient, getServicePath(), true);  
        cachedPath.start(StartMode.POST_INITIALIZED_EVENT);  
    }  
  
    private String getServicePath(){  
        return "/" + service + "/" + version;  
    }  
    private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {  
        cachedPath = new PathChildrenCache(client, path, cacheData);  
        cachedPath.getListenable().addListener(new PathChildrenCacheListener() {  
            @Override  
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
                PathChildrenCacheEvent.Type eventType = event.getType();  
                switch (eventType) {  
                case CONNECTION_RECONNECTED:  
                    logger.info("Connection is reconection.");  
                    break;  
                case CONNECTION_SUSPENDED:  
                    logger.info("Connection is suspended.");  
                    break;  
                case CONNECTION_LOST:  
                    logger.warn("Connection error,waiting...");  
                    return;  
                default:  
                    //  
                }  
                // 任何節點的時機數據變更,都會rebuild,此處爲一個"簡單的"作法.  
                cachedPath.rebuild();  
                rebuild();  
            }  
  
            protected void rebuild() throws Exception {  
                List<ChildData> children = cachedPath.getCurrentData();  
                if (children == null || children.isEmpty()) {  
                    // 有可能全部的thrift server都與zookeeper斷開了連接  
                    // 可是,有可能,thrift client與thrift server之間的網絡是良好的  
                    // 所以此處是否須要清空container,是須要多方面考慮的.  
                    container.clear();  
                    logger.error("thrift server-cluster error....");  
                    return;  
                }  
                List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();  
                String path = null;  
                for (ChildData data : children) {  
                    path = data.getPath();  
                    logger.debug("get path:"+path);  
                    path = path.substring(getServicePath().length()+1);  
                    logger.debug("get serviceAddress:"+path);  
                    String address = new String(path.getBytes(), "utf-8");  
                    current.addAll(transfer(address));  
                    trace.add(address);  
                }  
                Collections.shuffle(current);  
                synchronized (lock) {  
                    container.clear();  
                    container.addAll(current);  
                    inner.clear();  
                    inner.addAll(current);  
  
                }  
            }  
        });  
    }  
  
    private List<InetSocketAddress> transfer(String address) {  
        String[] hostname = address.split(":");  
        Integer weight = DEFAULT_WEIGHT;  
        if (hostname.length == 3) {  
            weight = Integer.valueOf(hostname[2]);  
        }  
        String ip = hostname[0];  
        Integer port = Integer.valueOf(hostname[1]);  
        List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();  
        // 根據優先級,將ip:port添加屢次到地址集中,而後隨機取地址實現負載  
        for (int i = 0; i < weight; i++) {  
            result.add(new InetSocketAddress(ip, port));  
        }  
        return result;  
    }  
  
    @Override  
    public List<InetSocketAddress> findServerAddressList() {  
        return Collections.unmodifiableList(container);  
    }  
  
    @Override  
    public synchronized InetSocketAddress selector() {  
        if (inner.isEmpty()) {  
            if (!container.isEmpty()) {  
                inner.addAll(container);  
            } else if (!trace.isEmpty()) {  
                synchronized (lock) {  
                    for (String hostname : trace) {  
                        container.addAll(transfer(hostname));  
                    }  
                    Collections.shuffle(container);  
                    inner.addAll(container);  
                }  
            }  
        }  
        return inner.poll();  
    }  
  
    @Override  
    public void close() {  
        try {  
            cachedPath.close();  
            zkClient.close();  
        } catch (Exception e) {  
        }  
    }  
  
    @Override  
    public String getService() {  
        return service;  
    }  
  
}
相關文章
相關標籤/搜索