24. 經常使用的多線程使用方式總結

(1)總論java

1.能夠不用多線程最好不要用算法

2.若是能夠不共享數據最好不要共享spring

3.服務器端最佳線程數量=((線程等待時間+線程cpu時間)/線程cpu時間) * cpu數量數據庫

由於數據庫訪問等待形成線程等待時間長比較長見,下面的例子就是以數據庫數據遷徙程序說明。編程

經常使用模式服務器

(2)分幾個線程處理不一樣數據多線程

適用場景:數據能夠容易的分開處理ide

 1 package me.jdk.thread;
 2 
 3 import java.util.concurrent.CountDownLatch;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 import org.slf4j.Logger;
 8 import org.slf4j.LoggerFactory;
 9 
10 /**
11  * 多線程不一樣數據
12  * 適用場景:數據能夠容易的分開處理
13  * @author guanpanpan
14  *
15  */
16 public class MulThrDiffData {
17     protected final static Logger log = LoggerFactory.getLogger(MulThrDiffData.class);
18     private static CountDownLatch latch;
19 
20     public static void main(String[] args) {
21         int dbMax = 10;
22         int tableMax = 16;
23         //設置同時處理的線程數
24         ExecutorService executorService = Executors.newFixedThreadPool(40);
25         //設置等待計數器
26         latch = new CountDownLatch(dbMax * tableMax);
27         // 啓動線程
28         for (int tableNo = 1; tableNo <= tableMax; tableNo++) {
29             for (int dbNo = 1; dbNo <= dbMax; dbNo++) {
30                 executorService.execute(new UserinfoRunable(dbNo, tableNo));
31             }
32         }
33         //再也不接受新線程
34         executorService.shutdown();
35         //等待程序執行完
36         try {
37             latch.await();
38         } catch (InterruptedException e1) {
39             log.error("threadLatch.await()", e1);
40         }
41         log.info("main End");
42     }
43 
44     /**
45      * 處理用戶數據的線程
46      */
47     static class UserinfoRunable implements Runnable {
48         int dbNo;
49         int tableNo;
50 
51         public UserinfoRunable(int dbNo, int tableNo) {
52             this.dbNo = dbNo;
53             this.tableNo = tableNo;
54         }
55 
56         @Override
57         public void run() {
58             System.out.println("do something" + dbNo + "-" + tableNo);
59             try {
60                 Thread.sleep(1000);
61             } catch (InterruptedException e) {
62                 // TODO Auto-generated catch block
63                 e.printStackTrace();
64             }
65             //計數器減去一
66             latch.countDown();
67         }
68     }
69 }

(3)線程協做來處理同一批數據memcached

適用場景:數據處理是一個相似生產線狀況,每一個生產過程費時不一樣單元測試

  1 package me.jdk.thread;
  2 
  3 import java.util.ArrayList;
  4 import java.util.List;
  5 import java.util.concurrent.BlockingQueue;
  6 import java.util.concurrent.CountDownLatch;
  7 import java.util.concurrent.ExecutorService;
  8 import java.util.concurrent.Executors;
  9 import java.util.concurrent.LinkedBlockingQueue;
 10 
 11 import me.util.BlockingQueueUtil;
 12 import me.util.DateUtil;
 13 
 14 import org.slf4j.Logger;
 15 import org.slf4j.LoggerFactory;
 16 import org.springframework.util.CollectionUtils;
 17 
 18 /**
 19  * 線程協做來處理同一批數據
 20  * 本示例是針對只跑一次的job,若是須要一直在跑的woker,只須要作少量改動
 21  * @author guanpanpan
 22  *
 23  */
 24 public class ProductConsumeTh {
 25     protected final static Logger log = LoggerFactory.getLogger(ProductConsumeTh.class);
 26     public static int productThSize = 2;//生產者線程數
 27     public static int consumeThSize = 5;//消費者線程數
 28     public static int maxDealSize = 10;//單個消費者線程每次最多處理數量
 29     public static int maxQueueSize = 1000;//生產者隊列最多數量
 30     //用String做爲示例,實際使用換成實際類型
 31     public static BlockingQueue<String> consumeQueue = new LinkedBlockingQueue<String>(maxQueueSize);
 32     private static CountDownLatch productLatch;//生產者攔截計數
 33     private static CountDownLatch consumeLatch;//消費者攔截計數
 34 
 35     private static boolean productRun;//生產者是否在生產
 36 
 37     public static void main(String[] args) {
 38         productRun = true;
 39         ExecutorService executorService = Executors.newFixedThreadPool(consumeThSize + productThSize);
 40         consumeLatch = new CountDownLatch(consumeThSize);
 41         productLatch = new CountDownLatch(productThSize);
 42         //開啓讀線程
 43         for (int thNo = 1; thNo <= productThSize; thNo++) {
 44             executorService.execute(new ProductRunable(thNo));
 45         }
 46         DateUtil.sleepForOneSecond();//先讀一會
 47         //開啓寫線程
 48         for (int thNo = 1; thNo <= consumeThSize; thNo++) {
 49             executorService.execute(new ConsumeRunable(thNo));
 50         }
 51         executorService.shutdown();
 52         // 等待寫線程完
 53         try {
 54             productLatch.await();
 55         } catch (InterruptedException e) {
 56             log.error("error in getLatch", e);
 57         }
 58         productRun = false;
 59         // 等待寫線程完
 60         try {
 61             consumeLatch.await();
 62         } catch (InterruptedException e) {
 63             log.error("error in writeLatch", e);
 64         }
 65         System.out.println("main End");
 66     }
 67 
 68     static class ProductRunable implements Runnable {
 69         private int thNo;
 70 
 71         public ProductRunable(int thNo) {
 72             this.thNo = thNo;
 73 
 74         }
 75 
 76         @Override
 77         public void run() {
 78             for (int i = 0; i < 10; i++) {
 79                 //取數據,由於是示例因此直接內存構建
 80                 List<String> list = new ArrayList<String>();
 81                 list.add("th" + thNo + " a" + i);
 82                 //加入隊列
 83                 try {
 84                     BlockingQueueUtil.put(consumeQueue, list);
 85                 } catch (InterruptedException e) {
 86                     log.error("QueueUtil.put:", e);
 87                 }
 88                 //實際中能夠若是取不到數據就休息一會,或者退出本線程,視程序是一直在跑的work,仍是隻跑一次的job
 89                 System.out.println("geter " + thNo + " put" + list);
 90                 DateUtil.sleepForOneSecond();
 91             }
 92             System.out.println("geter " + thNo + " end");
 93             productLatch.countDown();
 94         }
 95 
 96     }
 97 
 98     static class ConsumeRunable implements Runnable {
 99         private int thNo;
100 
101         public ConsumeRunable(int thNo) {
102             this.thNo = thNo;
103         }
104 
105         @Override
106         public void run() {
107             int dealSize = 0;
108             //只有在生產者中止生產時,而且處理完全部數據纔會退出
109             while (productRun || dealSize > 0) {
110                 //獲得當前線程要處理的數據
111                 List<String> sourceDatas = new ArrayList<String>();
112                 consumeQueue.drainTo(sourceDatas, maxDealSize);
113                 //進行處理
114                 if (CollectionUtils.isEmpty(sourceDatas)) {
115                     dealSize = 0;
116                     continue;
117                 }
118                 dealSize = sourceDatas.size();
119                 try {
120                     System.out.println("writer " + thNo + " deal:" + sourceDatas);
121                 } catch (Exception e) {
122                     e.printStackTrace();
123                     log.error("dtData", e);
124                 }
125                 //本處可考慮無數據處理時sleep休息一會
126 
127             }
128             System.out.println("writer " + thNo + " end");
129             consumeLatch.countDown();
130         }
131     }
132 }
 1 package me.util;
 2 
 3 import java.util.Collection;
 4 import java.util.concurrent.BlockingQueue;
 5 
 6 /**
 7  * 阻塞隊列
 8  * @author guanpanpan
 9  *
10  */
11 public class BlockingQueueUtil {
12     /**
13      * 向隊列加入一組數據,若是對隊已滿會阻塞
14      */
15     public static <T> void put(BlockingQueue<T> blockingQueue, Collection<T> collection) throws InterruptedException {
16         for (T object : collection) {
17             blockingQueue.put(object);
18         }
19     }
20 
21     /**
22      * 向隊列加入單個數據,若是對隊已滿會阻塞
23      */
24     public static <T> void put(BlockingQueue<T> blockingQueue, T object) throws InterruptedException {
25         blockingQueue.put(object);
26     }
27 }

 (3)使用取模來實現多線程處理不一樣數據

