Java 線程併發策略

1 什麼是併發問題。

  多個進程或線程同時(或着說在同一段時間內)訪問同一資源會產生併發問題。java

  

2 java中synchronized的用法

  1. 用法1
    public class Test{
        public synchronized void print(){
            ....;
        } 
    }

    某線程執行print()方法,則該對象將加鎖。其它線程將沒法執行該對象的全部synchronized塊。linux

  2. 用法2
    public class Test{
        public void print(){
            synchronized(this){//鎖住本對象
                ...;
            }
        }
    }

    同用法1, 但更能體現synchronized用法的本質。數據庫

  3. 用法3
    public class Test{
        private String a = "test";
        public void print(){
            synchronized(a){//鎖住a對象
                ...;
            }
        }
        public synchronized void t(){
            ...; //這個同步代碼塊不會由於print()而鎖定.
        }
    }

    執行print(),會給對象a加鎖,注意不是給Test的對象加鎖,也就是說 Test對象的其它synchronized方法不會由於print()而被鎖。同步代碼塊執行完,則釋放對a的鎖。緩存

    爲了鎖住一個對象的代碼塊而不影響該對象其它 synchronized塊的高性能寫法:服務器

    public class Test{
        private byte[] lock = new byte[0];
        public void print(){
            synchronized(lock){
                ...;
            }
        }
        public synchronized void t(){
            ...; 
        }
    }

     

  4. 靜態方法的鎖
    public class Test{
        public synchronized static void execute(){
            ...;
        }
    }

     

    效果同多線程

    public class Test{
        public static void execute(){
            synchronized(TestThread.class){
                ...;
            }
        }
    }

     

 

3 Java中的鎖與排隊機制。

鎖就是阻止其它進程或線程進行資源訪問的一種方式,即鎖住的資源不能被其它請求訪問。在JAVA中,sychronized關鍵字用來對一個對象加鎖。好比:併發

public class MyStack {
    int idx = 0;
    char [] data = new char[6];

    public synchronized void push(char c) {
        data[idx] = c;
        idx++;
    }

    public synchronized char pop() {
        idx--;
        return data[idx];
    }

    public static void main(String args[]){
        MyStack m = new MyStack();
        /**
           下面對象m被加鎖。嚴格的說是對象m的全部synchronized塊被加鎖。
           若是存在另外一個試圖訪問m的線程T,那麼T沒法執行m對象的push和
           pop方法。
        */
        m.pop();//對象m被加鎖。
    }
}

 

   Java的加鎖解鎖跟多我的排隊等一個公共廁位徹底同樣。第一我的進去後順手把門從裏面鎖住,其它人只好排隊等。第一我的結束後出來時,門纔會打開(解鎖)。輪到第二我的進去,一樣他又會把門從裏面鎖住,其它人繼續排隊等待。socket

  用廁所理論能夠很容易明白: 一我的進了一個廁位,這個廁位就會鎖住,但不會致使另外一個廁位也被鎖住,由於一我的不能同時蹲在兩個廁位裏。對於Java 就是說:Java中的鎖是針對同一個對象的,不是針對class的。看下例:分佈式

MyStatck m1 = new MyStack();
MyStatck m2 = new Mystatck();
m1.pop();
m2.pop();  

 

  m1對象的鎖是不會影響m2的鎖的,由於它們不是同一個廁位。就是說,假設有 3線程t1,t2,t3操做m1,那麼這3個線程只可能在m1上排隊等,假設另2個線程 t8,t9在操做m2,那麼t8,t9只會在m2上等待。而t2和t8則沒有關係,即便m2上的鎖釋放了,t1,t2,t3可能仍要在m1上排隊。緣由無它,不是同一個廁位耳。函數

  Java不能同時對一個代碼塊加兩個鎖,這和數據庫鎖機制不一樣,數據庫能夠對一條記錄同時加好幾種不一樣的鎖,

 

