Executors+CyclicBarrier實現的併發測試小例子

package org.phoenix.cases.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import com.meterware.httpunit.GetMethodWebRequest;
import com.meterware.httpunit.WebConversation;
import com.meterware.httpunit.WebResponse;

/**
 * Executors+CyclicBarrier實現的併發測試小例子<br>
 * 例子實現了併發測試中使用的集合點,集合點超時時間及思考時間等技術
 * @author mengfeiyang
 *
 */
public class FlushGenerator {
	private static volatile boolean RUNFLAG = true;
	private CyclicBarrier rendzvous;
	private static int threads;
	private static AtomicInteger totalCount = new AtomicInteger();
	private static AtomicInteger startedCount = new AtomicInteger();
	private static AtomicInteger finishCount = new AtomicInteger();
	private static AtomicInteger runCount = new AtomicInteger();
	private static AtomicInteger successCount = new AtomicInteger();
	private static AtomicInteger failCount = new AtomicInteger();
	private String url;
	private long rendzvousWaitTime = 0;
	private long thinkTime = 0;
	private int iteration = 0;
	
	/**
	 * 初始值設置
	 * @param url 被測url
	 * @param threads 總線程數
	 * @param iteration 每一個線程迭代次數
	 * @param rendzvousWaitTime 集合點超時時間,若是不啓用超時時間,請將此值設置爲0.<br>
	 * 							若是不啓用集合點,請將此值設置爲-1<br>
	 * 							若是不啓用超時時間,則等待全部的線程所有到達後,纔會繼續往下執行<br>
	 * @param thinkTime 思考時間,若是啓用思考時間,請將此值設置爲0
	 */
	public FlushGenerator(String url,int threads,int iteration,long rendzvousWaitTime,long thinkTime){
		totalCount.getAndSet(threads);
		FlushGenerator.threads = threads;
		this.url = url;
		this.iteration = iteration;
		this.rendzvousWaitTime = rendzvousWaitTime;
		this.thinkTime = thinkTime;
	}

	public static ThreadCount getThreadCount(){
		return new ThreadCount(threads,runCount.get(),startedCount.get(),finishCount.get(),successCount.get(),failCount.get());
	}
	
	public static boolean isRun(){
		return finishCount.get() != threads;
	}
	
	public synchronized static void stop(){
		RUNFLAG = false;
	}
	
	public void runTask(){
		List<Future<String>> resultList = new ArrayList<Future<String>>();
		ExecutorService exeService = Executors.newFixedThreadPool(threads);
		rendzvous = new CyclicBarrier(threads);//默認加載所有線程
		for(int i=0;i<threads;i++){
			resultList.add(exeService.submit(new TaskThread(i,url,iteration,rendzvousWaitTime,thinkTime)));
		}
		exeService.shutdown();
		for(int j=0;j<resultList.size();j++){
			try{
				System.out.println(resultList.get(j).get());
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		stop();
	}
	
	static class ThreadCount {
        public final int runThreads;
        public final int startedThreads;
        public final int finishedThreads;
        public final int totalThreads;
        public final int successCount;
        public final int failCount;
        
        
        public ThreadCount(int totalThreads,int runThreads, int startedThreads, int finishedThreads,int successCount,int failCount) {
        	this.totalThreads = totalThreads;
        	this.runThreads = runThreads;
        	this.startedThreads = startedThreads;
        	this.finishedThreads = finishedThreads;
            this.successCount = successCount;
            this.failCount = failCount;
        }
	}
	
	private class TaskThread implements Callable<String> {
		private String url;
		private long rendzvousWaitTime = 0;
		private long thinkTime = 0;
		private int iteration = 0;
		private int iterCount = 0;
		private int taskId;
		
		/**
		 * 任務執行者屬性設置
		 * @param taskId 任務id號
		 * @param url 被測url
		 * @param iteration 迭代次數,若是一直執行則需將此值設置爲0
		 * @param rendzvousWaitTime 集合點超時時間,若是不須要設置時間,則將此值設置爲0。若是不須要設置集合點,則將此值設置爲-1
		 * @param thinkTime 思考時間,若是不須要設置思考時間,則將此值設置爲0
		 */
		public TaskThread(int taskId,String url,int iteration, long rendzvousWaitTime,long thinkTime){
			this.taskId = taskId;
			this.url = url;
			this.rendzvousWaitTime = rendzvousWaitTime;
			this.thinkTime = thinkTime;
			this.iteration = iteration;
		}
		@Override
		public String call() throws Exception{
			startedCount.getAndIncrement();
			runCount.getAndIncrement();
			while(RUNFLAG && iterCount<iteration){
				if(iteration != 0)iterCount++;
				try{
						if(rendzvousWaitTime > 0){
							try{
								System.out.println("任務:task-"+taskId+" 已到達集合點...等待其餘線程,集合點等待超時時間爲:"+rendzvousWaitTime);
								rendzvous.await(rendzvousWaitTime,TimeUnit.MICROSECONDS);
							} catch (InterruptedException e) {
							} catch (BrokenBarrierException e) {
								System.out.println("task-"+taskId+" 等待時間已超過集合點超時時間:"+rendzvousWaitTime+" ms,將開始執行任務....");
							} catch (TimeoutException e) {
							}
						} else if(rendzvousWaitTime == 0){
							try{
								System.out.println("任務:task-"+taskId+" 已到達集合點...等待其餘線程");
								rendzvous.await();
							} catch (InterruptedException e) {
							} catch (BrokenBarrierException e) {
							}
						}
					WebResponse wr = new WebConversation().getResponse(new GetMethodWebRequest(url));
					System.out.println("線程:task-"+taskId+" 獲取到的資源大小:"+wr.getText().length()+",狀態碼:"+wr.getResponseCode());
					successCount.getAndIncrement();
					if(thinkTime !=0){
						System.out.println("task-"+taskId+" 距下次啓動時間:"+thinkTime);
						Thread.sleep(thinkTime);
					}
				}catch(Exception e){
					failCount.getAndIncrement();
				}
			}
			finishCount.getAndIncrement();
			runCount.decrementAndGet();
			return Thread.currentThread().getName()+" 執行完成!";
		}
	}
	
	public static void main(String[] args) {
		new Thread(){
			public void run(){
				new FlushGenerator("http://10.108.79.8:8080/hh.php",5,3,0,200).runTask();
			}
		}.start();
		
		new Thread(){
			public void run(){
				while(true){
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("isRun:"+FlushGenerator.isRun());
					System.out.println("totalThreads:"+FlushGenerator.getThreadCount().totalThreads);
					System.out.println("startedThreads:"+FlushGenerator.getThreadCount().startedThreads);
					System.out.println("runThreads:"+FlushGenerator.getThreadCount().runThreads);
					System.out.println("finishedThread:"+FlushGenerator.getThreadCount().finishedThreads);
					System.out.println("successCount:"+FlushGenerator.getThreadCount().successCount);
					System.out.println("failCount:"+FlushGenerator.getThreadCount().failCount);
					System.out.println();
				}
			}
		}.start();
	}
}
相關文章
相關標籤/搜索