5 並行模式與算法

單例模式

單例模式的好處:java

  • 節省系統開銷(省略new花費的時間)
  • 減輕GC壓力(new次數減少,內存的使用頻率也隨之減小)

簡單的單例:設計模式

public class Singleton {
    private static Singleton instance = new Singleton();
    private Singleton (){}
    public static Singleton getInstance() {
        return instance;
    }
}

上面代碼須要注意幾點:緩存

  • Singleton構造函數設置爲private。防止開發人員隨意建立多餘的實例。
  • instance對象必須爲private(保證instance的安全)且是static的。工程方法getInstance()必須是static的。

缺點:Singleton實例在什麼時候建立不受控制,instance對象會在第一次初始化時被建立安全

好比,在Singleton中加入一個屬性:數據結構

public class Singleton {
    public static int STATUS = 1;
    //...和上面代碼一致
}

那麼,只要調用Singleton.STATUS就會實例化Singleton。而不用等到調用getInstance()方法。多線程

延遲加載的單例:併發

特色:只會在instance第一次使用時建立對象。框架

缺點:使用了synchronized鎖,併發壞境下對性能產生必定影響。dom

/**
 * 延遲的單例
 */
public class LazySingleton {
    private LazySingleton(){
    }
    private static LazySingleton instance = null;
    public static synchronized LazySingleton getInstance(){
        if (instance == null)
            instance = new LazySingleton();
        return instance;
    }
}

使用內部類實現的單例:異步

優勢:在真正須要時建立對象;性能優越。

public class StaticSingleton {

    private StaticSingleton(){
        System.out.println("StaticSingleton is created!");
    }

    private static class SingletonHolder{
        private static StaticSingleton instance = new StaticSingleton();
    }

    public static StaticSingleton getInstance(){
        return SingletonHolder.instance;
    }
    
}

不變模式

什麼是不變模式:經過使用一種不可改變的對象,依靠對象的不變性,達到在沒有同步操做的多線程環境下依然保持內部狀態的一致性和正確性。這就是不變模式。

核心思想:天生對多線程友好,對象一旦被建立,它的內部狀態將永遠不會發生改變。別的線程不能改變,它本身也不會改變本身。這和只讀屬性有點區別,只讀屬性能夠本身改變本身。

應用場景知足兩個條件:

  • 當對象建立後,其內部狀態和數據再也不發生任何變化。
  • 當對象須要被共享,被多線程頻繁訪問。

如何實現?須要注意4點:

  • 去除setter方法已經全部修改自身屬性的方法。
  • 將全部屬性設置爲私有,並用final標記,確保其不可修改。
  • 確保沒有子類能夠重載修改它的行爲。
  • 有一個能夠建立完整對象的構造函數。
public final class Product {
    //一下全部final定義的對象都只賦值一次,隨後不會改變
    private final String no;
    private final String name;
    private final double price;

    public Product(String no, String name, double price){
        this.no = no;
        this.name = name;
        this.price = price;
    }
    public String getNo() {
        return no;
    }
    public String getName() {
        return name;
    }
    public double getPrice() {
        return price;
    }
}

屬性中的final確保全部數據都只賦值一次,定義class爲final是不但願被繼承。以避免子類破壞該類的不變模式。

JDK中的例子:最典型的是String類,全部元數據的包裝類都是不變模式。

注意:不變模式是經過迴避問題而不是解決問題來處理多線程併發訪問控制的,不變對象不須要同步操做。併發中性能上會提升。

生產者-消費者模式

介紹:生產者-消費者模式是一個典型的多線程設計模式,它爲多線程間的協做提供了良好的解決方案。在這個模式中,通產有兩類線程,若干個生產者線程和若干個消費者線程,還有一個共享內存緩衝區。生產者線程負責提交用戶請求,消費者線程則負責具體處理生產者提交的任務。生產者和消費者之間經過共享內存緩衝區進行通訊

基本結構:

特色:生產者和消費者解耦,二者都不須要知道對方的存在。容許消費者和生產者之間存在時間差。

具體結構:

