分佈式學習(一)——基於ZooKeeper的隊列爬蟲

zookeeper
一直琢磨着分佈式的東西怎麼搞,公司也沒有相關的項目可以參與,因此仍是迴歸本身的專長來吧——基於ZooKeeper的分佈式隊列爬蟲,因爲沒什麼人可以一塊兒溝通分佈式的相關知識,下面的小項目純屬「胡編亂造」。
簡單介紹下ZooKeeper:ZooKeeper是一個分佈式的,開放源碼的分佈式應用程序協調服務,是Google的Chubby一個開源的實現,它是集羣的管理者,監視着集羣中各個節點的狀態根據節點提交的反饋進行下一步合理操做。最終,將簡單易用的接口和性能高效、功能穩定的系統提供給用戶。
基本的知識就不過多介紹了,能夠參考參考下面這些人的:
ZooKeeper官網
http://www.cnblogs.com/wuxl360/p/5817471.htmljavascript

1、總體架構

這張圖來自skyme,我也是看了這張圖的啓發寫了這篇文章的。
html

最基本的分佈式隊列即一個生產者不斷抓取連接,而後將連接存儲進ZooKeeper的隊列節點裏,每一個節點的value都只是連接,而後消費者從中獲取一條url進行抓取。本項目生產這主要是用來生產URL便可,這部分就不要要求太多。而後是消費者,消費者須要解決的問題有:
1.隊列如何保證本身的分發正確;
2.消費這如何進行高效的抓取。java

2、ZooKeeper隊列原理

2.1 介紹

分佈式隊列,目前此類產品大多相似於ActiveMQ、RabbitMQ等,本文主要介紹的是Zookeeper實現的分佈式隊列,它的實現方式也有兩種,一種是FIFO(先進先出)的隊列,另外一種是等待隊列元素彙集以後才統一安排的Barrier模型。一樣,本文主要講的是FIFO的隊列模型。其大致設計思路也很簡單,主要是在/SinaQueue下建立順序節點,如/SinaQueue/qn-000000000,建立完節點以後,根據下面的4個步驟來決定執行的順序。
1.經過調用getChildren()接口來獲取某一節點下的全部節點,即獲取隊列中的全部元素。
2.肯定本身的節點序號在全部子節點中的順序。
3.若是本身不是序號最小的子節點,那麼就須要進入等待,同時向比本身序號小的最後一個節點註冊Watcher監聽。
4.接收到Watcher通知後,重複步驟1。
node

2.2 Watcher介紹

znode以某種方式發生變化時,「觀察」(watch)機制可讓客戶端獲得通知.能夠針對ZooKeeper服務的「操做」來設置觀察,該服務的其餘 操做能夠觸發觀察。
1.Watch是一次性的,每次都須要從新註冊,而且客戶端在會話異常結束時不會收到任何通知,而快速重鏈接時仍不影響接收通知。
2.Watch的回調執行都是順序執行的,而且客戶端在沒有收到關注數據的變化事件通知以前是不會看到最新的數據,另外須要注意不要在Watch回調邏輯中阻塞整個客戶端的Watch回調
3.Watch是輕量級的,WatchEvent是最小的通訊單元,結構上只包含通知狀態、事件類型和節點路徑。ZooKeeper服務端只會通知客戶端發生了什麼,並不會告訴具體內容。git

2.3 源碼

在csdn上找到了某我的寫的這個過程,使用的是ZKClient,有興趣能夠看看傑布斯的博客,可是沒有實現上面過程的第三步(Watcher相關的),這裏,咱們使用的是Zookeeper的另外一個客戶端工具curator,其中,curator實現了各類Zookeeper的特性,如:Election(選舉),Lock(鎖),Barrier(關卡),Atonmic(原子量),Cache(緩存),Queue(隊列)等。咱們來看看Curator實現的簡單的分佈式隊列的源碼。github

