特色:控制每次執行的線程數,達到控制線程併發的效果java
package com.zhiwei.thread; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * 信號燈:當線程空閒時自動去執行阻塞的線程,實現運行最優化 */ public class SemaphoreTest { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); // 定義信號燈,一次最多能處理3個線程 Semaphore sp = new Semaphore(3); for (int i = 0; i < 20; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { // 獲取信號,信號燈處理阻塞線程,最多容許3個線程訪問 sp.acquire(); System.out.println(Thread.currentThread().getName() + ":進入信號燈,還有" + sp.availablePermits() + "個信號"); Thread.sleep(new Random().nextInt(2000)); // 回收信號燈信號,供別的線程使用 sp.release(); System.out.println(Thread.currentThread().getName() + ":離開信號燈,還有" + sp.availablePermits() + "個信號"); } catch (InterruptedException e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } }
效果:緩存
做用:控制線程運行的任務總量同步,例如等待全部人完成工做才能夠下班安全
測試代碼多線程
package com.zhiwei.thread; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { public static void main(String[] args) { // 思想:只有各個子任務都完成了採起執行下一步,若是有線程提早完成則等待 ExecutorService threadPool = Executors.newCachedThreadPool(); // 規定總的任務量:只有所有完成纔會進行下一步處理 CyclicBarrier cb = new CyclicBarrier(3); for (int i = 0; i < 3; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + ":完成分任務,剩餘任務:" + (2 - cb.getNumberWaiting())); //若是前面2個線程阻塞 + 正在運行的線程 = 3,代表總任務完成 if (cb.getNumberWaiting() == 2) { System.out.println("恭喜,總任務已完成!"); } //分任務完成則等待,直到搜索的任務都完成,才執行await後面的代碼 cb.await(); } catch (Exception e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } }
效果:併發
CountDownLatch:可理解爲全部線程都就緒以後就一塊兒執行,相似旅遊跟團,只有全部人都到了才能夠觸發dom
測試代碼:ide
package com.zhiwei.thread; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountDownLatchTest { public static void main(String[] args) { // 緩存線程池:自動建立線程執行任務,若是線程執行完成任務則保存,供下次使用,若是線程不夠則動態建立 ExecutorService threadPool = Executors.newCachedThreadPool(); // 表示將完成3個任務量:任務計數器:多線程完成一些列操做 CountDownLatch ct = new CountDownLatch(3); for (int i = 0; i < 3; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); ct.countDown(); // 減1 System.out.println(Thread.currentThread().getName() + "準備分任務,剩餘任務:" + ct.getCount()); // 若是ct計數器不爲0則阻塞,爲0 則一塊兒執行 ct.await(); System.out.println(Thread.currentThread().getName() + "完成分任務"); } catch (Exception e) { e.printStackTrace(); } } }); } if (ct.getCount() == 0) { System.out.println("恭喜,總任務已完成!"); } threadPool.shutdown(); } }
效果:測試
ArrayBlockingQueue: JDK內部提供的阻塞隊列,可以保證線程安全優化
主要同步方法:ui
put方法
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
測試代碼:
package com.zhiwei.thread; import java.util.concurrent.ArrayBlockingQueue; /** * 阻塞隊列:可用於處理生產消費的問題 * * 實現機制:put/take利用重入鎖ReentrantLock實現同步效果 */ public class ArrayBlockingQueueTest { public static void main(String[] args) { ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<String>(3); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(1000); abq.put("Hello Java World"); System.out.println(Thread.currentThread().getName()+":放入數據,剩餘數據:"+abq.size()); } catch (Exception e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(10000); abq.take(); System.out.println(Thread.currentThread().getName()+":取出數據,剩餘數據:"+abq.size()); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } }
效果:
做用:兩個線程之間交換數據,不過要兩個線程都先拿出數據,而後才能進行數據交換
測試代碼
package com.zhiwei.thread; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** *特色: 必定要等雙方將數據都拿出來後才能交換(只能是兩個線程) * @author Yang ZhiWei * */ public class ExchangerTest { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); Exchanger<String> exchanger = new Exchanger<String>(); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+":交換數據:"+Thread.currentThread().getName()); try { String getDate = exchanger.exchange(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName()+":收到數據:"+getDate); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"交換數據:"+Thread.currentThread().getName()); try { String getDate = exchanger.exchange(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName()+"收到數據:"+getDate); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.shutdown(); } }
效果: