java線程深入學習

一、java中的線程是通過Thread類創建的,

 1 //下面是構造函數,一個共同的特點就是:都是調用init()進行創建的
 2    public Thread() {
 3         init(null, null, "Thread-" + nextThreadNum(), 0);
 4     }
 5 
 6     public Thread(Runnable target) {
 7         init(null, target, "Thread-" + nextThreadNum(), 0);
 8     }
 9 
10     Thread(Runnable target, AccessControlContext acc) {
11         init(null, target, "Thread-" + nextThreadNum(), 0, acc);
12     }
13 
14     public Thread(ThreadGroup group, Runnable target) {
15         init(group, target, "Thread-" + nextThreadNum(), 0);
16     }
17 
18     public Thread(String name) {
19         init(null, null, name, 0);
20     }
21 
22     public Thread(ThreadGroup group, String name) {
23         init(group, null, name, 0);
24     }
25 
26     public Thread(Runnable target, String name) {
27         init(null, target, name, 0);
28     }
29 
30     public Thread(ThreadGroup group, Runnable target, String name) {
31         init(group, target, name, 0);
32     }
33 
34     public Thread(ThreadGroup group, Runnable target, String name,
35                   long stackSize) {
36         init(group, target, name, stackSize);
37     }
38 
39 //init()方法有兩個
40     private void init(ThreadGroup g, Runnable target, String name,
41                       long stackSize) {
42         init(g, target, name, stackSize, null);
43     }
44 /*g:用於將線程分組管理
45  *target:用於指定線程將要執行的任務
46  *name:線程的名字
47  *stackSize:
48  *acc:
49  */
50     private void init(ThreadGroup g, Runnable target, String name,
51                       long stackSize, AccessControlContext acc) {
52         //線程必須有一個名字,默認情況下是Thread-x,x是從0開始的int型數
53         if (name == null) {
54             throw new NullPointerException("name cannot be null");
55         }
56 
57         this.name = name.toCharArray();
58 
59         Thread parent = currentThread();
60         SecurityManager security = System.getSecurityManager();
61         if (g == null) {
62             /* Determine if it's an applet or not */
63 
64             /* If there is a security manager, ask the security manager
65                what to do. */
66             if (security != null) {
67                 g = security.getThreadGroup();
68             }
69 
70             /* If the security doesn't have a strong opinion of the matter
71                use the parent thread group. */
72             if (g == null) {
73                 g = parent.getThreadGroup();
74             }
75         }
Thread源碼

從構造函數可以看出,創建一個有意義的線程(有可執行的任務),就是向其中傳遞一個實現Runnable的對象即可,但是也可以繼承Thread類(因爲該類已經實現了前一條),然後重寫run()即可。後一種方法不推薦,因爲這個類最重要的是提供需要執行的方法即可。

上圖顯示了線程狀態轉換的條件,這些方法的源碼見下:

  1 public synchronized void start() {
  2         /**
  3          * 0 狀態代表 "NEW".
  4          */
  5         if (threadStatus != 0)
  6             throw new IllegalThreadStateException();
  7 
  8         group.add(this);
  9 
 10         boolean started = false;
 11         try {
 12             start0();
 13             started = true;
 14         } finally {
 15             try {
 16                 if (!started) {
 17                     group.threadStartFailed(this);
 18                 }
 19             } catch (Throwable ignore) {
 20                 /* do nothing. If start0 threw a Throwable then
 21                   it will be passed up the call stack */
 22             }
 23         }
 24     }
 25 
 26     private native void start0();
 27 
 28     //執行target的run(),在start之後自動執行,
 29     public void run() {
 30         if (target != null) {
 31             target.run();
 32         }
 33     }
 34 
 35     //中斷線程
 36     public void interrupt() {
 37         //判斷是否是當前正在執行的線程,檢查權限
 38         if (this != Thread.currentThread())
 39             checkAccess();
 40 
 41         synchronized (blockerLock) {
 42             Interruptible b = blocker;
 43             if (b != null) {
 44                 interrupt0();           // Just to set the interrupt flag
 45                 b.interrupt(this);
 46                 return;
 47             }
 48         }
 49         interrupt0();
 50     }    
 51 
 52     //從源碼可以看到,該方法調用wait(),使alive狀態(start之後,die之前)線程進入等待狀態
 53     public final synchronized void join(long millis)
 54     throws InterruptedException {
 55         long base = System.currentTimeMillis();
 56         long now = 0;
 57 
 58         if (millis < 0) {
 59             throw new IllegalArgumentException("timeout value is negative");
 60         }
 61 
 62         if (millis == 0) {
 63             while (isAlive()) {
 64                 wait(0);
 65             }
 66         } else {
 67             while (isAlive()) {
 68                 long delay = millis - now;
 69                 if (delay <= 0) {
 70                     break;
 71                 }
 72                 wait(delay);
 73                 now = System.currentTimeMillis() - base;
 74             }
 75         }
 76     }
 77 
 78     public final synchronized void join(long millis, int nanos)
 79     throws InterruptedException {
 80 
 81         if (millis < 0) {
 82             throw new IllegalArgumentException("timeout value is negative");
 83         }
 84 
 85         if (nanos < 0 || nanos > 999999) {
 86             throw new IllegalArgumentException(
 87                                 "nanosecond timeout value out of range");
 88         }
 89 
 90         if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
 91             millis++;
 92         }
 93 
 94         join(millis);
 95     }
 96 
 97     public final void join() throws InterruptedException {
 98         join(0);
 99     }