下面代碼順帶演示了下,線程編程和單元測試的一些關係,有時須要爲單元測試改變一些原代碼

  1 package me.jdk.thread;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Collection;
  5 import java.util.List;
  6 import java.util.concurrent.ExecutorService;
  7 import java.util.concurrent.Executors;
  8 
  9 import me.util.DateUtil;
 10 import me.util.ModeUtil;
 11 
 12 import org.slf4j.Logger;
 13 import org.slf4j.LoggerFactory;
 14 
 15 /**
 16  * 多線程處理同一批數據
 17  * 好處:不一樣線程處理固定數據,不會有重複取的問題
 18  * @author guanpanpan
 19  *
 20  */
 21 public class MulThreadSameData_Mod {
 22     protected final static Logger log = LoggerFactory.getLogger(MulThreadSameData_Mod.class);
 23     //若是想多個線程擁有啓動,中止等操做就持有它
 24     private static List<DataDealRunable> jobRunables = new ArrayList<DataDealRunable>();
 25     private static int threadSize = 10;
 26 
 27     public static void main(String[] args) {
 28 
 29         //初始化相應線程
 30         for (int thNo = 1; thNo <= threadSize; thNo++) {
 31             jobRunables.add(new DataDealRunable(thNo));
 32         }
 33         //啓動相應服務線程
 34         ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
 35         for (DataDealRunable jobRunable : jobRunables) {
 36             executorService.execute(jobRunable);
 37         }
 38         executorService.shutdown();
 39         //在單元測試時可使用,在本處只爲展現,移植到例子以前是沒有的
 40         sleepStopAndWait();
 41         log.info("main End");
 42     }
 43 
 44     static class DataDealRunable implements Runnable {
 45         protected boolean run = true;//控制執行 
 46         public boolean runing = true;//執行狀態,用於單元測試
 47         protected int threadId;
 48 
 49         public DataDealRunable(int threadId) {
 50             this.threadId = threadId;
 51         }
 52 
 53         public void stop() {
 54             this.run = false;
 55         }
 56 
 57         @Override
 58         public void run() {
 59             runing = true;
 60             while (run) {
 61                 //示意從數據庫取到數據
 62                 List<String> datas = getDatasFromDb();
 63                 //獲得當前線程要處理的數據
 64                 Collection<String> datasToDeal = getDealData(datas, threadSize, threadId);
 65                 System.out.println("th" + threadId + "do something" + datasToDeal.size());
 66                 DateUtil.sleepForOneSecond();
 67             }
 68             runing = false;
 69         }
 70     }
 71 
 72     public static void stopService() {
 73         for (DataDealRunable jobRunable : jobRunables) {
 74             jobRunable.stop();
 75         }
 76 
 77     }
 78 
 79     public static void sleepStopAndWait() {
 80         //先讓執行一會,使線程獲得執行
 81         DateUtil.sleepForOneSecond();
 82         //中止線程
 83         stopService();
 84         //等待線程結束,只因此未使用latch來作,是由於本程序是一直在跑的worker,只有在集成測試時才須要中止
 85         waitStop();
 86     }
 87 
 88     /**
 89      * 在stopService後調用,等待線程退出,用於測試
 90      */
 91     private static void waitStop() {
 92         boolean running = true;
 93         while (running) {
 94             running = false;
 95             for (DataDealRunable jobRunable : jobRunables) {
 96                 if (jobRunable.runing) {
 97                     running = true;
 98                 }
 99             }
100             if (running) {
101                 DateUtil.sleepForOneSecond();
102             }
103 
104         }
105 
106     }
107 
108     public static Collection<String> getDealData(Collection<String> datas, int modeSize, int modeNo) {
109         Collection<String> modedatas = new ArrayList<String>();
110         for (String data : datas) {
111             if (modeNo == ModeUtil.getRandMode(data, modeSize)) {
112                 modedatas.add(data);
113             }
114         }
115         return modedatas;
116     }
117 
118     /**
119      * 模擬從數據庫取得數據
120      */
121     private static List<String> getDatasFromDb() {
122         /**要處理的數據*/
123         List<String> datas = new ArrayList<String>();
124         for (int i = 0; i < 1000; i++) {
125             datas.add("data" + i);
126         }
127         return datas;
128     }
129 }
 1 package me.util;
 2 
 3 import me.arithmetic.hash.HashAlgorithm;
 4 
 5 /**
 6  * 分庫分表算法
 7  * 計算庫和表的hash種子不能同樣,避免分庫和分表奇偶問題形成的不平均
 8  * @author guanpanpan
 9  *
10  */
11 public class ModeUtil {
12     /**
13      * hash算法的實現類,採用的是memcached的實現類。
14      */
15     private static HashAlgorithm ketemaHash = HashAlgorithm.KETAMA_HASH;
16 
17     /**
18      * 計算一個字符串的hash碼,原理爲先計算md5碼,再計算md5碼得hash。 這種算法能保證hash數據的均勻分佈
19      * 在計算hash前,傳入的Pin會被轉成小寫
20      * 
21      * @param pin
22      *            用戶登錄名(也能夠是其餘字符串)
23      * @return long型的hashcode
24      */
25     public static long getHash(String pin) {
26         String lcasePin = pin.toLowerCase().trim();
27         long hashCode = Math.abs(ketemaHash.hash(lcasePin));
28         return hashCode;
29     }
30 
31     public static int getRandMode(String pin, int modeSize) {
32         return (int) (getHash(pin) % modeSize + 1);
33     }
34 
35 }
相關文章
相關標籤/搜索