Java SE5 的類庫中引入了大量的新設計來解決併發問題的新類。學習他們將有助於編寫更加簡單而健壯的併發程序。java
他被用來同步一個或多個任務,強制他們等待由其餘的任務執行的一組操做完成。git
你能夠向 CountDownLatch 對象設置一個初始計數值,任何在這個對象上調用 wait() 的方法都將阻塞,直至這個計數值到達 0。其餘任務在結束其工做時,能夠在該對象上調用 CountDown() 來減少這個計數值。CountDownLatch 被設計爲只觸發一次,計數值不能被重置。若是須要重置計數值的版本,則可使用 CyclicBarrier 版本。調用 countDown() 的任務在產生這個調用時並無被阻塞,只有對 await() 的調用會被阻塞,直至技術值到達 0。編程
CountDownLatch 的典型用法是將一個任務分爲 n 個互相獨立的可解決任務,並建立值爲 0 的 CountDownLatch。當每一個任務完成時,都會在這個鎖存器上調用 countDown()。等待問題被解決的任務在這個鎖存器上調用 await(),將他們本身攔住,直至鎖存器計數結束。數組
import java.util.concurrent.*;
import java.util.*;
import static net.mindview.util.Print.*;
// Performs some portion of a task:
class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch latch;
TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
doWork();
latch.countDown();
} catch(InterruptedException ex) {
// Acceptable way to exit
}
}
public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
print(this + "completed");
}
public String toString() {
return String.format("%1$-3d ", id);
}
}
// Waits on the CountDownLatch:
class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
print("Latch barrier passed for " + this);
} catch(InterruptedException ex) {
print(this + " interrupted");
}
}
public String toString() {
return String.format("WaitingTask %1$-3d ", id);
}
}
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// All must share a single CountDownLatch object:
CountDownLatch latch = new CountDownLatch(SIZE);
for(int i = 0; i < 10; i++)
exec.execute(new WaitingTask(latch));
for(int i = 0; i < SIZE; i++)
exec.execute(new TaskPortion(latch));
print("Launched all tasks");
exec.shutdown(); // Quit when all tasks complete
}
} /* (Execute to see output) *///:~
複製代碼
全部的任務都使用在 main() 中定義的同一個單一的 CountDownLatch。安全
類庫的線程安全併發
類中包含了一個靜態的 Random 對象,這意味着多個任務可能會同時調用 Random.nextInt()。在這種狀況下,能夠經過向 TaskPortion 提供本身的 Random 對象來解決。也就是說經過移除 static 限定符的方式解決。對於 Java 標準類庫來講那些是線程安全的,那些是線程不安全的,JDK 文檔並無指出。要理解這一點必須逐個的去查看源碼,剛好 Random.nextInt() 是線程安全的。app
你但願建立一組任務,他們並行的執行工做,而後在進行下一個步驟以前等待,直至全部任務都完成。他使得全部的任務都在柵欄處等待,所以能夠一致的向前移動。dom
下面是模仿賽馬遊戲的一個仿真版本:ide
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) { barrier = b; }
public synchronized int getStrides() { return strides; }
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
strides += rand.nextInt(3); // Produces 0, 1 or 2
}
barrier.await();
}
} catch(InterruptedException e) {
// A legitimate way to exit
} catch(BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
}
public String toString() { return "Horse " + id + " "; }
public String tracks() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < getStrides(); i++)
s.append("*");
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec =
Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause) {
barrier = new CyclicBarrier(nHorses, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < FINISH_LINE; i++)
s.append("="); // The fence on the racetrack
print(s);
for(Horse horse : horses)
print(horse.tracks());
for(Horse horse : horses)
if(horse.getStrides() >= FINISH_LINE) {
print(horse + "won!");
exec.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch(InterruptedException e) {
print("barrier-action sleep interrupted");
}
}
});
for(int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
if(args.length > 0) { // Optional argument
int n = new Integer(args[0]);
nHorses = n > 0 ? n : nHorses;
}
if(args.length > 1) { // Optional argument
int p = new Integer(args[1]);
pause = p > -1 ? p : pause;
}
new HorseRace(nHorses, pause);
}
}
複製代碼
能夠向 CyclicBarrier 提供一個柵欄動做,它是一個 Runnable,當計數值到達 0 時自動執行。這裏柵欄動做是做爲匿名內部類來建立的,它被提交給 CyclicBarrier 的構造器。CyclicBarrier 使得每匹馬都執行了向前移動所必須執行的工做,而後等待柵欄出全部的馬都準備完畢。當全部的馬向前移動時,CyclicBarrier 將自動調用 Runnable 柵欄動做任務,按順序顯示馬和終點線的位置。一旦全部的任務越過了柵欄,它就會自動地爲下一回合比賽作好準備。學習
這是一個無界的 BlockingQueue,用於放置實現 DelayQueue 接口的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊列對象的延遲到期的時間最長。若是沒有任何延遲到期時間,那麼就不會有任何頭元素,而且 poll() 將返回 null。
下面示例:其中的 Delayed 對象自身就是任務,而 DelayedTaskConsumer 將最緊急的任務(到期時間最長的任務)從隊列中取出,而後運行它。
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence =
new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() +
NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
public long getDelay(TimeUnit unit) {
return unit.convert(
trigger - System.nanoTime(), NANOSECONDS);
}
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask)arg;
if(trigger < that.trigger) return -1;
if(trigger > that.trigger) return 1;
return 0;
}
public void run() { printnb(this + " "); }
public String toString() {
return String.format("[%1$-4d]", delta) +
" Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
public void run() {
for(DelayedTask pt : sequence) {
printnb(pt.summary() + " ");
}
print();
print(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
public void run() {
try {
while(!Thread.interrupted())
q.take().run(); // Run task with the current thread
} catch(InterruptedException e) {
// Acceptable way to exit
}
print("Finished DelayedTaskConsumer");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue =
new DelayQueue<DelayedTask>();
// Fill with tasks that have random delays:
for(int i = 0; i < 20; i++)
queue.put(new DelayedTask(rand.nextInt(5000)));
// Set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
複製代碼
DelayedTask 包含一個稱爲 sequence 的 List,它保存了任務被建立的順序,所以咱們看到排序是按照實際發生的順序執行的。Delsyed 接口有一個方法名叫 getDealay(),他能夠用來告知延遲到期多長時間,或者延遲在多長時間之前已經到期。這個方法將強制咱們使用 TimeUnit 類,由於這就是參數類型。
這是一個很基礎的優先級隊列,它具備可阻塞的讀取操做。下面是一個示例,其中在優先級隊列中的對象是按照優先級順序從隊列中出現的任務。PrioritizedTask 被賦予一個優先級數字,以此來提供順序:
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence =
new ArrayList<PrioritizedTask>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
public int compareTo(PrioritizedTask arg) {
return priority < arg.priority ? 1 :
(priority > arg.priority ? -1 : 0);
}
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
} catch(InterruptedException e) {
// Acceptable way to exit
}
print(this);
}
public String toString() {
return String.format("[%1$-3d]", priority) +
" Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1); // Lowest priority in this program
exec = e;
}
public void run() {
int count = 0;
for(PrioritizedTask pt : sequence) {
printnb(pt.summary());
if(++count % 5 == 0)
print();
}
print();
print(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer( Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e; // Used for EndSentinel
}
public void run() {
// Unbounded queue; never blocks.
// Fill it up fast with random priorities:
for(int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
// Trickle in highest-priority jobs:
try {
for(int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
// Add jobs, lowest priority first:
for(int i = 0; i < 10; i++)
queue.add(new PrioritizedTask(i));
// A sentinel to stop all the tasks:
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch(InterruptedException e) {
// Acceptable way to exit
}
print("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer( PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
public void run() {
try {
while(!Thread.interrupted())
// Use current thread to run the task:
q.take().run();
} catch(InterruptedException e) {
// Acceptable way to exit
}
print("Finished PrioritizedTaskConsumer");
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue =
new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
}
複製代碼
PrioritizedTask 對象的建立序列被記錄在 sequence List 中,用於和實際的執行順序比較。PrioritizedTaskProducer 和 PrioritizedTaskConsumer 經過 PriorityBlockingQueue 彼此鏈接。由於這種隊列的阻塞特性提供了全部必須的同步,因此你應該注意到,這裏不須要任何顯示的同步,沒必要考慮當你從這種隊列讀取時,其中是否還有元素,由於這種隊列在沒有元素時將直接阻塞讀取者。
假定溫室控制系統的示例,它能夠控制各類設施的開關,或者是對他們進行調節。這能夠被看作是一種併發問題,每一個指望的事件都是一個預約事件運行的任務。經過使用 schedule() 運行一次任務或者使用 scheduleAtFixedRate() 每隔規則的時間重複執行任務,你能夠將 Runnable 對象設置爲在未來的某個時刻執行。
public class GreenhouseScheduler {
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";
public synchronized String getThermostat() {
return thermostat;
}
public synchronized void setThermostat(String value) {
thermostat = value;
}
ScheduledThreadPoolExecutor scheduler =
new ScheduledThreadPoolExecutor(10);
public void schedule(Runnable event, long delay) {
scheduler.schedule(event,delay,TimeUnit.MILLISECONDS);
}
public void repeat(Runnable event, long initialDelay, long period) {
scheduler.scheduleAtFixedRate(
event, initialDelay, period, TimeUnit.MILLISECONDS);
}
class LightOn implements Runnable {
public void run() {
// Put hardware control code here to
// physically turn on the light.
System.out.println("Turning on lights");
light = true;
}
}
class LightOff implements Runnable {
public void run() {
// Put hardware control code here to
// physically turn off the light.
System.out.println("Turning off lights");
light = false;
}
}
class WaterOn implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water on");
water = true;
}
}
class WaterOff implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water off");
water = false;
}
}
class ThermostatNight implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to night setting");
setThermostat("Night");
}
}
class ThermostatDay implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to day setting");
setThermostat("Day");
}
}
class Bell implements Runnable {
public void run() { System.out.println("Bing!"); }
}
class Terminate implements Runnable {
public void run() {
System.out.println("Terminating");
scheduler.shutdownNow();
// Must start a separate task to do this job,
// since the scheduler has been shut down:
new Thread() {
public void run() {
for(DataPoint d : data)
System.out.println(d);
}
}.start();
}
}
// New feature: data collection
static class DataPoint {
final Calendar time;
final float temperature;
final float humidity;
public DataPoint(Calendar d, float temp, float hum) {
time = d;
temperature = temp;
humidity = hum;
}
public String toString() {
return time.getTime() +
String.format(
" temperature: %1$.1f humidity: %2$.2f",
temperature, humidity);
}
}
private Calendar lastTime = Calendar.getInstance();
{ // Adjust date to the half hour
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
}
private float lastTemp = 65.0f;
private int tempDirection = +1;
private float lastHumidity = 50.0f;
private int humidityDirection = +1;
private Random rand = new Random(47);
List<DataPoint> data = Collections.synchronizedList(
new ArrayList<DataPoint>());
class CollectData implements Runnable {
public void run() {
System.out.println("Collecting data");
synchronized(GreenhouseScheduler.this) {
// Pretend the interval is longer than it is:
lastTime.set(Calendar.MINUTE,
lastTime.get(Calendar.MINUTE) + 30);
// One in 5 chances of reversing the direction:
if(rand.nextInt(5) == 4)
tempDirection = -tempDirection;
// Store previous value:
lastTemp = lastTemp +
tempDirection * (1.0f + rand.nextFloat());
if(rand.nextInt(5) == 4)
humidityDirection = -humidityDirection;
lastHumidity = lastHumidity +
humidityDirection * rand.nextFloat();
// Calendar must be cloned, otherwise all
// DataPoints hold references to the same lastTime.
// For a basic object like Calendar, clone() is OK.
data.add(new DataPoint((Calendar)lastTime.clone(),
lastTemp, lastHumidity));
}
}
}
public static void main(String[] args) {
GreenhouseScheduler gh = new GreenhouseScheduler();
gh.schedule(gh.new Terminate(), 5000);
// Former "Restart" class not necessary:
gh.repeat(gh.new Bell(), 0, 1000);
gh.repeat(gh.new ThermostatNight(), 0, 2000);
gh.repeat(gh.new LightOn(), 0, 200);
gh.repeat(gh.new LightOff(), 0, 400);
gh.repeat(gh.new WaterOn(), 0, 600);
gh.repeat(gh.new WaterOff(), 0, 800);
gh.repeat(gh.new ThermostatDay(), 0, 1400);
gh.repeat(gh.new CollectData(), 500, 500);
}
}
複製代碼
DataPoint 能夠持有並顯示單個的數據段,而 CollectData 是被調度的任務,它在每次運行時,均可以產生仿真數據,並將其添加到 greenhouse 的 List 中。注意:volatile 和 synchornized 在適當的場合都獲得了應用,以防止任務之間的互相干涉。在持有 DataPoint 的 List 中的全部方法都是 synchronized 的,這是由於 List 被建立時,使用了synchronizedList()。
正常的鎖在任什麼時候刻都只容許一個任務訪問資源,而技術信號量容許 n 個任務同時訪問這個資源。能夠看作信號量是向外分發使用資源的許可證。
做爲一個示例,請考慮對象池的概念,他管理着數量有限的對象,當要使用對象時能夠遷出他們,而在用戶使用完畢時,能夠將他們籤回。這種功能能夠封裝到一個泛型類中:
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<T>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
available = new Semaphore(size, true);
// Load pool with objects that can be checked out:
for(int i = 0; i < size; ++i)
try {
// Assumes a default constructor:
items.add(classObject.newInstance());
} catch(Exception e) {
throw new RuntimeException(e);
}
}
public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkIn(T x) {
if(releaseItem(x))
available.release();
}
private synchronized T getItem() {
for(int i = 0; i < size; ++i)
if(!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
return null; // Semaphore prevents reaching here
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if(index == -1) return false; // Not in the list
if(checkedOut[index]) {
checkedOut[index] = false;
return true;
}
return false; // Wasn't checked out
}
}
複製代碼
在這個簡化的形式中,構造器 newInstance() 來把對象加載到池中。若是你須要一個新對象,那麼能夠調用 checkOut(),而且在使用完以後,將其遞交給 checkIn()。
boolean 類型的數組 checkedOut 能夠跟蹤被簽出的對象,而且能夠經過 getItem() 和 releaseItem() 方法來管理。而這些都將由 Semaphore 類型的 available 來加以確保,所以在 checkOut() 中,若是沒有任何信號量許可證可用,available 將阻塞調用過程。在 checkIn() 中,若是被簽入的對象有效,則會向信號量返回一個許可證。
咱們看下面一個示例:建立一個 Fat:
public class Fat {
private volatile double d; // Prevent optimization
private static int counter = 0;
private final int id = counter++;
public Fat() {
// Expensive, interruptible operation:
for(int i = 1; i < 10000; i++) {
d += (Math.PI + Math.E) / (double)i;
}
}
public void operation() { System.out.println(this); }
public String toString() { return "Fat id: " + id; }
}
複製代碼
咱們在池中管理這些對象,以限制這個構造器所形成的影響。咱們建立一個任務將簽出 Fat 對象,持有一段時間之後在將他們簽入:
class CheckoutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool) {
this.pool = pool;
}
public void run() {
try {
T item = pool.checkOut();
print(this + "checked out " + item);
TimeUnit.SECONDS.sleep(1);
print(this +"checking in " + item);
pool.checkIn(item);
} catch(InterruptedException e) {
// Acceptable way to terminate
}
}
public String toString() {
return "CheckoutTask " + id + " ";
}
}
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws Exception {
final Pool<Fat> pool =
new Pool<Fat>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < SIZE; i++)
exec.execute(new CheckoutTask<Fat>(pool));
print("All CheckoutTasks created");
List<Fat> list = new ArrayList<Fat>();
for(int i = 0; i < SIZE; i++) {
Fat f = pool.checkOut();
printnb(i + ": main() thread checked out ");
f.operation();
list.add(f);
}
Future<?> blocked = exec.submit(new Runnable() {
public void run() {
try {
// Semaphore prevents additional checkout,
// so call is blocked:
pool.checkOut();
} catch(InterruptedException e) {
print("checkOut() Interrupted");
}
}
});
TimeUnit.SECONDS.sleep(2);
blocked.cancel(true); // Break out of blocked call
print("Checking in objects in " + list);
for(Fat f : list)
pool.checkIn(f);
for(Fat f : list)
pool.checkIn(f); // Second checkIn ignored
exec.shutdown();
}
}
複製代碼
在 main() 中建立了一個持有 Fat 對象的 Pool,而一組 CheckoutTask 則開始操做這個 Pool,而後,main() 線程簽出池中的 Fat 對象,可是並不簽入他們。一旦池中全部的對象都被簽出,Semaphore 將再也不容許執行任何簽出的操做。blocked 的 run() 方法所以會被阻塞, 2 秒以後 cancel() 方法被調用,依次來掙脫 Future 的束縛。
SE5 中的新類庫包含了許多的新特性,爲咱們解決問題提供了許多的方便。如今 JDK 已經更新到了 8。要學習更多的新的內容還需不斷的學習新版本的 JDK。
掃描關注公衆號免費獲取全套 Java 編程思想筆記: