Service Discovery with Apache Curator

 

Curator的介紹

  Curator就是Zookeeper的一個客戶端工具(不知道Zookeeper的同窗能夠到http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/學習下),封裝ZooKeeper client與ZooKeeper server之間的鏈接處理以及zookeeper的經常使用操做,提供ZooKeeper各類應用場景(recipe, 好比共享鎖服務, 集羣領導選舉機制)的抽象封裝。固然還有他看起來很是舒服的Fluent風格的API。 Curator主要從如下幾個方面下降了zk使用的複雜性: html

  • 重試機制:提供可插拔的重試機制, 它將給捕獲全部可恢復的異常配置一個重試策略, 而且內部也提供了幾種標準的重試策略(好比指數補償).
  • 鏈接狀態監控: Curator初始化以後會一直的對zk鏈接進行監聽, 一旦發現鏈接狀態發生變化, 將做出相應的處理.
  • zk客戶端實例管理:Curator對zk客戶端到server集羣鏈接進行管理. 並在須要的狀況, 重建zk實例, 保證與zk集羣的可靠鏈接 
  • 各類使用場景支持:Curator實現zk支持的大部分使用場景支持(甚至包括zk自身不支持的場景), 這些實現都遵循了zk的最佳實踐, 並考慮了各類極端狀況.

  Curator經過以上的處理, 讓用戶專一於自身的業務自己, 而無需花費更多的精力在zk自己.這裏咱們介紹的是Curator的Service Discovery模塊java

Service Discovery 

  咱們一般在調用服務的時候,須要知道服務的地址,端口,或者其餘一些信息,一般狀況下,咱們是把他們寫到程序裏面,可是隨着服務愈來愈多,維護起來也愈來愈費勁,更重要的是,因爲地址都是在程序中配置的,咱們根本不知道遠程的服務是否可用,當咱們增長或者刪除服務,咱們又須要到配置文件中配置麼? 這時候,Zookeeper幫大忙了,咱們能夠把咱們的服務註冊到Zookeeper中,建立一個臨時節點(當鏈接斷開以後,節點將被刪除),存放咱們的服務信息(url,ip,port等信息),把這些臨時節點都存放在以serviceName命名的節點下面,這樣咱們要獲取某個服務的地址,只須要到Zookeeper中找到這個path,而後就能夠讀取到裏面存放的服務信息,這時候咱們就能夠根據這些信息調用咱們的服務。這樣,經過Zookeeper咱們就作到了動態的添加和刪除服務,作到了一旦一個服務時效,就會自動從Zookeeper中移除,基本上Curator中的Service Discovery就是作的這點事。
  下面咱們用兩張圖片來比較一下,通常狀況下的服務調用,和使用 Dynamic Service Registry 的區別web

    

 

  使用zookeeper作服務註冊以後:apache

 

關於Apache curator的service discovery的一些介紹能夠參考官方文檔:http://curator.apache.org/curator-x-discovery/index.htmldom

 

Service Discovery 的使用

  通常而言,分爲 Service Registry 和 Service Discovery,對應服務端和客戶端。也就是由服務提供者,講自身的信息註冊到Zookeeper,而後,客戶端經過到Zookeeper中查找服務信息,而後根據信息就行調用(見上圖)。說了這麼多,上代碼了。ide

  首先咱們定義個payload,咱們這一在裏面存儲一些服務信息。這個信息會被保存在Zookeeper,這裏只是舉個例子,你還能夠寫入更多你想要的信息。工具

  

package discovery;

import org.codehaus.jackson.map.annotate.JsonRootName;



/**
 * Created by hupeng on 2014/9/16.
 */
@JsonRootName("details")
public class InstanceDetails {

    private String id;

    private String listenAddress;

    private int listenPort;

    private String interfaceName;

    public InstanceDetails(String id, String listenAddress, int listenPort,String interfaceName) {
        this.id = id;
        this.listenAddress = listenAddress;
        this.listenPort = listenPort;
        this.interfaceName = interfaceName;
    }

    public InstanceDetails() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getListenAddress() {
        return listenAddress;
    }

    public void setListenAddress(String listenAddress) {
        this.listenAddress = listenAddress;
    }

    public int getListenPort() {
        return listenPort;
    }

    public void setListenPort(int listenPort) {
        this.listenPort = listenPort;
    }

    public String getInterfaceName() {
        return interfaceName;
    }

    public void setInterfaceName(String interfaceName) {
        this.interfaceName = interfaceName;
    }

    @Override
    public String toString() {
        return "InstanceDetails{" +
                "id='" + id + '\'' +
                ", listenAddress='" + listenAddress + '\'' +
                ", listenPort=" + listenPort +
                ", interfaceName='" + interfaceName + '\'' +
                '}';
    }
}

 

咱們先寫服務註冊,也就是服務端那邊作的事情。學習

package discovery;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