100 
101     //不能指定時間的休眠
102     public static native void yield();
103 
104     //讓線程休眠指定時間
105     public static native void sleep(long millis) throws InterruptedException;
106 
107     public static void sleep(long millis, int nanos)
108     throws InterruptedException {
109         if (millis < 0) {
110             throw new IllegalArgumentException("timeout value is negative");
111         }
112 
113         if (nanos < 0 || nanos > 999999) {
114             throw new IllegalArgumentException(
115                                 "nanosecond timeout value out of range");
116         }
117 
118         if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
119             millis++;
120         }
121 
122         sleep(millis);
123     }
124 
125 
126 //java線程的幾種狀態判斷
127     //start之後die之前都是alive狀態
128     public final native boolean isAlive();
129 
130 Thread源碼
Thread源碼

  需要注意的是start()是啓動一個線程,而run只是執行一個普通方法

  一個線程要執行需要滿足兩個條件,一是獲取CPU,二是獲取鎖(在有同步情況下),這就是阻塞產生的原因。由前可知阻塞有兩種情況,一種是阻塞自身(當前)線程(等待阻塞),另一種就是阻塞其他線程(同步阻塞)。上面有些方法類似,但是就是有這些細小的區別。例如:

  ①sleep:java中該方法是靜態方法(所以一般應該有Thread而非實例進行調用),調用該方法會當前線程使釋放CPU,但是不釋放鎖。下面的例子中在main()中使用t1.sleep()不能使t1休眠,因爲t1只是一個普通的Thread對象,而不是線程,其在主線程中使用,所以是主線程休眠,所以出現以下效果。另外注意sleep是靜態方法,最好使用類名即Thread調用

 1     public static void main(String[] args) throws InterruptedException {
 2         Thread t1=new Thread(new Runnable(){
 3             @Override
 4             public void run() {
 5                 for(int i=0;i<10;i++){
 6                     System.out.println("i="+i);
 7                 }
 8             }
 9         });
10         Thread t2=new Thread(new Runnable(){
11             @Override
12             public void run() {
13                 for(int j=0;j<10;j++){
14                     System.out.println("j="+j);
15                 }
16             }
17         });
18         t1.start();
19 //        t1.sleep(5000);        //和下面效果一樣,但是最好使用下面的方式
20         Thread.sleep(5000);
21         System.out.println("a");
22         t2.start();
23     }
等待阻塞

  ②wait:當前線程休眠,釋放CPU,同時釋放鎖,wait是Object對象的方法,必須在同步塊中使用,並且由notify或者notifyAll進行喚醒。那麼需要使用那個對象的wait()呢,這個問題其實和在那個對象上同步是一樣的問題,其實就是在多個線程需要使用的那個對象,在該對象上加鎖,並且調用這個對象的wait()

