Disruptor 實踐:整合到現有的爬蟲框架

秋天的顏色.jpg

一. Disruptor

Disruptor 是一個高性能的異步處理框架。java

Disruptor 是 LMAX 在線交易平臺的關鍵組成部分,LMAX平臺使用該框架對訂單處理速度能達到600萬TPS,除金融領域以外,其餘通常的應用中均可以用到Disruptor,它能夠帶來顯著的性能提高。其實 Disruptor 與其說是一個框架,不如說是一種設計思路,這個設計思路對於存在「併發、緩衝區、生產者—消費者模型、事務處理」這些元素的程序來講,Disruptor提出了一種大幅提高性能(TPS)的方案。git

二. 實踐

NetDiscovery 是基於 Vert.x、RxJava 2 等框架實現的爬蟲框架。github

NetDiscovery 默認的消息隊列採用 JDK 的 ConcurrentLinkedQueue,因爲爬蟲框架各個組件均可以被替換,因此下面基於 Disruptor 實現爬蟲的 Queue。編程

2.1 事件的封裝

將爬蟲的 request 封裝成一個 RequestEvent,該事件會在 Disruptor 中傳輸。網絡

import com.cv4j.netdiscovery.core.domain.Request;
import lombok.Data;

/** * Created by tony on 2018/9/1. */
@Data
public class RequestEvent {

    private Request request;

    public String toString() {

        return request.toString();
    }
}
複製代碼

2.2 發佈事件

下面編寫事件的發佈,從 RingBuffer 中獲取下一個可寫入事件的序號,將爬蟲要請求的 request 設置到 RequestEvent 事件中,最後將事件提交到 RingBuffer。多線程

import com.cv4j.netdiscovery.core.domain.Request;
import com.lmax.disruptor.RingBuffer;

import java.util.concurrent.atomic.AtomicInteger;

/** * Created by tony on 2018/9/2. */
public class Producer {

    private final RingBuffer<RequestEvent> ringBuffer;

    private AtomicInteger count = new AtomicInteger(0); // 計數器

    public Producer(RingBuffer<RequestEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void pushData(Request request){
        long sequence = ringBuffer.next();

        try{
            RequestEvent event = ringBuffer.get(sequence);
            event.setRequest(request);
        }finally {
            ringBuffer.publish(sequence);
            count.incrementAndGet();
        }
    }

    /** * 發送到隊列中到Request的數量 * @return */
    public int getCount() {

        return count.get();
    }
}
複製代碼

2.3 消費事件

RequestEvent 設置了 request 以後,消費者須要處理具體的事件。下面的 Consumer 僅僅是記錄消費者的線程名稱以及 request。真正的「消費」仍是須要從 DisruptorQueue 的 poll() 中獲取 request ,而後在 Spider 中進行「消費」。併發

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;

/** * Created by tony on 2018/9/2. */
@Slf4j
public class Consumer implements WorkHandler<RequestEvent> {

    @Override
    public void onEvent(RequestEvent requestEvent) throws Exception {

        log.info("consumer:" + Thread.currentThread().getName() + " requestEvent: value=" + requestEvent.toString());
    }
}
複製代碼

2.4 DisruptorQueue 的實現

Disruptor 支持單生產者單消費者、多生產者、多消費者、分組等方式。框架

NetDiscovery 中採用多生產者多消費者。dom

在 RingBuffer 建立時,ProducerType 使用 MULTI 類型表示多生產者。建立 RingBuffer 採用了 YieldingWaitStrategy 。YieldingWaitStrategy 是一種WaitStrategy,不一樣的 WaitStrategy 會有不一樣的性能。異步

YieldingWaitStrategy 性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啓超線程的特性。

ringBuffer = RingBuffer.create(ProducerType.MULTI,
                new EventFactory<RequestEvent>() {
                    @Override
                    public RequestEvent newInstance() {
                        return new RequestEvent();
                    }
                },
                ringBufferSize ,
                new YieldingWaitStrategy());
複製代碼

EventProcessor 用於處理 Disruptor 中的事件。

EventProcessor 的實現類包括:BatchEventProcessor 用於單線程批量處理事件,WorkProcessor 用於多線程處理事件。

WorkerPool 管理着一組 WorkProcessor。建立完 ringBuffer 以後,建立 workerPool:

SequenceBarrier barriers = ringBuffer.newBarrier();

        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = new Consumer();
        }

        workerPool = new WorkerPool<RequestEvent>(ringBuffer,
                        barriers,
                        new EventExceptionHandler(),
                        consumers);
複製代碼

啓動 workerPool:

ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        workerPool.start(Executors.newFixedThreadPool(threadNum));
複製代碼

最後是 DisruptorQueue 完整的代碼:

import com.cv4j.netdiscovery.core.domain.Request;
import com.cv4j.netdiscovery.core.queue.AbstractQueue;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/** * Created by tony on 2018/9/1. */
@Slf4j
public class DisruptorQueue extends AbstractQueue {

    private RingBuffer<RequestEvent> ringBuffer;

    private Consumer[] consumers = null;
    private Producer producer = null;
    private WorkerPool<RequestEvent> workerPool = null;
    private int ringBufferSize = 1024*1024; // RingBuffer 大小,必須是 2 的 N 次方

    private AtomicInteger consumerCount = new AtomicInteger(0);

    private static final int CONSUME_NUM = 2;
    private static final int THREAD_NUM = 4;

    public DisruptorQueue() {

        this(CONSUME_NUM,THREAD_NUM);
    }

    public DisruptorQueue(int consumerNum,int threadNum) {

        consumers = new Consumer[consumerNum];

        //建立ringBuffer
        ringBuffer = RingBuffer.create(ProducerType.MULTI,
                new EventFactory<RequestEvent>() {
                    @Override
                    public RequestEvent newInstance() {
                        return new RequestEvent();
                    }
                },
                ringBufferSize ,
                new YieldingWaitStrategy());

        SequenceBarrier barriers = ringBuffer.newBarrier();

        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = new Consumer();
        }

        workerPool = new WorkerPool<RequestEvent>(ringBuffer,
                        barriers,
                        new EventExceptionHandler(),
                        consumers);

        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        workerPool.start(Executors.newFixedThreadPool(threadNum));

        producer = new Producer(ringBuffer);
    }

    @Override
    protected void pushWhenNoDuplicate(Request request) {

        producer.pushData(request);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Request poll(String spiderName) {

        Request request = ringBuffer.get(ringBuffer.getCursor() - producer.getCount() +1).getRequest();
        ringBuffer.next();
        consumerCount.incrementAndGet();
        return request;
    }

    @Override
    public int getLeftRequests(String spiderName) {

        return producer.getCount()-consumerCount.get();
    }

    public int getTotalRequests(String spiderName) {

        return super.getTotalRequests(spiderName);
    }

    static class EventExceptionHandler implements ExceptionHandler {

        public void handleEventException(Throwable ex, long sequence, Object event) {

            log.debug("handleEventException:" + ex);
        }

        public void handleOnStartException(Throwable ex) {

            log.debug("handleOnStartException:" + ex);
        }

        public void handleOnShutdownException(Throwable ex) {

            log.debug("handleOnShutdownException:" + ex);
        }
    }
}
複製代碼

其中,pushWhenNoDuplicate() 是將 request 發送到 ringBuffer 中。poll() 是從 ringBuffer 中取出對應的 request ,用於爬蟲進行網絡請求、解析請求等處理。

總結:

爬蟲框架 github 地址:github.com/fengzhizi71…

上述代碼是比較經典的 Disruptor 多生產者多消費者的代碼,亦可做爲樣板代碼使用。

最後,在爬蟲框架是面向接口編程的,因此替換其中的任意組件都比較方便。


Java與Android技術棧:每週更新推送原創技術文章,歡迎掃描下方的公衆號二維碼並關注,期待與您的共同成長和進步。

相關文章
相關標籤/搜索