摘要:經過一次併發處理數據集的Java代碼重構之旅,展現函數式編程如何使得代碼更加精練。html
難度:中級
java
在開始以前,瞭解「高階函數」和「泛型」這兩個概念是必要的。python
高階函數就是接收函數參數的函數,可以根據傳入的函數參數調節本身的行爲。相似C語言中接收函數指針的函數。最經典的就是接收排序比較函數的排序函數。高階函數不神祕哦!在Java8以前,就是那些能夠接收回調接口做爲參數的方法;在本文中,那麼接收 Function, Consumer, Supplier 做爲參數的函數都是高階函數。高階函數使得函數的能力更加靈活多變。spring
泛型是可以接納多種類型做爲參數進行處理的能力。不少函數的功能並不限於某一種具體的類型,好比快速排序,不只能夠用於整型,也能夠用於字符串,甚至可用於對象。泛型使得函數在類型處理上更加靈活。編程
高階函數和泛型兩個特色結合起來,可以使得函數具有強大的抽象表達能力。
api
基本代碼以下。主要用途是根據具體的業務數據獲取接口 IGetBizData ,併發地獲取指定Keys值對應的業務數據集。網絡
package zzz.study.function.refactor.before; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import zzz.study.function.refactor.TaskUtil; /** * Created by shuqin on 17/6/23. */ public class ConcurrentDataHandlerFrame { public static void main(String[] args) { List<Integer> allData = getAllData(getKeys(), new GetTradeData()); System.out.println(allData); } public static List<String> getKeys() { List<String> keys = new ArrayList<String>(); for (int i=0; i< 20000; i++) { keys.add(String.valueOf(i)); } return keys; } /** * 獲取全部業務數據 */ public static <T> List<T> getAllData(List<String> allKeys, final IGetBizData iGetBizData) { List<String> parts = TaskUtil.divide(allKeys.size(), 1000); System.out.println(parts); ExecutorService executor = Executors.newFixedThreadPool(parts.size()); CompletionService<List<T>> completionService = new ExecutorCompletionService<List<T>>(executor); for (String part: parts) { int start = Integer.parseInt(part.split(":")[0]); int end = Integer.parseInt(part.split(":")[1]); if (end > allKeys.size()) { end = allKeys.size(); } final List<String> tmpRowkeyList = allKeys.subList(start, end); completionService.submit(new Callable<List<T>>() { public List<T> call() throws Exception { return iGetBizData.getData(tmpRowkeyList); } }); } List<T> result = new ArrayList<T>(); for (int i=0; i< parts.size(); i++) { try { result.addAll(completionService.take().get()); } catch (Exception e) { e.printStackTrace(); } } executor.shutdown(); return result; } } /** 業務數據接口 */ interface IGetBizData<T> { List<T> getData(List<String> keys); } /** 獲取業務數據具體實現 */ class GetTradeData implements IGetBizData<Integer> { public List<Integer> getData(List<String> keys) { // maybe xxxService.getData(keys); List<Integer> result = new ArrayList<Integer>(); for (String key: keys) { result.add(Integer.valueOf(key) % 1000000000); } return result; } }
代碼自己寫得不壞,沒有拗口的地方,讀起來也比較流暢。美中不足的是,不夠通用化。 心急的讀者能夠看看最後面重構後的代碼。這裏仍是從重構過程開始。
併發
若是面對一大塊代碼不知如何下手,那麼就從小處着手,先動起來。 對於以下代碼,瞭解 Java8 Stream api 的同窗確定知道怎麼作了:app
public List<Integer> getData(List<String> keys) { // maybe xxxService.getData(keys); List<Integer> result = new ArrayList<Integer>(); for (String key: keys) { result.add(Integer.valueOf(key) % 1000000000); } return result; }
能夠寫成一行代碼:框架
return keys.stream().map(key -> Integer.valueOf(key) % 1000000000).collect(Collectors.toList());
不過, 寫多了, collect(Collectors.toList()) 會大量出現,佔篇幅,並且當 map 裏的函數比較複雜時,IDE 有時不能自動補全。注意到這個函數其實就是傳一個列表和一個數據處理函數,所以,能夠抽離出一個 StreamUtil ,以前的代碼能夠寫成:
public static <T,R> List<R> map(List<T> data, Function<T, R> mapFunc) { return data.stream().map(mapFunc).collect(Collectors.toList()); // stream replace foreach } return StreamUtil.map(keys, key -> Integer.valueOf(key) % 1000000000);
看上去是一個很日常的改動,其實是一大步。注意到 map(keys, key -> Integer.valueOf(key) % 1000000000) 並無展現該如何去計算,只是表達了要作什麼計算。 從「關注計算過程」 到「描述計算內容」,實現了計算「描述」 與「執行」的關注點分離。
好滴,已經走出了第一步!
自從瞭解了函數編程,彷佛對重複的foreach代碼生出「仇」了,巴不得消滅乾淨。 讀者能夠看到方法 getKeys 和 getAllData (從completionService獲取數據時) 分別有一段foreach循環,經過計數而後添加元素並返回一個列表(具體就不貼代碼了)。這樣的代碼看多了也會厭倦的。 實際上,能夠抽離出一個 ForeachUtil 的公用類來作這個事情。爲避免代碼佔篇幅,讀者能夠看重構後的 ForeachUtil, 而後 getKeys 的實現就能夠凝練爲一行代碼:
getKeys: return ForeachUtil.foreachAddWithReturn(2000, (ind -> Arrays.asList(String.valueOf(ind)))); getAllData: List<R> result = ForeachUtil.foreachAddWithReturn(parts.size(), (ind) -> get(ind, completionService));
棒! 每次將多行代碼變成一行代碼是否是很爽?更重要的是,每次都抽離出了通用的部分,可讓後面的代碼更好寫。
注意到,因爲 lambda 表達式沒法處理受檢異常,所以,將 get 函數抽離出來成爲一個函數,lambda 表達式就顯得更好看一點。
注意到 getAllData 裏有一個比較難看的內部類,是爲了根據一段邏輯生成一個任務類:
completionService.submit(new Callable<List<T>>() { public List<T> call() throws Exception { return iGetBizData.getData(tmpRowkeyList); } });
實際上,優秀的IDE工具好比 Intellj 會自動提示要不要替換成 lambda 。 就依它的建議:
completionService.submit(() -> iGetBizData.getData(tmpRowkeyList));
又是一行代碼! 乾淨利落!
這裏有一段代碼,根據任務劃分的區段範圍,獲取數據集的指定子集:
for (String part: parts) { int start = Integer.parseInt(part.split(":")[0]); int end = Integer.parseInt(part.split(":")[1]); if (end > allKeys.size()) { end = allKeys.size(); } final List<String> tmpRowkeyList = allKeys.subList(start, end); // submit tasks }
原本是一段容易編寫單測的獨立邏輯塊,混在 getAllData 方法裏,一來讓這段代碼的單測難寫了,二來增長了整個方法 getAllData 的單測編寫麻煩度。真是兩不討好。抽離出去更好。可參見重構後的TaskUtil. 不少程序猿都有這個容易致使單測難寫的不良習慣。
接下來作什麼呢? 看上去小的改動彷佛到盡頭了。 如今,能夠考慮改造回調接口了。實際上,函數接口是回調接口的很是有效的替代者。能夠把 getAllData 的參數 final IGetBizData iGetBizData 改爲 Function<List<String>, List<T>> iGetBizDataFunc
,表示這個函數將做用於一個列表keys,返回指定的數據集。相應的,iGetBizData.getData(tmpRowkeyList) 就能夠改爲 iGetBizDataFunc.apply(tmpRowkeyList) 。 就是這麼簡單!
讀者可能會疑惑,這樣改究竟有什麼益處呢?第一個好處就是能夠移除 iGetBizData 接口定義了。 java8以前,每次寫回調,都得定義一個接口,再寫實現類,煩不煩?
假設如今我不只須要併發獲取數據,還須要併發處理數據獲得一個數據列表,該怎麼辦呢?看上去 getAllData 已經有潛力知足需求了,但是還有一些細節要處理。實際上,無非就是給定一個T類型列表,以及一個處理列表並返回另外一個R類型列表的函數,而後利用 getAllData 已有的功能就能夠實現。 能夠抽離出一個底層的 public static <T,R> List<R> handleAllData(List<T> allKeys, Function<List<T>, List<R>> handleBizDataFunc)
方法,而後將 getAllData 的實現移入其中,對類型略加改造,就能夠實現。而後 getAllData 就能夠依賴 handleAllData 來實現了。泛型很強大!
咱們經常會在代碼裏看到不少 try-catch 語句塊。大多數程序猿可能並不以爲有什麼,但是,重複就是代碼罪惡之源。實際上,消除這些重複有一個簡單的技巧:首先看這些重複函數裏有哪幾行語句是不同的(一般是一行或兩三行),抽離出 Function (單參數單返回函數) 或 Consumer (單參數無返回函數) 或 BiFunction (雙參數單返回函數) 或 BiConsumer (雙參數無返回函數) , 而後將這個函數接口做爲參數傳進去。 function 的方法是 apply, consumer 的方法是 accept ;
重構後的代碼可見 CatchUtil 。 實際上很像 Python 裏的裝飾器,經過封裝函數的 try-catch ,給任何函數添加異常處理。 不過 Python 有萬能函數 func(*args, **keyargs) , Java 沒有能夠表示全部函數接口的萬能函數。可參見文章: python使用裝飾器捕獲異常。
接下來,咱們須要抽離出併發處理。客戶端代碼不須要知道數據處理的細節,它只須要傳一個數據列表和一個數據處理函數,其餘都交給框架層。略加修改後,可參見重構後的代碼 ExecutorUtil. 原來一團代碼通過精練後,長度減小了不少。handleAllData 如今變成了這樣:
public static <T,R> List<R> handleAllData(List<T> allKeys, Function<List<T>, List<R>> handleBizDataFunc) { return ExecutorUtil.exec(allKeys, handleBizDataFunc); }
抽離併發處理的益處在於,能夠在後續使用策略模式,提供串行計算策略和併發計算策略,在不一樣場景下選擇不一樣的計算策略。重構後的代碼沒有展現這一點。讀者能夠一試。
注意到有 System.out.println(allData); 嗯,怎麼感受有點不順眼呢?其實能夠編寫一個消費函數,改爲函數式,見以下代碼。YY: 這都要改,過不過度 ? 但是如今要方便地進行其餘簡單處理,就更容易了,沒必要編寫函數,而是編寫和傳入不一樣的 lambda 表達式便可:
public static <T> void consumer(List<T> data, Consumer<T> consumer) { data.forEach( (t) -> CatchUtil.tryDo(t, consumer) ); } consumer(allData, System.out::println); consumer(allData, (s) -> System.out.println(s*3));
注意到 handleAllData 須要傳一個數據列表 allKeys ; 更函數式的風格,這個列表應該是一個數據提供函數來得到的。可使用 Supplier 來抽象。它有一個 get 函數。 能夠將 參數改爲 Supplier getAllKeysFunc,而後用 getAllKeysFunc.get() 來取代以前的列表 allKeys.
public static <T,R> List<R> handleAllData(Supplier<List<T>> getAllKeysFunc, Function<List<T>, List<R>> handleBizDataFunc) { return handleAllData(getAllKeysFunc.get(), handleBizDataFunc); }
這樣有什麼益處呢? 抽離了列表 allKeys 的來源,如今能夠從任意地方獲取,好比從文件或網絡中獲取,只要傳入一個數據提供函數便可,這使得 handleAllData 的處理範圍更加靈活了。
瞭解柯里化的同窗知道,柯里化是將多元函數分解爲多個單元函數的屢次調用的過程,在每一次分解的過程當中,都會產生大量的副產品函數,是一個強大的函數工廠。柯里化的簡單介紹可參見文章: 函數柯里化(Currying)示例 。
如何使用 Java 模擬柯里化呢? 這要求一個併發數據處理函數返回一個函數 Function 而不是一個值列表,而返回的函數是可定製化的。這部分經過嘗試及IDE的提示,而完成的。見以下代碼:
/** * 傳入一個數據處理函數,返回一個能夠併發處理數據集的函數, 該函數接受一個指定數據集 * Java 模擬柯里化: 函數工廠 */ public static <T,R> Function<List<T>, List<R>> handleAllData(Function<List<T>, List<R>> handleBizDataFunc) { return ts -> handleAllData(ts, handleBizDataFunc); } /** * 傳入一個數據提供函數,返回一個能夠併發處理獲取的數據集的函數, 該函數接受一個數據處理函數 * Java 模擬柯里化: 函數工廠 */ public static <T,R> Function<Function<List<T>, List<R>>, List<R>> handleAllData(Supplier<List<T>> getAllKeysFunc) { return handleBizDataFunc -> handleAllData(getAllKeysFunc.get(), handleBizDataFunc); }
而後,客戶端的代碼就更加有函數式風格了(甚至顯得有點「另類」)。 第一個 handleAllData 接受一個數據處理函數,並返回一個封裝了併發處理的數據處理函數,能夠對任意指定數據集進行處理; 第二個 handleAllData 接受一個數據提供函數, 並返回一個封裝了併發處理的數據處理函數,經過指定定製化的數據處理函數來實現計算。apply 裏的對象是一個 Function ! 是否是有點思惟反轉 ? ^_^ 仔細再體味一下~~
List<Object> objs = StreamUtil.map(DataSupplier.getKeys(), s->Double.valueOf(s)); List<Double> handledData2 = handleAllData((numbers) -> StreamUtil.map(numbers, (num) -> Math.pow((double)num,2))).apply(objs); Function<List<String>, List<Object>> func = (numbers) -> StreamUtil.map(numbers, (num) -> Integer.parseInt(num)*2); List<Object> handledData3 = handleAllData(DataSupplier::getKeys).apply(func);
固然,這裏並非真正的柯里化,由於參數只有一個。Scala 的柯里化是指 f(x)(y) = x+y ; f(x) = f(x)(1) = x+1 ; f(y) = f(1)(y) = 1+y ; 能夠經過 f(x)(y) 將x或y代入不一樣的變量獲得任意多的函數。利用柯里化很容易寫成簡潔的微框架,好比一個文件集合處理框架。 filesHandler(files)(handler) 與 filesHandler(hanler)(files) 是不同的。這裏再也不過多討論。
經過使用函數式編程對過程/對象混合式代碼進行重構,使得代碼更凝練而有表達力了。雖然函數式編程還沒有普遍推廣於大型工程中,只有一部分程序猿開始嘗試使用,在理解上也須要必定的思惟轉換,不過適度使用確實能加強代碼的抽象表達力。僅僅是「高階函數+泛型+惰性求值」的基本使用,就能產生強大而凝練的表達效果。 函數式編程確有一套本身獨特的編程設計理念。 推薦閱讀《Scala函數式編程》。
現代軟件開發已經不只僅是單純地編寫代碼實現邏輯,而是含有很強的設計過程。須要仔細提煉概念、對象、操做,仔細設計對象之間的交互,有效地組合一系列關聯對象成爲高內聚低耦合的模塊,有效地隔離對象關聯最小化依賴關係,如此才能構建出容易理解和擴展、更容易演進的長久發展的軟件。編程便是設計,從具象到抽象再到具象的過程。
重構後的代碼是這樣子滴:
package zzz.study.function.refactor.result; import java.util.Arrays; import java.util.List; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import zzz.study.function.refactor.CatchUtil; import zzz.study.function.refactor.ExecutorUtil; import zzz.study.function.refactor.ForeachUtil; import zzz.study.function.refactor.StreamUtil; /** * Created by shuqin on 17/6/23. */ public class ConcurrentDataHandlerFrameRefactored { public static void main(String[] args) { List<Integer> allData = getAllData(DataSupplier::getKeys, GetTradeData::getData); consumer(allData, System.out::println); List<Double> handledData = handleAllData(allData, (numbers) -> StreamUtil.map(numbers, (num) -> Math.sqrt(num)) ); consumer(handledData, System.out::println); List<Object> objs = StreamUtil.map(DataSupplier.getKeys(), s->Double.valueOf(s)); List<Double> handledData2 = handleAllData((numbers) -> StreamUtil.map(numbers, (num) -> Math.pow((double)num,2))).apply(objs); consumer(handledData2, System.out::println); Function<List<String>, List<Object>> func = (numbers) -> StreamUtil.map(numbers, (num) -> Integer.parseInt(num)*2); List<Object> handledData3 = handleAllData(DataSupplier::getKeys).apply(func); consumer(handledData3, System.out::println); } /** * 獲取全部業務數據 * * 回調的替換 */ public static <T> List<T> getAllData(Supplier<List<String>> getAllKeysFunc, Function<List<String>, List<T>> iGetBizDataFunc) { return getAllData(getAllKeysFunc.get(), iGetBizDataFunc); } public static <T> List<T> getAllData(List<String> allKeys, Function<List<String>, List<T>> iGetBizDataFunc) { return handleAllData(allKeys, iGetBizDataFunc); } public static <T,R> List<R> handleAllData(Supplier<List<T>> getAllKeysFunc, Function<List<T>, List<R>> handleBizDataFunc) { return handleAllData(getAllKeysFunc.get(), handleBizDataFunc); } /** * 傳入一個數據處理函數,返回一個能夠併發處理數據集的函數, 該函數接受一個指定數據集 * Java 模擬柯里化: 函數工廠 */ public static <T,R> Function<List<T>, List<R>> handleAllData(Function<List<T>, List<R>> handleBizDataFunc) { return ts -> handleAllData(ts, handleBizDataFunc); } /** * 傳入一個數據提供函數,返回一個能夠併發處理獲取的數據集的函數, 該函數接受一個數據處理函數 * Java 模擬柯里化: 函數工廠 */ public static <T,R> Function<Function<List<T>, List<R>>, List<R>> handleAllData(Supplier<List<T>> getAllKeysFunc) { return handleBizDataFunc -> handleAllData(getAllKeysFunc.get(), handleBizDataFunc); } public static <T,R> List<R> handleAllData(List<T> allKeys, Function<List<T>, List<R>> handleBizDataFunc) { return ExecutorUtil.exec(allKeys, handleBizDataFunc); } public static <T> void consumer(List<T> data, Consumer<T> consumer) { data.forEach( (t) -> CatchUtil.tryDo(t, consumer) ); } public static class DataSupplier { public static List<String> getKeys() { // foreach code refining return ForeachUtil.foreachAddWithReturn(2000, (ind -> Arrays.asList(String.valueOf(ind)))); } } /** 獲取業務數據具體實現 */ public static class GetTradeData { public static List<Integer> getData(List<String> keys) { // maybe xxxService.getData(keys); return StreamUtil.map(keys, key -> Integer.valueOf(key) % 1000000000); // stream replace foreach } } }
package zzz.study.function.refactor; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; /** * Created by shuqin on 17/6/25. */ public class ExecutorUtil { private ExecutorUtil() {} private static final int CORE_CPUS = Runtime.getRuntime().availableProcessors(); private static final int TASK_SIZE = 1000; // a throol pool may be managed by spring private static ExecutorService executor = new ThreadPoolExecutor( CORE_CPUS, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(60)); /** * 根據指定的列表關鍵數據及列表數據處理器,併發地處理並返回處理後的列表數據集合 * @param allKeys 列表關鍵數據 * @param handleBizDataFunc 列表數據處理器 * @param <T> 待處理的數據參數類型 * @param <R> 待返回的數據結果類型 * @return 處理後的列表數據集合 * * NOTE: 相似實現了 stream.par.map 的功能,不帶延遲計算 */ public static <T,R> List<R> exec(List<T> allKeys, Function<List<T>, List<R>> handleBizDataFunc) { List<String> parts = TaskUtil.divide(allKeys.size(), TASK_SIZE); //System.out.println(parts); CompletionService<List<R>> completionService = new ExecutorCompletionService<>(executor); ForeachUtil.foreachDone(parts, (part) -> { final List<T> tmpRowkeyList = TaskUtil.getSubList(allKeys, part); completionService.submit( () -> handleBizDataFunc.apply(tmpRowkeyList)); // lambda replace inner class }); // foreach code refining List<R> result = ForeachUtil.foreachAddWithReturn(parts.size(), (ind) -> get(ind, completionService)); return result; } /** * 根據指定的列表關鍵數據及列表數據處理器,併發地處理 * @param allKeys 列表關鍵數據 * @param handleBizDataFunc 列表數據處理器 * @param <T> 待處理的數據參數類型 * * NOTE: foreachDone 的併發版 */ public static <T> void exec(List<T> allKeys, Consumer<List<T>> handleBizDataFunc) { List<String> parts = TaskUtil.divide(allKeys.size(), TASK_SIZE); //System.out.println(parts); ForeachUtil.foreachDone(parts, (part) -> { final List<T> tmpRowkeyList = TaskUtil.getSubList(allKeys, part); executor.execute( () -> handleBizDataFunc.accept(tmpRowkeyList)); // lambda replace inner class }); } public static <T> List<T> get(int ind, CompletionService<List<T>> completionService) { // lambda cannot handler checked exception try { return completionService.take().get(); } catch (Exception e) { e.printStackTrace(); // for log throw new RuntimeException(e.getCause()); } } }
package zzz.study.function.refactor; import java.util.ArrayList; import java.util.List; /** * Created by shuqin on 17/1/5. */ public class TaskUtil { private TaskUtil() {} public static List<String> divide(int totalSize, int persize) { List<String> parts = new ArrayList<String>(); if (totalSize <= 0 || persize <= 0) { return parts; } if (persize >= totalSize) { parts.add("0:" + totalSize); return parts; } int num = totalSize / persize + (totalSize % persize == 0 ? 0 : 1); for (int i=0; i<num; i++) { int start = persize*i; int end = persize*i+persize; if (end > totalSize) { end = totalSize; } parts.add(start + ":" + end); } return parts; } public static <T> List<T> getSubList(List<T> allKeys, String part) { int start = Integer.parseInt(part.split(":")[0]); int end = Integer.parseInt(part.split(":")[1]); if (end > allKeys.size()) { end = allKeys.size(); } return allKeys.subList(start, end); } }
package zzz.study.function.refactor; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; import java.util.function.Function; /** * Created by shuqin on 17/6/24. * * foreach 代碼通用模板 */ public class ForeachUtil { public static <T> List<T> foreachAddWithReturn(int num, Function<Integer, List<T>> getFunc) { List<T> result = new ArrayList<T>(); for (int i=0; i< num; i++) { result.addAll(CatchUtil.tryDo(i, getFunc)); } return result; } public static <T> void foreachDone(List<T> data, Consumer<T> doFunc) { for (T part: data) { CatchUtil.tryDo(part, doFunc); } } }
package zzz.study.function.refactor; import java.util.function.Consumer; import java.util.function.Function; /** * Created by shuqin on 17/6/24. */ public class CatchUtil { public static <T,R> R tryDo(T t, Function<T,R> func) { try { return func.apply(t); } catch (Exception e) { e.printStackTrace(); // for log throw new RuntimeException(e.getCause()); } } public static <T> void tryDo(T t, Consumer<T> func) { try { func.accept(t); } catch (Exception e) { e.printStackTrace(); // for log throw new RuntimeException(e.getCause()); } } }
package zzz.study.function.refactor; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; /** * Created by shuqin on 17/6/24. */ public class StreamUtil { public static <T,R> List<R> map(List<T> data, Function<T, R> mapFunc) { return data.stream().map(mapFunc).collect(Collectors.toList()); // stream replace foreach } }