服務獲取接口
//獲取服務名稱
String getService();
/**
* 獲取全部服務端地址
* @return
*/
List<InetSocketAddress> findServerAddressList();
/**
* 選取一個合適的address,能夠隨機獲取等'
* 內部能夠使用合適的算法.
* @return
*/
InetSocketAddress selector();
void close();
基於zookeeper服務獲取
public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {
private Logger logger = LoggerFactory.getLogger(getClass());
// 註冊服務
private String service;
// 服務版本號
private String version = "1.0.0";
private PathChildrenCache cachedPath;
private CuratorFramework zkClient;
// 用來保存當前provider所接觸過的地址記錄
// 當zookeeper集羣故障時,能夠使用trace中地址,做爲"備份"
private Set<String> trace = new HashSet<String>();
private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
private Object lock = new Object();
// 默認權重
private static final Integer DEFAULT_WEIGHT = 1;
public void setService(String service) {
this.service = service;
}
public void setVersion(String version) {
this.version = version;
}
public ThriftServerAddressProviderZookeeper() {
}
public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
this.zkClient = zkClient;
}
public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
}
@Override
public void afterPropertiesSet() throws Exception {
// 若是zk還沒有啓動,則啓動
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
buildPathChildrenCache(zkClient, getServicePath(), true);
cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
}
private String getServicePath(){
return "/" + service + "/" + version;
}
private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
cachedPath = new PathChildrenCache(client, path, cacheData);
cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
PathChildrenCacheEvent.Type eventType = event.getType();
switch (eventType) {
case CONNECTION_RECONNECTED:
logger.info("Connection is reconection.");
break;
case CONNECTION_SUSPENDED:
logger.info("Connection is suspended.");
break;
case CONNECTION_LOST:
logger.warn("Connection error,waiting...");
return;
default:
//
}
// 任何節點的時機數據變更,都會rebuild,此處爲一個"簡單的"作法.
cachedPath.rebuild();
rebuild();
}
protected void rebuild() throws Exception {
List<ChildData> children = cachedPath.getCurrentData();
if (children == null || children.isEmpty()) {
// 有可能全部的thrift server都與zookeeper斷開了連接
// 可是,有可能,thrift client與thrift server之間的網絡是良好的
// 所以此處是否須要清空container,是須要多方面考慮的.
container.clear();
logger.error("thrift server-cluster error....");
return;
}
List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
String path = null;
for (ChildData data : children) {
path = data.getPath();
logger.debug("get path:"+path);
path = path.substring(getServicePath().length()+1);
logger.debug("get serviceAddress:"+path);
String address = new String(path.getBytes(), "utf-8");
current.addAll(transfer(address));
trace.add(address);
}
Collections.shuffle(current);
synchronized (lock) {
container.clear();
container.addAll(current);
inner.clear();
inner.addAll(current);
}
}
});
}
private List<InetSocketAddress> transfer(String address) {
String[] hostname = address.split(":");
Integer weight = DEFAULT_WEIGHT;
if (hostname.length == 3) {
weight = Integer.valueOf(hostname[2]);
}
String ip = hostname[0];
Integer port = Integer.valueOf(hostname[1]);
List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
// 根據優先級,將ip:port添加屢次到地址集中,而後隨機取地址實現負載
for (int i = 0; i < weight; i++) {
result.add(new InetSocketAddress(ip, port));
}
return result;
}
@Override
public List<InetSocketAddress> findServerAddressList() {
return Collections.unmodifiableList(container);
}
@Override
public synchronized InetSocketAddress selector() {
if (inner.isEmpty()) {
if (!container.isEmpty()) {
inner.addAll(container);
} else if (!trace.isEmpty()) {
synchronized (lock) {
for (String hostname : trace) {
container.addAll(transfer(hostname));
}
Collections.shuffle(container);
inner.addAll(container);
}
}
}
return inner.poll();
}
@Override
public void close() {
try {
cachedPath.close();
zkClient.close();
} catch (Exception e) {
}
}
@Override
public String getService() {
return service;
}
}