在java中如何使用etcd的v2 和v3 api獲取配置,而且對配置的變化進行監控和監聽

etcd 和zookeeper 很像,均可以用來作配置管理。而且etcd能夠在目前流行的Kubernetes中使用。html

可是etcd 提供了v2版本合v3的版本的兩種api。咱們如今分別來介紹一下這兩個版本api的使用。java

1、Etcd V2版本APInode

一、java工程中使用maven引入 etcd v2的java api操做jar包git

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.21.Final</version>
</dependency>
<dependency>
<groupId>org.mousio</groupId>
<artifactId>etcd4j</artifactId>
<version>2.15.0</version>
</dependency>

二、etcd的連接
import mousio.etcd4j.EtcdClient;
import java.io.InputStream;
import java.net.URI;
import java.util.Properties;

public class EtcdUtil {
//etcd客戶端連接
private static EtcdClient client = null;
//連接初始化
public static synchronized EtcdClient getClient(){
if (null == client){
client = new EtcdClient (URI.create(properties.getProperty("http://127.0.0.1:2379")));
}
return client;
}
}

 


三、如何獲取etcd的配置而且對配置進行監聽
//初始化獲取etcd配置而且進行配置監聽
private void initEtcdConfig() {
EtcdKeysResponse dataTree ;
try {
final EtcdClient etcdClient = EtcdUtil.getClient();
//獲取etcd中名稱叫ETCD文件夾下的配置
EtcdKeyGetRequest etcdKeyGetRequest = etcdClient.getDir("ETCD").consistent();
dataTree = etcdKeyGetRequest.send().get();
//獲取etcd的版本
System.out.println("ETCD's version:"+etcdClient.getVersion());
getConfig("/ETCD/example.config",dataTree); //加載配置項
//啓動一個線程進行監聽
startListenerThread(etcdClient);
} catch (Exception e) {
System.out.println("EtcdClient init cause Exception:"+e.getMessage());
e.printStackTrace();
}
}
private String getConfig(String configFile,EtcdKeysResponse dataTree){
if(null != dataTree && dataTree.getNode().getNodes().size()>0){
for(EtcdKeysResponse.EtcdNode node:dataTree.getNode().getNodes()){
if(node.getKey().equals(configFile)){
return node.getValue();
}
}
}
System.out.println("Etcd configFile"+ configFile+"is not exist,Please Check");
return null;
}
public void startListenerThread(EtcdClient etcdClient){
new Thread(()->{
startListener(etcdClient);
}).start();
}
public void startListener(EtcdClient etcdClient){
ResponsePromise promise =null;
try {
promise = etcdClient.getDir(SYSTEM_NAME).recursive().waitForChange().consistent().send();
promise.addListener(promisea -> {
System.out.println("found ETCD's config cause change");
try {
getConfig("/ETCD/example.config", etcdClient.getDir("ETCD").consistent().send().get()); //加載配置項
} catch (Exception e) {
e.printStackTrace();
System.out.println("listen etcd 's config change cause exception:{}"+e.getMessage());
}
startListener(etcdClient);
});
} catch (Exception e) {
startListener(etcdClient);
System.out.println("listen etcd 's config change cause exception:"+e.getMessage());
e.printStackTrace();
}
}

 

四、使用dcmp管理 etcd的配置項
dcmp 是一個使用go語言開發的etcd配置界面,不足的時,這個只支持v2的API,項目的地址:

https://github.com/silenceper/dcmpgithub

 

2、Etcd V3版本APIapi

一、在工程引入以下依賴promise

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.15.Final</version>
</dependency>
<dependency>
<groupId>com.coreos</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.0.2</version>
</dependency>
二、v3 api操做工具類
public class EtcdUtil {
    //etcl客戶端連接
    private static Client client = null;
    //連接初始化
    public static synchronized Client getEtclClient(){
        if(null == client){
            client = Client.builder().endpoints(props.getProperty("http://127.0.0.1:2379")).build();
        }
            return client;
    }