更過關於這些方法的區別參見:http://blog.csdn.net/evane1890/article/details/3313416

 二、多線程:上面是線程的一些基本情況,但是通常都是有多個線程一起使用的。線程之間共享同一片內存區域,所以當多個線程訪問同一個數據時就可能出錯,爲了獲得最佳速度,java允許線程保存共享成員變量的私有拷貝,而且只當線程進入或者離開同步代碼塊時才與共享成員變量的原始值對比。這就引起了多線程的一個問題:可見性。另一個問題就是對於一段代碼,只允許一個線程操作,即互斥性。

  volatile關鍵字可以解決可見性的問題,所以對於多個線程訪問的變量可以使用。但是其並不能解決原子性,所以並不能保證在多線程中的安全。參考:http://blog.csdn.net/zmissm/article/details/23484457http://blog.csdn.net/ns_code/article/details/17101369  

三、Java中的鎖機制:

  1、同步鎖:

    Java中最早解決多線程訪問的鎖機制就是同步鎖,即通過synchronized關鍵字,並使用的是一個對象(本質上是對象的內置鎖)來對一段代碼進行鎖定,直到當前代碼執行完成才釋放鎖。而其他線程必須等到釋放鎖之後才能重新獲取執行。

  注意:①一個對象只有一個鎖,所以使用這個對象作爲鎖的代碼塊(可以是多個)只能有一個線程執行。例如有兩個線程a和b,分別執行x和y代碼塊,但是由於x和y同時使用o作爲鎖,所以當a執行x時,b不能執行y。

    ②synchronized可以用於方法上(此時默認使用this對象,不需要手動設置),也可以使用在方法中的一段代碼上,此時需要指定用哪個對象進行鎖定,通常是那個多個線程需要訪問的對象。也可以在一個靜態方法上使用synchronized,使用的是this.class對象作爲鎖,它會鎖住整個類中的代碼塊;相同的,如果要在一個靜態塊中使用同步,也必須使用this.class作爲鎖。

    ③如果在一段同步代碼之內進行多個線程間進行消息傳遞,使用的是wait()/notify()/notifyAll(),就是那個用來鎖定的對象Object的方法。

  2、Lock鎖:synchronized是語法上的實現,在Java1.5版本之後引入了Lock概念,使鎖作爲一種類的存在,在java.util.concurrent.locks包下。

    其中Lock和ReadWriteLock是接口,定義了鎖需要實現的功能,主要的就是lock()/unlock()和readLock()/writeLock()。這裏就和synchronized有一個很大的區別即是Lock需要手動的釋放鎖,並且通常需要在加鎖的代碼上使用try並將解鎖的部分在finally中執行

 1     public boolean offer(E e) {
 2         checkNotNull(e);
 3         final ReentrantLock lock = this.lock;
 4         lock.lock();
 5         try {
 6             if (count == items.length)
 7                 return false;
 8             else {
 9                 insert(e);
10                 return true;
11             }
12         } finally {
13             lock.unlock();
14         }
15     }
ArrayBlockingQueue中offer()

    在1.5版本中,提供了兩個實現:ReentrantLock,ReentrantReadWriteLock。其中,ReentrantLock的效果更類似與synchronized,但是增加了一些特性以提高性能;而ReentrantReadWriteLock則是讀寫鎖,在實現上如果寫鎖執行,則其他線程都阻塞,但是如果讀鎖執行則會阻塞寫鎖而不會阻塞其他的讀鎖

    同樣的,爲了在多個線程中實現通信,提供了Condition接口,其中await()/signal()/signalAll()對應了object中的三個方法,區別是這幾個方法可以不在鎖的範圍內進行操作。該接口的實現類由Lock的實現類提供,調用其newCondition()即可。也就是說Condition對象是綁定到Lock對象上的,而且一個Lock對象可以生成多個Condition。

 1     public ArrayBlockingQueue(int capacity, boolean fair) {
 2         if (capacity <= 0)
 3             throw new IllegalArgumentException();
 4         this.items = new Object[capacity];
 5         lock = new ReentrantLock(fair);
 6         notEmpty = lock.newCondition();
 7         notFull =  lock.newCondition();
 8     }
 9 
