(1)wait() / notify()方法dom
(2)await() / signal()方法ide
(3)BlockingQueue阻塞隊列方法this
(4)PipedInputStream / PipedOutputStreamspa
本文只介紹最經常使用的前三種,第四種暫不作討論線程
第一種:BlockingQueue阻塞隊列方法code
1 class Task { 2 3 private int id; 4 private int value; 5 6 public int getId() { 7 return id; 8 } 9 10 public void setId(int id) { 11 this.id = id; 12 } 13 14 public int getValue() { 15 return value; 16 } 17 18 public void setValue(int value) { 19 this.value = value; 20 } 21 } 22 23 /** 24 * 生產者 25 */ 26 class Provider implements Runnable{ 27 28 private BlockingQueue<Task> queue; 29 30 private static AtomicInteger newId = new AtomicInteger(); 31 32 private static Random random = new Random(); 33 34 private volatile boolean isRunning = true; 35 36 public Provider(BlockingQueue<Task> queue) { 37 this.queue = queue; 38 } 39 40 @Override 41 public void run() { 42 while (isRunning){ 43 try { 44 Thread.sleep(random.nextInt(1000)); 45 int id = newId.incrementAndGet(); 46 Task task = new Task(); 47 task.setId(id); 48 task.setValue(random.nextInt(1000)); 49 System.out.println("當前線程:" + Thread.currentThread().getName() + ", 獲取了數據,id爲:" + id + ", 進行裝載到公共緩衝區中..."); 50 this.queue.put(task); 51 } catch (Exception e){ 52 e.printStackTrace(); 53 } 54 } 55 } 56 57 public void stop() { 58 this.isRunning = false; 59 } 60 } 61 62 /** 63 * 消費者 64 */ 65 class Consumer implements Runnable{ 66 67 private BlockingQueue<Task> queue; 68 69 public Consumer(BlockingQueue queue) { 70 this.queue = queue; 71 } 72 73 @Override 74 public void run() { 75 while (true){ 76 try { 77 Task task = queue.take(); 78 Thread.sleep(new Random().nextInt(1000)); 79 System.out.println("當前消費線程:" + Thread.currentThread().getName() + ", 消費成功,消費數據爲id: " + task.getId()); 80 } catch (Exception e) { 81 e.printStackTrace(); 82 } 83 } 84 85 } 86 } 87 88 public static void main(String[] args) { 89 90 ArrayBlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10); 91 Provider provider = new Provider(queue); 92 Provider provider1 = new Provider(queue); 93 Provider provider2 = new Provider(queue); 94 95 Consumer consumer = new Consumer(queue); 96 Consumer consumer1 = new Consumer(queue); 97 Consumer consumer2 = new Consumer(queue); 98 99 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 100 60L, TimeUnit.SECONDS, 101 new ArrayBlockingQueue<Runnable>(12)); 102 executor.execute(provider); 103 executor.execute(provider1); 104 executor.execute(provider2); 105 executor.execute(consumer); 106 executor.execute(consumer1); 107 executor.execute(consumer2); 108 109 110 try { 111 Thread.sleep(3000); 112 } catch (Exception e) { 113 e.printStackTrace(); 114 } 115 provider.stop(); 116 provider1.stop(); 117 provider2.stop(); 118 119 try { 120 Thread.sleep(2000); 121 } catch (InterruptedException e) { 122 e.printStackTrace(); 123 } 124 }
運行結果:
當前線程:pool-1-thread-2, 獲取了數據,id爲:1, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-1, 獲取了數據,id爲:2, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-3, 獲取了數據,id爲:3, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-3, 獲取了數據,id爲:4, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-3, 獲取了數據,id爲:5, 進行裝載到公共緩衝區中...
當前消費線程:pool-1-thread-5, 消費成功,消費數據爲id: 1
當前消費線程:pool-1-thread-4, 消費成功,消費數據爲id: 2
當前線程:pool-1-thread-2, 獲取了數據,id爲:6, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-1, 獲取了數據,id爲:7, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-2, 獲取了數據,id爲:8, 進行裝載到公共緩衝區中...
當前消費線程:pool-1-thread-4, 消費成功,消費數據爲id: 4
當前線程:pool-1-thread-1, 獲取了數據,id爲:9, 進行裝載到公共緩衝區中...
當前消費線程:pool-1-thread-5, 消費成功,消費數據爲id: 3
當前線程:pool-1-thread-3, 獲取了數據,id爲:10, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-2, 獲取了數據,id爲:11, 進行裝載到公共緩衝區中...
當前消費線程:pool-1-thread-5, 消費成功,消費數據爲id: 6
當前消費線程:pool-1-thread-4, 消費成功,消費數據爲id: 5
當前線程:pool-1-thread-3, 獲取了數據,id爲:12, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-1, 獲取了數據,id爲:13, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-2, 獲取了數據,id爲:14, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-2, 獲取了數據,id爲:15, 進行裝載到公共緩衝區中...
當前線程:pool-1-thread-3, 獲取了數據,id爲:16, 進行裝載到公共緩衝區中...
當前消費線程:pool-1-thread-4, 消費成功,消費數據爲id: 8
當前消費線程:pool-1-thread-4, 消費成功,消費數據爲id: 10
當前線程:pool-1-thread-1, 獲取了數據,id爲:17, 進行裝載到公共緩衝區中...
當前消費線程:pool-1-thread-5, 消費成功,消費數據爲id: 7
當前消費線程:pool-1-thread-3, 消費成功,消費數據爲id: 9
當前消費線程:pool-1-thread-3, 消費成功,消費數據爲id: 13
當前消費線程:pool-1-thread-4, 消費成功,消費數據爲id: 11
當前線程:pool-1-thread-2, 獲取了數據,id爲:18, 進行裝載到公共緩衝區中...
當前消費線程:pool-1-thread-3, 消費成功,消費數據爲id: 14
當前消費線程:pool-1-thread-5, 消費成功,消費數據爲id: 12
當前消費線程:pool-1-thread-3, 消費成功,消費數據爲id: 16
當前消費線程:pool-1-thread-3, 消費成功,消費數據爲id: 18
當前消費線程:pool-1-thread-4, 消費成功,消費數據爲id: 15
當前消費線程:pool-1-thread-5, 消費成功,消費數據爲id: 17blog
第二種:wait() / notify()方法隊列
1 /** 2 * 倉庫 3 */ 4 class Storage { 5 6 private LinkedList list = new LinkedList(); // 存放產品的 7 8 private final int MAX_NUM = 100; 9 10 11 public void product(int num) { 12 13 synchronized (list) { 14 15 while (list.size() + num > MAX_NUM) { 16 try { 17 list.wait(); 18 } catch (Exception e) { 19 e.printStackTrace(); 20 } 21 System.out.println("庫存達到最大,不能生產"); 22 23 } 24 25 // 說明庫存是夠得,那麼生產 26 for (int i = 0; i < num; i++) { 27 list.add(new Object()); 28 } 29 30 try { 31 Thread.sleep(2000); 32 } catch (Exception e) { 33 e.printStackTrace(); 34 } 35 36 System.out.println(Thread.currentThread().getName() + "生產完成,如今庫存" + list.size()); 37 38 list.notifyAll(); 39 40 } 41 } 42 43 public void consume(int num) { 44 45 synchronized (list) { 46 47 while (list.size() < num) { 48 System.out.println("庫存不足"); 49 try { 50 list.wait(); 51 } catch (Exception e) { 52 e.printStackTrace(); 53 } 54 } 55 56 for (int i = 0; i < num; i++) { 57 list.remove(); 58 } 59 try { 60 Thread.sleep(1000); 61 } catch (Exception e) { 62 e.printStackTrace(); 63 } 64 65 System.out.println(Thread.currentThread().getName() + "消費完成"+ num +",如今庫存" + list.size()); 66 list.notifyAll(); 67 } 68 } 69 70 public LinkedList getList() { 71 return list; 72 } 73 74 public void setList(LinkedList list) { 75 this.list = list; 76 } 77 78 } 79 80 class Producter1 implements Runnable { 81 82 private int num; 83 private Storage storage; 84 85 public Producter1(int num, Storage storage) { 86 this.num = num; 87 this.storage = storage; 88 } 89 90 @Override 91 public void run() { 92 storage.product(num); 93 } 94 95 public int getNum() { 96 return num; 97 } 98 99 public void setNum(int num) { 100 this.num = num; 101 } 102 103 public Storage getStorage() { 104 return storage; 105 } 106 107 public void setStorage(Storage storage) { 108 this.storage = storage; 109 } 110 } 111 112 class Consumer1 implements Runnable { 113 114 private int num; 115 116 private Storage storage; 117 118 public Consumer1(int num, Storage storage) { 119 this.num = num; 120 this.storage = storage; 121 } 122 123 @Override 124 public void run() { 125 storage.consume(num); 126 } 127 128 public int getNum() { 129 return num; 130 } 131 132 public void setNum(int num) { 133 this.num = num; 134 } 135 136 public Storage getStorage() { 137 return storage; 138 } 139 140 public void setStorage(Storage storage) { 141 this.storage = storage; 142 } 143 } 144 145 public static void main(String[] args) { 146 147 Storage storage = new Storage(); 148 149 Producter1 producter = new Producter1(10,storage); 150 Producter1 producter1 = new Producter1(10,storage); 151 Producter1 producter2 = new Producter1(10,storage); 152 Producter1 producter3 = new Producter1(10,storage); 153 Producter1 producter4 = new Producter1(10,storage); 154 Producter1 producter5 = new Producter1(10,storage); 155 Producter1 producter6 = new Producter1(10,storage); 156 157 Consumer1 consumer1 = new Consumer1(20,storage); 158 Consumer1 consumer2 = new Consumer1(30,storage); 159 Consumer1 consumer3 = new Consumer1(20,storage); 160 161 ThreadPoolExecutor executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 162 60L, TimeUnit.SECONDS, 163 new ArrayBlockingQueue<Runnable>(12)); 164 165 executor.execute(producter); 166 executor.execute(producter1); 167 executor.execute(producter2); 168 executor.execute(producter3); 169 executor.execute(producter4); 170 executor.execute(producter5); 171 executor.execute(producter6); 172 executor.execute(consumer1); 173 executor.execute(consumer2); 174 executor.execute(consumer3); 175 }
運行結果:
pool-1-thread-1生產完成,如今庫存10
pool-1-thread-3生產完成,如今庫存20
pool-1-thread-7生產完成,如今庫存30
pool-1-thread-9消費完成30,如今庫存0
pool-1-thread-5生產完成,如今庫存10
庫存不足
pool-1-thread-6生產完成,如今庫存20
pool-1-thread-4生產完成,如今庫存30
pool-1-thread-10消費完成20,如今庫存10
pool-1-thread-2生產完成,如今庫存20
pool-1-thread-8消費完成20,如今庫存0ip
第三種:await() / signal()方法rem
class Storage1 { private final int MAX_NUM = 100; private LinkedList list = new LinkedList(); private final Lock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); public void product(int num) { lock.lock(); try { while (list.size() + num > MAX_NUM) { notFull.await(); System.out.println("庫存達到最大,不能生產"); } } catch (Exception e) { e.printStackTrace(); } // 說明庫存是夠得,那麼生產 for (int i = 0; i < num; i++) { list.add(new Object()); } try { // Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "生產完成,如今庫存" + list.size()); notFull.signalAll(); notEmpty.signalAll(); lock.unlock(); } public void consume(int num) { lock.lock(); try { while (list.size() < num) { notEmpty.await(); System.out.println("庫存不足"); } } catch (Exception e) { e.printStackTrace(); } for (int i = 0; i < num; i++) { list.remove(); } try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "消費完成"+ num +",如今庫存" + list.size()); notFull.signalAll(); notEmpty.signalAll(); lock.unlock(); } } class Producter2 implements Runnable { private int num; private Storage1 storage; public Producter2(int num, Storage1 storage) { this.num = num; this.storage = storage; } @Override public void run() { storage.product(num); } public int getNum() { return num; } public void setNum(int num) { this.num = num; } public Storage1 getStorage() { return storage; } public void setStorage(Storage1 storage) { this.storage = storage; } } class Consumer2 implements Runnable { private int num; private Storage1 storage; public Consumer2(int num, Storage1 storage) { this.num = num; this.storage = storage; } @Override public void run() { storage.consume(num); } public int getNum() { return num; } public void setNum(int num) { this.num = num; } public Storage1 getStorage() { return storage; } public void setStorage(Storage1 storage) { this.storage = storage; } } public static void main(String[] args) { Storage1 storage = new Storage1(); Producter2 producter = new Producter2(10,storage); Producter2 producter1 = new Producter2(10,storage); Producter2 producter2 = new Producter2(10,storage); Producter2 producter3 = new Producter2(10,storage); Producter2 producter4 = new Producter2(10,storage); Producter2 producter5 = new Producter2(10,storage); Producter2 producter6 = new Producter2(10,storage); Consumer2 consumer1 = new Consumer2(20,storage); Consumer2 consumer2 = new Consumer2(30,storage); Consumer2 consumer3 = new Consumer2(20,storage); ThreadPoolExecutor executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(12)); executor.execute(producter); executor.execute(producter1); executor.execute(producter2); executor.execute(producter3); executor.execute(producter4); executor.execute(producter5); executor.execute(producter6); executor.execute(consumer1); executor.execute(consumer2); executor.execute(consumer3); }
運行結果:
pool-1-thread-1生產完成,如今庫存10pool-1-thread-3生產完成,如今庫存20pool-1-thread-4生產完成,如今庫存30pool-1-thread-5生產完成,如今庫存40pool-1-thread-9消費完成30,如今庫存10pool-1-thread-7生產完成,如今庫存20pool-1-thread-8消費完成20,如今庫存0pool-1-thread-2生產完成,如今庫存10pool-1-thread-6生產完成,如今庫存20pool-1-thread-10消費完成20,如今庫存0