[生-消_2019-09-27_15-09-37](D:\user\80004133\Pictures\生-消_[](https://img2018.cnblogs.com/blog/1541399/201911/1541399-20191129145700779-1544050544.png

  • 生產者:提交用戶請求,提取用戶任務,並裝入內存緩衝區
  • 消費者:消費任務
  • 內存緩衝區:緩存任務,供生產者提交,消費者消費的緩衝區域。
  • 任務:緩衝區緩衝的數據結構
  • Main:使用生產者和消費者的客戶端

例子:實現一個基於生產者-消費者求整數平方的並行程序

生產者:它構建PCData對象,並放入BlockingQueue隊列中。

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingDeque<PCData> queue; //內存緩衝區,經過構造時外部引入,保證和消費者用的是一樣的內存緩衝區.
    private static AtomicInteger count = new AtomicInteger(); //總數,原子操做.
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingDeque<PCData> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        PCData data = null;
        Random random = new Random();
        System.out.println("start producter .." + Thread.currentThread().getId());
        try {
            while (isRunning) {
                Thread.sleep(random.nextInt(SLEEPTIME));
                data = new PCData(count.incrementAndGet());
                System.out.println(data + " is put into Queue");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {      //插入一個數據到緩衝區
                    System.out.println("failed to put data " + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }

    public void stop() {
        isRunning = false;
    }
}

消費者:從BlockingQueue中取得PCData對象,並進行相應計算。

public class Consumer implements Runnable {
    private BlockingDeque<PCData> queue;    //共享緩衝區
    private static final int SLEEPTIME = 1000;

    public Consumer(BlockingDeque<PCData> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("start Consumer id : " + Thread.currentThread().getId());
        Random r = new Random();
        try {
            while (true) {
                PCData data = queue.take();     //從共享緩衝區中拿到一個數據消費
                if (null != data) {
                    int re = data.getIntData() * data.getIntData();
                    System.out.println(MessageFormat.format("{0} * {0} = {1}", data.getIntData(), re));
                    Thread.sleep(r.nextInt(SLEEPTIME));
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}

共享數據模型:PCData

public class PCData {
    private final int intData;

    public PCData(int d) {
        intData = d;
    }

    public PCData(String d) {
        intData = Integer.parseInt(d);
    }

    public int getIntData() {
        return intData;
    }

    @Override
    public String toString() {
        return "PCData{" +
                "intData=" + intData +
                '}';
    }
}

主函數:建立一個生產者和消費者共用的共享緩衝區,建立生產者和消費者線程,並提交到線程池。

class Main {
    public static void main(String[] a) throws InterruptedException {
        BlockingDeque<PCData> queue = new LinkedBlockingDeque<>(10);
        Producer producter1 = new Producer(queue);
        Producer producter2 = new Producer(queue);
        Producer producter3 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(producter1);
        es.execute(producter2);
        es.execute(producter3);
        es.execute(consumer1);
        es.execute(consumer2);
        es.execute(consumer3);
        //運行時間
        Thread.sleep(1000 * 10);
        //中止生產者
        producter1.stop();
        producter2.stop();
        producter3.stop();
        //中止生產者後,預留時間給消費者執行
        Thread.sleep(1000 * 5);
        es.shutdown();
    }
}

高性能的生產者-消費者:無鎖的實現

現成的Disruptor框架是一個無鎖的緩存框架。

無鎖的緩存框架:Disruptor

介紹:Disruptor框架使用無鎖的方式實現了一個環形隊列,很是適用於生產者-消費者模式,好比事件和消息發佈。

用Disruptor實現生產者-消費者案例

提升消費者的響應時間:選擇合適的策略

消費者如何監控緩衝區中的信息呢?,Disruptor提供了幾種策略:

  • BlockingWaitStrategy(默認):使用BlockingWaitStrategy和使用BlockingQueue很是相似的,它們都使用鎖和條件進行數據的監控和線程的喚醒。最節省cup,但在高併發下性能最糟。
  • SleepingWaitStrategy:它先使用自旋,不成功再使用Thread.yield()讓出CPU,並最終使用LockSupport.parkNanos(1)進行線程休眠,以確保不佔用太多CPU。不太佔CPU,適用於對延時要求不高的場合。對生產者影響最小。典型的應用場景是異步日誌
  • YieldingWaitStrategy適用於低延時場景。很佔CPU(最好邏輯CPU數量大於消費者線程數)。消費者線程會不斷循環監控緩衝區變化,在循環內部,會使用Thread.yield()讓出CPU給別的線程執行時間。
  • BusySpinWaitStrategy最瘋狂的等待策略。特耗CPU,特低延遲。它就是一個死循環!只有在延遲很是苛刻的場合才考慮使用它。

CPU Cache的優化:解決僞共享問題

什麼是僞共享問題?

爲了提升CPU速度,CPU有一個高速緩存Cache。在高速緩存中,讀寫數據的最小單位爲緩存行,它是從主存(memory)複製到緩存(Cache)的最小單位。

若是兩個變量存放在一個緩存行時,在多線程訪問中,可能會相互影響彼此的性能。如圖,當cpu1的x被修改,cpu2上的緩存行就會變成無效,致使Cache沒法命中。若是CPU不能命中緩存,系統的吞吐量就會降低。

解決辦法

讓x變量一行,行的空位置讓padding填充。這樣,就能保證x被修改時,cpu2的緩存不會失效。

相關文章
相關標籤/搜索