Disruptor (3) - 與ArrayBlockingQueue的性能比對

本次代碼測試基於相同的 容量、生產線程數、單個線程生產量; 僅有一個消費線程。java

修改各參數獲得的結果:併發

數據規模、併發線程數、 最主要的是容量小時:Disruptor沒有優點app

2019-08-29T07:42:35.235Z  線程數:64 單線程生產量: 2048 容量:32 數據總量:131072
2019-08-29T07:42:48.742Z  EventProcessor wrapper
2019-08-29T07:42:48.743Z  disruptor start success!
2019-08-29T07:42:51.113Z  處理的sequence:131071 count:131072  Disruptor 總耗時:2369

2019-08-29T07:42:36.200Z  ArrayBlockingQueue 生產耗時:962
2019-08-29T07:42:36.200Z  處理count:131072  ArrayBlockingQueue 消費耗時:962
2019-08-29T07:42:36.201Z  ArrayBlockingQueue 總耗時:963
2019-08-29T08:24:38.641Z  線程數:512 單線程生產量: 2048 容量:32 數據總量:1048576
2019-08-29T08:24:38.670Z  EventProcessor wrapper
2019-08-29T08:24:38.670Z  disruptor start success!
2019-08-29T08:25:08.590Z  處理的sequence:1048575 count:1048576  Disruptor 總耗時:29918

2019-08-29T08:25:54.753Z  處理count:1048576  ArrayBlockingQueue 消費耗時:9231
2019-08-29T08:25:54.753Z  ArrayBlockingQueue 生產耗時:9230
2019-08-29T08:25:54.753Z  ArrayBlockingQueue 總耗時:9231

增大容量:  Disruptor的性能上升ide

2019-08-29T07:40:28.980Z  線程數:64 單線程生產量: 2048 容量:128 數據總量:131072
2019-08-29T07:40:29.008Z  EventProcessor wrapper
2019-08-29T07:40:29.008Z  disruptor start success!
2019-08-29T07:40:29.694Z  處理的sequence:131071 count:131072  Disruptor 總耗時:685

2019-08-29T07:47:42.436Z  處理count:131072  ArrayBlockingQueue 消費耗時:508
2019-08-29T07:47:42.436Z  ArrayBlockingQueue 生產耗時:508
2019-08-29T07:47:42.436Z  ArrayBlockingQueue 總耗時:508
2019-08-29T07:43:39.073Z  線程數:64 單線程生產量: 2048 容量:512 數據總量:131072
2019-08-29T07:43:39.101Z  EventProcessor wrapper
2019-08-29T07:43:39.101Z  disruptor start success!
2019-08-29T07:43:39.269Z  處理的sequence:131071 count:131072  Disruptor 總耗時:167

2019-08-29T07:43:53.722Z  ArrayBlockingQueue 生產耗時:383
2019-08-29T07:43:53.722Z  處理count:131072  ArrayBlockingQueue 消費耗時:383
2019-08-29T07:43:53.722Z  ArrayBlockingQueue 總耗時:383
2019-08-29T07:44:05.995Z  線程數:64 單線程生產量: 2048 容量:1024 數據總量:131072
2019-08-29T08:18:10.426Z  EventProcessor wrapper
2019-08-29T08:18:10.426Z  disruptor start success!
2019-08-29T08:18:10.524Z  處理的sequence:131071 count:131072  Disruptor 總耗時:97

2019-08-29T07:44:06.365Z  ArrayBlockingQueue 生產耗時:367
2019-08-29T07:44:06.365Z  處理count:131072  ArrayBlockingQueue 消費耗時:367
2019-08-29T07:44:06.365Z  ArrayBlockingQueue 總耗時:367

再增大各指標參數: Disruptor優點愈來愈明顯高併發

2019-08-29T07:50:59.911Z  線程數:64 單線程生產量: 65536 容量:1048576 數據總量:4194304
2019-08-29T07:51:28.075Z  EventProcessor wrapper
2019-08-29T07:51:28.075Z  disruptor start success!
2019-08-29T07:51:28.577Z  處理的sequence:4194303 count:4194304  Disruptor 總耗時:501

2019-08-29T07:51:11.549Z  ArrayBlockingQueue 生產耗時:11633
2019-08-29T07:51:11.575Z  處理count:4194304  ArrayBlockingQueue 消費耗時:11659
2019-08-29T07:51:11.575Z  ArrayBlockingQueue 總耗時:11659
2019-08-29T07:57:22.994Z  線程數:128 單線程生產量: 65536 容量:1048576 數據總量:8388608
2019-08-29T07:57:23.074Z  EventProcessor wrapper
2019-08-29T07:57:23.074Z  disruptor start success!
2019-08-29T07:57:24.036Z  處理的sequence:8388607 count:8388608  Disruptor 總耗時:961

