還記得Curator提供哪幾個組件嗎? 咱們不妨回顧一下: html
前面的例子其實前五個組件都涉及到了, 好比Utilities例子的TestServer, Client裏的CuratorZookeeperClient, Errors裏的ConnectionStateListener等。 還有最後一個組件咱們尚未介紹,那就是Curator擴展組件。 java
Recipes組件包含了豐富的Curator應用的組件。 可是這些並非ZooKeeper Recipe的所有。 大量的分佈式應用已經抽象出了許許多多的的Recipe,其中有些仍是能夠經過Curator來實現。 若是不斷都將這些Recipe都增長到Recipes中, Recipes會變得愈來愈大。 爲了不這種情況, Curator把一些其它的Recipe放在單獨的包中, 命名方式就是curator-x-<name>,好比curator-x-discovery, curator-x-rpc。 本文就是介紹curator-x-discovery。 apache
這是一個服務發現的Recipe。
咱們在介紹臨時節點Ephemeral Node的時候就講到, 能夠經過臨時節點建立一個服務註冊機制。 服務啓動後建立臨時節點, 服務斷掉後臨時節點就不存在了。 這個擴展抽象了這種功能,聽過了一套API,能夠實現服務發現機制。 app
咱們先介紹一下例子中的服務類。InstanceDetails定義了服務實例的基本信息,實際中可能會定義更詳細的信息。 dom
package com.colobu.zkrecipe.discovery; import org.codehaus.jackson.map.annotate.JsonRootName; /** * In a real application, the Service payload will most likely be more detailed * than this. But, this gives a good example. */ @JsonRootName("details") public class InstanceDetails { private String description; public InstanceDetails() { this(""); } public InstanceDetails(String description) { this.description = description; } public void setDescription(String description) { this.description = description; } public String getDescription() { return description; } }
ExampleServer至關與你在分佈式環境中的服務應用。 每一個服務應用實例都相似這個類, 應用啓動時調用start, 關閉時調用close。 分佈式
package com.colobu.zkrecipe.discovery; import java.io.Closeable; import java.io.IOException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.UriSpec; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; /** * This shows a very simplified method of registering an instance with the * service discovery. Each individual instance in your distributed set of * applications would create an instance of something similar to ExampleServer, * start it when the application comes up and close it when the application * shuts down. */ public class ExampleServer implements Closeable { private final ServiceDiscovery<InstanceDetails> serviceDiscovery; private final ServiceInstance<InstanceDetails> thisInstance; public ExampleServer(CuratorFramework client, String path, String serviceName, String description) throws Exception { // in a real application, you'd have a convention of some kind for the // URI layout UriSpec uriSpec = new UriSpec("{scheme}://foo.com:{port}"); thisInstance = ServiceInstance.<InstanceDetails> builder().name(serviceName).payload(new InstanceDetails(description)) .port((int) (65535 * Math.random())) // in a real application, // you'd use a common // port .uriSpec(uriSpec).build(); // if you mark your payload class with @JsonRootName the provided // JsonInstanceSerializer will work JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class); serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(path).serializer(serializer) .thisInstance(thisInstance).build(); } public ServiceInstance<InstanceDetails> getThisInstance() { return thisInstance; } public void start() throws Exception { serviceDiscovery.start(); } @Override public void close() throws IOException { CloseableUtils.closeQuietly(serviceDiscovery); } }
DiscoveryExample提供了增長,刪除,顯示,註冊已有的服務的功能。 注意此處服務註冊是由ExampleServer本身完成的, 這比較符合實際的狀況。 實際狀況是服務本身起來後主動註冊服務。 可是此處啓動又是由DiscoveryExample來調用, 純粹爲了演示使用。 你能夠根據你本身的狀況合理安排服務的註冊和啓動。 ide
random命令提供了一個徹底由DiscoveryExample控制的服務。 它負責註冊一個服務並啓動。 ui
調用close就關閉了服務。 this
package com.colobu.zkrecipe.discovery; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.strategies.RandomStrategy; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class DiscoveryExample { private static final String PATH = "/discovery/example"; public static void main(String[] args) throws Exception { // This method is scaffolding to get the example up and running TestingServer server = new TestingServer(); CuratorFramework client = null; ServiceDiscovery<InstanceDetails> serviceDiscovery = null; Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap(); try { client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class); serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(PATH).serializer(serializer).build(); serviceDiscovery.start(); processCommands(serviceDiscovery, providers, client); } finally { for (ServiceProvider<InstanceDetails> cache : providers.values()) { CloseableUtils.closeQuietly(cache); } CloseableUtils.closeQuietly(serviceDiscovery); CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(server); } } private static void processCommands(ServiceDiscovery<InstanceDetails> serviceDiscovery, Map<String, ServiceProvider<InstanceDetails>> providers, CuratorFramework client) throws Exception { // More scaffolding that does a simple command line processor printHelp(); List<ExampleServer> servers = Lists.newArrayList(); try { BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); boolean done = false; while (!done) { System.out.print("> "); String line = in.readLine(); if (line == null) { break; } String command = line.trim(); String[] parts = command.split("\\s"); if (parts.length == 0) { continue; } String operation = parts[0]; String args[] = Arrays.copyOfRange(parts, 1, parts.length); if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) { printHelp(); } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) { done = true; } else if (operation.equals("add")) { addInstance(args, client, command, servers); } else if (operation.equals("delete")) { deleteInstance(args, command, servers); } else if (operation.equals("random")) { listRandomInstance(args, serviceDiscovery, providers, command); } else if (operation.equals("list")) { listInstances(serviceDiscovery); } } } finally { for (ExampleServer server : servers) { CloseableUtils.closeQuietly(server); } } } private static void listRandomInstance(String[] args, ServiceDiscovery<InstanceDetails> serviceDiscovery, Map<String, ServiceProvider<InstanceDetails>> providers, String command) throws Exception { // this shows how to use a ServiceProvider // in a real application you'd create the ServiceProvider early for the // service(s) you're interested in if (args.length != 1) { System.err.println("syntax error (expected random <name>): " + command); return; } String serviceName = args[0]; ServiceProvider<InstanceDetails> provider = providers.get(serviceName); if (provider == null) { provider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).providerStrategy(new RandomStrategy<InstanceDetails>()).build(); providers.put(serviceName, provider); provider.start(); Thread.sleep(2500); // give the provider time to warm up - in a real // application you wouldn't need to do this } ServiceInstance<InstanceDetails> instance = provider.getInstance(); if (instance == null) { System.err.println("No instances named: " + serviceName); } else { outputInstance(instance); } } private static void listInstances(ServiceDiscovery<InstanceDetails> serviceDiscovery) throws Exception { // This shows how to query all the instances in service discovery try { Collection<String> serviceNames = serviceDiscovery.queryForNames(); System.out.println(serviceNames.size() + " type(s)"); for (String serviceName : serviceNames) { Collection<ServiceInstance<InstanceDetails>> instances = serviceDiscovery.queryForInstances(serviceName); System.out.println(serviceName); for (ServiceInstance<InstanceDetails> instance : instances) { outputInstance(instance); } } } finally { CloseableUtils.closeQuietly(serviceDiscovery); } } private static void outputInstance(ServiceInstance<InstanceDetails> instance) { System.out.println("\t" + instance.getPayload().getDescription() + ": " + instance.buildUriSpec()); } private static void deleteInstance(String[] args, String command, List<ExampleServer> servers) { // simulate a random instance going down // in a real application, this would occur due to normal operation, a // crash, maintenance, etc. if (args.length != 1) { System.err.println("syntax error (expected delete <name>): " + command); return; } final String serviceName = args[0]; ExampleServer server = Iterables.find(servers, new Predicate<ExampleServer>() { @Override public boolean apply(ExampleServer server) { return server.getThisInstance().getName().endsWith(serviceName); } }, null); if (server == null) { System.err.println("No servers found named: " + serviceName); return; } servers.remove(server); CloseableUtils.closeQuietly(server); System.out.println("Removed a random instance of: " + serviceName); } private static void addInstance(String[] args, CuratorFramework client, String command, List<ExampleServer> servers) throws Exception { // simulate a new instance coming up // in a real application, this would be a separate process if (args.length < 2) { System.err.println("syntax error (expected add <name> <description>): " + command); return; } StringBuilder description = new StringBuilder(); for (int i = 1; i < args.length; ++i) { if (i > 1) { description.append(' '); } description.append(args[i]); } String serviceName = args[0]; ExampleServer server = new ExampleServer(client, PATH, serviceName, description.toString()); servers.add(server); server.start(); System.out.println(serviceName + " added"); } private static void printHelp() { System.out.println("An example of using the ServiceDiscovery APIs. This example is driven by entering commands at the prompt:\n"); System.out.println("add <name> <description>: Adds a mock service with the given name and description"); System.out.println("delete <name>: Deletes one of the mock services with the given name"); System.out.println("list: Lists all the currently registered services"); System.out.println("random <name>: Lists a random instance of the service with the given name"); System.out.println("quit: Quit the example"); System.out.println(); } }
其它兩個擴展Curator RPC Proxy(curator-x-rpc)擴展和Service Discovery Server(curator-x-discovery-server)是爲了橋接非Java應用的擴展,本系列將再也不介紹了。感興趣的朋友能夠看下面的 文檔。 Curator Service Discovery Curator RPC Proxy google