etcd是一個開源的、分佈式的鍵值對數據存儲系統,提供共享配置、服務的註冊和發現。etcd與zookeeper相比算是輕量級系統,二者的一致性協議也同樣,etcd的raft比zookeeper的paxos簡單。關於windows版本的etcd服務端和nodejs瀏覽器下載和安裝見etcd服務端和客戶端安裝。html
咱們用etcd,就須要etcd客戶端,這裏用的是java客戶端etcd4j。etcd客戶端經過http發送get、put、post、delete等操做到服務端執行對目錄信息的增刪查改。etcd應用於微服務架構中的角色是服務註冊中心,經過對接口調用信息的添加和查詢,提供服務的註冊和發現能力。java
下面結合etcd4j的主要功能類EtcdClient,咱們看看etcd的一些操做:node
* Copyright (c) 2015, Jurriaan Mous and contributors as indicated by the @author tags. package mousio.etcd4j; import io.netty.handler.ssl.SslContext; import mousio.client.retry.RetryPolicy; import mousio.client.retry.RetryWithExponentialBackOff; import mousio.etcd4j.requests.*; import mousio.etcd4j.responses.EtcdAuthenticationException; import mousio.etcd4j.responses.EtcdException; import mousio.etcd4j.responses.EtcdHealthResponse; import mousio.etcd4j.responses.EtcdMembersResponse; import mousio.etcd4j.responses.EtcdSelfStatsResponse; import mousio.etcd4j.responses.EtcdStoreStatsResponse; import mousio.etcd4j.responses.EtcdVersionResponse; import mousio.etcd4j.responses.EtcdLeaderStatsResponse; import mousio.etcd4j.transport.EtcdClientImpl; import mousio.etcd4j.transport.EtcdNettyClient; import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.util.concurrent.TimeoutException; /** * Etcd client. */ public class EtcdClient implements Closeable { private final EtcdClientImpl client; private RetryPolicy retryHandler; /** * Constructor * * @param baseUri URI to create connection on */ public EtcdClient(URI... baseUri) { this(EtcdSecurityContext.NONE, baseUri); } /** * Constructor * * @param username username * @param password password * @param baseUri URI to create connection on */ public EtcdClient(String username, String password, URI... baseUri) { this(EtcdSecurityContext.withCredential(username, password), baseUri); } /** * Constructor * * @param sslContext context for Ssl connections * @param username username * @param password password * @param baseUri URI to create connection on */ public EtcdClient(SslContext sslContext, String username, String password, URI... baseUri) { this(new EtcdSecurityContext(sslContext, username, password), baseUri); } /** * Constructor * * @param sslContext context for Ssl connections * @param baseUri URI to create connection on */ public EtcdClient(SslContext sslContext, URI... baseUri) { this(EtcdSecurityContext.withSslContext(sslContext), baseUri); } /** * Constructor * * @param securityContext context for security * @param baseUri URI to create connection on */ public EtcdClient(EtcdSecurityContext securityContext, URI... baseUri) { this(new EtcdNettyClient( securityContext, (baseUri.length == 0) ? new URI[] { URI.create("https://127.0.0.1:4001") } : baseUri )); } /** * Create a client with a custom implementation * * @param etcdClientImpl to create client with. */ public EtcdClient(EtcdClientImpl etcdClientImpl) { this.client = etcdClientImpl; this.retryHandler = RetryWithExponentialBackOff.DEFAULT; } /** * Get the version of the Etcd server * * @return version as String * @deprecated use version() when using etcd 2.1+. */ @Deprecated public String getVersion() { try { return new EtcdOldVersionRequest(this.client, retryHandler).send().get(); } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) { return null; } } /** * Get the version of the Etcd server * * @return version */ public EtcdVersionResponse version() { try { return new EtcdVersionRequest(this.client, retryHandler).send().get(); } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) { return null; } } /** * Get the Self Statistics of Etcd * * @return EtcdSelfStatsResponse */ public EtcdSelfStatsResponse getSelfStats() { try { return new EtcdSelfStatsRequest(this.client, retryHandler).send().get(); } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) { return null; } } /** * Get the Leader Statistics of Etcd * * @return EtcdLeaderStatsResponse */ public EtcdLeaderStatsResponse getLeaderStats() { try { return new EtcdLeaderStatsRequest(this.client, retryHandler).send().get(); } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) { return null; } } /** * Get the Store Statistics of Etcd * * @return vEtcdStoreStatsResponse */ public EtcdStoreStatsResponse getStoreStats() { try { return new EtcdStoreStatsRequest(this.client, retryHandler).send().get(); } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) { return null; } } /** * Get the Members of Etcd * * @return vEtcdMembersResponse */ public EtcdMembersResponse getMembers() { try { return new EtcdMembersRequest(this.client,retryHandler).send().get(); } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) { return null; } } /** * Get the Members of Etcd * * @return vEtcdMembersResponse */ public EtcdHealthResponse getHealth() { try { return new EtcdHealthRequest(this.client,retryHandler).send().get(); } catch (IOException | EtcdException | EtcdAuthenticationException | TimeoutException e) { return null; } } /** * Put a key with a value * * @param key to put * @param value to put on key * @return EtcdKeysRequest */ public EtcdKeyPutRequest put(String key, String value) { return new EtcdKeyPutRequest(client, key, retryHandler).value(value); } /** * Refresh a key with new ttl * (without notifying watchers when using etcd 2.3+) * * @param key to refresh * @param ttl to update key with * @return EtcdKeysRequest */ public EtcdKeyPutRequest refresh(String key, Integer ttl) { return new EtcdKeyPutRequest(client, key, retryHandler).refresh(ttl); } /** * Create a dir * * @param dir to create * @return EtcdKeysRequest */ public EtcdKeyPutRequest putDir(String dir) { return new EtcdKeyPutRequest(client, dir, retryHandler).isDir(); } /** * Post a value to a key for in-order keys. * * @param key to post to * @param value to post * @return EtcdKeysRequest */ public EtcdKeyPostRequest post(String key, String value) { return new EtcdKeyPostRequest(client, key, retryHandler).value(value); } /** * Deletes a key * * @param key to delete * @return EtcdKeysRequest */ public EtcdKeyDeleteRequest delete(String key) { return new EtcdKeyDeleteRequest(client, key, retryHandler); } /** * Deletes a directory * * @param dir to delete * @return EtcdKeysRequest */ public EtcdKeyDeleteRequest deleteDir(String dir) { return new EtcdKeyDeleteRequest(client, dir, retryHandler).dir(); } /** * Get by key * * @param key to get * @return EtcdKeysRequest */ public EtcdKeyGetRequest get(String key) { return new EtcdKeyGetRequest(client, key, retryHandler); } /** * Get directory * * @param dir to get * @return EtcdKeysGetRequest */ public EtcdKeyGetRequest getDir(String dir) { return new EtcdKeyGetRequest(client, dir, retryHandler).dir(); } /** * Get all keys * * @return EtcdKeysRequest */ public EtcdKeyGetRequest getAll() { return new EtcdKeyGetRequest(client, retryHandler); } @Override public void close() throws IOException { if (client != null) { client.close(); } } /** * Set the retry handler. Default is an exponential back-off with start of 20ms. * * @param retryHandler to set * @return this instance */ public EtcdClient setRetryHandler(RetryPolicy retryHandler) { this.retryHandler = retryHandler; return this; } }
這個類提供能etcd鏈接的方法,也就是構造器方法,提供了對etcd的操做方法(get、put、post、putDir、delete、deleteDir等),還提供了重試策略(參見RetryPolicy)。咱們跟一下EtcdClient(URI... baseUri),進入EtcdClient(EtcdSecurityContext securityContext, URI... baseUri),實例化EtcdNettyClient:bootstrap
* Copyright (c) 2015, Jurriaan Mous and contributors as indicated by the @author tags. package mousio.etcd4j.transport; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.resolver.dns.DnsServerAddresses; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import mousio.client.ConnectionState; import mousio.client.retry.RetryHandler; import mousio.etcd4j.EtcdSecurityContext; import mousio.etcd4j.promises.EtcdResponsePromise; import mousio.etcd4j.requests.EtcdRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.nio.channels.ClosedChannelException; import java.util.Map; import java.util.concurrent.CancellationException; /** * @author Jurriaan Mous * @author Luca Burgazzoli * * Netty client for the requests and responses */ public class EtcdNettyClient implements EtcdClientImpl { private static final Logger logger = LoggerFactory.getLogger(EtcdNettyClient.class); // default etcd port private static final int DEFAULT_PORT = 2379; private static final String ENV_ETCD4J_ENDPOINT = "ETCD4J_ENDPOINT"; private final EventLoopGroup eventLoopGroup; private final URI[] uris; private final Bootstrap bootstrap; //private final String hostName; private final EtcdNettyConfig config; private final EtcdSecurityContext securityContext; protected volatile int lastWorkingUriIndex; /** * Constructor * * @param sslContext SSL context if connecting with SSL. Null if not connecting with SSL. * @param uri to connect to */ public EtcdNettyClient(final SslContext sslContext, final URI... uri) { this(new EtcdNettyConfig(), sslContext, uri); } /** * Constructor * * @param securityContext security context. * @param uri to connect to */ public EtcdNettyClient(final EtcdSecurityContext securityContext, final URI... uri) { this(new EtcdNettyConfig(), securityContext, uri); } /** * Constructor with custom eventloop group and timeout * * @param config for netty * @param sslContext SSL context if connecting with SSL. Null if not connecting with SSL. * @param uris to connect to */ public EtcdNettyClient(final EtcdNettyConfig config, final SslContext sslContext, final URI... uris) { this(config, new EtcdSecurityContext(sslContext), uris); } /** * Constructor with custom eventloop group and timeout * * @param config for netty * @param uris to connect to */ public EtcdNettyClient(final EtcdNettyConfig config, final URI... uris) { this(config, EtcdSecurityContext.NONE, uris); } /** * Constructor with custom eventloop group and timeout * * @param config for netty * @param securityContext security context (ssl, authentication) * @param uris to connect to */ public EtcdNettyClient(final EtcdNettyConfig config, final EtcdSecurityContext securityContext, final URI... uris) { logger.info("Setting up Etcd4j Netty client"); this.lastWorkingUriIndex = 0; this.config = config.clone(); this.securityContext = securityContext.clone(); this.uris = uris; this.eventLoopGroup = config.getEventLoopGroup(); this.bootstrap = new Bootstrap() .group(eventLoopGroup) .channel(config.getSocketChannelClass()) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout()) .resolver(new DnsAddressResolverGroup( NioDatagramChannel.class, DnsServerAddresses.defaultAddresses())) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (securityContext.hasNettySsl()) { p.addLast(securityContext.nettySslContext().newHandler(ch.alloc())); } else if (securityContext.hasSsl()) { p.addLast(new SslHandler(securityContext.sslContext().createSSLEngine())); } p.addLast("codec", new HttpClientCodec()); p.addLast("auth", new HttpBasicAuthHandler()); p.addLast("chunkedWriter", new ChunkedWriteHandler()); p.addLast("aggregate", new HttpObjectAggregator(config.getMaxFrameSize())); } }); } /** * For tests * * @return the current bootstrap */ protected Bootstrap getBootstrap() { return bootstrap; } /** * Send a request and get a future. * * @param etcdRequest Etcd Request to send * @return Promise for the request. */ public <R> EtcdResponsePromise<R> send(final EtcdRequest<R> etcdRequest) throws IOException { ConnectionState connectionState = new ConnectionState(uris, lastWorkingUriIndex); if (etcdRequest.getPromise() == null) { etcdRequest.setPromise(new EtcdResponsePromise<R>( etcdRequest.getRetryPolicy(), connectionState, new RetryHandler() { @Override public void doRetry(ConnectionState connectionState) throws IOException { connect(etcdRequest, connectionState); } })); } connect(etcdRequest, connectionState); return etcdRequest.getPromise(); } /** * Connect to server * * @param etcdRequest to request with * @param <R> Type of response * @throws IOException if request could not be sent. */ @SuppressWarnings("unchecked") protected <R> void connect(final EtcdRequest<R> etcdRequest) throws IOException { this.connect(etcdRequest, etcdRequest.getPromise().getConnectionState()); } /** * Connect to server * * @param etcdRequest to request with * @param connectionState for retries * @param <R> Type of response * @throws IOException if request could not be sent. */ @SuppressWarnings("unchecked") protected <R> void connect(final EtcdRequest<R> etcdRequest, final ConnectionState connectionState) throws IOException { if(eventLoopGroup.isShuttingDown() || eventLoopGroup.isShutdown() || eventLoopGroup.isTerminated()){ etcdRequest.getPromise().getNettyPromise().cancel(true); logger.debug("Retry canceled because of closed etcd client"); return; } final URI uri; // when we are called from a redirect, the url in the request may also // contain host and port! URI requestUri = URI.create(etcdRequest.getUrl()); if (requestUri.getHost() != null && requestUri.getPort() > -1) { uri = requestUri; } else if (connectionState.uris.length == 0 && System.getenv(ENV_ETCD4J_ENDPOINT) != null) { // read uri from environment variable String endpoint_uri = System.getenv(ENV_ETCD4J_ENDPOINT); if(logger.isDebugEnabled()) { logger.debug("Will use environment variable {} as uri with value {}", ENV_ETCD4J_ENDPOINT, endpoint_uri); } uri = URI.create(endpoint_uri); } else { uri = connectionState.uris[connectionState.uriIndex]; } // Start the connection attempt. final ChannelFuture connectFuture = bootstrap.connect(connectAddress(uri)); etcdRequest.getPromise().getConnectionState().loop = connectFuture.channel().eventLoop(); etcdRequest.getPromise().attachNettyPromise(connectFuture.channel().eventLoop().<R>newPromise()); connectFuture.addListener(new GenericFutureListener<ChannelFuture>() { @Override public void operationComplete(final ChannelFuture f) throws Exception { if (!f.isSuccess()) { final Throwable cause = f.cause(); if (logger.isDebugEnabled()) { logger.debug("Connection failed to {}, cause {}", connectionState.uris[connectionState.uriIndex], cause); } if (cause instanceof ClosedChannelException || cause instanceof IllegalStateException) { etcdRequest.getPromise().cancel(new CancellationException("Channel closed")); } else { etcdRequest.getPromise().handleRetry(f.cause()); } return; } // Handle already cancelled promises if (etcdRequest.getPromise().getNettyPromise().isCancelled()) { f.channel().close(); etcdRequest.getPromise().getNettyPromise().setFailure(new CancellationException()); return; } final Promise listenedToPromise = etcdRequest.getPromise().getNettyPromise(); // Close channel when promise is satisfied or cancelled later listenedToPromise.addListener(new GenericFutureListener<Future<?>>() { @Override public void operationComplete(Future<?> future) throws Exception { // Only close if it was not redirected to new promise if (etcdRequest.getPromise().getNettyPromise() == listenedToPromise) { f.channel().close(); } } }); if (logger.isDebugEnabled()) { logger.debug("Connected to {} ({})", f.channel().remoteAddress().toString(), connectionState.uriIndex); } lastWorkingUriIndex = connectionState.uriIndex; modifyPipeLine(etcdRequest, f.channel().pipeline()); createAndSendHttpRequest(uri, etcdRequest.getUrl(), etcdRequest, f.channel()) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { etcdRequest.getPromise().setException(future.cause()); if (!f.channel().eventLoop().inEventLoop()) { f.channel().eventLoop().shutdownGracefully(); } f.channel().close(); } } }); f.channel().closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (logger.isDebugEnabled()) { logger.debug("Connection closed for request {} on uri {} ", etcdRequest.getMethod().name(), etcdRequest.getUri()); } } }); } }); } /** * Modify the pipeline for the request * * @param req to process * @param pipeline to modify * @param <R> Type of Response */ private <R> void modifyPipeLine(final EtcdRequest<R> req, final ChannelPipeline pipeline) { final EtcdResponseHandler<R> handler = new EtcdResponseHandler<>(this, req); if (req.hasTimeout()) { pipeline.addFirst(new ReadTimeoutHandler(req.getTimeout(), req.getTimeoutUnit())); } pipeline.addLast(handler); pipeline.addLast(new ChannelHandlerAdapter() { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { handler.retried(true); req.getPromise().handleRetry(cause); } }); } /** * Get HttpRequest belonging to etcdRequest * * @param server server for http request * @param uri to send request to * @param etcdRequest to send * @param channel to send request on * @param <R> Response type * @return HttpRequest * @throws Exception when creating or sending HTTP request fails */ private <R> ChannelFuture createAndSendHttpRequest(URI server, String uri, EtcdRequest<R> etcdRequest, Channel channel) throws Exception { HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, etcdRequest.getMethod(), uri); httpRequest.headers().add(HttpHeaderNames.CONNECTION, "keep-alive"); if(!this.config.hasHostName()) { httpRequest.headers().add(HttpHeaderNames.HOST, server.getHost() + ":" + server.getPort()); } else { httpRequest.headers().add(HttpHeaderNames.HOST, this.config.getHostName()); } HttpPostRequestEncoder bodyRequestEncoder = null; Map<String, String> keyValuePairs = etcdRequest.getRequestParams(); if (keyValuePairs != null && !keyValuePairs.isEmpty()) { HttpMethod etcdRequestMethod = etcdRequest.getMethod(); if (etcdRequestMethod == HttpMethod.POST || etcdRequestMethod == HttpMethod.PUT) { bodyRequestEncoder = new HttpPostRequestEncoder(httpRequest, false); for (Map.Entry<String, String> entry : keyValuePairs.entrySet()) { bodyRequestEncoder.addBodyAttribute(entry.getKey(), entry.getValue()); } httpRequest = bodyRequestEncoder.finalizeRequest(); } else { QueryStringEncoder encoder = new QueryStringEncoder(uri); for (Map.Entry<String, String> entry : keyValuePairs.entrySet()) { encoder.addParam(entry.getKey() , entry.getValue()); } httpRequest.setUri(encoder.toString()); } } etcdRequest.setHttpRequest(httpRequest); ChannelFuture future = channel.write(httpRequest); if (bodyRequestEncoder != null && bodyRequestEncoder.isChunked()) { future = channel.write(bodyRequestEncoder); } channel.flush(); return future; } /** * Close netty */ @Override public void close() { logger.info("Shutting down Etcd4j Netty client"); if (config.isManagedEventLoopGroup()) { logger.debug("Shutting down Netty Loop"); eventLoopGroup.shutdownGracefully(); } } private InetSocketAddress connectAddress(URI uri) { return InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort() == -1 ? DEFAULT_PORT : uri.getPort()); } private class HttpBasicAuthHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (securityContext.hasCredentials() && msg instanceof HttpRequest) { addBasicAuthHeader((HttpRequest)msg); } ctx.write(msg, promise); } private void addBasicAuthHeader(HttpRequest request) { final String auth = Base64.encode( Unpooled.copiedBuffer( securityContext.username() + ":" + securityContext.password(), CharsetUtil.UTF_8) ).toString(CharsetUtil.UTF_8); request.headers().add(HttpHeaderNames.AUTHORIZATION, "Basic " + auth); } } }
進入EtcdNettyClient(final EtcdSecurityContext securityContext, final URI... uri),再進入public EtcdNettyClient(final EtcdNettyConfig config,
final EtcdSecurityContext securityContext, final URI... uris),咱們發現etcd集成了netty框架。windows