【Thread】- JUC 5種線程同步工具

Semaphore:信號燈

特色:控制每次執行的線程數,達到控制線程併發的效果java

  • 測試代碼
package com.zhiwei.thread;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 信號燈:當線程空閒時自動去執行阻塞的線程,實現運行最優化
 */
public class SemaphoreTest {

public static void main(String[] args) {

    ExecutorService threadPool = Executors.newCachedThreadPool();

    // 定義信號燈,一次最多能處理3個線程
    Semaphore sp = new Semaphore(3); 

    for (int i = 0; i < 20; i++) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    
                    // 獲取信號,信號燈處理阻塞線程,最多容許3個線程訪問
                    sp.acquire(); 
                    
                    System.out.println(Thread.currentThread().getName() + ":進入信號燈,還有" +  sp.availablePermits() + "個信號");
                    
                    Thread.sleep(new Random().nextInt(2000));
                    
                    // 回收信號燈信號,供別的線程使用
                    sp.release();
                    System.out.println(Thread.currentThread().getName() + ":離開信號燈,還有" +  sp.availablePermits() + "個信號");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    threadPool.shutdown();
}
}

效果:緩存


CyclicBarrier

做用:控制線程運行的任務總量同步,例如等待全部人完成工做才能夠下班安全

測試代碼多線程

package com.zhiwei.thread;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {

	public static void main(String[] args) {
		
    // 思想:只有各個子任務都完成了採起執行下一步,若是有線程提早完成則等待
    ExecutorService threadPool = Executors.newCachedThreadPool();
    
    // 規定總的任務量:只有所有完成纔會進行下一步處理
    CyclicBarrier cb = new CyclicBarrier(3); 

    for (int i = 0; i < 3; i++) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep((long) (Math.random() * 10000));
                    System.out.println(Thread.currentThread().getName() + ":完成分任務,剩餘任務:" + (2 - cb.getNumberWaiting()));
                    
                    //若是前面2個線程阻塞 + 正在運行的線程 = 3,代表總任務完成
                    if (cb.getNumberWaiting() == 2) {
                        System.out.println("恭喜,總任務已完成!");
                    }

                    //分任務完成則等待,直到搜索的任務都完成,才執行await後面的代碼
                    cb.await(); 
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
    threadPool.shutdown();
}
}

效果:併發


CountDownLatch 發令槍

CountDownLatch:可理解爲全部線程都就緒以後就一塊兒執行,相似旅遊跟團,只有全部人都到了才能夠觸發dom

測試代碼ide

package com.zhiwei.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest {

	public static void main(String[] args) {

    // 緩存線程池:自動建立線程執行任務,若是線程執行完成任務則保存,供下次使用,若是線程不夠則動態建立
    ExecutorService threadPool = Executors.newCachedThreadPool();

    // 表示將完成3個任務量:任務計數器:多線程完成一些列操做
    CountDownLatch ct = new CountDownLatch(3);

    for (int i = 0; i < 3; i++) {

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {

                    Thread.sleep((long) (Math.random() * 10000));
                    ct.countDown(); // 減1
 					System.out.println(Thread.currentThread().getName() + "準備分任務,剩餘任務:" + ct.getCount());
                   

                    // 若是ct計數器不爲0則阻塞,爲0 則一塊兒執行
                    ct.await(); 
                     System.out.println(Thread.currentThread().getName() + "完成分任務");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
	if (ct.getCount() == 0) {
                        System.out.println("恭喜,總任務已完成!");
                    }
    threadPool.shutdown();
}
}

效果:測試


ArrayBlockingQueue

ArrayBlockingQueue: JDK內部提供的阻塞隊列,可以保證線程安全優化

主要同步方法:ui

put方法

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

take方法

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

測試代碼:

package com.zhiwei.thread;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * 阻塞隊列:可用於處理生產消費的問題
 * 
 * 實現機制:put/take利用重入鎖ReentrantLock實現同步效果
 */
public class ArrayBlockingQueueTest {

	public static void main(String[] args) {
		
        ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<String>(3);

        new Thread(new Runnable() {
	
			@Override
			public void run() {
				while(true){
					try {
						Thread.sleep(1000);
						abq.put("Hello Java World");
						System.out.println(Thread.currentThread().getName()+":放入數據,剩餘數據:"+abq.size());
					} catch (Exception e) {
						e.printStackTrace();
					}	
				}
			}
		}).start();
        
        new Thread(new Runnable() {
			@Override
			public void run() {
				while(true){
					try {
						Thread.sleep(10000);
						abq.take();
						System.out.println(Thread.currentThread().getName()+":取出數據,剩餘數據:"+abq.size());
					} catch (Exception e) {
						e.printStackTrace();
					}	
				}
			}
		}).start();
	}
}

效果:


Exchanger

做用:兩個線程之間交換數據,不過要兩個線程都先拿出數據,而後才能進行數據交換

測試代碼

package com.zhiwei.thread;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 *特色: 必定要等雙方將數據都拿出來後才能交換(只能是兩個線程)
 * @author Yang ZhiWei
 *
 */
public class ExchangerTest {

public static void main(String[] args) {
    
    ExecutorService threadPool = Executors.newCachedThreadPool();
    
    Exchanger<String> exchanger = new Exchanger<String>();
    
    threadPool.execute(new Runnable() {
        @Override
        public void run() {

            System.out.println(Thread.currentThread().getName()+":交換數據:"+Thread.currentThread().getName());
            
            try {
                String getDate = exchanger.exchange(Thread.currentThread().getName());
                
                System.out.println(Thread.currentThread().getName()+":收到數據:"+getDate);
            
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    
    threadPool.execute(new Runnable() {
        @Override
        public void run() {
            
            System.out.println(Thread.currentThread().getName()+"交換數據:"+Thread.currentThread().getName());
            try {
                
                String getDate = exchanger.exchange(Thread.currentThread().getName());
                
                System.out.println(Thread.currentThread().getName()+"收到數據:"+getDate);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    threadPool.shutdown();
}
}

效果:

相關文章
相關標籤/搜索