自研服務治理框架----服務註冊

Zookeeper客戶端Curator

Curator是Netflix公司開源的一套zookeeper客戶端框架,Curator無疑是Zookeeper客戶端中的瑞士軍刀,解決了不少Zookeeper客戶端很是底層的細節開發工做,包括鏈接重連、反覆註冊Watcher和NodeExistsException異常等等。apache

Curator包含了幾個包:

  • curator-framework:對zookeeper的底層api的一些封裝
  • curator-client:提供一些客戶端的操做,例如重試策略等
  • curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式計數器、分佈式Barrier等

Mavn依賴

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

註冊接口

@Override  
    public void register(String service, String version, String address) {  
        if(zkClient.getState() == CuratorFrameworkState.LATENT){  
            zkClient.start();  
        }  
        if(StringUtils.isEmpty(version)){  
            version="1.0.0";  
        }  
        //臨時節點  
        try {  
            zkClient.create()  
                .creatingParentsIfNeeded()  
                .withMode(CreateMode.EPHEMERAL)  
                .forPath("/"+service+"/"+version+"/"+address);  
        } catch (UnsupportedEncodingException e) {  
            logger.error("register service address to zookeeper exception:{}",e);  
            throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);  
        } catch (Exception e) {  
            logger.error("register service address to zookeeper exception:{}",e);  
            throw new ThriftException("register service address to zookeeper exception:{}", e);  
        }  
    }

服務註冊

public class ThriftServiceServerFactory implements InitializingBean {  
    // 服務註冊本機端口  
    private Integer port = 8299;  
    // 優先級  
    private Integer weight = 1;// default  
    // 服務實現類  
    private Object service;// serice實現類  
    //服務版本號  
    private String version;  
    // 解析本機IP  
    private ThriftServerIpResolve thriftServerIpResolve;  
    //服務註冊  
    private ThriftServerAddressRegister thriftServerAddressRegister;  
  
    private ServerThread serverThread;  
      
    public void setPort(Integer port) {  
        this.port = port;  
    }  
  
    public void setWeight(Integer weight) {  
        this.weight = weight;  
    }  
  
    public void setService(Object service) {  
        this.service = service;  
    }  
  
    public void setVersion(String version) {  
        this.version = version;  
    }  
  
    public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {  
        this.thriftServerIpResolve = thriftServerIpResolve;  
    }  
  
    public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {  
        this.thriftServerAddressRegister = thriftServerAddressRegister;  
    }  
  
    @Override  
    public void afterPropertiesSet() throws Exception {  
        if (thriftServerIpResolve == null) {  
            thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();  
        }  
        String serverIP = thriftServerIpResolve.getServerIp();  
        if (StringUtils.isEmpty(serverIP)) {  
            throw new ThriftException("cant find server ip...");  
        }  
  
        String hostname = serverIP + ":" + port + ":" + weight;  
        Class<?> serviceClass = service.getClass();  
        // 獲取實現類接口  
        Class<?>[] interfaces = serviceClass.getInterfaces();  
        if (interfaces.length == 0) {  
            throw new IllegalClassFormatException("service-class should implements Iface");  
        }  
        // reflect,load "Processor";  
        TProcessor processor = null;  
        String serviceName = null;  
        for (Class<?> clazz : interfaces) {  
            String cname = clazz.getSimpleName();  
            if (!cname.equals("Iface")) {  
                continue;  
            }  
            serviceName = clazz.getEnclosingClass().getName();  
            String pname = serviceName + "$Processor";  
            try {  
                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
                Class<?> pclass = classLoader.loadClass(pname);  
                if (!TProcessor.class.isAssignableFrom(pclass)) {  
                    continue;  
                }  
                Constructor<?> constructor = pclass.getConstructor(clazz);  
                processor = (TProcessor) constructor.newInstance(service);  
                break;  
            } catch (Exception e) {  
                //  
            }  
        }  
        if (processor == null) {  
            throw new IllegalClassFormatException("service-class should implements Iface");  
        }  
        //須要單獨的線程,由於serve方法是阻塞的.  
        serverThread = new ServerThread(processor, port);  
        serverThread.start();  
        // 註冊服務  
        if (thriftServerAddressRegister != null) {  
            thriftServerAddressRegister.register(serviceName, version, hostname);  
        }  
  
    }  
    class ServerThread extends Thread {  
        private TServer server;  
        ServerThread(TProcessor processor, int port) throws Exception {  
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);  
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);    
            TProcessorFactory processorFactory = new TProcessorFactory(processor);  
            tArgs.processorFactory(processorFactory);  
            tArgs.transportFactory(new TFramedTransport.Factory());    
            tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));   
            server = new TThreadedSelectorServer(tArgs);  
        }  
  
        @Override  
        public void run(){  
            try{  
                //啓動服務  
                server.serve();  
            }catch(Exception e){  
                //  
            }  
        }  
          
        public void stopServer(){  
            server.stop();  
        }  
    }  
      
    public void close() {  
        serverThread.stopServer();  
    }  
}
相關文章
相關標籤/搜索