4 什麼時候釋放鎖?

  通常是執行完畢同步代碼塊(鎖住的代碼塊)後就釋放鎖,也能夠用wait()方式半路上釋放鎖。wait()方式就比如蹲廁所到一半,忽然發現下水道堵住了,不得已必須出來站在一邊,好讓修下水道師傅(準備執行notify的一個線程)進去疏通馬桶,疏通完畢,師傅大喊一聲: "已經修好了"(notify),剛纔出來的同志聽到後就從新排隊。注意啊,必須等師傅出來啊,師傅不出來,誰也進不去。也就是說notify後,不是其它線程立刻能夠進入封鎖區域活動了,而是必須還要等notify代碼所在的封鎖區域執行完畢從而釋放鎖之後,其它線程纔可進入。

這裏是wait與notify代碼示例:

public synchronized char pop() {
    char c;
    while (buffer.size() == 0) {
        try {
            this.wait(); //從廁位裏出來
        } catch (InterruptedException e) {
            // ignore it...
        }
    }
    c = ((Character)buffer.remove(buffer.size()-1)).
        charValue();
    return c;
}

public synchronized void push(char c) {
    this.notify(); //通知那些wait()的線程從新排隊。注意:僅僅是通知它們從新排隊。
    Character charObj = new Character(c);
    buffer.addElement(charObj);
}//執行完畢,釋放鎖。那些排隊的線程就能夠進來了。

再深刻一些。

因爲wait()操做而半路出來的同志沒收到notify信號前是不會再排隊的,他會在旁邊看着這些排隊的人(其中修水管師傅也在其中)。注意,修水管的師傅不能插隊,也得跟那些上廁所的人同樣排隊,不是說一我的蹲了一半出來後,修水管師傅就能夠忽然冒出來而後馬上進去搶修了,他要和原來排隊的那幫人公平競爭,由於他也是個普通線程。若是修水管師傅排在後面,則前面的人進去後,發現堵了,就wait,而後出來站到一邊,再進去一個,再wait,出來,站到一邊,只到師傅進去執行notify. 這樣,一下子功夫,排隊的旁邊就站了一堆人,等着notify.

終於,師傅進去,而後notify了,接下來呢?

1. 有一個wait的人(線程)被通知到。
2. 爲何被通知到的是他而不是另一個wait的人?取決於JVM.咱們沒法預先
   判斷出哪個會被通知到。也就是說,優先級高的不必定被優先喚醒,等待
   時間長的也不必定被優先喚醒,一切不可預知!(固然,若是你瞭解該JVM的
   實現,則能夠預知)。
3. 他(被通知到的線程)要從新排隊。
4. 他會排在隊伍的第一個位置嗎?回答是:不必定。他會排最後嗎?也不必定。
   但若是該線程優先級設的比較高,那麼他排在前面的機率就比較大。
5. 輪到他從新進入廁位時,他會從上次wait()的地方接着執行,不會從新執行。
   噁心點說就是,他會接着拉巴巴,不會從新拉。
6. 若是師傅notifyAll(). 則那一堆半途而廢出來的人所有從新排隊。順序不可知。

 

Java DOC 上說,The awakened threads will not be able to proceed until the current thread relinquishes the lock on this object(當前線程釋放鎖前,喚醒的線程不能去執行)。

這用廁位理論解釋就是顯而易見的事。

 

 

5 Lock的使用

 

用synchronized關鍵字能夠對資源加鎖。用Lock關鍵字也能夠。它是JDK1.5中新增內容。用法以下:

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) 
                notFull.await();
            items[putptr] = x; 
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) 
                notEmpty.await();
            Object x = items[takeptr]; 
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    } 
}

 

(注:這是JavaDoc裏的例子,是一個阻塞隊列的實現例子。所謂阻塞隊列,就是一個隊列若是滿了或者空了,都會致使線程阻塞等待。Java裏的 ArrayBlockingQueue提供了現成的阻塞隊列,不須要本身專門再寫一個了。)

一個對象的lock.lock()和lock.unlock()之間的代碼將會被鎖住。這種方式比起synchronize好在什麼地方?簡而言之,就是對wait的線程進行了分類。用廁位理論來描述,則是那些蹲了一半而從廁位裏出來等待的人緣由可能不同,有的是由於馬桶堵了,有的是由於馬桶沒水了。通知(notify)的時候,就能夠喊:由於馬桶堵了而等待的過來從新排隊(好比馬桶堵塞問題被解決了),或者喊,由於馬桶沒水而等待的過來從新排隊(好比馬桶沒水問題被解決了)。這樣能夠控制得更精細一些。不像synchronize裏的wait和notify,不論是馬桶堵塞仍是馬桶沒水都只能喊:剛纔等待的過來排隊!假如排隊的人進來一看,發現原來只是馬桶堵塞問題解決了,而本身渴望解決的問題(馬桶沒水)還沒解決,只好再回去等待(wait),白進來轉一圈,浪費時間與資源。

 Lock方式與synchronized對應關係:

Lock await signal signalAll
synchronized wait notify notifyAll

注意:不要在Lock方式鎖住的塊裏調用wait、notify、notifyAll

 

 

6 利用管道進行線程間通訊 

  原理簡單。兩個線程,一個操做PipedInputStream,一個操做 PipedOutputStream。PipedOutputStream寫入的數據先緩存在Buffer中,若是 Buffer滿,此線程wait。PipedInputStream讀出Buffer中的數據,若是Buffer 沒數據,此線程wait。

jdk1.5中的阻塞隊列可實現一樣功能。

 例1 這個例子實際上只是單線程,還談不上線程間通訊,但不妨一看。 

package io;
import java.io.*;
public class PipedStreamTest {
    public static void main(String[] args) {
        PipedOutputStream ops=new PipedOutputStream();
        PipedInputStream pis=new PipedInputStream();
        try{
            ops.connect(pis);//實現管道鏈接
            new Producer(ops).run();
            new Consumer(pis).run();
        }catch(Exception e){
            e.printStackTrace();
        }

    }
}

//生產者
class Producer implements Runnable{
    private PipedOutputStream ops;
    public Producer(PipedOutputStream ops)
    {
        this.ops=ops;
    }

    public void run()
    {
        try{
            ops.write("hell,spell".getBytes());
            ops.close();
        }catch(Exception e)
            {e.printStackTrace();}
    }
}

//消費者
class Consumer implements Runnable{
    private PipedInputStream pis;
    public Consumer(PipedInputStream pis)
    {
        this.pis=pis;
    }

    public void run()
    {
        try{
            byte[] bu=new byte[100];
            int len=pis.read(bu);
            System.out.println(new String(bu,0,len));
            pis.close();
        }catch(Exception e)
            {e.printStackTrace();}
    }
} 

 

 例2 對上面的程序作少量改動就成了兩個線程。

package io;
import java.io.*;
public class PipedStreamTest {
    public static void main(String[] args) {
        PipedOutputStream ops=new PipedOutputStream();
        PipedInputStream pis=new PipedInputStream();
        try{
            ops.connect(pis);//實現管道鏈接
            Producer p = new Producer(ops);
            new Thread(p).start();
            Consumer c = new Consumer(pis);
            new Thread(c).start();
        }catch(Exception e){
            e.printStackTrace();
        }

    }
}

//生產者
class Producer implements Runnable{
    private PipedOutputStream ops;
    public Producer(PipedOutputStream ops)
    {
        this.ops=ops;
    }

    public void run()
    {
        try{
            for(;;){
                ops.write("hell,spell".getBytes());
                ops.close();
            }
        }catch(Exception e)
            {e.printStackTrace();}
    }
}

//消費者
class Consumer implements Runnable{
    private PipedInputStream pis;
    public Consumer(PipedInputStream pis)
    {
        this.pis=pis;
    }

    public void run()
    {
        try{
            for(;;){
                byte[] bu=new byte[100];
                int len=pis.read(bu);
                System.out.println(new String(bu,0,len));
            }
            pis.close();
        }catch(Exception e)
            {e.printStackTrace();}
    }
}

 

 例3. 這個例子更加貼進應用

import java.io.*;
       