2019-08-29T07:58:25.567Z  ArrayBlockingQueue 生產耗時:47941
2019-08-29T07:58:25.646Z  處理count:8388608  ArrayBlockingQueue 消費耗時:48020
2019-08-29T07:58:25.647Z  ArrayBlockingQueue 總耗時:48021

再大線程數, ArrayBlockingQueue 更耗時了,而Disruptor仍舊很快性能

2019-08-29T08:05:17.927Z  線程數:256 單線程生產量: 65536 容量:1048576 數據總量:16777216
2019-08-29T08:05:18.026Z  EventProcessor wrapper
2019-08-29T08:05:18.027Z  disruptor start success!
2019-08-29T08:05:20.060Z  處理的sequence:16777215 count:16777216  Disruptor 總耗時:2032

經測試發現: 測試

  1. 容量大小 與 消費者的消費速度  與整個耗時 成正比。
  2. Disruptor的性能在高併發、高數據規模(bufferSize 要大些)時表現更突出。
  3.  Disruptor與LinkedBlockingQueue(比ArrayBlockingQueue性能好些)比對而言,當bufferSize大些的時候,也有優點。 

測試入口

package com.lmax.disruptor.noob;

import java.time.Instant;
import java.time.format.DateTimeFormatter;

/**
 * 擔憂影響, 分開執行測試
 * 
 * @author admin
 *
 */
public class CompareTest {
	public static int THREAD = 2 << 8; // 線程數量
	public static int PER = 2 << 10; // 單個線程生產數量
	public static int TOTAL_COUNT = THREAD * PER; // 數據總量
	public static int SIZE =32; // 最大容量

	public static void main(String[] args) {
		println("線程數:" + THREAD + " 單線程生產量: " + PER + " 容量:" + SIZE + " 數據總量:" + TOTAL_COUNT);
		 new Thread(() -> ArrayBlockingQueueTest.execute()).start();
		 //	  new Thread(() -> DisruptorTest.execute()).start();
	}

	public static void println(String msg) {
		System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + "  " + msg);
	}
}

ArrayBlockingQueue 測試用例

package com.lmax.disruptor.noob;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class ArrayBlockingQueueTest {

	public static void execute() {
		ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(CompareTest.SIZE);

		AtomicBoolean endP = new AtomicBoolean(false);
		AtomicBoolean endC = new AtomicBoolean(false);
		long startTime = System.currentTimeMillis();
		AtomicLong count = new AtomicLong(0);
		for (int i = 0; i < CompareTest.THREAD; i++) {
			final int m = i;
			new Thread(() -> {
				for (int j = 0; j < CompareTest.PER; j++) {
					try {
						queue.put("i" + m + "j" + j); // 隊列不夠,等待生產
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					if (count.incrementAndGet() == CompareTest.TOTAL_COUNT) {
						CompareTest.println("ArrayBlockingQueue 生產耗時:" + (System.currentTimeMillis() - startTime));
						endP.set(true);
					}
				}
			}).start();
		}

		new Thread(() -> {
			AtomicLong consumerCount = new AtomicLong(0);
			while (true) {
				try {
					queue.take(); // 直到消費完全部信息
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				if (consumerCount.incrementAndGet() == CompareTest.TOTAL_COUNT) {
					break;
				}
			}
			CompareTest.println("處理count:" + consumerCount.get() + "  ArrayBlockingQueue 消費耗時:"
					+ (System.currentTimeMillis() - startTime));
			endC.set(true);
		}).start();

		while (!(endC.get() && endP.get())) {}
		
       CompareTest.println("ArrayBlockingQueue 總耗時:" + (System.currentTimeMillis() - startTime));

	}
}

Disruptor 測試用例

package com.lmax.disruptor.noob;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

public class DisruptorTest {
	public static void execute() {

		Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>(new DataEventFactory(), CompareTest.SIZE,
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable eventProcessor) {
						CompareTest.println("EventProcessor wrapper");// 對事件處理總線的封裝
						Thread thread = new Thread(eventProcessor);
						thread.setName("EventProcessorWrapper");
						return thread;
					}
				});
		/**
		 * 建立EventProcessors<Runnable>.
		 * 子過程Disruptor.checkNotStarted()事件處理handler必須在啓動以前綁定.
		 */
		disruptor.handleEventsWith(new DataEventHandler());

		disruptor.start();
		CompareTest.println("disruptor start success!");

		RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer();
		DataProducer producer = new DataProducer(ringBuffer);
		DataEventProducerWithTranslator translator = new DataEventProducerWithTranslator(ringBuffer);
		long start = System.currentTimeMillis();

		for (int l = 0; l < CompareTest.THREAD; l++) {
			new Thread(() -> {
				for (int m = 0; m < CompareTest.PER; m++) {
					producer.onData(start);
					// translator.onData(start);
				}
			}).start();
		}
		/**
		 * 關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理;並不會自動關閉外部指定的executor,須要主動關閉
		 */
		// disruptor.shutdown();
        // 	CompareTest.println("disruptor shutdown success!");
		// executor.shutdown();
	}
}

事件this

package com.lmax.disruptor.noob;

/**
 * 事件實例封裝 業務數據傳遞對象
 * 
 * @author admin
 *
 */
public class DataEvent {
	private long startTime;

	public long getStartTime() {
		return startTime;
	}

	public void setStartTime(long startTime) {
		this.startTime = startTime;
	}

}
---

package com.lmax.disruptor.noob;
import com.lmax.disruptor.EventFactory;

/*
 * 構建傳遞的數據封裝對象, 在初始化ringBuffer時,直接給entries[]每一個地址上初始化DataEvent
 */
public class DataEventFactory implements EventFactory {

	@Override
	public Object newInstance() {
		return new DataEvent();
	}

}

生產事件發佈atom

package com.lmax.disruptor.noob;

import com.lmax.disruptor.RingBuffer;

public class DataProducer {
	private final RingBuffer<DataEvent> ringBuffer;

	public DataProducer(RingBuffer<DataEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	/**
	 * 當前仍是生產線程
	 * <p>
	 * onData用來發布事件,每調用一次就發佈一次事件事件 它的參數會經過事件傳遞給消費者
	 * 
	 * @param data
	 */
	public void onData(long data) {//
		// 能夠把ringBuffer看作一個事件隊列,那麼next就是獲得下面一個事件槽, 若沒有空閒的時間槽則阻塞
		long sequence = ringBuffer.next();
		// CompareTest.println("生產置入sequence:" + sequence);
		try {
			// 用上面的索引取出一個空的事件用於填充
			DataEvent event = ringBuffer.get(sequence);// for the sequence
			event.setStartTime(data);
		} finally {
			// 發佈事件
			ringBuffer.publish(sequence);
		}
	}
}

獲取下一個事件槽併發布事件要使用try/finally保證事件必定會被髮布, 因此最好直接使用 ringBuffer.publishEvent方式將數據交由Translator來處理填充DataEvent,最後finally發佈spa

package com.lmax.disruptor.noob;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * 獲取下一個事件槽併發布事件(發佈事件的時候要使用try/finally保證事件必定會被髮布)。
 * 若是咱們使用RingBuffer.next()獲取一個事件槽,那麼必定要發佈對應的事件。若是不能發佈事件,那麼就會引發Disruptor狀態的混亂
 * 。尤爲是在多個事件生產者的狀況下會致使事件消費者失速,從而不得不重啓應用才能會恢復。
 * 
 * @author admin
 *
 */
public class DataEventProducerWithTranslator {
	private final RingBuffer<DataEvent> ringBuffer;

	// 一個translator能夠看作一個事件初始化器,publicEvent方法會調用它
	// 填充Event
	private static final EventTranslatorOneArg<DataEvent, Long> TRANSLATOR = new EventTranslatorOneArg<DataEvent, Long>() {
		public void translateTo(DataEvent event, long sequence, Long startTime) {
			event.setStartTime(startTime);
		}
	};

	public DataEventProducerWithTranslator(RingBuffer<DataEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	public void onData(Long bb) {
		ringBuffer.publishEvent(TRANSLATOR, bb);
		// 當前仍是生產者線程
	//	CompareTest.println(Thread.currentThread().getName() + " pulishEvent end!");
		
	}
}

事件消費處理

package com.lmax.disruptor.noob;

import java.util.concurrent.atomic.AtomicLong;

import com.lmax.disruptor.EventHandler;

/**
 * 對指定事件的處理過程
 *
 */
public class DataEventHandler implements EventHandler<DataEvent> {
	public AtomicLong count = new AtomicLong(0);

	@Override
	public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception {
		/**
		 * 消費者線程由初始化Disruptor時指定的threadFactory建立的
		 */
		if (count.incrementAndGet() == CompareTest.TOTAL_COUNT) {
			CompareTest.println("處理的sequence:" + sequence + " count:" + count.get() + "  Disruptor 總耗時:"
					+ (System.currentTimeMillis() - event.getStartTime()));
		}
	}

}
相關文章
相關標籤/搜索