Disruptor (5) - 使用場景

handleEventsWith & handleEventsWithWorkerPool

在disruptor框架調用start方法以前,須要將消息的消費者指定給disruptor框架。java

  1. disruptor.handleEventsWith(EventHandler... handlers),將多個EventHandler的實現類傳入方法,封裝成一個EventHandlerGroup
  2. disruptor.handleEventsWithWorkerPool(WorkHandler... handlers),將多個WorkHandler的實現類傳入方法,封裝成一個EventHandlerGroup

不一樣點

  1. handleEventsWith方法的EventHandlerGroup中的每一個消費者都會對同一條消息m進行消費,各個消費者之間不存在競爭。
  2. handleEventsWithWorkerPool方法返回的EventHandlerGroup,Group的消費者對於同一條消息m不重複消費;若是c0消費了消息m,則c1再也不消費消息m。

對於獨立消費的消費者,應當實現EventHandler接口。對於不重複消費的消費者,應當實現WorkHandler接口。
從代碼層面而言, 有不一樣的具體實現來支持不一樣的模式app

  1. ConsumerInfo
  2. EventProcessor

消費場景

此處的測試代碼的對接口WorkHandler 進行了改造。
框架

package com.lmax.disruptor.noob;

import java.time.Instant;
import java.time.format.DateTimeFormatter;

public class CompareTest {
	public static int THREAD = 2; // 線程數量
	public static int PER = 1; // 單個線程生產數量
	public static int TOTAL_COUNT = THREAD * PER; // 數據總量
	public static int SIZE = 4; // 最大容量

	public static void main(String[] args) {
		println("線程數:" + THREAD + " 單線程生產量: " + PER + " 容量:" + SIZE + " 數據總量:" + TOTAL_COUNT);
		 DisruptorTest.execute();
	}

	public static void println(String msg) {
		System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + "[" + Thread.currentThread().getName() + "] " + msg);
	}
}

---------
import java.util.concurrent.ThreadFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

public class DisruptorTest {

	public static void execute() {

		Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>(new DataEventFactory(), CompareTest.SIZE,
				new ThreadFactory() {
			       AtomicInteger count = new AtomicInteger(0);

					@Override
					public Thread newThread(Runnable eventProcessor) {
						CompareTest.println("EventProcessor wrapper");// 對事件處理總線的封裝
						Thread thread = new Thread(eventProcessor);
						thread.setName("EventProcessor" + count.incrementAndGet());
						return thread;
					}
				});
		/**
		 * 建立EventProcessors<Runnable>.
		 * 子過程Disruptor.checkNotStarted()事件處理handler必須在啓動以前綁定.
		 */
		disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"),new DataEventHandler("dataEventHandler2"));
		 // disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"),new DataWorkHandler("dataWorkHandler2"));

		disruptor.start();
		CompareTest.println("disruptor start success!");

		RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer();
		DataProducer producer = new DataProducer(ringBuffer);
		DataEventProducerWithTranslator translator = new DataEventProducerWithTranslator(ringBuffer);
		long start = System.currentTimeMillis();

		for (int l = 0; l < CompareTest.THREAD; l++) {
			new Thread(() -> {
				for (int m = 0; m < CompareTest.PER; m++) {
					producer.onData(start);
					// translator.onData(start); 推薦用這種方式作。
				}
			}).start();
		}
	}
}

----------
import java.util.concurrent.atomic.AtomicLong;
import com.lmax.disruptor.EventHandler;

public class DataEventHandler implements EventHandler<DataEvent> {
	public AtomicLong count = new AtomicLong(0);
	public String name = null;

	public DataEventHandler(String name) {
		this.name = name;
	}

	@Override
	public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception {
		Thread.sleep(name.contentEquals("dataEventHandler1") ? 1000 : 100);
		CompareTest.println("handlerName: " + name + " 處理的sequence:" + sequence
				+ " count:" + count.incrementAndGet() + "  Disruptor 總耗時:"
				+ (System.currentTimeMillis() - event.getStartTime()));
	}

}

----------
import java.util.concurrent.atomic.AtomicLong;
import com.lmax.disruptor.WorkHandler;

public class DataWorkHandler implements WorkHandler<DataEvent> {
	public AtomicLong count = new AtomicLong(0);
	public String name = null;

	public DataWorkHandler(String name) {
		this.name = name;
	}

	@Override
	public void onEvent(DataEvent event, long sequence) throws Exception {
		Thread.sleep(name.contentEquals("dataWorkHandler2") ? 100 :1000);
		CompareTest.println("handlerName: " + name + " 處理的sequence:" + sequence + " count:" + count.incrementAndGet()
				+ "  Disruptor 總耗時:" + (System.currentTimeMillis() - event.getStartTime()));
	}
}
  1. handleEventsWith 同一消息被不一樣handler獨立消費。 此時handler處理是無序的。
    disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"), new DataEventHandler("dataEventHandler2"));

  2. 依賴串行.  對於同一消息前handler處理完結,後handler才處理。
    disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1")).then(new DataEventHandler("dataEventHandler2")).then(new DataEventHandler("dataEventHandler3"));

  3. handleEventsWithWorkerPool 不重複消費。 
    disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"), new DataWorkHandler("dataWorkHandler2"));
  4. 組合方式ide

    disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4")).then(new DataEventHandler("dataEventHandler1"),
    				new DataEventHandler("dataEventHandler2"));

     

    disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"),
    				new DataEventHandler("dataEventHandler2")).thenHandleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4"));

相關文章
相關標籤/搜索