簡介html
java.util.concurrent包是Java 5的一個重大改進,java.util.concurrent包提供了多種線程間同步和通訊的機制,好比Executors, Queues, Timing, Synchronizers和Concurrent Collections等。與synchronized關鍵字和Object.notify()等方法相比,這些類和方法的抽象層次都較高。Effective Java中提到,其中比較重要的同步和通訊機制有Executor框架、Concurrent Collections和Synchronizers三種。 java
其中Synchronizers包含了五種: Semaphore信號量,CounDownLatch倒計時鎖存器,CyclicBarrier循環柵欄,Phaser和Exchanger。 JCIP中提到,Exchanger能夠看作一種特殊的Barrier。Effective Java 提到用的比較多的主要是Semaphore信號量和CounDownLatch倒計時鎖存器。本文主要講解我認爲比較重要的Semaphore信號量、CounDownLatch計數鎖存器和CyclibBarrier。每一種都按照它們的概念、jdk實現、所提供的方法和使用(traveler或者jdk, or sample code)來進行介紹。sql
1 Semaphore api
semaphore,信號量,是衆多synchronizer中的一個。在操做系統中就存在互斥量和信號量這樣的概念。 semaphore跟鎖機制存在必定的類似性,semaphore也是一種鎖機制,所不一樣的是,reentrantLock是隻容許一個線程得到鎖,而信號量持有多個許可(permits),容許多個線程得到許可並執行。從這個意義上看,重入鎖是許可只有1的信號量。它們所提供的方法也很是接近。併發
1.1 實現app
跟ReentrantLock同樣,Semaphore也是以AQS爲基礎來實現的。框架
1.1.1 構造函數:ide
非公平版本:函數
1 public Semaphore(int permits) { 2 sync = new NonfairSync(permits); 3 }
能夠選擇是否公平的版本:oop
1 public Semaphore(int permits, boolean fair) { 2 sync = fair ? new FairSync(permits) : new NonfairSync(permits); 3 }
1.1.2 其餘方法
跟ReentrantLock不一樣的是,每種acquire方法都分爲有參數的和不帶參數的兩個版本:
acquire() :
1 public void acquire() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 }
acquire(int permits)
1 public void acquire(int permits) throws InterruptedException { 2 if (permits < 0) throw new IllegalArgumentException(); 3 sync.acquireSharedInterruptibly(permits); 4 }
與此相似的還有:
acquireUninterruptibly()&acquireUninterruptibly(int)
tryAcquire()& tryAcquire(int)
tryAcquire(long,TimeUnit)& tryAcquire(int, long,TimeUnit)
release()& release(int)
1.2 使用示例:
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 import java.util.concurrent.Semaphore; 4 5 public class TIJ_semaphore { 6 public static void main(String[] args) { 7 ExecutorService exec = Executors.newCachedThreadPool(); 8 final Semaphore semp = new Semaphore(5); // 5 permits 9 10 for (int index = 0; index < 20; index++) { 11 final int NO = index; 12 Runnable run = new Runnable() { 13 public void run() { 14 try {
// if 1 permit avaliable, thread will get a permits and go; if no permit avaliable, thread will block until 1 avaliable 15 semp.acquire();
16 System.out.println("Accessing: " + NO); 17 Thread.sleep((long) (10000); 18 semp.release(); 19 } catch (InterruptedException e) { 20 } 21 } 22 }; 23 exec.execute(run); 24 } 25 exec.shutdown(); 26 }
程序輸出結果爲:
1 Accessing: 0 2 Accessing: 2 3 Accessing: 3 4 Accessing: 4 5 Accessing: 1 6 (等待10s) 7 Accessing: 5 8 Accessing: 6 9 Accessing: 14 10 Accessing: 8 11 Accessing: 7 12 (等待10s) 13 Accessing: 10 14 Accessing: 9 15 Accessing: 11 16 Accessing: 15 17 Accessing: 12 18 (等待10s) 19 Accessing: 13 20 Accessing: 16 21 Accessing: 17 22 Accessing: 19 23 Accessing: 18
2 CountDownLatch
2.1 實現
內部使用AQS實現
2.2 方法
await()
等待,無超時,能夠被中斷
1 public void await() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 }
boolean await(long,timeUnit):
若是等待超時,則返回false; 若是時間爲0或者爲負,則馬上返回。
1 public boolean await(long timeout, TimeUnit unit) 2 throws InterruptedException { 3 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 4 }
countDown():
把Latch的計數減1,若是計數到達0,則釋放全部正在等待的線程。
1 public void countDown() { 2 sync.releaseShared(1); 3 }
2.3 使用:
絕大多數synchronizer在jdk中沒有使用,緣由很簡單:這些synchronizer是抽象層次較高的,因此通常只有應用程序纔會直接使用。
而在nts生產環境中,只有一處admin.rest.api.RestRequestSender使用了CountDownLatch:
1 public Map<String, List<JSONObject>> doDelete(final String reqUri, final HttpHeaders _headers) 2 throws RestAPIException 3 { 4 setMethod(Method.DELETE); 5 setUriTemplate(reqUri); 6 headers = _headers; 7 return processBroadcast(); 8 }
doDelete調用了processBraodcast:
private Map<String, List<JSONObject>> processBroadcast() throws RestAPIException { ... final Map<String, SaaSServerContext> dServers = RestRequestSender.deployedServers; if (!dServers.isEmpty()) { final CountDownLatch doneSignal = new CountDownLatch(dServers.size()); ... for (final String key : dServers.keySet()) { ... executor.submit(new RRSRunnable(doneSignal, context, results, key, totalRecordsCounter)); } try { if (!doneSignal.await(Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt(), TimeUnit.MINUTES)) // if timeout will retrun false { XLog.warning("MDM api broadcast timed out after " + Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt() + " minutes. Timeout is set via notes.ini key " + Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getNotesIniName()); } } catch (final InterruptedException ie) { throw new RestAPIException("Interrupted", ie); } } return results; // if doneSingnal.await has been intrerrupted, will this line still execute? }
RRSRunnable代碼以下:
class RRSRunnable implements Runnable { private final CountDownLatch doneSignal; ... RRSRunnable(final CountDownLatch doneSignal, final SaaSServerContext context, final Map<String, List<JSONObject>> results, final String serverKey, final MaxRecordCounter recordCounter) { this.doneSignal = doneSignal; ... } @Override public void run() { ... try { processRequest(responses, context.getHostName(), client, context, recordCounter); final long elapsedTime = System.currentTimeMillis() - startTime; XLog.fine("Traveler API request completed. orgid=" + orgId + ";request=" + reqUri + ";pool=" + serverKey + ";time=" + elapsedTime + "ms"); } catch (final RestAPIException rae) { exceptionServerKey = serverKey; exception = rae; } finally { doneSignal.countDown(); } } }
2.4 更加通常的Latch:
CountDownLatch是一種特殊的Latch,jcip第八章用countdownLatch實現了一種valueLatch。
2.4.1 nts Latch簡介:
在nts生產代碼中也實現了一種Latch,Latch容許多個線程間的協做:在這種Latch中,有working thread和latching thread之分:
workingThread在作一些工做,latchingThread但願當這些工做完成的時候,鎖存這些工做,而後獲得workingThread的工做結果。workingThread和latchingThread共享一個Latch對象,workingThread會調用start方法,通知它正在開始針對特定Object的工做已經開始了。同時,latchingThread將調用latch方法,並傳進它但願等待的Object。 當workingThread完成對某一Object(start方法傳入的)的工做後,它將調用finish方法,傳入該對象,以及工做的結果對象。當finish方法被調用後,調用latch方法的線程被喚醒,返回工做結果給latch方法的調用者。多個線程能夠鎖存同一個將要完成某些工做的object。一旦任意一個線程調用了finish方法,他們都將被喚醒並返回結果對象。若是調用latch方法時,針對latch對象的工做尚未開始,線程馬上返回,並不會block. 因此start(Object)應該首先被調用。
workingThread調用start(Object)方法,代表它開始工做。 同時,latchingThread調用latch(Object,long)方法,等待workingThread的執行完成。 workingThread執行finish(Object,Object)方法,表示工做完成,此時,latchingThread醒來。start(Object) finish(Object,Object) --> working thread 第二個參數爲結果。 ?
latch(Object,long) --> latching thread
2.4.2 nts Latch 實現:
start:
1 public boolean start(final Object obj) 2 { 3 final long timeStart = System.currentTimeMillis(); 4 boolean rv = false; 5 Barrier b = null; 6 synchronized (this) 7 { 8 if (!latched.containsKey(obj)) 9 { 10 b = new Barrier("Latch:" + name + "_Obj:" + obj, 1); 11 latched.put(obj, b); // latched is a synchronizedHashMap 12 rv = true; 13 } 14 } 15 XLog.exiting("name=" + name, "obj=" + obj, "barrier=" + b, "rv=" + rv, ("Elapsed time=" 16 + (System.currentTimeMillis() - timeStart) + "ms")); 17 return rv; 18 }
finish:
1 public void finish(final Object obj, final Object result) 2 { 3 final long timeStart = System.currentTimeMillis(); 4 final Barrier b; 5 synchronized (this) 6 { 7 b = latched.remove(obj); 8 if (null != b) 9 { 10 // there are waiters that need the result 11 b.result = result; 12 try 13 { 14 b.enter(0); 15 } 16 catch (final InterruptedException e) 17 { 18 // ignored 19 } 20 } 21 } 22 XLog.exiting("name=" + name, "obj=" + obj, "result=" + result, "barrier=" + b, ("Elapsed time=" 23 + (System.currentTimeMillis() - timeStart) + "ms")); 24 }
3 CyclicBarrier
CountDownLatch通常用於某個線程A等待若干個其餘線程執行完任務以後,它(一個線程)才執行; 而CyclicBarrier通常用於一組線程互相等待至某個狀態,而後這一組線程再同時執行。
3.1 實現:
使用Lock和Condition實現。不一樣於AQS。(Condition是基於AQS實現的)
CyclicBarrier包含下面的域:
1 /** The lock for guarding barrier entry */ 2 private final ReentrantLock lock = new ReentrantLock(); 3 /** Condition to wait on until tripped */ 4 private final Condition trip = lock.newCondition();
3.1.1 構造函數
當在等待柵欄的線程個數到達預約義的個數時,barrier 發生trip, 可是由於沒有預約義的動做,因此不執行任何動做。
1 public CyclicBarrier(int parties) { 2 this(parties, null); 3 }
當barrier發生trip時,會由最後一個進入該barrier的線程執行特定的動做:
1 public CyclicBarrier(int parties, Runnable barrierAction) { 2 if (parties <= 0) throw new IllegalArgumentException(); 3 this.parties = parties; 4 this.count = parties; 5 this.barrierCommand = barrierAction; 6 }
CyclicBarrier方法:
await():
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
await(Long time, TimeUtil unit):
1 public int await(long timeout, TimeUnit unit) 2 throws InterruptedException, 3 BrokenBarrierException, 4 TimeoutException { 5 return dowait(true, unit.toNanos(timeout)); 6 }
上述兩種方法都調用了dowait方法:
1 private int dowait(boolean timed, long nanos) 2 throws InterruptedException, BrokenBarrierException, 3 TimeoutException { 4 final ReentrantLock lock = this.lock; 5 lock.lock(); 6 try {
final Generation g = generation; 7 ... 8 if (Thread.interrupted()) { 9 breakBarrier(); 10 throw new InterruptedException(); 11 } 12 13 int index = --count; 14 if (index == 0) { // tripped 15 boolean ranAction = false; 16 try { 17 final Runnable command = barrierCommand; 18 if (command != null) 19 command.run(); 20 ranAction = true; 21 nextGeneration(); 22 return 0; 23 } finally { 24 if (!ranAction) 25 breakBarrier(); 26 } 27 } 28 29 // loop until tripped, broken, interrupted, or timed out 30 for (;;) { 31 try { 32 if (!timed) 33 /* The lock associated with this Condition is atomically
released and the current thread becomes disabled for thread scheduling
purposes */
trip.await(); 34 else if (nanos > 0L) 35 nanos = trip.awaitNanos(nanos); 36 } catch (InterruptedException ie) { 37 if (g == generation && ! g.broken) { 38 breakBarrier(); 39 throw ie; 40 } else { 41 // We're about to finish waiting even if we had not 42 // been interrupted, so this interrupt is deemed to 43 // "belong" to subsequent execution. 44 Thread.currentThread().interrupt(); 45 } 46 } 47 48 if (g.broken) 49 throw new BrokenBarrierException(); 50 51 if (g != generation) 52 return index; 53 54 if (timed && nanos <= 0L) { 55 breakBarrier(); 56 throw new TimeoutException(); 57 } 58 } 59 } finally { 60 lock.unlock(); 61 } 62 }
breakBarrier:
1 private void breakBarrier() { 2 generation.broken = true; 3 count = parties; 4 trip.signalAll(); 5 }
3.2 使用:
在jdk和traveler code中沒有使用。這符合Effective Java 69中的描述。在實際使用中較少見到。
4 Synchronizere與wait()/wait(long)/wait(long,int)/notify()/notifyAll()的比較:
在effective java 69中提到,wait()/wait(long)/wait(long,int)/notify()/notifyAll()不易使用且容易出錯。通常來說應該優選更高級的concurrent container 或者 synchronizer。 其中synchronizer比較經常使用的是Semaphore和CountDownLatch,而CyclicBarrier和Exchanger 則使用的比較少。說從易用性上來說,wait()/notify()/notifyAll()更象是彙編語言,併發容器和synchronizer更像是高級語言。 但我在nts代碼中看到了不少wait/notify,而Semaphore和CountDownLatch則用的不多。
Lock/Condition()是使用AQS實現的,Lock/Condition() 組合能夠用來替代Object.wait()/notify()。 而高級的synchronizer: CountDownLatch& Semaphore也是基於AQS實現的。因此理論上,能夠替代wait/notify。 Semaphore和CountDownLatch也都包含了跟wait(long timeout)相對應的方法。
考慮如下在下面的例子中,是否能夠用Semaphore& CountDownLatch來代替wait/notify ?
1 public Connection getConnection(final boolean highpriority) throws SQLException 2 { 3 final long sTime = System.currentTimeMillis(); 4 Connection conn = null; 6 boolean createConnection = false; 7 boolean waitConnection = false; 8 9 try 10 { 11 while (true) 12 { 13 createConnection = false; 14 waitConnection = false; 15 17 synchronized (dbConnections) 18 { 19 20 final List<Connection> dbConnectionsToBeFreedTemp = new ArrayList<Connection>(); 21 synchronized (dbConnectionsToBeFreed) 22 { 23 if (!dbConnectionsToBeFreed.isEmpty()) 24 { 25 dbConnectionsToBeFreedTemp.addAll(dbConnectionsToBeFreed); 26 dbConnectionsToBeFreed.clear(); 27 if (1 == dbConnectionsToBeFreedTemp.size()) 28 { 29 // only one, so only notify one 30 dbConnectionsToBeFreed.notify(); // Semaphore.release() 31 } 32 else 33 { 34 dbConnectionsToBeFreed.notifyAll(); // CountDownLatch.await() 35 } 36 } 37 if (isThrottDown && connectionCount <= Tier.ONE.value * .20F) 38 { 39 isThrottDown = false; 40 dbConnectionsToBeFreed.notifyAll(); 41 } 42 } 43 44 if (!dbConnectionsToBeFreedTemp.isEmpty()) 45 { 46 for (final Connection connToFree : dbConnectionsToBeFreedTemp) 47 { 48 if (!closeConnectionIfAgedOut(connToFree)) 49 { 50 dbConnections.add(connToFree); 51 } 52 } 53 dbConnectionsToBeFreedTemp.clear(); 54 } 55 56 57 if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean()) 58 { 59 getCurrentTier(); 60 61 if ((!dbConnections.isEmpty() && !isThrottDown) 62 && (highpriority || ((connectionCount - dbConnections.size()) < currentTier.value))) //Prevent non-high priority requests from grabbing freed high priority connections if connection is capped 63 { 64 conn = dbConnections.remove(0); 65 } 66 67 if (null == conn) 68 { 69 // See if we should create a new connection or have to wait 70 // if none are in the stack, then see if we should make another 71 if ((connectionCount < currentTier.value || (highpriority && (connectionCount < maxConnectionCount))) 72 && !isThrottDown) 73 { 74 75 createConnection = true; 76 connectionCount++; 77 } 78 else if (!isScheduled && conn == null && !highpriority && !createConnection 79 && currentTier != Tier.THREE && !isThrottDown) 80 { 81 WallClock.getInstance().addAlarm(ALARM_NAME, 82 Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmStepUpTier); 83 isScheduled = true; 84 waitConnection = true; 85 } 86 else if (conn == null && !createConnection && currentTier == Tier.THREE 87 && connectionCount - dbConnections.size() >= Tier.THREE.value && !isScheduled) 88 { 89 WallClock.getInstance().addAlarm(ALARM_THROTTLE_DOWN, 90 Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmThrottleDown); 91 isScheduled = true; 92 waitConnection = true; 93 } 94 else 95 { 96 // Connections are maxed out, so we have to wait 97 waitConnection = true; 98 } 99 } 100 } 101 else 102 { 103 if (!dbConnections.isEmpty() 104 && (highpriority || ((connectionCount - dbConnections.size()) <= maxConnectionCount))) 105 { 106 conn = dbConnections.remove(0); 107 } 108 109 if (null == conn) 110 { 111 // See if we should create a new connection or have to wait 112 // if none are in the stack, then see if we should make another 113 if ((connectionCount < maxConnectionCount) || highpriority) 114 { 115 116 createConnection = true; 117 connectionCount++; 118 } 119 else 120 { 121 // Connections are maxed out, so we have to wait 122 waitConnection = true; 123 } 124 } 125 } 126 } 127 128 if (null != conn) 129 { 130 // we have a Connection, so we are done 131 break; 132 } 133 else if (createConnection) 134 { 135 if (!isDerby) 136 { 137 // update user and password from Configuration 138 connProps.put("user", Configuration.NTS_DBUSER.getString()); 139 connProps.put("password", Configuration.NTS_DBPASSWORD.getString()); 140 } 141 142 try 143 { 144 conn = DriverManager.getConnection(url, connProps); 145 } 146 catch (final SQLException sqle) 147 { 148 connectionCount--; // count must be decremented since a connection was never created 149 DatabaseStatus.reportException(sqle); 150 throw sqle; 151 } 152 153 if (conn != null) 154 { 155 DatabaseStatus.clearException(); 156 if (peakConnections < connectionCount) 157 { 158 peakConnections = connectionCount; 159 Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_COUNT, peakConnections); 160 Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_TIME, new Date().toString()); 161 } 162 allConnections.put(Integer.valueOf(conn.hashCode()), PersistentStore.createDeathTimeStamp()); 163 164 break; 165 } 166 else if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean()) 167 { 168 // Unexpected Database exceptions will result in connection == null. 169 // Instead of retrying the connection, throw a checked exception. 170 throw new DBConnectionsAllUsedException("There are no DB Connections available"); 171 } 172 else 173 { 174 // don't break which will do the retry 175 } 176 } 177 else if (waitConnection) 178 { 179 try 180 { 181 Stats.inc(Stats.DB_THREADS_WAITING_FOR_CONNECTION); 182 synchronized (dbConnectionsToBeFreed) 183 { 184 dbConnectionsToBeFreed.wait(); // Semaphore.acquire() CountDownLatch.countdown() 185 } 186 } 187 finally 188 { 189 Stats.dec(Stats.DB_THREADS_WAITING_FOR_CONNECTION); 190 } 191 // don't break which will do the retry 192 } 193 else 194 { 195 // should never hit this case 196 break; 197 } 198 } 199 } 200 catch (final InterruptedException ie) 201 { 202 // expected 203 throw new SQLException("Exception waiting for DB connection.", ie); 204 } 205 finally 206 { 207 208 } 209 210 return conn; 211 }
5 進一步問題:
5.1 CyclicBarrier方法,await(): 爲何要提供這兩種方法來包裹wrapper dowait方法?
5.2 Semaphore和CountDownLatch互換
1 final Semaphore sem = new Semaphore(0); 2 for (int i = 0; i < num_threads; ++ i) 3 { 4 Thread t = new Thread() { 5 public void run() 6 { 7 try 8 { 9 doStuff(); 10 } 11 finally 12 { 13 sem.release(); 14 } 15 } 16 }; 17 t.start(); 18 } 19 20 sem.acquire(num_threads);
1 final CountDownLatch latch = new CountDownLatch(num_threads); 2 for (int i = 0; i < num_threads; ++ i) 3 { 4 Thread t = new Thread() { 5 public void run() 6 { 7 try 8 { 9 doStuff(); 10 } 11 finally 12 { 13 latch.countDown(); 14 } 15 } 16 }; 17 t.start(); 18 } 19 20 latch.await();
6 參考文獻:
http://www.cnblogs.com/dolphin0520/p/3920397.html
----- 單例 -----
單例與static方法的區別: 一個是實例,能夠傳遞,另一個不能夠。(好像也沒什麼用,傳遞單例)。
http://stackoverflow.com/questions/519520/difference-between-static-class-and-singleton-pattern#