public class PipedIO { //程序運行後將sendFile文件的內容拷貝到receiverFile文件中
    public static void main(String args[]){       
        try{//構造讀寫的管道流對象       
            PipedInputStream pis=new PipedInputStream();       
            PipedOutputStream pos=new PipedOutputStream();       
            //實現關聯       
            pos.connect(pis);       
            //構造兩個線程,而且啓動。           
            new Sender(pos,"c:\\text2.txt").start();           
            new Receiver(pis,"c:\\text3.txt").start();         
        }catch(IOException e){       
            System.out.println("Pipe Error"+ e);       
        }       
    }       
}       
//線程發送       
class Sender extends Thread{           
    PipedOutputStream pos;       
    File file;       
    //構造方法       
    Sender(PipedOutputStream pos, String fileName){       
        this.pos=pos;       
        file=new File(fileName);       
    }          
    //線程運行方法       
    public void run(){          
        try{       
            //讀文件內容       
            FileInputStream fs=new FileInputStream(file);       
            int data;       
            while((data=fs.read())!=-1){       
                //寫入管道始端       
                pos.write(data);       
            }       
            pos.close();                        
        }       
        catch(IOException e) {       
            System.out.println("Sender Error" +e);       
        }       
    }       
}
       
//線程讀       
class Receiver extends Thread{       
    PipedInputStream pis;       
    File file;       
    //構造方法       
    Receiver(PipedInputStream pis, String fileName){         
        this.pis=pis;       
        file=new File(fileName);       
    }          
    //線程運行       
    public void run(){          
        try {       
            //寫文件流對象       
            FileOutputStream fs=new FileOutputStream(file);       
            int data;       
            //從管道末端讀       
            while((data=pis.read())!=-1){
       
                //寫入本地文件       
                fs.write(data);       
            }       
            pis.close();            
        }       
        catch(IOException e){       
            System.out.println("Receiver Error" +e);       
        }       
    }       
}

 

 

7 阻塞隊列

 

阻塞隊列能夠代替管道流方式來實現進水管/排水管模式(生產者/消費者).JDK1.5提供了幾個現成的阻塞隊列. 如今來看ArrayBlockingQueue的代碼以下:

這裏是一個阻塞隊列 

BlockingQueue<Object> blockingQ = new ArrayBlockingQueue<Object> 10;

 

 一個線程從隊列裏取

for(;;){
    Object o = blockingQ.take();//隊列爲空,則等待(阻塞)
}

 

 另外一個線程往隊列存

for(;;){
    blockingQ.put(new Object());//隊列滿,則等待(阻塞)
}

 

 可見,阻塞隊列使用起來比管道簡單

 

8 使用Executors、Executor、ExecutorService、ThreadPoolExecutor

可使用線程管理任務。還可使用jdk1.5提供的一組類來更方便的管理任務。從這些類裏咱們能夠體會一種面向任務的思惟方式。這些類是:

  1. Executor接口。使用方法:
    Executor executor = anExecutor;//生成一個Executor實例。
    executor.execute(new RunnableTask1());

    用意:使用者只關注任務執行,不用操心去關注任務的建立、以及執行細節等這些第三方實現者關心的問題。也就是說,把任務的調用執行和任務的實現解耦。

    實際上,JDK1.5中已經有該接口出色的實現。夠用了。

  2. Executors是一個如同Collections同樣的工廠類或工具類,用來產生各類不一樣接口的實例。
  3. ExecutorService接口它繼承自Executor. Executor只管把任務扔進 executor()裏去執行,剩餘的事就無論了。而ExecutorService則不一樣,它會多作點控制工做。好比:
    class NetworkService {
        private final ServerSocket serverSocket;
        private final ExecutorService pool;
    
        public NetworkService(int port, int poolSize) throws IOException {
            serverSocket = new ServerSocket(port);
            pool = Executors.newFixedThreadPool(poolSize);
        }
     
        public void serve() {
            try {
                for (;;) {
                    pool.execute(new Handler(serverSocket.accept()));
                }
            } catch (IOException ex) {
                pool.shutdown(); //再也不執行新任務
            }
        }
    }
    
    class Handler implements Runnable {
        private final Socket socket;
        Handler(Socket socket) { this.socket = socket; }
        public void run() {
            // read and service request
        }
    }

     ExecutorService(也就是代碼裏的pool對象)執行shutdown後,它就不能再執行新任務了,但老任務會繼續執行完畢,那些等待執行的任務也再也不等待了。

  4. 任務提交者與執行者通信
    public static void main(String args[])throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Callable<String> task = new Callable<String>(){
            public String call()throws Exception{
                return "test";
            }
        };
        Future<String> f = executor.submit(task); 
        String result = f.get();//等待(阻塞)返回結果
        System.out.println(result);
        executor.shutdown();                
    }

     

    Executors.newSingleThreadExecutor()取得的Executor實例有如下特性:

    1. 任務順序執行. 好比:
      executor.submit(task1);
      executor.submit(task2);

       

      必須等task1執行完,task2才能執行。

    2. task1和task2會被放入一個隊列裏,由一個工做線程來處理。即:一共有2個線程(主線程、處理任務的工做線程)。
  5. 其它的類請參考Java Doc

 

 

