java併發學習

Java Concurrent Learn


對java併發的一些學習,以前總結在幕布上,爲了加深印象,這裏從新學習下。java

併發級別

  • 阻塞 - 對臨界區的代碼串行化執行數組

    1. synchronized
    2. ReentrantLock
  • 無飢餓 - 在優先級併發中,可能低優先級的永遠不能執行的狀況,須要避免就叫無飢餓併發

    1. 隊列排隊請求資源
    2. 公平鎖
  • 無障礙 - 讀寫鎖控制力度不一樣,這裏主要是樂觀讀app

    1. 樂觀鎖-一致性標記,狀態版本控制
  • 無鎖 - 比較交換,不是真正的不加鎖,而是用java本地Unsaft 的cas方法ide

    1. CAS - compare and swap
  • 無等待 - 讀寫不等待,在寫的時候,從新複製一個對象對其操做,操做完以後在合適的時機賦值回去,這種只適合極少許寫,大量讀的狀況函數

    1. RCU - read copy update

併發原則 - 三大原則

  • 原子性 - atomic*原子操做類
  • 可見性 - volatile關鍵字保證
  • 有序性 - happen-before規則

線程建立

  • 實現Runnable接口- 其實全部建立方式都是須要實現Runnable接口
public class RunnableTest implements Runnable {
        @Override
        public void run() {}
    }
  • 繼續Thread
public class ThreadTest extends Thread {
        @Override
        public void run() {}
    }
  • 用Callable接口實現類構造FutureTask
public class CallableTest implements Callable {
        @Override
        public Object call() throws Exception {
            return null;
        }
    }
    
    FutureTask futureTask = new FutureTask(new CallableTest());

同步控制

  • synchronized Object.wait() Object.notify

synchronized同步關鍵字,對類,方法,對象進行修飾,修飾後獲取對應監視器才能進入修飾的臨界區
Object.wait(),Object.notify(),Object.notifyAll()方法必須在獲取到監視器對象後才能調用,即要寫在synchronized內部
Object.wait()以後會將當前線程放入對象的wait set中,釋放獲取的監視器對象
Object.notify()會從wait set隨機對一線程喚醒,Object.notifyAll()會從wait set全部線程喚醒,不會當即釋放獲取的監視器,待結束臨界區才釋放,喚醒不表明被喚醒線程繼續執行,其還須要爭搶獲取監視器才能繼續執行工具

/**
 * @author tomsun28
 * @date 19:49 2020-02-19
 */
public class Demo {

    private static final Logger LOGGER = LoggerFactory.getLogger(Demo.class);
    private final static Demo demo = new Demo();

