(1)總論java
1.能夠不用多線程最好不要用算法
2.若是能夠不共享數據最好不要共享spring
3.服務器端最佳線程數量=((線程等待時間+線程cpu時間)/線程cpu時間) * cpu數量數據庫
由於數據庫訪問等待形成線程等待時間長比較長見,下面的例子就是以數據庫數據遷徙程序說明。編程
經常使用模式服務器
(2)分幾個線程處理不一樣數據多線程
適用場景:數據能夠容易的分開處理ide
1 package me.jdk.thread; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 10 /** 11 * 多線程不一樣數據 12 * 適用場景:數據能夠容易的分開處理 13 * @author guanpanpan 14 * 15 */ 16 public class MulThrDiffData { 17 protected final static Logger log = LoggerFactory.getLogger(MulThrDiffData.class); 18 private static CountDownLatch latch; 19 20 public static void main(String[] args) { 21 int dbMax = 10; 22 int tableMax = 16; 23 //設置同時處理的線程數 24 ExecutorService executorService = Executors.newFixedThreadPool(40); 25 //設置等待計數器 26 latch = new CountDownLatch(dbMax * tableMax); 27 // 啓動線程 28 for (int tableNo = 1; tableNo <= tableMax; tableNo++) { 29 for (int dbNo = 1; dbNo <= dbMax; dbNo++) { 30 executorService.execute(new UserinfoRunable(dbNo, tableNo)); 31 } 32 } 33 //再也不接受新線程 34 executorService.shutdown(); 35 //等待程序執行完 36 try { 37 latch.await(); 38 } catch (InterruptedException e1) { 39 log.error("threadLatch.await()", e1); 40 } 41 log.info("main End"); 42 } 43 44 /** 45 * 處理用戶數據的線程 46 */ 47 static class UserinfoRunable implements Runnable { 48 int dbNo; 49 int tableNo; 50 51 public UserinfoRunable(int dbNo, int tableNo) { 52 this.dbNo = dbNo; 53 this.tableNo = tableNo; 54 } 55 56 @Override 57 public void run() { 58 System.out.println("do something" + dbNo + "-" + tableNo); 59 try { 60 Thread.sleep(1000); 61 } catch (InterruptedException e) { 62 // TODO Auto-generated catch block 63 e.printStackTrace(); 64 } 65 //計數器減去一 66 latch.countDown(); 67 } 68 } 69 }
(3)線程協做來處理同一批數據memcached
適用場景:數據處理是一個相似生產線狀況,每一個生產過程費時不一樣單元測試
1 package me.jdk.thread; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.BlockingQueue; 6 import java.util.concurrent.CountDownLatch; 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 import java.util.concurrent.LinkedBlockingQueue; 10 11 import me.util.BlockingQueueUtil; 12 import me.util.DateUtil; 13 14 import org.slf4j.Logger; 15 import org.slf4j.LoggerFactory; 16 import org.springframework.util.CollectionUtils; 17 18 /** 19 * 線程協做來處理同一批數據 20 * 本示例是針對只跑一次的job,若是須要一直在跑的woker,只須要作少量改動 21 * @author guanpanpan 22 * 23 */ 24 public class ProductConsumeTh { 25 protected final static Logger log = LoggerFactory.getLogger(ProductConsumeTh.class); 26 public static int productThSize = 2;//生產者線程數 27 public static int consumeThSize = 5;//消費者線程數 28 public static int maxDealSize = 10;//單個消費者線程每次最多處理數量 29 public static int maxQueueSize = 1000;//生產者隊列最多數量 30 //用String做爲示例,實際使用換成實際類型 31 public static BlockingQueue<String> consumeQueue = new LinkedBlockingQueue<String>(maxQueueSize); 32 private static CountDownLatch productLatch;//生產者攔截計數 33 private static CountDownLatch consumeLatch;//消費者攔截計數 34 35 private static boolean productRun;//生產者是否在生產 36 37 public static void main(String[] args) { 38 productRun = true; 39 ExecutorService executorService = Executors.newFixedThreadPool(consumeThSize + productThSize); 40 consumeLatch = new CountDownLatch(consumeThSize); 41 productLatch = new CountDownLatch(productThSize); 42 //開啓讀線程 43 for (int thNo = 1; thNo <= productThSize; thNo++) { 44 executorService.execute(new ProductRunable(thNo)); 45 } 46 DateUtil.sleepForOneSecond();//先讀一會 47 //開啓寫線程 48 for (int thNo = 1; thNo <= consumeThSize; thNo++) { 49 executorService.execute(new ConsumeRunable(thNo)); 50 } 51 executorService.shutdown(); 52 // 等待寫線程完 53 try { 54 productLatch.await(); 55 } catch (InterruptedException e) { 56 log.error("error in getLatch", e); 57 } 58 productRun = false; 59 // 等待寫線程完 60 try { 61 consumeLatch.await(); 62 } catch (InterruptedException e) { 63 log.error("error in writeLatch", e); 64 } 65 System.out.println("main End"); 66 } 67 68 static class ProductRunable implements Runnable { 69 private int thNo; 70 71 public ProductRunable(int thNo) { 72 this.thNo = thNo; 73 74 } 75 76 @Override 77 public void run() { 78 for (int i = 0; i < 10; i++) { 79 //取數據,由於是示例因此直接內存構建 80 List<String> list = new ArrayList<String>(); 81 list.add("th" + thNo + " a" + i); 82 //加入隊列 83 try { 84 BlockingQueueUtil.put(consumeQueue, list); 85 } catch (InterruptedException e) { 86 log.error("QueueUtil.put:", e); 87 } 88 //實際中能夠若是取不到數據就休息一會,或者退出本線程,視程序是一直在跑的work,仍是隻跑一次的job 89 System.out.println("geter " + thNo + " put" + list); 90 DateUtil.sleepForOneSecond(); 91 } 92 System.out.println("geter " + thNo + " end"); 93 productLatch.countDown(); 94 } 95 96 } 97 98 static class ConsumeRunable implements Runnable { 99 private int thNo; 100 101 public ConsumeRunable(int thNo) { 102 this.thNo = thNo; 103 } 104 105 @Override 106 public void run() { 107 int dealSize = 0; 108 //只有在生產者中止生產時,而且處理完全部數據纔會退出 109 while (productRun || dealSize > 0) { 110 //獲得當前線程要處理的數據 111 List<String> sourceDatas = new ArrayList<String>(); 112 consumeQueue.drainTo(sourceDatas, maxDealSize); 113 //進行處理 114 if (CollectionUtils.isEmpty(sourceDatas)) { 115 dealSize = 0; 116 continue; 117 } 118 dealSize = sourceDatas.size(); 119 try { 120 System.out.println("writer " + thNo + " deal:" + sourceDatas); 121 } catch (Exception e) { 122 e.printStackTrace(); 123 log.error("dtData", e); 124 } 125 //本處可考慮無數據處理時sleep休息一會 126 127 } 128 System.out.println("writer " + thNo + " end"); 129 consumeLatch.countDown(); 130 } 131 } 132 }
1 package me.util; 2 3 import java.util.Collection; 4 import java.util.concurrent.BlockingQueue; 5 6 /** 7 * 阻塞隊列 8 * @author guanpanpan 9 * 10 */ 11 public class BlockingQueueUtil { 12 /** 13 * 向隊列加入一組數據,若是對隊已滿會阻塞 14 */ 15 public static <T> void put(BlockingQueue<T> blockingQueue, Collection<T> collection) throws InterruptedException { 16 for (T object : collection) { 17 blockingQueue.put(object); 18 } 19 } 20 21 /** 22 * 向隊列加入單個數據,若是對隊已滿會阻塞 23 */ 24 public static <T> void put(BlockingQueue<T> blockingQueue, T object) throws InterruptedException { 25 blockingQueue.put(object); 26 } 27 }
(3)使用取模來實現多線程處理不一樣數據
下面代碼順帶演示了下,線程編程和單元測試的一些關係,有時須要爲單元測試改變一些原代碼
1 package me.jdk.thread; 2 3 import java.util.ArrayList; 4 import java.util.Collection; 5 import java.util.List; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 9 import me.util.DateUtil; 10 import me.util.ModeUtil; 11 12 import org.slf4j.Logger; 13 import org.slf4j.LoggerFactory; 14 15 /** 16 * 多線程處理同一批數據 17 * 好處:不一樣線程處理固定數據,不會有重複取的問題 18 * @author guanpanpan 19 * 20 */ 21 public class MulThreadSameData_Mod { 22 protected final static Logger log = LoggerFactory.getLogger(MulThreadSameData_Mod.class); 23 //若是想多個線程擁有啓動,中止等操做就持有它 24 private static List<DataDealRunable> jobRunables = new ArrayList<DataDealRunable>(); 25 private static int threadSize = 10; 26 27 public static void main(String[] args) { 28 29 //初始化相應線程 30 for (int thNo = 1; thNo <= threadSize; thNo++) { 31 jobRunables.add(new DataDealRunable(thNo)); 32 } 33 //啓動相應服務線程 34 ExecutorService executorService = Executors.newFixedThreadPool(threadSize); 35 for (DataDealRunable jobRunable : jobRunables) { 36 executorService.execute(jobRunable); 37 } 38 executorService.shutdown(); 39 //在單元測試時可使用,在本處只爲展現,移植到例子以前是沒有的 40 sleepStopAndWait(); 41 log.info("main End"); 42 } 43 44 static class DataDealRunable implements Runnable { 45 protected boolean run = true;//控制執行 46 public boolean runing = true;//執行狀態,用於單元測試 47 protected int threadId; 48 49 public DataDealRunable(int threadId) { 50 this.threadId = threadId; 51 } 52 53 public void stop() { 54 this.run = false; 55 } 56 57 @Override 58 public void run() { 59 runing = true; 60 while (run) { 61 //示意從數據庫取到數據 62 List<String> datas = getDatasFromDb(); 63 //獲得當前線程要處理的數據 64 Collection<String> datasToDeal = getDealData(datas, threadSize, threadId); 65 System.out.println("th" + threadId + "do something" + datasToDeal.size()); 66 DateUtil.sleepForOneSecond(); 67 } 68 runing = false; 69 } 70 } 71 72 public static void stopService() { 73 for (DataDealRunable jobRunable : jobRunables) { 74 jobRunable.stop(); 75 } 76 77 } 78 79 public static void sleepStopAndWait() { 80 //先讓執行一會,使線程獲得執行 81 DateUtil.sleepForOneSecond(); 82 //中止線程 83 stopService(); 84 //等待線程結束,只因此未使用latch來作,是由於本程序是一直在跑的worker,只有在集成測試時才須要中止 85 waitStop(); 86 } 87 88 /** 89 * 在stopService後調用,等待線程退出,用於測試 90 */ 91 private static void waitStop() { 92 boolean running = true; 93 while (running) { 94 running = false; 95 for (DataDealRunable jobRunable : jobRunables) { 96 if (jobRunable.runing) { 97 running = true; 98 } 99 } 100 if (running) { 101 DateUtil.sleepForOneSecond(); 102 } 103 104 } 105 106 } 107 108 public static Collection<String> getDealData(Collection<String> datas, int modeSize, int modeNo) { 109 Collection<String> modedatas = new ArrayList<String>(); 110 for (String data : datas) { 111 if (modeNo == ModeUtil.getRandMode(data, modeSize)) { 112 modedatas.add(data); 113 } 114 } 115 return modedatas; 116 } 117 118 /** 119 * 模擬從數據庫取得數據 120 */ 121 private static List<String> getDatasFromDb() { 122 /**要處理的數據*/ 123 List<String> datas = new ArrayList<String>(); 124 for (int i = 0; i < 1000; i++) { 125 datas.add("data" + i); 126 } 127 return datas; 128 } 129 }
1 package me.util; 2 3 import me.arithmetic.hash.HashAlgorithm; 4 5 /** 6 * 分庫分表算法 7 * 計算庫和表的hash種子不能同樣,避免分庫和分表奇偶問題形成的不平均 8 * @author guanpanpan 9 * 10 */ 11 public class ModeUtil { 12 /** 13 * hash算法的實現類,採用的是memcached的實現類。 14 */ 15 private static HashAlgorithm ketemaHash = HashAlgorithm.KETAMA_HASH; 16 17 /** 18 * 計算一個字符串的hash碼,原理爲先計算md5碼,再計算md5碼得hash。 這種算法能保證hash數據的均勻分佈 19 * 在計算hash前,傳入的Pin會被轉成小寫 20 * 21 * @param pin 22 * 用戶登錄名(也能夠是其餘字符串) 23 * @return long型的hashcode 24 */ 25 public static long getHash(String pin) { 26 String lcasePin = pin.toLowerCase().trim(); 27 long hashCode = Math.abs(ketemaHash.hash(lcasePin)); 28 return hashCode; 29 } 30 31 public static int getRandMode(String pin, int modeSize) { 32 return (int) (getHash(pin) % modeSize + 1); 33 } 34 35 }