使用zookeeper搭建分佈式定時任務可執行鎖

    最近接手一個含有定時任務的歷史項目,須要部署兩個實例,爲了減小重複打包的工做,在技術經理的建議下,決定採用zookeeper臨時節點存在與否的方式來控制各個實例下定時任務的是否執行,同時也大部分避免了宕機的狀況定時任務不會執行的狀況。java

    具體代碼實現:node

import com.loan_manage.service.MasterService;
import com.netflix.curator.RetryPolicy;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;

/**
 * 是否主節點實現
 * @author wuhanhong
 */
@Service
public class MasterServiceImpl implements MasterService {

    private final static Logger logger = LoggerFactory.getLogger(MasterServiceImpl.class);
    private CuratorFramework zk;

    @Value("${zookeeper.registry.address}")
    private String hosts;
    @Value("${zookeeper.retry.times}")
    private Integer retry;
    @Value("${zookeeper.connection.timeout}")
    private Integer connectTimeout;
    @Value(("${zookeeper.session.timeout}"))
    private Integer sessionTimeout;
    @Value("${zookeeper.nodes.path}")
    private String root;

    @PostConstruct
    public void startZookeeper() throws Exception {
        logger.debug("start zookeeper ...");
        RetryPolicy retryPolicy = new RetryUntilElapsed(1000,1000);
        zk = CuratorFrameworkFactory.newClient(hosts, retryPolicy);
        zk.start();
    }

    @PreDestroy
    private void stopZookeeper() throws Exception {
        if(zk != null) {
            logger.debug("stop zookeeper ...");
            zk.close();
        }
    }

    @Override
    public boolean isMaster() {
        try {
            if(zk == null || zk.getZookeeperClient() == null || zk.getZookeeperClient().getZooKeeper() == null ){
                return false;
            }

            Long sessionId = zk.getZookeeperClient().getZooKeeper().getSessionId();
            List<String> children = null;

            if(zk.checkExists().forPath(root) != null){
                children = zk.getChildren().forPath(root);
            }
            if(children == null || children.isEmpty()){
                zk.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(root+"/"+sessionId,"".getBytes());
                return true;
            }else{
                String child = children.get(0);
                if(sessionId.toString().equals(child)){
                    return true;
                }
            }
        } catch (Exception e) {
            logger.warn("create node failed....",e);
        }
        return false;
    }


    public String getHosts() {
        return hosts;
    }

    public void setHosts(String hosts) {
        this.hosts = hosts;
    }

    public Integer getRetry() {
        return retry;
    }

    public void setRetry(Integer retry) {
        this.retry = retry;
    }

    public Integer getConnectTimeout() {
        return connectTimeout;
    }

    public void setConnectTimeout(Integer connectTimeout) {
        this.connectTimeout = connectTimeout;
    }

    public Integer getSessionTimeout() {
        return sessionTimeout;
    }

    public void setSessionTimeout(Integer sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public String getRoot() {
        return root;
    }

    public void setRoot(String root) {
        this.root = root + "/master";
    }
}

配置文件:spring

zookeeper.registry.address=192.168.1.61:5181,192.168.1.61:5182,192.168.1.61:5181
zookeeper.retry.times = 3
zookeeper.connection.timeout = 30
zookeeper.session.timeout = 300
zookeeper.nodes.path = /tasks/nodes

    一旦有機器宕機,機器與zookeeper的連接會斷開,臨時節點就會失效,當下一臺機器再次check的時候,發現臨時節點不存在,就是建立臨時節點,將本身做爲定時任務的master,而後繼續執行定時任務。apache

相關文章
相關標籤/搜索