    public static void waitDemo() {
        synchronized (demo) {
            try {
                // 調用wait方法必須獲取對應的監視器,在wait以後,釋放監視器
                LOGGER.info("獲取demo對象監視器1");
                demo.wait();
                LOGGER.info("釋放demo對象監視器1");
                Thread.sleep(2000);
                LOGGER.info("釋放demo對象監視器2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void notifyDemo() {
        // 從等待隊列中喚醒一個
        synchronized (demo) {
            LOGGER.info("獲取demo對象監視器2");
            demo.notify();
            LOGGER.info("釋放demo對象監視器3");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LOGGER.info("釋放demo對象監視器4");
        }
    }

    public static void main(String[] args) {
        new Thread(() -> waitDemo()).start();
        new Thread(() -> notifyDemo()).start();
    }
}
  • ReentrantLock Condition

重入鎖(可重複獲取的鎖),重入鎖內部的Condition
ReentrantLock lock幾回,就須要unlock幾回,lock以後須要緊跟tay finally代碼,finally第一行就要unlock
Condition功能用法和Object.wait notify差很少,只不過其搭配重入鎖使用,不用在使用前獲取對象監視器
Condition.await()會釋放當前擁有的鎖,並掛起當前線程
Conditon.singal()會隨機喚醒當前await的線程,Condition.singalAll()會喚醒全部,固然同Object.notify()喚醒不表明被喚醒線程能夠繼續執行,其還須要爭搶獲取到鎖後繼續執行性能

/**
 * @author tomsun28
 * @date 22:15 2019-10-27
 */
public class ReenterLockCondition {

    public static void main(String[] args) throws InterruptedException{
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();
        new Thread(() -> {
            // 先獲取重入鎖
            System.out.println("0 - 獲取鎖");
            reentrantLock.lock();
            try {
                System.out.println("0 - 釋放鎖,掛起線程");
                // 同 Object.wait類似 其和ReentrantLock綁定,這裏就是釋放鎖,掛起線程
                condition.await();
                System.out.println("0 - 等待鎖釋放,thread is fin");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }).start();
        System.out.println("1 - 睡4秒");
        Thread.sleep(4000);
        System.out.println("1 - 等待獲取鎖");
        reentrantLock.lock();
        try {
            System.out.println("1 - condition 喚醒線程");
            condition.signal();
            Thread.sleep(4000);
            System.out.println("1 - 睡4秒釋放鎖");
        } finally {
            reentrantLock.unlock();
        }
    }
}
  • Semaphore

信號量-不一樣於鎖,其能夠控制進入臨界區的線程數量,鎖只能是一個線程進入臨界區
構造函數能夠初始化信號數量,semaphore.acquire()消耗一個信號,semaphore.release()釋放一個信號,當所剩的信號爲0時則不容許進入臨界區學習

/**
 * @author tomsun28
 * @date 22:53 2019-10-27
 */
public class SemaphoreDemo {

    public static void main(String[] args) {
        final Semaphore semaphore = new Semaphore(5);
        final AtomicInteger atomicInteger = new AtomicInteger(1);
        Runnable thread1 = () -> {
            try {
                semaphore.acquire();
                Thread.sleep(2000);
                System.out.println(atomicInteger.getAndAdd(1));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        };
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        IntStream.range(0,20).forEach(i -> executorService.execute(thread1));
        executorService.shutdown();
    }
}
  • ReetrantReadWriteLock

讀寫鎖,以前的重入鎖仍是synchronized,對於讀寫都是一樣的加鎖
ReadWriteLock把讀和寫的加鎖分離,讀讀不互斥,讀寫互斥,寫寫互斥提升了鎖性能,
須要注意,當有讀鎖存在時,寫鎖不能被獲取,在大量一直在進行讀鎖的時候,會形成寫鎖的飢餓,即寫鎖一直獲取不到鎖ui

/**
 * @author tomsun28
 * @date 21:35 2020-02-19
 */
public class ReadWriteLockDemo {

    public static void main(String[] args) {
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        new Thread(() -> {
            readWriteLock.readLock().lock();
            readWriteLock.readLock().lock();
            try {
                System.out.println("獲取2讀鎖");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
              e.printStackTrace();
            } finally {
                readWriteLock.readLock().unlock();
                readWriteLock.readLock().unlock();
                System.out.println("釋放2讀鎖");
            }
        }).start();

        new Thread(() -> {
            readWriteLock.readLock().lock();
            try {
                System.out.println("獲取3讀鎖");
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                readWriteLock.readLock().unlock();
                System.out.println("釋放3讀鎖");
            }
        }).start();

        new Thread(() -> {
            readWriteLock.writeLock().lock();
            try {
                System.out.println("獲取1寫鎖");
            } finally {
                readWriteLock.writeLock().unlock();
                System.out.println("釋放1寫鎖");
            }
        }).start();
    }

}
  • StampedLock

上面的讀寫鎖有一個缺點,便可能寫飢餓,StampedLock解決這個問題
使用樂觀讀 - 使用標記戳來判斷此次讀的過程是否有寫的過程,如有則從新讀取或轉化爲悲觀讀

/**
 * @author tomsun28
 * @date 21:57 2020-02-19
 */
public class StampedLockDemo {

    public static void main(String[] args) {
        StampedLock stampedLock = new StampedLock();
        AtomicInteger flag = new AtomicInteger(0);
        new Thread(() -> {
            long writeStamped = stampedLock.writeLock();
            try {
                System.out.println("獲取寫鎖,休息1.006秒");
                Thread.sleep(1006);
                System.out.println("當前值爲: " + flag.getAndAdd(1));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally{
                stampedLock.unlockWrite(writeStamped);
                System.out.println("釋放寫鎖");
            }

        }).start();

        new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long stamp = stampedLock.tryOptimisticRead();
            int readValue = flag.intValue();
            while (!stampedLock.validate(stamp)) {
                System.out.println("重試樂觀讀");
                stamp = stampedLock.tryOptimisticRead();
                readValue = flag.intValue();
            }
            System.out.println("樂觀讀爲:" + readValue);
        }).start();
    }
}
  • CountDownLatch

併發減數攔截器,await()攔截阻塞當前線程,當減數器爲0時通行

/**
 * @author tomsun28
 * @date 23:35 2019-10-27
 */
public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        AtomicInteger atomicInteger = new AtomicInteger(1);
        Thread threadd = new Thread(() -> {
            try {
                Thread.sleep(1000);
                System.out.println(" thred check complete" + atomicInteger.getAndIncrement());
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        IntStream.range(0, 10).forEach(i -> executorService.submit(threadd));
        countDownLatch.await();
        System.out.println("fin");
    }
}
  • CyclicBarrier

柵欄,功能相似於ConutDownLatch,不過能夠循環重置處理
ConutDownLatch是減數器爲0時觸發,並重置減數器循環等待下一次觸發,而且能夠提供觸發方法

/**
 * @author tomsun28
 * @date 23:59 2019-10-27
 */
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> {
            System.out.println("this cyclic ok");
        });
        AtomicInteger atomicInteger = new AtomicInteger(1);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        IntStream.range(0, 100).forEach(i -> {
            executorService.execute(() -> {
                try {
                    Thread.sleep(1000);
                    System.out.println(atomicInteger.getAndIncrement());
                    cyclicBarrier.await();
                } catch (BrokenBarrierException | InterruptedException e) {
                    e.printStackTrace();
                }
            });
        });
        executorService.shutdown();
    }
}
  • LockSupport

同步工具類,比起wait notify要方便不少
LockSupport.park()是在當前線程阻塞掛起
LockSupport.unpark(thread)喚醒指定線程

/**
 * @author tomsun28
 * @date 00:39 2019-10-28
 */
public class LockSupportDemo {

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
           try {
               LockSupport.park();
               LockSupport.park();
               System.out.println("fin");
           } catch (Exception e) {
               e.printStackTrace();
           }
        });
        thread.start();
        Thread.sleep(3000);
        System.out.println("begin");
        LockSupport.unpark(thread);
        Thread.sleep(3000);
        LockSupport.unpark(thread);

    }
}