10     private void insert(E x) {
11         items[putIndex] = x;
12         putIndex = inc(putIndex);
13         ++count;
14         //可以看到這裏調用了signal來喚醒相關線程,但是鎖是在offer()中添加的
15         notEmpty.signal();
16     }
ArrayBlockingQueue部分函數

    下面是ReentrantLock的類圖關係,ReentrantLock中的實現都是依賴其中的Sync的有效子類NonfairSync(非公平鎖)和FairSync(公平鎖)。而鎖的實現依賴於其父類。

 

 1     private final Sync sync;
 2 
 3     //構造函數
 4     public ReentrantLock() {
 5         sync = new NonfairSync();
 6     }
 7     public ReentrantLock(boolean fair) {
 8         sync = fair ? new FairSync() : new NonfairSync();
 9     }
10 
11     //功能代碼
12     public void lock() {
13         sync.lock();
14     }
15 
16     public boolean tryLock() {
17         return sync.nonfairTryAcquire(1);
18     }
19 
20     public void unlock() {
21         sync.release(1);
22     }
23 
24     public Condition newCondition() {
25         return sync.newCondition();
26     }
ReetrantLock源碼1
  1     abstract static class Sync extends AbstractQueuedSynchronizer {
  2         private static final long serialVersionUID = -5179523762034025860L;
  3 
  4         abstract void lock();
  5 
  6         final boolean nonfairTryAcquire(int acquires) {
  7             final Thread current = Thread.currentThread();
  8             int c = getState();
  9             if (c == 0) {
 10                 if (compareAndSetState(0, acquires)) {
 11                     setExclusiveOwnerThread(current);
 12                     return true;
 13                 }
 14             }
 15             else if (current == getExclusiveOwnerThread()) {
 16                 int nextc = c + acquires;
 17                 if (nextc < 0) // overflow
 18                     throw new Error("Maximum lock count exceeded");
 19                 setState(nextc);
 20                 return true;
 21             }
 22             return false;
 23         }
 24 
 25         protected final boolean tryRelease(int releases) {
 26             int c = getState() - releases;
 27             if (Thread.currentThread() != getExclusiveOwnerThread())
 28                 throw new IllegalMonitorStateException();
 29             boolean free = false;
 30             if (c == 0) {
 31                 free = true;
 32                 setExclusiveOwnerThread(null);
 33             }
 34             setState(c);
 35             return free;
 36         }
 37 
 38         protected final boolean isHeldExclusively() {
 39             // While we must in general read state before owner,
 40             // we don't need to do so to check if current thread is owner
 41             return getExclusiveOwnerThread() == Thread.currentThread();
 42         }
 43 
 44         final ConditionObject newCondition() {
 45             return new ConditionObject();
 46         }
 47 
 48         // Methods relayed from outer class
 49 
 50         final Thread getOwner() {
 51             return getState() == 0 ? null : getExclusiveOwnerThread();
 52         }
 53 
 54         final int getHoldCount() {
 55             return isHeldExclusively() ? getState() : 0;
 56         }
 57 
 58         final boolean isLocked() {
 59             return getState() != 0;
 60         }
 61 
 62         private void readObject(java.io.ObjectInputStream s)
 63             throws java.io.IOException, ClassNotFoundException {
 64             s.defaultReadObject();
 65             setState(0); // reset to unlocked state
 66         }
 67     }
 68 
 69     /**
 70      * Sync object for non-fair locks
 71      */
 72     static final class NonfairSync extends Sync {
 73         private static final long serialVersionUID = 7316153563782823691L;
 74 
 75         /**
 76          * Performs lock.  Try immediate barge, backing up to normal
 77          * acquire on failure.
 78          */
 79         final void lock() {
 80             if (compareAndSetState(0, 1))
 81                 setExclusiveOwnerThread(Thread.currentThread());
 82             else
 83                 acquire(1);
 84         }
 85 
 86         protected final boolean tryAcquire(int acquires) {
 87             return nonfairTryAcquire(acquires);
 88         }
 89     }
 90 
 91     /**
 92      * Sync object for fair locks
 93      */
 94     static final class FairSync extends Sync {
 95         private static final long serialVersionUID = -3000897897090466540L;
 96 
 97         final void lock() {
 98             acquire(1);
 99         }
100 
101         /**
102          * Fair version of tryAcquire.  Don't grant access unless
103          * recursive call or no waiters or is first.
104          */
105         protected final boolean tryAcquire(int acquires) {
106             final Thread current = Thread.currentThread();
107             int c = getState();
108             if (c == 0) {
109                 if (!hasQueuedPredecessors() &&
110                     compareAndSetState(0, acquires)) {
111                     setExclusiveOwnerThread(current);
112                     return true;
113                 }
114             }
115             else if (current == getExclusiveOwnerThread()) {
116                 int nextc = c + acquires;
117                 if (nextc < 0)
118                     throw new Error("Maximum lock count exceeded");
119                 setState(nextc);
120                 return true;
121             }
122             return false;
123         }
124     }
ReetrantLock源碼2

