package com.sohu.hot.vis.servlet; import java.util.concurrent.*; /** * 多線程學習之Callable * * @author liweihan * @time 2016-12-29 14:44 */ public class TestCallableAndFuture { /** * Callable 和 Future接口 * Callable是相似於Runnable的接口,實現Callable接口的類和實現Runnable的類都是可被其它線程執行的任務。 * Callable和Runnable有幾點不一樣: * (1)Callable規定的方法是call(),而Runnable規定的方法是run(). * (2)Callable的任務執行後可返回值,而Runnable的任務是不能返回值的。 * (3)call()方法可拋出異常,而run()方法是不能拋出異常的。 * (4)運行Callable任務可拿到一個Future對象, * Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並檢索計算的結果。 * 經過Future對象可瞭解任務執行狀況,可取消任務的執行,還可獲取任務執行的結果。 * Future的cancel方法能夠取消任務的執行,它有一布爾參數,參數爲 true 表示當即中斷任務的執行, * 參數爲 false 表示容許正在運行的任務運行完成。Future的 get 方法等待計算完成,獲取計算結果 */ public static class MyCallable implements Callable { private int flag = 0; public MyCallable(int flag) { this.flag = flag; } @Override public Object call() throws Exception { if (this.flag == 0) { return "flag = 0"; }else if (this.flag == 1) { try { while (true) { System.out.println("循環。。。"); Thread.sleep(2000); } } catch (InterruptedException e) { System.out.println("Interruptered"); } return false; } else { throw new Exception("Error flag value!!"); } } } public static void main(String[] args) { //定義三個Callable類型的任務 MyCallable task1 = new MyCallable(0); MyCallable task2 = new MyCallable(1); MyCallable task3 = new MyCallable(2); //定義一個執行任務的服務 ExecutorService es = Executors.newFixedThreadPool(3); try { /** * 提交併執行任務,任務啓動時返回了一個Future對象。 * 若是想獲得任務執行的結果或者是異常可對這個Future對象進行操做 */ Future future1 = es.submit(task1); //得到第一個任務的結果,若是調用get方法,當前線程會等待任務執行完畢後才往下執行 System.out.println("task1:" + future1.get()); Future future2 = es.submit(task2); //等待5秒後,再中止第二個任務。由於第二個任務進行的是無限循環 Thread.sleep(5000); System.out.println("task2 cancel:" + future2.cancel(true)); //獲取第三個任務的輸出,由於執行第三個任務會引發異常 //因此下面的語句將引發異常 Future future3 = es.submit(task3); System.out.println("task3:" + future3.get()); } catch (Exception e) { System.out.println(e.toString()); } //中止任務執行服務 es.shutdownNow(); } }
例子2:html
package com.sohu.hot.vis.servlet; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 多線程學習之Callable * * @author liweihan * @time 2016-12-29 15:44 */ public class TestCallable2 { static class StarRelationThread implements Callable<Boolean> { private Map<String, String> mapThread ; private Map<String, String> map ; private int threadNum; public StarRelationThread(Map<String, String> mapThread ,Map<String, String> map,int threadNum) { this.map = map; this.threadNum = threadNum; this.mapThread = mapThread; } @Override public Boolean call() throws Exception { System.out.println(" 第 " + threadNum + " 個線程處理-開始 ,此線程處理的數量 " + mapThread.size() + ",總的數量爲:"+map.size()); System.out.println("處理數據 ,並寫入redis中"); if (threadNum > 3) { try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } } int sync = 0; for (Map.Entry<String, String> en : mapThread.entrySet()) { sync++; if (sync < 2) { System.out.println("key :" + en.getKey() + ", value :" + en.getValue()); } } System.out.println(" 第 " + threadNum + " 個線程執行完畢!"); return true; //true和flase,能夠根據具體業務再作處理 } } public static void main(String[] args) { Map<String, String> map = new HashMap<String, String>(); //測試數據 for (int i = 0; i < 300000; i++) { map.put("key" + i, "value"+i); } //5.分割map+多線程 int totalSize = map.size(); System.out.println("Map totalSize : " + totalSize); //線程的數量 int threadNum = 16; //每一個線程處理的數量 int threadSize = totalSize / threadNum; System.out.println("每一個線程處理的數量:" + threadSize); List<StarRelationThread> threadList = new ArrayList<StarRelationThread>(); for (int i = 0; i < threadNum; i++) { int end ; if (i == threadNum - 1) { //最後一個線程 end = threadSize + totalSize % threadNum; } else { end = threadSize; } int beginNum = i * threadSize; int endNum = i * threadSize + end; System.out.println(i + " begin : " + beginNum + " , " + endNum); int sync = 0; //分割map Map<String, String> mapThread = new HashMap<String, String>(); for(Map.Entry<String, String> entry : map.entrySet()) { sync++; if (sync > beginNum && sync <= endNum) { mapThread.put(entry.getKey(), entry.getValue()); } } StarRelationThread st = new StarRelationThread(mapThread,map,i); threadList.add(st); } //執行任務 try { /** * 線程池的瞭解:http://blog.csdn.net/coding_or_coded/article/details/6856014 * http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315645.html * http://hbiao68.iteye.com/blog/1929245 * * https://my.oschina.net/u/1419751/blog/359263 * http://blog.csdn.net/linghu_java/article/details/17123057 */ ExecutorService executorService = Executors.newFixedThreadPool( 4 ); List<Future<Boolean>> threadFutureList = executorService.invokeAll( threadList ); executorService.shutdownNow(); boolean hasError = false; for ( Future<Boolean> threadFuture : threadFutureList ) { boolean optSuccess = threadFuture.get(); if ( !optSuccess ) { hasError = true; } } if (hasError) { System.out.println(" FAIL---------------"); } else { System.out.println(" SUCCESS ------------------"); } } catch (Exception e) { e.printStackTrace(); } } }
當用完一個線程池後,應該調用該線程池的shutdown()方法,該方法將啓動線程池的關閉序列,調用shutdown()方法後的線程池再也不接受新任務,但將之前全部已提交任務執行完。當線程池中的全部任務都執行完成後,池中的全部線程都會死亡;java
void shutdown();c++
另外也能夠調用線程池中的shutdownNow()方法來關閉線程池,該方法試圖中止全部正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行任務列表。redis
List<Runnable> shutdownNow();多線程
例子3:Semaphoredom
一個計數信號量。從概念上講,信號量維護了一個許可集。若有必要,在許可可用前會阻塞每個 acquire(),而後再獲取該許可。每一個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。可是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並採起相應的行動。拿到信號量的線程能夠進入代碼,不然就等待。經過acquire()和release()獲取和釋放訪問許可。異步
<1.>
ide
public void acquire() throws InterruptedException
今後信號量獲取一個許可,在提供一個許可前一直將線程阻塞,不然線程被中斷。獲取一個許可(若是提供了一個)並當即返回,將可用的許可數減 1。學習
<2.>
測試
public void release()
釋放一個許可,將其返回給信號量。釋放一個許可,將可用的許可數增長 1。若是任意線程試圖獲取許可,則選中一個線程並將剛剛釋放的許可給予它。而後針對線程安排目的啓用(或再啓用)該線程。
package com.book.admin.test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { public static void main(String[] args) { //線程池 ExecutorService exec = Executors.newCachedThreadPool(); //只能5個線程同時訪問 final Semaphore semp = new Semaphore(5); for (int i = 0; i < 20; i++) { final int no = i; Runnable runnable = new Runnable() { @Override public void run() { try { //獲取許可 semp.acquire(); System.out.println("Accessing: " + no); Thread.sleep((long) Math.random() * 10000); //訪問完後,釋放許可,若是註釋掉下面的語句,則控制檯只能打印5條記錄,以後線程一直阻塞 semp.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }; //執行線程 exec.execute(runnable); } //退出線程池 exec.shutdown(); } }