9 併發流程控制

 CountDownLatch 門插銷計數器

  1. 啓動線程,而後等待線程結束。即經常使用的主線程等全部子線程結束後再執行的問題。
    public static void main(String[] args)throws Exception {
        // TODO Auto-generated method stub
        final int count=10;
        final CountDownLatch completeLatch = new CountDownLatch(count);//定義了門插銷的數目是10
                    
        for(int i=0;i<count;i++){
            Thread thread = new Thread("worker thread"+i){
                    public void run(){
                        //do xxxx                                   
                        completeLatch.countDown();//減小一根門插銷
                    }
                };
            thread.start();
        }           
        completeLatch.await();//若是門插銷還沒減完則等待。
    } 

     

    JDK1.4時,經常使用辦法是給子線程設置狀態,主線程循環檢測。易用性和效率都很差。

  2. 啓動不少線程,等待通知才能開始
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        final CountDownLatch startLatch = new CountDownLatch(1);//定義了一根門插銷
    
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread("worker thread" + i) {
                    public void run() {
                        try {
                            startLatch.await();//若是門插銷還沒減完則等待
                        } catch (InterruptedException e) {
    
                        }
                        // do xxxx
                    }
                };
            thread.start();
        }
        startLatch.countDown();//減小一根門插銷
    }

     

 

 

 CycliBarrier. 等全部線程都達到一個起跑線後才能開始繼續運行。 

public class CycliBarrierTest implements Runnable {
    private CyclicBarrier barrier;

    public CycliBarrierTest(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    public void run() {
        //do xxxx;
        try {
            this.barrier.await();//線程運行至此會檢查是否其它線程都到齊了,沒到齊就繼續等待。到齊了就執行barrier的run函數體裏的內容
        } catch (Exception e) {

        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        //參數2表明兩個線程都達到起跑線纔開始一塊兒繼續往下執行
        CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
                public void run() {
                    //do xxxx;
                }
            });
        Thread t1 = new Thread(new CycliBarrierTest(barrier));         
        Thread t2 = new Thread(new CycliBarrierTest(barrier));
        t1.start();
        t2.start();
    }

}

 

 簡化了傳統的用計數器+wait/notifyAll來實現該功能的方式。

 

10 併發3定律

  1. Amdahl定律. 給定問題規模,可並行化部分佔12%,那麼即便把並行運用到極致,系統的性能最多也只能提升1/(1-0.12)=1.136倍。即:並行對提升系統性能有上限。
  2. Gustafson定律. Gustafson定律說Amdahl定律沒有考慮隨着cpu的增多而有更多的計算能力可被使用。其本質在於更改問題規模從而能夠把Amdahl定律中那剩下的88%的串行處理並行化,從而能夠突破性能門檻。本質上是一種空間換時間。
  3. Sun-Ni定律. 是前兩個定律的進一步推廣。其主要思想是計算的速度受限於存儲而不是CPU的速度. 因此要充分利用存儲空間等計算資源,儘可能增大問題規模以產生更好/更精確的解.

 

11 由併發到並行

  計算機識別物體須要飛速的計算,以致於芯片發熱發燙,而人在識別物體時卻一目瞭然,卻並不會致使某個腦細胞被燒熱燒焦(誇張)而感到不適,是因爲大腦是一個分佈式並行運行系統,就像google用一些廉價的linux服務器能夠進行龐大複雜的計算同樣,大腦內部無數的神經元的獨自計算,互相分享成果,從而瞬間完成須要單個cpu萬億次運算纔能有的效果。試想,若是在並行處理領域有所建立,將對計算機的發展和將來產生不可估量的影響。固然,其中的挑戰也可想而知:許多的問題是並不容易輕易就「分割」的了的。

相關文章
相關標籤/搜索