import java.io.IOException;
/**
 * Created by hupeng on 2014/9/16.
 */
public class ServiceRegistrar{

    private ServiceDiscovery<InstanceDetails> serviceDiscovery;
    private final CuratorFramework client;


    public ServiceRegistrar(CuratorFramework client,String basePath) throws Exception {
        this.client = client;
        JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class);
        serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class)
                .client(client)
                .serializer(serializer)
                .basePath(basePath)
                .build();
        serviceDiscovery.start();
    }

    public void registerService(ServiceInstance<InstanceDetails> serviceInstance) throws Exception {
        serviceDiscovery.registerService(serviceInstance);
    }

    public void unregisterService(ServiceInstance<InstanceDetails> serviceInstance) throws Exception {
        serviceDiscovery.unregisterService(serviceInstance);

    }

    public void updateService(ServiceInstance<InstanceDetails> serviceInstance) throws Exception {
        serviceDiscovery.updateService(serviceInstance);

    }

    public void close() throws IOException {
        serviceDiscovery.close();
    }
}

通常狀況下,會在咱們服務啓動的時候就將服務信息註冊,好比咱們是web項目的話能夠寫一個Servlet Listener進行註冊,這裏爲了方便,寫一個Main方法進行測試,若是咱們把咱們的信息存儲在payload中的話,UriSpec是能夠不定義的。測試

package discovery;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;


import java.util.UUID;

/**
 * User: hupeng
 * Date: 14-9-16
 * Time: 下午8:05
 */
public class ServerApp {

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();
        ServiceRegistrar serviceRegistrar = new ServiceRegistrar(client,"services");
        ServiceInstance<InstanceDetails> instance1 = ServiceInstance.<InstanceDetails>builder()
                .name("service1")
                .port(12345)
                .address("192.168.1.100")   //address不寫的話,會取本地ip
                .payload(new InstanceDetails(UUID.randomUUID().toString(),"192.168.1.100",12345,"Test.Service1"))
                .uriSpec(new UriSpec("{scheme}://{address}:{port}"))
                .build();
        ServiceInstance<InstanceDetails> instance2 = ServiceInstance.<InstanceDetails>builder()
                .name("service2")
                .port(12345)
                .address("192.168.1.100")
                .payload(new InstanceDetails(UUID.randomUUID().toString(),"192.168.1.100",12345,"Test.Service2"))
                .uriSpec(new UriSpec("{scheme}://{address}:{port}"))
                .build();
        serviceRegistrar.registerService(instance1);
        serviceRegistrar.registerService(instance2);


        Thread.sleep(Integer.MAX_VALUE);
    }
}

再來寫Service discoveryui

package discovery;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RandomStrategy;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
 * Created by hupeng on 2014/9/16.
 */
public class ServiceDiscoverer {
    private ServiceDiscovery<InstanceDetails> serviceDiscovery;
    private Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap();
    private List<Closeable> closeableList = Lists.newArrayList();
    private Object lock = new Object();


    public ServiceDiscoverer(CuratorFramework client ,String basePath) throws Exception {
        JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class);
        serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class)
                .client(client)
                .basePath(basePath)
                .serializer(serializer)
                .build();

        serviceDiscovery.start();
    }


    public ServiceInstance<InstanceDetails> getInstanceByName(String serviceName) throws Exception {
        ServiceProvider<InstanceDetails> provider = providers.get(serviceName);
        if (provider == null) {
            synchronized (lock) {
                provider = providers.get(serviceName);
                if (provider == null) {
                    provider = serviceDiscovery.serviceProviderBuilder().
                            serviceName(serviceName).
                            providerStrategy(new RandomStrategy<InstanceDetails>())
                            .build();
                    provider.start();
                    closeableList.add(provider);
                    providers.put(serviceName, provider);
                }
            }
        }


        return provider.getInstance();
    }


    public synchronized void close(){
       for (Closeable closeable : closeableList) {
           CloseableUtils.closeQuietly(closeable);
       }
    }


}

客戶端測試程序:

package discovery;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceInstance;

/**
 * User: hupeng
 * Date: 14-9-16
 * Time: 下午8:16
 */
public class ClientApp {

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();
        ServiceDiscoverer serviceDiscoverer = new ServiceDiscoverer(client,"services");

        ServiceInstance<InstanceDetails> instance1 = serviceDiscoverer.getInstanceByName("service1");

        System.out.println(instance1.buildUriSpec());
        System.out.println(instance1.getPayload());

        ServiceInstance<InstanceDetails> instance2 = serviceDiscoverer.getInstanceByName("service1");

        System.out.println(instance2.buildUriSpec());
        System.out.println(instance2.getPayload());

        serviceDiscoverer.close();
        CloseableUtils.closeQuietly(client);
    }
}

好了,代碼就到這裏,若是有什麼問題的話,請指正。

相關文章
相關標籤/搜索