有關AbstractQueuedSynchronizer實現原理的內容參考:http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer,其中有兩個基本:1,使用volatile修飾的state變量用於設置是否有線程獲得鎖,2,一個FIFO的隊列用於保存掛起的線程。

四、有了以上的基礎,就可以實現多線程中的一個經典例子:生產者消費者模型,就是有一個倉庫,生產者將商品放入其中,當倉滿時則不能生產,並阻塞所有的生成線程;此時只能由消費者線程進行消費,但是當倉空時就需要阻塞消費者線程而喚醒生產者線程進行生產。

  1、使用同步鎖,以及wait/notifyAll機制實現,其中需要注意的是wait的使用規範,詳細參見http://www.importnew.com/16453.html,有一條重要的就是:在while中而不是if中使用wait.

  1 package thread;
  2 
  3 import java.util.LinkedList;
  4 import java.util.Queue;
  5 
  6 public class ProducerConsumer2 {
  7     public static void main(String[] args) {
  8         Storage s=new Storage();
  9         
 10         Producer p1=new Producer(s);
 11         Producer p2=new Producer(s);
 12         Producer p3=new Producer(s);
 13         
 14         Consumer c1=new Consumer(s);
 15         Consumer c2=new Consumer(s);
 16         Consumer c3=new Consumer(s);
 17         Consumer c4=new Consumer(s);
 18         Consumer c5=new Consumer(s);
 19         
 20         Thread t1=new Thread(p1,"p1");
 21         Thread t2=new Thread(p2,"p2");
 22         Thread t3=new Thread(p3,"p3");
 23         Thread t4=new Thread(c1,"c1");
 24         Thread t5=new Thread(c2,"c2");
 25         Thread t6=new Thread(c3,"c3");
 26         Thread t7=new Thread(c4,"c4");
 27         Thread t8=new Thread(c5,"c5");
 28         
 29         t1.start();
 30         t2.start();
 31         t3.start();
 32         t4.start();
 33         t5.start();
 34         t6.start();
 35         t7.start();
 36         t8.start();
 37     }
 38 }
 39 
 40 class Product{
 41     private int id;
 42     
 43     public Product(int id) {
 44         this.id=id;
 45     }
 46 
 47     @Override
 48     public String toString() {
 49         return "Product [id=" + id + "]";
 50     }
 51 }
 52 
 53 //倉庫對象,生產者和消費者之間的橋樑
 54 class Storage{
 55     //倉庫容量
 56     Queue<Product> queues = new LinkedList<Product>();
 57     
 58     //生產,即向倉庫中添加產品
 59     public void push(Product s){
 60         queues.add(s);
 61     }
 62     
 63     //消費
 64     public Product pop(){
 65         return queues.remove();
 66     }
 67 }
 68 
 69 class Producer implements Runnable{
 70     private Storage s;
 71     
 72     public Producer(Storage s) {
 73         this.s=s;
 74     }
 75     
 76     @Override
 77     public void run() {
 78             while(true){
 79                 synchronized(s.queues){
 80                     while(s.queues.size()>=10){
 81                         try{
 82                             System.out.println(Thread.currentThread().getName()+",隊滿,不能添加");
 83                             s.queues.wait();
 84                         }catch(InterruptedException e){
 85                             e.printStackTrace();
 86                         }
 87                     }
 88                     Product p=new Product((int)(Math.random()*10000));
 89                     s.push(p);
 90                     System.out.println(Thread.currentThread().getName()+",生產了產品,現在有"+s.queues.size());
 91                     System.out.println("============================================");
 92               span> Storage s;  71     
 72     public Producer(Storage s) {  73         this.s=s;  74  }  75     
 76  @Override  77     public void run() {  78             while(true){  79                 synchronized(s.queues){  80                     while(s.queues.size()>=10){  81                         try{  82                             System.out.println(Thread.currentThread().getName()+",隊滿,不能添加");  83  s.queues.wait();  84                         }catch(InterruptedException e){  85  e.printStackTrace();  86  }  87  }  88                     Product p=new Product((int)(Math.random()*10000));  89  s.push(p);  90                     System.out.println(Thread.currentThread().getName()+",生產了產品,現在有"+s.queues.size());  91                     System.out.println("============================================");  92  s.queues.notifyAll();  93  }  94  }  95  }  96 }  97 
 98 class Consumer implements Runnable{  99     private Storage s; 100     
101     public Consumer(Storage s) { 102         this.s=s; 103  } 104     
105  @Override 106     public void run() { 107             while(true){ 108                 synchronized(s.queues){ 109                     //注意這裏的while不能換爲if
110                     while(s.queues.isEmpty()){ 111                         try{ 112                             System.out.println("隊空,不能消費"); 113  s.queues.wait(); 114                         }catch(InterruptedException e){ 115  e.printStackTrace(); 116  } 117  } 118  s.pop(); 119                     System.out.println(Thread.currentThread().getName()+",消費了產品,現在有"+s.queues.size()); 120                     System.out.println("============================================"); 121  s.queues.notifyAll(); 122  } 123  } 124  } 125 }
生產者消費者模型-1

  2、使用Lock鎖實現

 1 class Sto{
 2     //使用Lock對象代替synchronized,使用Condition進行線程間通訊
 3     Lock lock=new ReentrantLock();
 4     Condition condition=lock.newCondition();
 5     
 6     //倉庫容量
 7     Queue<Pro> queues = new LinkedList<Pro>();
 8     
 9     //生產,即向倉庫中添加產品
10     public void push(){
11         lock.lock();
12 //        synchronized(queues){
13             try {
14                 whilewhile(s.queues.size()>=10){
 81                         try{
 82                             System.out.println(Thread.currentThread().getName()+",隊滿,不能添加");
 83                             s.queues.wait();
 84                         }catch(InterruptedException e){
 85                             e.printStackTrace();
 86                         }
 87                     }
 88                     Product p=new Product((int)(Math.random()*10000));
 89                     s.push(p);
 90                     System.out.println(Thread.currentThread().getName()+",生產了產品,現在有"+s.queues.size());
 91                     System.out.println("============================================");
 92                     s.queues.notifyAll();
 93                 }
 94             }
 95     }
 96 }
 97 
 98 class Consumer implements Runnable{
 99     private Storage s;
100     
101     public Consumer(Storage s) {
102         this.s=s;
103     }
104     
105     @Override
106     public void run() {
107             while(true){
108                 synchronized(s.queues){
109                     //注意這裏的while不能換爲if
110                     while(s.queues.isEmpty()){
111                         try{
112                             System.out.println("隊空,不能消費");
113                             s.queues.wait();
114                         }catch(InterruptedException e){
115                             e.printStackTrace();
116                         }
117                     }
118                     s.pop();
119                     System.out.println(Thread.currentThread().getName()+",消費了產品,現在有"+s.queues.size());
120                     System.out.println("============================================");
121                     s.queues.notifyAll();
122                 }
123             }
124     }
125 }
生產者消費者模型-1

  2、使用Lock鎖實現

 1 class Sto{
 2     //使用Lock對象代替synchronized,使用Condition進行線程間通訊
 3     Lock lock=new ReentrantLock();
 4     Condition condition=lock.newCondition();
 5     
 6     //倉庫容量
 7     Queue<Pro> queues = new LinkedList<Pro>();
 8     
 9     //生產,即向倉庫中添加產品
10     public void push(){
11         lock.lock();
12 //        synchronized(queues){
13             try {
14                 while(queues.size()>=10){
15                     System.out.println("隊滿,不能生成");
16                     try {

     
     
      
      );

      
      113 
      
       s.queues.wait(); 
      
      114                         }
      
      catch
      
      (InterruptedException e){ 
      
      115 
      
       e.printStackTrace(); 
      
      116 
      
       } 
      
      117 
      
       } 
      
      118 
      
       s.pop(); 
      
      119                     System.ou
     
     
相關文章
相關標籤/搜索