public class SimpleDistributedQueue {
    ...
    private final CuratorFramework client;//鏈接Zookeeper的客戶端
    private final String path;//路徑
    private final EnsureContainers ensureContainers;//確保原子特性
    private final String PREFIX = "qn-";//順序節點的贊成前綴,使用qn-
    ...

其中PREFIX是用來生成順序節點的,默認不可更改,將生成的路徑賦予給path,而後向節點賦予數據。下面是賦予數據的代碼web

public boolean offer(byte[] data) throws Exception {
        String thisPath = ZKPaths.makePath(this.path, "qn-");//生成的路徑
        ((ACLBackgroundPathAndBytesable)this.client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath(thisPath, data);//若是沒有路徑將生成持久化的路徑而後存儲節點的數據。
        return true;
    }

最關鍵的來了,隊列如何保證本身的分發正確?SimpleDistributedQueue使用take()來取得隊列的頭部,而後將頭部刪掉,這一過程的一致性是經過CountDownLatch和Watcher來實現的。apache

public byte[] take() throws Exception {//直接調用interPoll,並將超時的設置爲0;
        return this.internalPoll(0L, (TimeUnit)null);
    }
    private byte[] internalPoll(long timeout, TimeUnit unit) throws Exception {
        ...//忽略超時的設置代碼
        while(true) {
            final CountDownLatch latch = new CountDownLatch(1);//定義一個latch,設置爲1,先加鎖,而後執行完任務後再釋放鎖
            Watcher watcher = new Watcher() {
                public void process(WatchedEvent event) {
                    latch.countDown();
                }
            };
            byte[] bytes;
            try {
                bytes = this.internalElement(true, watcher);//調用internalElement函數來獲取字節流
            } catch (NoSuchElementException var17) {
            }
            ...
            if (hasTimeout) {
                long elapsedMs = System.currentTimeMillis() - startMs;
                long thisWaitMs = maxWaitMs - elapsedMs;
                if (thisWaitMs <= 0L) {    //若是等待超時了則返回爲空
                    return null;
                }
                latch.await(thisWaitMs, TimeUnit.MILLISECONDS);
            } else {
                latch.await();
            }
        }
    }
    private byte[] internalElement(boolean removeIt, Watcher watcher) throws Exception {
            this.ensurePath();
            List nodes;
            try {
                nodes = watcher != null ? (List)((BackgroundPathable)this.client.getChildren().usingWatcher(watcher)).forPath(this.path) : (List)this.client.getChildren().forPath(this.path);//獲取節點下的全部子節點註冊監聽(watcher默認都不是爲空的,每個都註冊)
            } catch (NoNodeException var8) {
                throw new NoSuchElementException();
            }
    
            Collections.sort(nodes);//對節點進行排序
            Iterator var4 = nodes.iterator();
            while(true) {//遍歷
                while(var4.hasNext()) {
                    String node = (String)var4.next();//取得當前頭結點
                    if (node.startsWith("qn-")) {
                        String thisPath = ZKPaths.makePath(this.path, node);
                        try {
                            byte[] bytes = (byte[])this.client.getData().forPath(thisPath);
                            if (removeIt) {
                                this.client.delete().forPath(thisPath);//刪除該節點
                            }
                            return bytes;//返回節點的字節流
            ...
        }

3、多線程併發

對於分佈式爬蟲來講,讓每個消費者高效的進行抓取是具備重要意義的,爲了加快爬蟲的速度,採用多線程爬蟲的方法。Java多線程實現方式主要有三種:繼承Thread類、實現Runnable接口、使用ExecutorService、Callable、Future實現有返回結果的多線程。其中前兩種方式線程執行完後都沒有返回值,只有最後一種是帶返回值的。其中使用Executors提供了四種聲明線程池的方法,分別是newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor和newScheduledThreadPool,爲了監控實時監控隊列的長度,咱們使用數組型的阻塞隊列ArrayBlockingQueue。聲明方式以下:數組

private static final BlockingQueue<Runnable> queuelength = new ArrayBlockingQueue<>(1000);
    ExecutorService es = new ThreadPoolExecutor(CORE, CORE,
            0L, TimeUnit.MILLISECONDS,
            queuelength);

4、使用

本次實驗主要環境以下:緩存

zookeeper.version=3.5
java.version=1.8.0_65
os.arch=amd64
i5 四核心CPU
網速爲中國電信100M

這裏主要是對博客園中的前兩千條博客進行爬取,本文主要是對分佈式隊列的理解,就再也不進行什麼難度的處理(好比元素的選取、數據的存儲等),只輸出每篇博客的title便可。
生產者代碼:

public class Producer {
    //logger
    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    public static final CuratorFramework client = CuratorFrameworkFactory.builder().connectString("119.23.46.71:2181")
            .sessionTimeoutMs(1000)
            .connectionTimeoutMs(1000)
            .canBeReadOnly(false)
            .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
            .defaultData(null)
            .build();
    private static SimpleDistributedQueue queue = new SimpleDistributedQueue(client, "/Queue");
    private static Integer j = 0;

    public static void begin(String url) {//對博客園的每一頁進行爬取
        try {
            String content = HttpHelper.getInstance().get(url);
            resolveweb(content);
        } catch (Exception e) {
            logger.error("", e);
        }
    }
    public static void resolveweb(String content) throws Exception {
        Elements elements = Jsoup.parse(content).select("a.titlelink");//對每篇博客的標題進行獲取
        for (Element element : elements) {
            String url = element.attr("href");//
            if (StringUtils.isNotEmpty(url) && !url.contains("javascript") && !url.contains("jump")) {//去除a中調用href過程
                logger.info(url + " " + String.valueOf(j++));
                queue.offer(url.getBytes());
            }
        }
    }

    public static void main(String[] args) {
        client.start();
        for (int i = 0; i < 100; i++) {
            begin("https://www.cnblogs.com/#p" + String.valueOf(i));
        }
    }
}

消費者

public class Consumer {
    //logger
    private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
    private static final CuratorFramework client = CuratorFrameworkFactory.builder().connectString("119.23.46.71:2181")
            .sessionTimeoutMs(1000)
            .connectionTimeoutMs(1000)
            .canBeReadOnly(false)
            .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
            .defaultData(null)
            .build();
    private static SimpleDistributedQueue queue = new SimpleDistributedQueue(client, "/SinaQueue");
    private static Integer i = 0;
    private static final Integer CORE = Runtime.getRuntime().availableProcessors();
    //聲明爲一個數組型的阻塞隊列,這裏限制大小爲
    private static final BlockingQueue<Runnable> queuelength = new ArrayBlockingQueue<>(1000);

    static class CBCrawler implements Runnable {
        private String url;

        public CBCrawler(String url) {
            this.url = url;
        }

        @Override
        public void run() {
            String content = HttpHelper.getInstance().get(url);
            logger.info(url + " " + Jsoup.parse(content).title());//打印網頁的標題
        }
    }

    public static void begin() {
        try {
            ExecutorService es = new ThreadPoolExecutor(CORE, CORE,
                    0L, TimeUnit.MILLISECONDS,
                    queuelength);
            while (client.getChildren().forPath("/SinaQueue").size() > 0) {
                CBCrawler crawler = new CBCrawler(new String(queue.take()));
                es.submit(crawler);//執行爬蟲
                i = i + 1;
                logger.info(String.valueOf(i) + " is finished\n" + " queue size is" + queuelength.size());//監控當前隊列的長度
            }
            if (!es.isShutdown()) {//若是線程池沒有關閉則關閉
                es.shutdown();
            }
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        client.start();
        begin();
        client.close();
        logger.info("start time: " + start);
        long end = System.currentTimeMillis();
        logger.info("end time: " + end);
        logger.info("take time: " + String.valueOf(end - start));//記錄開始時間和結束時間
    }
}

因爲在隊列的take中使用了CountDownLatch和Collections.sort(nodes)進行排序,耗時過程變長了很多,2000個節點,單臺服務器和多臺服務器的耗時是同樣的,都是9分鐘,具體實驗見下面。

實驗結果

生產者生產URL:

單機模式下的消費者,耗時:560825/(1000*60)=9分鐘

分佈式模式下的抓取:

耗時:564374/(1000*60)=9分鐘:

由圖可見,當每一個消費者處理能力大於隊列分配的能力時,耗時的過程反而是在隊列,畢竟分佈式隊列在進行take動做的時候對節點進行了加鎖,還要對隊列進行排序,特別是在節點多達2000+的狀況下,耗時是十分嚴重的。

實驗二

實驗二的主要解決的問題是將消費者處理的耗時延長,咱們使用Thread.sleep(n)來模擬時長。因爲博客園忽然連不上,爲了減小這種不可控的故障,抓取的網頁改成新浪,並將抓取後的URL以文本形式保存下來。

public static void sleepUtil(Integer time) {
    try {
        Thread.sleep(time * 1000);
    } catch (Exception e) {
        logger.error("線程sleep異常", e);
    }
}

此時再看程序的輸出,能夠看出,隊列的分發能力已經大於消費者的處理能力,總算是正常了。

分佈式隊列分發的時間是:341998/(1000*60)=5.6分鐘

2017-10-30  08:55:48.458 [main] INFO  com.crawler.Consumer - start time: 1509324606460
2017-10-30  08:55:48.458 [main] INFO  com.crawler.Consumer - end time: 1509324948458
2017-10-30  08:55:48.458 [main] INFO  com.crawler.Consumer - take time: 341998

兩臺機子抓取完畢的耗時分別是:

A服務器:08:49:54.509——09:02:07  
B服務器:08:49:54.509——09:05:05

單機的時候分發時間是:353198/(1000*60)=5.8分鐘

2017-10-30  09:30:25.812 [main] INFO  com.crawler.Consumer - start time: 1509326672614
2017-10-30  09:30:25.812 [main] INFO  com.crawler.Consumer - end time: 1509327025812
2017-10-30  09:30:25.812 [main] INFO  com.crawler.Consumer - take time: 353198

耗時

09:24:33.391——09:51:44.733

分佈式下平均耗時約爲13分鐘,單機模式下耗時約爲27分鐘,仍是蠻符合估算的。

總結

源代碼都放在這裏了,有興趣的能夠star一下或者下載看一下,也歡迎你們提提意見,沒企業級的實戰環境,見笑了O(∩_∩)O~

歡迎訪問個人我的網站
我的網站網址:http://www.wenzhihuai.com
我的網站代碼地址:https://github.com/Zephery/newblog

相關文章
相關標籤/搜索