最近一年多寫的最虐心的代碼。必須好好複習java併發了。搞了一夜終於測試都跑經過了,特此記念,以資鼓勵!java
import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 實現可調整大小的阻塞隊列,支持數據遷移平衡reader,writer讀取吸入速度,達到最大吞吐 * @author hanjie * */ public class RecordBuffer { public static final Record CLOSE_RECORD = new Record() { @Override public Object getColumnValue(String columnName) { // TODO Auto-generated method stub return null; } }; public static final Record SWITCH_QUEUE_RECORD = new Record() { @Override public Object getColumnValue(String columnName) { // TODO Auto-generated method stub return null; } }; public Lock switchingQueueLock = new ReentrantLock(); public Condition readerSwitched = switchingQueueLock.newCondition(); public Condition writerSwitched = switchingQueueLock.newCondition(); public Condition switchFinished = switchingQueueLock.newCondition(); public volatile boolean readerSwitchSuccess = true; public volatile boolean writerSwitchSuccess = true; public volatile boolean switchingQueue = false; public volatile boolean closed = false; private volatile ArrayBlockingQueue<Record> queue; private TaskCounter taskCounter; public RecordBuffer(TaskCounter taskCounter, int size) { this.queue = new ArrayBlockingQueue<Record>(size); this.taskCounter = taskCounter; } public void resize(int newSize) { try { if(closed){ return; } switchingQueueLock.lock(); try { //double check下,要不可能writer收到CLOSED_record已經 退出了。writerSwitched.await() 會hang住 if(closed){ return; } this.switchingQueue = true; ArrayBlockingQueue<Record> oldQueue = queue; queue = new ArrayBlockingQueue<Record>(newSize); this.readerSwitchSuccess = false; this.writerSwitchSuccess = false; //先拯救下writer,可能writer恰好阻塞到take上,失敗也不要緊,說明老隊列不空,writer不會阻塞到take oldQueue.offer(SWITCH_QUEUE_RECORD); while (!writerSwitchSuccess) { writerSwitched.await(); } //writer先切換隊列,而後reader可能阻塞在最後一個put上,清空下老隊列拯救reader,讓它順利醒來 transferOldQueueRecordsToNewQueue(oldQueue); while (!readerSwitchSuccess) { readerSwitched.await(); } //前面的清空,恰好碰到reader要put最後一個,非阻塞式清空動做就有殘留最後一個put transferOldQueueRecordsToNewQueue(oldQueue); this.switchingQueue = false; this.switchFinished.signalAll(); } finally { switchingQueueLock.unlock(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } private void transferOldQueueRecordsToNewQueue(ArrayBlockingQueue<Record> oldQueue) throws InterruptedException { List<Record> oldRecords = new ArrayList<Record>(oldQueue.size()); Record record = null; while ((record = oldQueue.poll()) != null) { oldRecords.add(record); } // 轉移老隊列剩下的記錄到新隊列 for (int i = 0; i < oldRecords.size(); i++) { queue.put(oldRecords.get(i)); } } public void close() { this.closed = true; switchingQueueLock.lock(); try { //若是正在切換隊列, 等切換作完才能,發送最後一個CLOSE while (switchingQueue) { switchFinished.await(); } this.queue.put(CLOSE_RECORD); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } finally{ switchingQueueLock.unlock(); } } public void put(Record record) { try { if (!queue.offer(record)) { taskCounter.incrBufferFullCount(); if (!readerSwitchSuccess) { notifyReaderSwitchSuccess(); } queue.put(record); } taskCounter.incrReadCount(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } private void notifyReaderSwitchSuccess() { System.out.println("reader switch"); switchingQueueLock.lock(); try { readerSwitchSuccess = true; readerSwitched.signalAll(); } finally { switchingQueueLock.unlock(); } } public Record take() { try { Record record = queue.poll(); //若是拿到了切換記錄,則切換隊列重試 if(record == SWITCH_QUEUE_RECORD){ if (!writerSwitchSuccess) { notifyWriterSwitchSuccess(); } record = queue.poll(); } if (record == null) { taskCounter.incrBufferEmptyCount(); //調用take先檢查是否正在切換,保證拿到新的隊列 if (!writerSwitchSuccess) { notifyWriterSwitchSuccess(); } record = queue.take(); //若是很不幸恰好在take阻塞時候,切換,只能發送一個切換記錄將其喚醒 if(record == SWITCH_QUEUE_RECORD){ if (!writerSwitchSuccess) { notifyWriterSwitchSuccess(); } record = queue.take(); } } if (record == CLOSE_RECORD) { if (!writerSwitchSuccess) { notifyWriterSwitchSuccess(); } return null; } taskCounter.incrWriteCount(); return record; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } private void notifyWriterSwitchSuccess() { System.out.println("writer switch"); switchingQueueLock.lock(); try { writerSwitchSuccess = true; writerSwitched.signalAll(); } finally { switchingQueueLock.unlock(); } } }