zookeeper的安裝與集羣搭建參考:http://www.javashuo.com/article/p-cbooxgmm-dt.htmlhtml
描述:本章主要講java代碼如何實現zookeeper節點的增刪改查,用法與解釋所有在註釋裏。java
本教程的工程,使用maven、jdk八、springboot、zookeeper 3.4.12node
重點:你們學會增刪改查後,不妨動腦想下,zookeeper如何實現分佈式鎖,小小的提示下,競爭建立臨時節點,建立成功者,則得到鎖。web
注:請注意log4j2的配置,由於是java測試,並無經過web、servlet啓動程序,因此請把log4j2放在資源目錄的根目錄,我是新建了一個資源目錄包log4j,將log4j2.xml放在該目錄下。spring
一、pom.xmlsql
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
</parent>
<groupId>com.qy.code</groupId>
<artifactId>qy-zk</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
<maven.test.skip>true</maven.test.skip>
<java.version>1.8</java.version>
<spring.boot.version>2.0.1.RELEASE</spring.boot.version>
<qy.code.version>0.0.1-SNAPSHOT</qy.code.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<!-- 不使用springboot默認log -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
<!-- 排除衝突jar -->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<build>
<plugins>
<!-- 要將源碼放上去,須要加入這個插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<configuration>
<attach>true</attach>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.log4j2.xmlapache
注:請注意log4j2的配置,由於是java測試,並無經過web、servlet啓動程序,因此請把log4j2放在資源目錄的根目錄,我是新建了一個資源目錄包log4j,將log4j2.xml放在該目錄下。安全
<?xml version="1.0" encoding="UTF-8"?>
<!-- log4j2 自己日誌打印級別,以及從新刷新配置文件的時間-->
<Configuration status="WARN" monitorInterval="5">
<Properties>
<Property name="log-path">logs</Property>
<Property name="log-file-temp">temp.log</Property>
<Property name="log-file-info">info.log</Property>
<Property name="log-file-warn">warn.log</Property>
<Property name="log-file-error">error.log</Property>
<!-- 輸出格式 -->
<!-- <Property name="pattern">%p [%date{yyyy-MM-dd HH:mm:ss,SSS}] [%thread] %l %m %n </Property> -->
<Property name="pattern">%m %n </Property>
<!-- 日誌切割的最小單位 -->
<property name="every_file_size">1M</property>
</Properties>
<Appenders>
<!-- 重置太打印 打印debug及以上級別 -->
<Console name="Console-Appender" target="SYSTEM_OUT">
<ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" />
<PatternLayout pattern="${pattern}"/>
</Console>
<!--文件會打印出全部信息,這個log每次運行程序會自動清空,由append屬性決定,這個也挺有用的,適合臨時測試用-->
<File name="RollingFile-Appender-Temp" fileName="${log-path}/${log-file-temp}" append="false">
<PatternLayout pattern="${pattern}"/>
</File>
<!-- 這個會打印出全部的信息,每次大小超過size,則這size大小的日誌會自動存入按年份-月份創建的文件夾下面並進行壓縮,做爲存檔-->
<RollingFile name="RollingFile-Appender-INFO" fileName="${log-path}/${log-file-info}" append="true" filePattern="${log-path}/$${date:yyyy-MM}/info-%d{yyyy-MM-dd}-%i.log">
<!-- 只輸出INFO級別 -->
<Filters>
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY" />
<ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL" />
<ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters>
<PatternLayout pattern="${pattern}" />
<SizeBasedTriggeringPolicy size="${every_file_size}"/>
<!-- DefaultRolloverStrategy屬性如不設置,則默認爲最多同一文件夾下7個文件,這裏設置了10 -->
<DefaultRolloverStrategy max="10"/>
</RollingFile>
<RollingFile name="RollingFile-Appender-WARN" fileName="${log-path}/${log-file-warn}" append="true" filePattern="${log-path}/$${date:yyyy-MM}/warn-%d{yyyy-MM-dd}-%i.log">
<!-- 只輸出Warn級別 -->
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
<ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL" />
</Filters>
<PatternLayout pattern="${pattern}" />
<SizeBasedTriggeringPolicy size="${every_file_size}"/>
<!-- DefaultRolloverStrategy屬性如不設置,則默認爲最多同一文件夾下7個文件,這裏設置了10 -->
<DefaultRolloverStrategy max="10"/>
</RollingFile>
<RollingFile name="RollingFile-Appender-ERROR" fileName="${log-path}/${log-file-error}" append="true" filePattern="${log-path}/$${date:yyyy-MM}/error-%d{yyyy-MM-dd}-%i.log">
<!-- 只輸出ERROR級別 -->
<Filters>
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
<PatternLayout pattern="${pattern}" />
<SizeBasedTriggeringPolicy size="${every_file_size}"/>
<!-- DefaultRolloverStrategy屬性如不設置,則默認爲最多同一文件夾下7個文件,這裏設置了10 -->
<DefaultRolloverStrategy max="10"/>
</RollingFile>
</Appenders>
<Loggers>
<logger name="java.sql" level="debug" additivity="false">
<appender-ref ref="Console-Appender"/>
</logger>
<logger name="com.ibatis" level="debug" additivity="false">
<appender-ref ref="Console-Appender"/>
</logger>
<!-- 第三方的軟件日誌級別 -->
<logger name="org.springframework" level="info" additivity="true">
<AppenderRef ref="RollingFile-Appender-WARN" />
<AppenderRef ref="RollingFile-Appender-ERROR" />
</logger>
<!-- 第三方的軟件日誌級別 -->
<logger name="org.apache" level="warn" additivity="true">
<AppenderRef ref="RollingFile-Appender-WARN" />
<AppenderRef ref="RollingFile-Appender-ERROR" />
</logger>
<!-- 異步輸出 -->
<Root level="INFO">
<AppenderRef ref="Console-Appender"/>
<AppenderRef ref="RollingFile-Appender-Temp"/>
<AppenderRef ref="RollingFile-Appender-INFO" />
<AppenderRef ref="RollingFile-Appender-WARN" />
<AppenderRef ref="RollingFile-Appender-ERROR" />
</Root>
</Loggers>
</Configuration>
3.增刪改查.javaspringboot
package com.qy.zk; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; /** * 描述:zookeeper增刪改查 * 做者:七脈 * 重點:刪除、修改、查詢所用的version版本號,分佈式事務鎖的實現方式,樂觀鎖。ACL權限:全部人、限定人、限定IP等 */
public class MyZkConnect{ private static Logger log=LogManager.getLogger(MyZkConnect.class); //集羣節點
public static final String zkServerClusterConnect = "192.168.159.129:2181,192.168.159.129:2182,192.168.159.129:2183"; //單一節點
public static final String zkServerSingleConnect = "192.168.159.129:2181"; //超時毫秒數
public static final int timeout = 3000; public static void main(String[] args) throws InterruptedException, IOException, KeeperException { //創建鏈接
ZooKeeper zk = connect(); //zk.close();//關閉後不支持重連
log.info("zk 狀態:"+zk.getState()); /**恢復會話鏈接**/
//long sessionId = zk.getSessionId(); //byte[] sessionPasswd = zk.getSessionPasswd(); //zk2會話重連後,zk會話將失效,再也不支持作增刪改查操做。 //ZooKeeper zk2 = reconnect(sessionId, sessionPasswd);
/**建立節點**/
//create(zk, "/myzk", "myzk");
/**查詢節點Data**/ queryData(zk, "/myzk"); /**修改節點data**/
//update(zk, "/myzk", "myzk-update");
/**刪除節點**/
//delete(zk, "/myzk");
} /** * 描述:創建鏈接 * 做者:七脈 * @return * @throws IOException * @throws InterruptedException */
public static ZooKeeper connect() throws IOException, InterruptedException{ CountDownLatch cdl = new CountDownLatch(1); log.info("準備創建zk服務"); ZooKeeper zk = new ZooKeeper(zkServerClusterConnect, timeout, new MyZkWatcher(cdl,"創建鏈接")); log.info("完成創建zk服務"); cdl.await();//這裏爲了等待wather監聽事件結束
return zk; } /** * 描述:從新鏈接服務 * 做者:七脈 * @param sessionId 現有會話ID * @param sessionPasswd 現有會話密碼 * @return * @throws IOException * @throws InterruptedException * 重點:關閉後的會話鏈接,不支持重連。重連後,前會話鏈接將會失效。 */
public static ZooKeeper reconnect(long sessionId, byte[] sessionPasswd) throws IOException, InterruptedException{ CountDownLatch cdl = new CountDownLatch(1); log.info("準備從新鏈接zk服務"); ZooKeeper zk = new ZooKeeper(zkServerClusterConnect, timeout, new MyZkWatcher(cdl,"從新鏈接"), sessionId, sessionPasswd); log.info("完成從新鏈接zk服務"); cdl.await();//這裏爲了等待wather監聽事件結束
return zk; } /** * 描述:建立節點 * 做者:七脈 * @param zk * @param nodePath * @param nodeData * @throws KeeperException * @throws InterruptedException */
public static void create(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{ log.info("開始建立節點:{}, 數據:{}",nodePath,nodeData); List<ACL> acl = Ids.OPEN_ACL_UNSAFE; CreateMode createMode = CreateMode.PERSISTENT; String result = zk.create(nodePath, nodeData.getBytes(), acl, createMode); //建立節點有兩種,上面是第一種,還有一種可使用回調函數及參數傳遞,與上面方法名稱相同。
log.info("建立節點返回結果:{}",result); log.info("完成建立節點:{}, 數據:{}",nodePath,nodeData); } /** * 描述:查詢節點結構信息 * 做者:七脈 * @param zk * @param nodePath * @return * @throws KeeperException * @throws InterruptedException */
public static Stat queryStat(ZooKeeper zk,String nodePath) throws KeeperException, InterruptedException{ log.info("準備查詢節點Stat,path:{}", nodePath); Stat stat = zk.exists(nodePath, false); log.info("結束查詢節點Stat,path:{},version:{}", nodePath, stat.getVersion()); return stat; } /** * 描述:查詢節點Data值信息 * 做者:七脈 * @param zk * @param nodePath * @return * @throws KeeperException * @throws InterruptedException */
public static String queryData(ZooKeeper zk,String nodePath) throws KeeperException, InterruptedException{ log.info("準備查詢節點Data,path:{}", nodePath); String data = new String(zk.getData(nodePath, false, queryStat(zk, nodePath))); log.info("結束查詢節點Data,path:{},Data:{}", nodePath, data); return data; } /** * 描述:修改節點 * 做者:七脈 * @param zk * @param nodePath * @param nodeData * @throws KeeperException * @throws InterruptedException * 重點:每次修改節點的version版本號都會變動,因此每次修改都須要傳遞節點原版本號,以確保數據的安全性。 */
public static Stat update(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{ //修改節點前先查詢該節點信息
Stat stat = queryStat(zk, nodePath); log.info("準備修改節點,path:{},data:{},原version:{}", nodePath, nodeData, stat.getVersion()); Stat newStat = zk.setData(nodePath, nodeData.getBytes(), stat.getVersion()); //修改節點值有兩種方法,上面是第一種,還有一種可使用回調函數及參數傳遞,與上面方法名稱相同。 //zk.setData(path, data, version, cb, ctx);
log.info("完成修改節點,path:{},data:{},現version:{}", nodePath, nodeData, newStat.getVersion()); return stat; } /** * 描述:刪除節點 * 做者:七脈 * @param zk * @param nodePath * @throws InterruptedException * @throws KeeperException */
public static void delete(ZooKeeper zk,String nodePath) throws InterruptedException, KeeperException{ //刪除節點前先查詢該節點信息
Stat stat = queryStat(zk, nodePath); log.info("準備刪除節點,path:{},原version:{}", nodePath, stat.getVersion()); zk.delete(nodePath, stat.getVersion()); //修改節點值有兩種方法,上面是第一種,還有一種可使用回調函數及參數傳遞,與上面方法名稱相同。 //zk.delete(path, version, cb, ctx);
log.info("完成刪除節點,path:{}", nodePath); } }
4.watcher觀察者.javasession
package com.qy.zk; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 描述:ZK服務觀察者事件 * 做者:七脈 */
public class MyZkWatcher implements Watcher{ private static final Logger log = LoggerFactory.getLogger(MyZkWatcher.class); //異步鎖
private CountDownLatch cdl; //標記
private String mark; public MyZkWatcher(CountDownLatch cdl,String mark) { this.cdl = cdl; this.mark = mark; } //監聽事件處理方法
public void process(WatchedEvent event) { log.info(mark+" watcher監聽事件:{}",event); cdl.countDown(); } }
本章節用到了CountDownLatch,不會的能夠學下http://www.javashuo.com/article/p-vjuqugyq-eb.html