java springboot整合zookeeper入門教程(增刪改查)

java springboot整合zookeeper增刪改查入門教程

zookeeper的安裝與集羣搭建參考:http://www.javashuo.com/article/p-cbooxgmm-dt.htmlhtml

描述:本章主要講java代碼如何實現zookeeper節點的增刪改查,用法與解釋所有在註釋裏。java

   本教程的工程,使用maven、jdk八、springboot、zookeeper 3.4.12node

   重點:你們學會增刪改查後,不妨動腦想下,zookeeper如何實現分佈式鎖,小小的提示下,競爭建立臨時節點,建立成功者,則得到鎖。web

   注:請注意log4j2的配置,由於是java測試,並無經過web、servlet啓動程序,因此請把log4j2放在資源目錄的根目錄,我是新建了一個資源目錄包log4j,將log4j2.xml放在該目錄下。spring

一、pom.xmlsql

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
    </parent>
    
    <groupId>com.qy.code</groupId>
    <artifactId>qy-zk</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>pom</packaging>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
        <maven.test.skip>true</maven.test.skip>
        <java.version>1.8</java.version>
        <spring.boot.version>2.0.1.RELEASE</spring.boot.version>
        <qy.code.version>0.0.1-SNAPSHOT</qy.code.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <!-- 不使用springboot默認log -->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
            <!-- 排除衝突jar -->
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
    </dependencies>
    
    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
    
    
    <build>
        <plugins>
            <!-- 要將源碼放上去,須要加入這個插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <configuration>
                    <attach>true</attach>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2.log4j2.xmlapache

注:請注意log4j2的配置,由於是java測試,並無經過web、servlet啓動程序,因此請把log4j2放在資源目錄的根目錄,我是新建了一個資源目錄包log4j,將log4j2.xml放在該目錄下。安全

<?xml version="1.0" encoding="UTF-8"?>
<!-- log4j2 自己日誌打印級別,以及從新刷新配置文件的時間-->
<Configuration status="WARN" monitorInterval="5">
    <Properties>
        <Property name="log-path">logs</Property>
        <Property name="log-file-temp">temp.log</Property>
        <Property name="log-file-info">info.log</Property>
        <Property name="log-file-warn">warn.log</Property>
        <Property name="log-file-error">error.log</Property>
        <!-- 輸出格式 -->
        <!-- <Property name="pattern">%p [%date{yyyy-MM-dd HH:mm:ss,SSS}] [%thread] %l %m %n </Property> -->
        <Property name="pattern">%m %n </Property>
        <!-- 日誌切割的最小單位 -->
        <property name="every_file_size">1M</property>
    </Properties>
    <Appenders>
        <!-- 重置太打印 打印debug及以上級別 -->
        <Console name="Console-Appender" target="SYSTEM_OUT">
            <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" />
            <PatternLayout pattern="${pattern}"/>
        </Console>
        
           <!--文件會打印出全部信息,這個log每次運行程序會自動清空,由append屬性決定,這個也挺有用的,適合臨時測試用-->
        <File name="RollingFile-Appender-Temp" fileName="${log-path}/${log-file-temp}" append="false">
            <PatternLayout pattern="${pattern}"/>
        </File> 
        <!-- 這個會打印出全部的信息,每次大小超過size,則這size大小的日誌會自動存入按年份-月份創建的文件夾下面並進行壓縮,做爲存檔-->
        <RollingFile name="RollingFile-Appender-INFO" fileName="${log-path}/${log-file-info}" append="true" filePattern="${log-path}/$${date:yyyy-MM}/info-%d{yyyy-MM-dd}-%i.log">
            <!-- 只輸出INFO級別 -->
            <Filters>
                <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY" />
                <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL" />
                <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL" />
            </Filters>
            <PatternLayout pattern="${pattern}" />
            <SizeBasedTriggeringPolicy size="${every_file_size}"/>
            <!-- DefaultRolloverStrategy屬性如不設置,則默認爲最多同一文件夾下7個文件,這裏設置了10 -->
            <DefaultRolloverStrategy max="10"/>
        </RollingFile>
        <RollingFile name="RollingFile-Appender-WARN" fileName="${log-path}/${log-file-warn}" append="true" filePattern="${log-path}/$${date:yyyy-MM}/warn-%d{yyyy-MM-dd}-%i.log">
            <!-- 只輸出Warn級別 -->
            <Filters>
                <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
                <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL" />
            </Filters>
            <PatternLayout pattern="${pattern}" />
            <SizeBasedTriggeringPolicy size="${every_file_size}"/>
            <!-- DefaultRolloverStrategy屬性如不設置,則默認爲最多同一文件夾下7個文件,這裏設置了10 -->
            <DefaultRolloverStrategy max="10"/>
        </RollingFile>
        <RollingFile name="RollingFile-Appender-ERROR" fileName="${log-path}/${log-file-error}" append="true" filePattern="${log-path}/$${date:yyyy-MM}/error-%d{yyyy-MM-dd}-%i.log">
            <!-- 只輸出ERROR級別 -->
            <Filters>
                <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
            </Filters>
            <PatternLayout pattern="${pattern}" />
            <SizeBasedTriggeringPolicy size="${every_file_size}"/>
            <!-- DefaultRolloverStrategy屬性如不設置,則默認爲最多同一文件夾下7個文件,這裏設置了10 -->
            <DefaultRolloverStrategy max="10"/>
        </RollingFile>
    </Appenders>
    <Loggers>
        <logger name="java.sql" level="debug" additivity="false">
            <appender-ref ref="Console-Appender"/>
        </logger>
        <logger name="com.ibatis" level="debug" additivity="false">
            <appender-ref ref="Console-Appender"/>
        </logger>
        <!-- 第三方的軟件日誌級別 -->
        <logger name="org.springframework" level="info" additivity="true">
            <AppenderRef ref="RollingFile-Appender-WARN" />
            <AppenderRef ref="RollingFile-Appender-ERROR" />
        </logger>
        <!-- 第三方的軟件日誌級別 -->
        <logger name="org.apache" level="warn" additivity="true">
            <AppenderRef ref="RollingFile-Appender-WARN" />
            <AppenderRef ref="RollingFile-Appender-ERROR" />
        </logger>
        <!-- 異步輸出 -->
        <Root level="INFO">
            <AppenderRef ref="Console-Appender"/>
            <AppenderRef ref="RollingFile-Appender-Temp"/>
            <AppenderRef ref="RollingFile-Appender-INFO" />
            <AppenderRef ref="RollingFile-Appender-WARN" />
            <AppenderRef ref="RollingFile-Appender-ERROR" />
        </Root>
    </Loggers>
