Curator是Netflix公司開源的一套zookeeper客戶端框架,Curator無疑是Zookeeper客戶端中的瑞士軍刀,解決了不少Zookeeper客戶端很是底層的細節開發工做,包括鏈接重連、反覆註冊Watcher和NodeExistsException異常等等。apache
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
@Override public void register(String service, String version, String address) { if(zkClient.getState() == CuratorFrameworkState.LATENT){ zkClient.start(); } if(StringUtils.isEmpty(version)){ version="1.0.0"; } //臨時節點 try { zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath("/"+service+"/"+version+"/"+address); } catch (UnsupportedEncodingException e) { logger.error("register service address to zookeeper exception:{}",e); throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e); } catch (Exception e) { logger.error("register service address to zookeeper exception:{}",e); throw new ThriftException("register service address to zookeeper exception:{}", e); } }
public class ThriftServiceServerFactory implements InitializingBean { // 服務註冊本機端口 private Integer port = 8299; // 優先級 private Integer weight = 1;// default // 服務實現類 private Object service;// serice實現類 //服務版本號 private String version; // 解析本機IP private ThriftServerIpResolve thriftServerIpResolve; //服務註冊 private ThriftServerAddressRegister thriftServerAddressRegister; private ServerThread serverThread; public void setPort(Integer port) { this.port = port; } public void setWeight(Integer weight) { this.weight = weight; } public void setService(Object service) { this.service = service; } public void setVersion(String version) { this.version = version; } public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) { this.thriftServerIpResolve = thriftServerIpResolve; } public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) { this.thriftServerAddressRegister = thriftServerAddressRegister; } @Override public void afterPropertiesSet() throws Exception { if (thriftServerIpResolve == null) { thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve(); } String serverIP = thriftServerIpResolve.getServerIp(); if (StringUtils.isEmpty(serverIP)) { throw new ThriftException("cant find server ip..."); } String hostname = serverIP + ":" + port + ":" + weight; Class<?> serviceClass = service.getClass(); // 獲取實現類接口 Class<?>[] interfaces = serviceClass.getInterfaces(); if (interfaces.length == 0) { throw new IllegalClassFormatException("service-class should implements Iface"); } // reflect,load "Processor"; TProcessor processor = null; String serviceName = null; for (Class<?> clazz : interfaces) { String cname = clazz.getSimpleName(); if (!cname.equals("Iface")) { continue; } serviceName = clazz.getEnclosingClass().getName(); String pname = serviceName + "$Processor"; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class<?> pclass = classLoader.loadClass(pname); if (!TProcessor.class.isAssignableFrom(pclass)) { continue; } Constructor<?> constructor = pclass.getConstructor(clazz); processor = (TProcessor) constructor.newInstance(service); break; } catch (Exception e) { // } } if (processor == null) { throw new IllegalClassFormatException("service-class should implements Iface"); } //須要單獨的線程,由於serve方法是阻塞的. serverThread = new ServerThread(processor, port); serverThread.start(); // 註冊服務 if (thriftServerAddressRegister != null) { thriftServerAddressRegister.register(serviceName, version, hostname); } } class ServerThread extends Thread { private TServer server; ServerThread(TProcessor processor, int port) throws Exception { TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); TProcessorFactory processorFactory = new TProcessorFactory(processor); tArgs.processorFactory(processorFactory); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true)); server = new TThreadedSelectorServer(tArgs); } @Override public void run(){ try{ //啓動服務 server.serve(); }catch(Exception e){ // } } public void stopServer(){ server.stop(); } } public void close() { serverThread.stopServer(); } }