線程池

  • Executors 提供的線程池

    1. newFixedThreadPool(threadNum) 最大與核心線程數量爲threadNum的linked隊列線程池
    2. newSingleThreadExecutor() 最大與核心線程數量爲1的linked隊列線程池
    3. newCachedThreadPool 核心數量爲0,最大線程數量爲Integer.MAX_VALUE的線程池,線程數量會跟插入的任務同步增加
    4. newSingleThreadScheduledExecutor() 核心數量爲1,最大線程數量爲Integer.MAX_VALUE的延遲隊列線程池
    5. newWorkStealingPool() ForkJoin線程池
  • ThreadPoolExecutor 自定義構造線程池

    1. corePoolSize 核心線程數量
    2. maxPoolSize 最大線程數量
    3. keepAliveTime 超出核心線程數的線程的空閒存活時間
    4. unit 時間單位
    5. blockingQueue 存放未被執行的任務線程的阻塞隊列,當隊列滿時,纔會去從核心數量開始往最大線程數方向新增運行線程
    6. threadFactory 線程建立工廠
    7. rejectedHandler 任務滿拒絕策略
  • ForkJoinPool

    ForkJoinTask

    RecursiveTask<T>
    RecursiveAction

併發容器

  • 非併發容器轉併發處理 Collections.synchronziedXXX()

其本質是在容器外層加了synchronized(mutex)

  • 併發map - CpncurrentHashMap - 1.7 segment 1.8 synchronzied cas
  • 併發ArrayList - RCU - read copy update - CopyOnWriteArrayList
  • 併發Queue - ConcurrentLinkedQueue 非阻塞隊列
  • 阻塞隊列 BlockingQueue - ReentrantLock Condition

    1. ArrayBlockingQueue數組阻塞隊列
    2. LinkedBlockingQueue列表阻塞隊列
    3. SynchronousQueue此阻塞隊列無容量當即轉發
    4. PriorityBlockingQueue優先級阻塞隊列
    5. DelayedWorkQueue延遲阻塞隊列

原子類

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong
  • LongAddr
  • AtomicRefrence
  • AtomicStampedRefreence

other

  • ThreadLocal線程本地變量
  • volatile可見關鍵字
  • Future非阻塞模式
  • CompletableFuture非阻塞


轉載請註明 from tomsun28

相關文章
相關標籤/搜索