</Configuration>

3.增刪改查.javaspringboot

package com.qy.zk; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; /** * 描述:zookeeper增刪改查 * 做者:七脈 * 重點:刪除、修改、查詢所用的version版本號,分佈式事務鎖的實現方式,樂觀鎖。ACL權限:全部人、限定人、限定IP等 */
public class MyZkConnect{ private static  Logger log=LogManager.getLogger(MyZkConnect.class); //集羣節點
    public static final String zkServerClusterConnect = "192.168.159.129:2181,192.168.159.129:2182,192.168.159.129:2183"; //單一節點
    public static final String zkServerSingleConnect = "192.168.159.129:2181"; //超時毫秒數
    public static final int timeout = 3000; public static void main(String[] args) throws InterruptedException, IOException, KeeperException { //創建鏈接
        ZooKeeper zk = connect(); //zk.close();//關閉後不支持重連
        log.info("zk 狀態:"+zk.getState()); /**恢復會話鏈接**/
        //long sessionId = zk.getSessionId(); //byte[] sessionPasswd = zk.getSessionPasswd(); //zk2會話重連後,zk會話將失效,再也不支持作增刪改查操做。 //ZooKeeper zk2 = reconnect(sessionId, sessionPasswd);
        
        /**建立節點**/
        //create(zk, "/myzk", "myzk");
        
        /**查詢節點Data**/ queryData(zk, "/myzk"); /**修改節點data**/
        //update(zk, "/myzk", "myzk-update");
        
        /**刪除節點**/
        //delete(zk, "/myzk");
 } /** * 描述:創建鏈接 * 做者:七脈 * @return * @throws IOException * @throws InterruptedException */
    public static ZooKeeper connect() throws IOException, InterruptedException{ CountDownLatch cdl = new CountDownLatch(1); log.info("準備創建zk服務"); ZooKeeper zk = new ZooKeeper(zkServerClusterConnect, timeout, new MyZkWatcher(cdl,"創建鏈接")); log.info("完成創建zk服務"); cdl.await();//這裏爲了等待wather監聽事件結束
        return zk; } /** * 描述:從新鏈接服務 * 做者:七脈 * @param sessionId 現有會話ID * @param sessionPasswd 現有會話密碼 * @return * @throws IOException * @throws InterruptedException * 重點:關閉後的會話鏈接,不支持重連。重連後,前會話鏈接將會失效。 */
    public static ZooKeeper reconnect(long sessionId, byte[] sessionPasswd) throws IOException, InterruptedException{ CountDownLatch cdl = new CountDownLatch(1); log.info("準備從新鏈接zk服務"); ZooKeeper zk = new ZooKeeper(zkServerClusterConnect, timeout, new MyZkWatcher(cdl,"從新鏈接"), sessionId, sessionPasswd); log.info("完成從新鏈接zk服務"); cdl.await();//這裏爲了等待wather監聽事件結束
        return zk; } /** * 描述:建立節點 * 做者:七脈 * @param zk * @param nodePath * @param nodeData * @throws KeeperException * @throws InterruptedException */
    public static void create(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{ log.info("開始建立節點:{}, 數據:{}",nodePath,nodeData); List<ACL> acl = Ids.OPEN_ACL_UNSAFE; CreateMode createMode = CreateMode.PERSISTENT; String result = zk.create(nodePath, nodeData.getBytes(), acl, createMode); //建立節點有兩種,上面是第一種,還有一種可使用回調函數及參數傳遞,與上面方法名稱相同。
        log.info("建立節點返回結果:{}",result); log.info("完成建立節點:{}, 數據:{}",nodePath,nodeData); } /** * 描述:查詢節點結構信息 * 做者:七脈 * @param zk * @param nodePath * @return * @throws KeeperException * @throws InterruptedException */
    public static Stat queryStat(ZooKeeper zk,String nodePath) throws KeeperException, InterruptedException{ log.info("準備查詢節點Stat,path:{}", nodePath); Stat stat = zk.exists(nodePath, false); log.info("結束查詢節點Stat,path:{},version:{}", nodePath, stat.getVersion()); return stat; } /** * 描述:查詢節點Data值信息 * 做者:七脈 * @param zk * @param nodePath * @return * @throws KeeperException * @throws InterruptedException */
    public static String queryData(ZooKeeper zk,String nodePath) throws KeeperException, InterruptedException{ log.info("準備查詢節點Data,path:{}", nodePath); String data = new String(zk.getData(nodePath, false, queryStat(zk, nodePath))); log.info("結束查詢節點Data,path:{},Data:{}", nodePath, data); return data; } /** * 描述:修改節點 * 做者:七脈 * @param zk * @param nodePath * @param nodeData * @throws KeeperException * @throws InterruptedException * 重點:每次修改節點的version版本號都會變動,因此每次修改都須要傳遞節點原版本號,以確保數據的安全性。 */
    public static Stat update(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{ //修改節點前先查詢該節點信息
        Stat stat = queryStat(zk, nodePath); log.info("準備修改節點,path:{},data:{},原version:{}", nodePath, nodeData, stat.getVersion()); Stat newStat = zk.setData(nodePath, nodeData.getBytes(), stat.getVersion()); //修改節點值有兩種方法,上面是第一種,還有一種可使用回調函數及參數傳遞,與上面方法名稱相同。 //zk.setData(path, data, version, cb, ctx);
        log.info("完成修改節點,path:{},data:{},現version:{}", nodePath, nodeData, newStat.getVersion()); return stat; } /** * 描述:刪除節點 * 做者:七脈 * @param zk * @param nodePath * @throws InterruptedException * @throws KeeperException */
    public static void delete(ZooKeeper zk,String nodePath) throws InterruptedException, KeeperException{ //刪除節點前先查詢該節點信息
        Stat stat = queryStat(zk, nodePath); log.info("準備刪除節點,path:{},原version:{}", nodePath, stat.getVersion()); zk.delete(nodePath, stat.getVersion()); //修改節點值有兩種方法,上面是第一種,還有一種可使用回調函數及參數傳遞,與上面方法名稱相同。 //zk.delete(path, version, cb, ctx);
        log.info("完成刪除節點,path:{}", nodePath); } }

4.watcher觀察者.javasession

package com.qy.zk; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 描述:ZK服務觀察者事件 * 做者:七脈 */
public class MyZkWatcher implements Watcher{ private static final Logger log = LoggerFactory.getLogger(MyZkWatcher.class); //異步鎖
    private CountDownLatch cdl; //標記
    private String mark; public MyZkWatcher(CountDownLatch cdl,String mark) { this.cdl = cdl; this.mark = mark; } //監聽事件處理方法
    public void process(WatchedEvent event) { log.info(mark+" watcher監聽事件:{}",event); cdl.countDown(); } }

本章節用到了CountDownLatch,不會的能夠學下http://www.javashuo.com/article/p-vjuqugyq-eb.html

 

源碼:https://pan.baidu.com/s/1nWQvN9l2SsGupY6LPP0gTg

相關文章
相關標籤/搜索