1 package com.chunjiangchao.thread; 2 /** 3 * 多線程之間數據共享 4 * @author chunjiangchao 5 * 6 */ 7 public class MultiThreadShareDataDemo { 8 9 public static void main(String[] args) { 10 Data data = new Data(); 11 new Thread(new IncrementRunnable(data)).start(); 12 new Thread(new DecrementtRunnable(data)).start(); 13 14 final Data data2 = new Data(); 15 new Thread(new Runnable() { 16 17 @Override 18 public void run() { 19 data2.increment(); 20 } 21 }).start(); 22 new Thread(new Runnable() { 23 24 @Override 25 public void run() { 26 data2.decrement(); 27 } 28 }).start(); 29 } 30 //對共享數據進行增長 31 private static class IncrementRunnable implements Runnable{ 32 private Data data ; 33 public IncrementRunnable(Data data){ 34 this.data = data; 35 } 36 public void run() { 37 data.increment(); 38 } 39 } 40 //對共享數據進行減小 41 private static class DecrementtRunnable implements Runnable{ 42 private Data data ; 43 public DecrementtRunnable(Data data){ 44 this.data = data; 45 } 46 public void run() { 47 data.decrement(); 48 } 49 } 50 51 52 //共享數據 53 private static class Data{ 54 private int temp=0; 55 public synchronized void increment(){ 56 temp++; 57 System.out.println(Thread.currentThread()+"中temp的值爲:"+temp); 58 } 59 public synchronized void decrement(){ 60 temp--; 61 System.out.println(Thread.currentThread()+"中temp的值爲:"+temp); 62 } 63 } 64 65 }
static ExecutorService newFixedThreadPool(int nThreads) 建立一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程。(建立固定線程池)多線程
List<Runnable> shutdownNow()試圖中止全部正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。dom
static ExecutorService newCachedThreadPool():
static ExecutorService newSingleThreadExecutor():建立單個線程,若是線程死掉了,它會自動找個替補線程補上去。(如何實現線程死掉以後從新啓動)?
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):建立一個定時線程池
1 package com.chunjiangchao.thread; 2 3 import java.util.Date; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.TimeUnit; 7 8 /** 9 * 線程併發庫,線程池的使用 10 * @author chunjiangchao 11 * 12 */ 13 public class ExecuterDemo { 14 15 public static void main(String[] args) { 16 // ExecutorService threadPool = Executors.newFixedThreadPool(3);//開了固定的三個線程 17 // ExecutorService threadPool = Executors.newCachedThreadPool();//開了10個線程 18 ExecutorService threadPool = Executors.newSingleThreadExecutor();//開了一個固定的線程 19 for(int i=0;i<10;i++){ 20 final int loop = i; 21 threadPool.execute(new Runnable(){ 22 public void run() { 23 try { 24 Thread.sleep(1000); 25 } catch (InterruptedException e) { 26 // e.printStackTrace(); 27 } 28 System.out.println(Thread.currentThread().getName()+" outer "+loop); 29 } 30 31 }); 32 } 33 /* 34 shutdownNow執行的結果爲: 35 pool-1-thread-3 outer 36 pool-1-thread-1 outer 37 pool-1-thread-2 outer * */ 38 // threadPool.shutdownNow(); 39 /*shutdown會執行完全部已經提交的任務,不會處理shutdown後提交的任務,並且在後面提交Runnable的時候, 40 * 會拋出異常java.util.concurrent.RejectedExecutionException*/ 41 threadPool.shutdown(); 42 // threadPool.execute(new Runnable(){ 43 // 44 // @Override 45 // public void run() { 46 // System.out.println("不會進行處理"); 47 // } 48 // 49 // }); 50 //實現定時器效果 51 Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable(){ 52 53 @Override 54 public void run() { 55 System.out.println("執行定時器結果"+new Date().toLocaleString()); 56 } 57 58 }, 2, 4, TimeUnit.SECONDS);//每隔4s玩一次 59 } 60 61 }
take() 獲取並移除表示下一個已完成任務的 Future,若是目前不存在這樣的任務,則等待
1 package com.chunjiangchao.thread; 2 3 import java.util.Date; 4 import java.util.concurrent.Callable; 5 import java.util.concurrent.ExecutionException; 6 import java.util.concurrent.Executor; 7 import java.util.concurrent.ExecutorCompletionService; 8 import java.util.concurrent.ExecutorService; 9 import java.util.concurrent.Executors; 10 import java.util.concurrent.Future; 11 12 /** 13 * Callable&Future的使用 14 * @author chunjiangchao 15 * 16 */ 17 public class CallableAndFutureDemo { 18 19 public static void main(String[] args) { 20 ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(); 21 //提交單一任務 22 Future<String> submit = newSingleThreadExecutor.submit(new Callable<String>(){ 23 24 @Override 25 public String call() throws Exception { 26 printTime(); 27 mSleep(3000); 28 printTime(); 29 return "我這有返回值,你看看是否是"; 30 } 31 32 }); 33 mSleep(500); 34 try { 35 String string = submit.get(); 36 System.out.println(string); 37 } catch (InterruptedException | ExecutionException e) { 38 e.printStackTrace(); 39 } 40 // submit.cancel(true);//能夠對任務進行取消 41 //提交多個任務 42 Executor executor = Executors.newCachedThreadPool(); 43 ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor); 44 for(int i=0;i<10;i++){ 45 final int loop = i; 46 completionService.submit(new Callable<String>(){ 47 48 @Override 49 public String call() throws Exception { 50 mSleep(1000*loop); 51 return "提交多任務有返回結果"+loop; 52 } 53 54 }); 55 } 56 for(int i=0;i<10;i++){ 57 try { 58 Future<String> result = completionService.take(); 59 printTime(); 60 System.out.println(result.get()); 61 } catch (InterruptedException e) { 62 e.printStackTrace(); 63 } catch (ExecutionException e) { 64 e.printStackTrace(); 65 } 66 } 67 /* 68 * 打印 結果以下 69 2016-4-18 11:57:46 70 2016-4-18 11:57:49 71 我這有返回值,你看看是否是 72 2016-4-18 11:57:49 73 提交多任務有返回結果0 74 2016-4-18 11:57:50 75 提交多任務有返回結果1 76 2016-4-18 11:57:51 77 提交多任務有返回結果2 78 2016-4-18 11:57:52 79 提交多任務有返回結果3 80 2016-4-18 11:57:53 81 提交多任務有返回結果4 82 2016-4-18 11:57:54 83 提交多任務有返回結果5 84 2016-4-18 11:57:55 85 提交多任務有返回結果6 86 2016-4-18 11:57:56 87 提交多任務有返回結果7 88 2016-4-18 11:57:57 89 提交多任務有返回結果8 90 2016-4-18 11:57:58 91 提交多任務有返回結果9 92 */ 93 94 } 95 private static void mSleep(long time){ 96 try { 97 Thread.sleep(time); 98 } catch (InterruptedException e) { 99 e.printStackTrace(); 100 } 101 } 102 private static void printTime(){ 103 System.out.println(new Date().toLocaleString()); 104 } 105 106 }
1 package com.chunjiangchao.thread; 2 3 import java.util.concurrent.locks.Lock; 4 import java.util.concurrent.locks.ReentrantLock; 5 6 /** 7 * lock的使用 8 */ 9 public class LockDemo { 10 11 public static void main(String[] args) { 12 final Outputer outputer = new Outputer(); 13 for(int index=0;index<10;index++){ 14 final int loop = index; 15 new Thread(new Runnable() { 16 public void run() { 17 // outputer.print("chunjiangchao"+loop); 18 outputer.synPrint("chunjiangchao"+loop); 19 } 20 }).start(); 21 22 } 23 } 24 private static class Outputer{ 25 private Lock lock = new ReentrantLock(); 26 public void print(String name){ 27 int length = name.length(); 28 lock.lock(); 29 try { 30 for(int i=0;i<length;i++){ 31 Thread.sleep(100); 32 System.out.print(name.charAt(i)+" "); 33 } 34 System.out.println(); 35 } catch (Exception e) { 36 e.printStackTrace(); 37 }finally{ 38 lock.unlock(); 39 } 40 41 } 42 /** 43 * 同步代碼塊的做用,和上面添加Lock鎖的做用相同,只不過鎖的對象不同而已 44 * @param name 45 */ 46 public synchronized void synPrint(String name){ 47 int length = name.length(); 48 try { 49 for(int i=0;i<length;i++){ 50 Thread.sleep(100); 51 System.out.print(name.charAt(i)+" "); 52 } 53 System.out.println(); 54 } catch (Exception e) { 55 e.printStackTrace(); 56 } 57 } 58 } 59 60 }
Lock比傳統線程模型中的synchronized方式更加面向對象,與生活中的鎖相似,鎖自己也應該是一個對象。兩個線程執行的代碼片斷要實現同步互斥的效果,它們必須用同一個Lock對象。Lock lock= new ReentrantLock( )
ReadWriteLock rwl = new ReentrantReadWriteLock( )
1 package com.chunjiangchao.thread; 2 3 import java.util.Random; 4 import java.util.concurrent.locks.ReadWriteLock; 5 import java.util.concurrent.locks.ReentrantReadWriteLock; 6 /** 7 * 使用讀寫鎖 查看打印結果發現讀鎖與讀鎖之間併發,寫鎖與寫鎖間併發,讀與寫之間是互斥的 8 * @author chunjaingchao 9 * 10 */ 11 public class ReadWriteLockDemo { 12 public static void main(String[] args) { 13 final Queue q3 = new Queue(); 14 for(int i=0;i<3;i++) 15 { 16 new Thread(){ 17 public void run(){ 18 while(true){ 19 q3.get(); 20 } 21 } 22 }.start(); 23 new Thread(){ 24 public void run(){ 25 while(true){ 26 q3.put(new Random().nextInt(10000)); 27 } 28 } 29 }.start(); 30 } 31 } 32 static class Queue{ 33 private Integer integer = null;//共享數據,只能有一個線程能寫該數據,但能夠有多個線程同時讀該數據。 34 ReadWriteLock rwl = new ReentrantReadWriteLock(); 35 public void get(){ 36 rwl.readLock().lock(); 37 try { 38 System.out.println(Thread.currentThread().getName() + "*****讀取******"); 39 Thread.sleep(200); 40 System.out.println(Thread.currentThread().getName() + "******讀取*****" + integer); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 }finally{ 44 rwl.readLock().unlock(); 45 } 46 } 47 public void put(Integer data){ 48 rwl.writeLock().lock(); 49 try { 50 System.out.println(Thread.currentThread().getName() + "######寫數據#######"); 51 Thread.sleep(200); 52 this.integer = data; 53 System.out.println(Thread.currentThread().getName() + "#######寫數據#######" + data); 54 } catch (InterruptedException e) { 55 e.printStackTrace(); 56 }finally{ 57 rwl.writeLock().unlock(); 58 } 59 } 60 } 61 }
1 class CachedData { 2 Object data; 3 volatile boolean cacheValid; 4 ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); 5 void processCachedData() { 6 rwl.readLock().lock();//添加讀鎖 7 if (!cacheValid) { 8 // Must release read lock before acquiring write lock 9 rwl.readLock().unlock();//若是沒有數據,將讀鎖釋放 10 rwl.writeLock().lock();//添加寫鎖 11 // Recheck state because another thread might have acquired 12 // write lock and changed state before we did. 13 if (!cacheValid) { 14 data = ... 15 cacheValid = true; 16 } 17 // Downgrade by acquiring read lock before releasing write lock 18 rwl.readLock().lock();//添加讀鎖 19 rwl.writeLock().unlock(); // Unlock write, still hold read//釋放寫鎖 20 } 21 use(data); 22 rwl.readLock().unlock();//釋放讀鎖 23 } 24 }
緩存系統的概念:你要找數據不要直接去找數據庫,能夠直接找我。 我若是沒有,查找數據庫給你。與你直接查找是同樣的。好處就是下一次你再來的時候,我就不用操做數據庫了。我直接給你。
在等待 Condition 時,容許發生「虛假喚醒」,這一般做爲對基礎平臺語義的讓步。對於大多數應用程序,這帶來的實際影響很小,由於 Condition 應該老是在一個循環中被等待,並測試正被等待的狀態聲明。某個實現能夠隨意移除可能的虛假喚醒,但建議應用程序程序員老是假定這些虛假喚醒可能發生,所以老是在一個循環中等待。(記住:每次在等待的時候,都要將判斷放在while循環中,防止僞喚醒出現)
1 class BoundedBuffer { 2 final Lock lock = new ReentrantLock(); 3 final Condition notFull = lock.newCondition(); 4 final Condition notEmpty = lock.newCondition(); 5 final Object[] items = new Object[100]; 6 int putptr, takeptr, count; 7 public void put(Object x) throws InterruptedException { 8 lock.lock(); 9 try { 10 while (count == items.length) 11 notFull.await(); 12 items[putptr] = x; 13 if (++putptr == items.length) putptr = 0; 14 ++count; 15 notEmpty.signal(); 16 } finally { 17 lock.unlock(); 18 } 19 } 20 public Object take() throws InterruptedException { 21 lock.lock(); 22 try { 23 while (count == 0) 24 notEmpty.await(); 25 Object x = items[takeptr]; 26 if (++takeptr == items.length) takeptr = 0; 27 --count; 28 notFull.signal(); 29 return x; 30 } finally { 31 lock.unlock(); 32 } 33 } 34 }
1 public class BoundedBufferDemo { 2 3 public static void main(String[] args) { 4 final BoundedBuffer boundedBuffer = new BoundedBuffer(); 5 new Thread(new Runnable(){ 6 7 @Override 8 public void run() { 9 while(true){ 10 try { 11 Thread.sleep(1000); 12 int nextInt = new Random().nextInt(); 13 System.out.println(new Date().toLocaleString()+"存放數據"+nextInt); 14 boundedBuffer.put(nextInt); 15 } catch (InterruptedException e) { 16 e.printStackTrace(); 17 } 18 } 19 } 20 21 }).start(); 22 new Thread(new Runnable(){ 23 24 @Override 25 public void run() { 26 while(true){ 27 try { 28 Thread.sleep(new Random().nextInt(1000)); 29 System.out.println(new Date().toLocaleString()+"獲取數據"+boundedBuffer.take()); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 } 34 } 35 36 }).start(); 37 } 38 }
1 package com.chunjiangchao.thread; 2 3 import java.util.concurrent.locks.Condition; 4 import java.util.concurrent.locks.Lock; 5 import java.util.concurrent.locks.ReentrantLock; 6 7 /** 8 * 相似於生產者消費者 9 * 子線程循環10次,接着主線程循環100,接着又回到子線程循環10次,接着再回到主線程又循環100,如此循環50次,請寫出程序。 10 * @author chunjiangchao 11 */ 12 public class ConditionDemo { 13 14 public static void main(String[] args) { 15 final Business business = new Business(); 16 new Thread(new Runnable() { 17 public void run() { 18 for(int i = 0;i<10;i++){ 19 business.sub(i); 20 } 21 } 22 }).start(); 23 new Thread(new Runnable() { 24 public void run() { 25 for(int i = 0;i<10;i++){ 26 business.main(i); 27 } 28 } 29 }).start(); 30 } 31 private static class Business{ 32 private Lock lock = new ReentrantLock(); 33 private Condition condition = lock.newCondition(); 34 private boolean bShouldSub = true; 35 public void main(int loop){ 36 lock.lock(); 37 while(bShouldSub){ 38 try { 39 condition.await(); 40 } catch (InterruptedException e) { 41 e.printStackTrace(); 42 } 43 } 44 for(int i=0;i<100;i++){ 45 try { 46 Thread.sleep(10); 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 System.out.println(loop+"……main……"+i); 51 } 52 bShouldSub = true; 53 condition.signal(); 54 lock.unlock();//應該寫在finally代碼塊裏面 55 } 56 public void sub(int loop){ 57 lock.lock(); 58 while(!bShouldSub){ 59 try { 60 condition.await(); 61 } catch (InterruptedException e) { 62 e.printStackTrace(); 63 } 64 } 65 for(int i=0;i<10;i++){ 66 try { 67 Thread.sleep(10); 68 } catch (InterruptedException e) { 69 e.printStackTrace(); 70 } 71 System.out.println(loop+"……sub……"+i); 72 } 73 bShouldSub = false; 74 condition.signal(); 75 lock.unlock();//應該寫在finally代碼塊裏 76 } 77 78 } 79 }
1 package com.chunjiangchao.thread; 2 3 import java.util.concurrent.locks.Condition; 4 import java.util.concurrent.locks.Lock; 5 import java.util.concurrent.locks.ReentrantLock; 6 7 /** 8 * 多線程之間的通訊 9 * 三個線程交互執行 A-B-C-A-B-C 10 * @author chunjiangchao 11 * 12 */ 13 public class ThreeConditionDemo { 14 15 public static void main(String[] args) { 16 final Business business = new Business(); 17 new Thread(new Runnable() { 18 public void run() { 19 for(int i = 0;i<10;i++){ 20 business.one(i); 21 } 22 } 23 }).start(); 24 new Thread(new Runnable() { 25 public void run() { 26 for(int i = 0;i<10;i++){ 27 business.two(i); 28 } 29 } 30 }).start(); 31 new Thread(new Runnable() { 32 public void run() { 33 for(int i = 0;i<10;i++){ 34 business.three(i); 35 } 36 } 37 }).start(); 38 } 39 private static class Business{ 40 private Lock lock = new ReentrantLock(); 41 private Condition condition1 = lock.newCondition(); 42 private Condition condition2 = lock.newCondition(); 43 private Condition condition3 = lock.newCondition(); 44 private int whichOne = 1; 45 public void one(int loop){ 46 try { 47 lock.lock(); 48 while(whichOne!=1){ 49 condition1.await(); 50 } 51 for(int i=0;i<10;i++){ 52 Thread.sleep(10); 53 System.out.println("one "+loop+" 當前執行 "+i); 54 } 55 whichOne = 2; 56 condition2.signal(); 57 } catch (Exception e) { 58 e.printStackTrace(); 59 }finally{ 60 lock.unlock(); 61 } 62 } 63 public void two(int loop){ 64 try { 65 lock.lock(); 66 while(whichOne!=2){ 67 condition2.await(); 68 } 69 for(int i=0;i<10;i++){ 70 Thread.sleep(10); 71 System.out.println("two "+loop+" 當前執行 "+i); 72 } 73 whichOne = 3; 74 condition3.signal(); 75 } catch (Exception e) { 76 e.printStackTrace(); 77 }finally{ 78 lock.unlock(); 79 } 80 } 81 public void three(int loop){ 82 try { 83 lock.lock(); 84 while(whichOne!=3){ 85 condition3.await(); 86 } 87 for(int i=0;i<10;i++){ 88 Thread.sleep(10); 89 System.out.println("three "+loop+" 當前執行 "+i); 90 } 91 whichOne = 1; 92 condition1.signal(); 93 } catch (Exception e) { 94 e.printStackTrace(); 95 }finally{ 96 lock.unlock(); 97 } 98 } 99 } 100 101 }