    /**
     * 根據指定的配置名稱獲取對應的value
     * @param key 配置項
     * @return
     * @throws Exception
     */
    public static String getEtcdValueByKey(String key) throws Exception {
        List<KeyValue> kvs = EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(key)).get().getKvs();
        if(kvs.size()>0){
            String value = kvs.get(0).getValue().toStringUtf8();
            return value;
        }
        else {
            return null;
        }
    }

    /**
     * 新增或者修改指定的配置
     * @param key
     * @param value
     * @return
     */
    public static void putEtcdValueByKey(String key,String value) throws Exception{
        EtcdUtil.getEtclClient().getKVClient().put(ByteSequence.fromString(key),ByteSequence.fromBytes(value.getBytes("utf-8")));

    }

    /**
     * 刪除指定的配置
     * @param key
     * @return
     */
    public static void deleteEtcdValueByKey(String key){
         EtcdUtil.getEtclClient().getKVClient().delete(ByteSequence.fromString(key));

    }
}
//V3 api配置初始化和監聽
public void init(){
try {
    //加載配置
    getConfig(EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(ETCD_CONFIG_FILE_NAME)).get().getKvs());
//啓動監聽線程
    new Thread(() -> {
//對某一個配置進行監聽
        Watch.Watcher watcher = EtcdUtil.getEtclClient().getWatchClient().watch(ByteSequence.fromString("etcd_key"));
        try {
            while(true) {
                watcher.listen().getEvents().stream().forEach(watchEvent -> {
                    KeyValue kv = watchEvent.getKeyValue();
            //獲取事件變化類型
            System.out.println(watchEvent.getEventType());
                 //獲取發生變化的key
                  System.out.println(kv.getKey().toStringUtf8());
          //獲取變化後的value
                    String afterChangeValue = kv.getValue().toStringUtf8();

                });
            }
        } catch (InterruptedException e) {
            e.printStackTrace();

        }
    }).start();
} catch (Exception e) {
    e.printStackTrace();

}

}
private String getConfig(List<KeyValue> kvs){
    if(kvs.size()>0){
        String config = kvs.get(0).getValue().toStringUtf8();
        System.out.println("etcd 's config 's configValue is :"+config);
        return config;
    }
    else {
        return null;
    }
}

v3版本的正式版本代碼(能夠直接使用的工具類,而且加上了日誌):maven

 1 import com.coreos.jetcd.Client;
 2 import com.coreos.jetcd.Watch;
 3 import com.coreos.jetcd.data.ByteSequence;
 4 import com.coreos.jetcd.data.KeyValue;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import java.util.List;
 8 /**
 9  *  etcd 連接和操做工具,包括啓動監聽 操做etcd v3 版本協議,此操做不支持v2 版本協議。
10  *  v2版本的協議能夠參考 https://www.cnblogs.com/laoqing/p/8967549.html
11  */
12 public class EtcdUtil {
13     public static Logger log = LoggerFactory.getLogger(EtcdUtil.class);
14     //etcl客戶端連接
15     private static Client client = null;
16 
17     //連接初始化 單例模式
18     public static synchronized Client getEtclClient(){
19         if(null == client){
20             client = Client.builder().endpoints("http://xxx.xxx.xxx.xxx:2379").build();
21         }
22         return client;
23     }
24     /**
25      * 根據指定的配置名稱獲取對應的value
26      * @param key 配置項
27      * @return
28      * @throws Exception
29      */
30     public static String getEtcdValueByKey(String key) throws Exception {
31         List<KeyValue> kvs = EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(key)).get().getKvs();
32         if(kvs.size()>0){
33             String value = kvs.get(0).getValue().toStringUtf8();
34             return value;
35         }
36         else {
37             return null;
38         }
39     }
40     /**
41      * 新增或者修改指定的配置
42      * @param key
43      * @param value
44      * @return
45      */
46     public static void putEtcdValueByKey(String key,String value) throws Exception{
47         EtcdUtil.getEtclClient().getKVClient().put(ByteSequence.fromString(key),ByteSequence.fromBytes(value.getBytes("utf-8")));
48     }
49     /**
50      * 刪除指定的配置
51      * @param key
52      * @return
53      */
54     public static void deleteEtcdValueByKey(String key){
55         EtcdUtil.getEtclClient().getKVClient().delete(ByteSequence.fromString(key));
56     }
57     /**
58      * etcd的監聽,監聽指定的key,當key 發生變化後,監聽自動感知到變化。
59      * @param key 指定須要監聽的key
60      */
61     public static void initListen(String key){
62         try {
63             //加載配置
64             getConfig(EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(key)).get().getKvs());
65             new Thread(() -> {
66                 Watch.Watcher watcher = EtcdUtil.getEtclClient().getWatchClient().watch(ByteSequence.fromString(key));
67                 try {
68                     while(true) {
69                         watcher.listen().getEvents().stream().forEach(watchEvent -> {
70                             KeyValue kv = watchEvent.getKeyValue();
71                             log.info("etcd event:{} ,change key is:{},afterChangeValue:{}",watchEvent.getEventType(),kv.getKey().toStringUtf8(),kv.getValue().toStringUtf8());
72                         });
73                     }
74                 } catch (InterruptedException e) {
75                     log.info("etcd listen start cause Exception:{}",e);
76                 }
77             }).start();
78         } catch (Exception e) {
79             log.info("etcd listen start cause Exception:{}",e);
80         }
81     }
82     private static String getConfig(List<KeyValue> kvs){
83         if(kvs.size()>0){
84             String config = kvs.get(0).getKey().toStringUtf8();
85             String value = kvs.get(0).getValue().toStringUtf8();
86             log.info("etcd 's config 's config key is :{},value is:{}",config,value);
87             return value;
88         }
89         else {
90             return null;
91         }
92     }
93 }

 

運行後的效果:工具

 

 

另外v3 版本api 的管理界面,能夠參考以下開源項目ui

https://github.com/shiguanghuxian/etcd-manage 

 

 

 

【原文歸做者全部,歡迎轉載,可是保留版權,而且轉載時,須要註明出處

相關文章
相關標籤/搜索