ZooKeeper 是一個開源的分佈式協調服務,ZooKeeper框架最初是在「Yahoo!"上構建的,用於以簡單而穩健的方式訪問他們的應用程序。 後來,Apache ZooKeeper成爲Hadoop,HBase和其餘分佈式框架使用的有組織服務的標準。 例如,Apache HBase使用ZooKeeper跟蹤分佈式數據的狀態。ZooKeeper 的設計目標是將那些複雜且容易出錯的分佈式一致性服務封裝起來,構成一個高效可靠的原語集,並以一系列簡單易用的接口提供給用戶使用。java
ZooKeeper 一般用於:命名服務、配置管理、集羣管理、分佈式協調/通知、分佈式鎖和分佈式隊列等等。node
各個爬蟲的節點經過註冊到 ZooKeeper 從而實現爬蟲集羣的管理。NetDiscovery 正是藉助了 ZooKeeper 的特性來監控爬蟲集羣。git
NetDiscovery 是一款基於 Vert.x、RxJava 2 等框架實現的通用爬蟲框架。它包含了豐富的特性。github
NetDiscovery 包含了 Spider 和 SpiderEngine。 Spider 用於實現爬蟲的業務邏輯,Spider 能夠添加到 SpiderEngine,由 SpiderEngine 來管理各個 Spider 的生命週期。服務器
可是 SpiderEngine 部署到每個節點以後,SpiderEngine 如何進行監控和管理呢?框架
能夠將 SpiderEngine 在運行時,先註冊到 ZooKeeper。(須要事先在 ZooKeeper 集羣建立 /netdiscovery 節點)分佈式
/** * 啓動SpiderEngine中全部的spider,讓每一個爬蟲並行運行起來。 * */
public void run() {
if (Preconditions.isNotBlank(spiders)) {
registerZK();
......  
}
}
/** * 將當前 SpiderEngine 註冊到 zookeeper 指定的目錄 /netdiscovery 下 */
private void registerZK() {
if (Preconditions.isNotBlank(zkStr) && useZk) {
log.info("zkStr: {}", zkStr);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
client.start();
try {
String ipAddr = InetAddress.getLocalHost().getHostAddress() + "-" + defaultHttpdPort + "-" + System.currentTimeMillis();
String nowSpiderEngineZNode = "/netdiscovery/" + ipAddr;
client.create().withMode(CreateMode.EPHEMERAL).forPath(nowSpiderEngineZNode,nowSpiderEngineZNode.getBytes());
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製代碼
另外,須要使用 NetDiscovery Monitor 的 CuratorManager 類。 它藉助 Zookeeper 的 Watcher 機制,監聽已經註冊到 /netdiscovery 這個父 zNode 下的各個子 zNode ,也就是各個 SpiderEngine。ide
Watcher機制是指 ZooKeeper 客戶端向 ZooKeeper 服務器註冊 Watcher 的同時,會將 Watcher 對象存儲在客戶端的 WatchManager 中。ZooKeeper 服務器觸發 Watcher 事件後,會向客戶端發送通知,客戶端線程從 WatchManager 中回調 Watcher 執行相應的功能。oop
/** * 當前所監控的父的 zNode 下如果子 zNode 發生了變化:新增,刪除,修改 * <p> * 下述方法都會觸發執行 * * @param event */
@Override
public void process(WatchedEvent event) {
List<String> newZodeInfos = null;
try {
newZodeInfos = client.getChildren().usingWatcher(this).forPath("/netdiscovery");
//根據初始化容器的長度與最新的容器的長度進行比對,就能夠推導出當前 SpiderEngine 集羣的狀態:新增,宕機/下線,變動...
//哪一個容器中元素多,就循環遍歷哪一個容器。
if (Preconditions.isNotBlank(newZodeInfos)) {
if (newZodeInfos.size()>allZnodes.size()){
//明確顯示新增了哪一個 SpiderEngine 節點
for (String nowZNode:newZodeInfos) {
if (!allZnodes.contains(nowZNode)){
log.info("新增 SpiderEngine 節點{}", nowZNode);
}
}
}else if (newZodeInfos.size()<allZnodes.size()){
// 宕機/下線
// 明確顯示哪一個 SpiderEngine 節點宕機/下線了
for (String initZNode : allZnodes) {
if (!newZodeInfos.contains(initZNode)) {
log.info("SpiderEngine 節點【{}】下線了!", initZNode);
// 若是有下線的處理,則處理(例如發郵件、短信等)
if (serverOfflineProcess!=null) {
serverOfflineProcess.process();
}
}
}
}else {
// SpiderEngine 集羣正常運行;
// 宕機/下線了,當時立刻重啓了,總的爬蟲未發生變化
}
}
} catch (Exception e) {
e.printStackTrace();
}
allZnodes = newZodeInfos;
}
複製代碼
因此須要單獨運行一個進程,例如:this
public class TestCuratorManager {
public static void main(String[] args) {
CuratorManager curatorManager = new CuratorManager();
curatorManager.start();
}
}
複製代碼
下圖反映了 ZooKeeper 如何監控 SpiderEngine 集羣。
爬蟲框架 github 地址:github.com/fengzhizi71…
本文介紹瞭如何使用 ZooKeeper 來監控爬蟲的集羣。將來,NetDiscovery 還會增長更爲通用的功能。
Java與Android技術棧:每週更新推送原創技術文章,歡迎掃描下方的公衆號二維碼並關注,期待與您的共同成長和進步。