直接入正題,如下用數據庫操做舉例。java
查看ArrayList.stream()方法的實現,如圖:mysql
發現方法實在Collection接口裏面實現的,能夠看出Stream對象是由StreamSupport.stream()方法初始化的。第一個入參是Spliterator實例,第二個入參,true表示執行的是並行操做,反之。查看api能夠發現StreamSupport.Stream()方法還有另一個以Suplier Function爲數據源的重載方法,這裏不舉例。sql
從新定位上圖spliterator()方法,如圖:數據庫
查看this的類型:api
能夠看到Spliterators.spliterator()方法的第一個入參是Iterator實例,第二個入參看文檔貌似是用於適當分割數據源,增長批處理大小(batch size)的,填0的話,內部會替換爲默認的數值,不然取自定義的值,這裏不深究,固然也但願有大牛不吝解惑。ide
好了,到這裏就能夠實現本身的Stream api了;測試
直接上碼:this
1. 建立實現Iterator接口:spa
package com.qkf.test.iterator; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Iterator; import com.qkf.test.bean.Record; /** * 用於構建Stream的Source * @author qkf */ public class ResultSetIterator implements Iterator<Record> { private Connection conn; private ResultSet rs; private PreparedStatement ps; public ResultSetIterator(Connection conn, String sql, Object... params) { assert conn != null; assert sql != null; this.conn = conn; try { ps = conn.prepareStatement(sql); if (params != null && params.length > 0) { for (int i = 1; i <= params.length; ++i) { ps.setObject(i, params[i-1]); } } rs = ps.executeQuery(); } catch (SQLException e) { closeRes(); e.printStackTrace(); } } @Override public boolean hasNext() { try { return rs.next(); } catch (SQLException e) { closeRes(); e.printStackTrace(); } return false; } @Override public Record next() { return new Record(rs); } private void close(AutoCloseable... closeable) { if (closeable != null) { for (AutoCloseable c : closeable) { try { c.close(); } catch (Exception e) { // nothing to do } } } } /** * 關閉資源 */ private void closeRes() { close(this.rs, this.ps, this.conn); } }
以上用到的Record類表示數據庫的一行記錄,上碼:code
package com.qkf.test.bean; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 表示數據行記錄 * @author qkf */ public class Record { private Map<String, Object> columnMap = null; // 行記錄保存爲key-value形式 private List<Object> columnList = null; // 行記錄保存爲列表形式 private int columnCount; // 行記錄列數 public Record(ResultSet rs) { columnMap = new HashMap<>(); columnList = new ArrayList<>(); try { ResultSetMetaData metaData = rs.getMetaData(); columnCount = metaData.getColumnCount(); if (columnCount > 0) { for (int i = 1; i <= columnCount; ++i) { Object obj = rs.getObject(i); columnList.add(obj); columnMap.put(metaData.getColumnLabel(i), obj); } } } catch (SQLException e) { e.printStackTrace(); } } public Integer getAsInt(int index) { return Integer.valueOf(columnList.get(index).toString()); } public Integer getAsInt(String name) { return Integer.valueOf(columnMap.get(name).toString()); } public Double getAsDouble(int index) { return Double.valueOf(columnList.get(index).toString()); } public Double getAsDouble(String name) { return Double.valueOf(columnMap.get(name).toString()); } public String getAsString(int index) { return String.valueOf(columnList.get(index).toString()); } public String getAsString(String name) { return String.valueOf(columnMap.get(name).toString()); } public int getColumnSize() { return this.columnCount; } @Override public String toString() { return columnMap.toString(); } }
2.根據開頭閱讀源碼獲得的Api自定義Stream api:
package com.qkf.test; import java.sql.Connection; import java.sql.DriverManager; import java.util.Spliterators; import java.util.stream.Stream; import java.util.stream.StreamSupport; import com.qkf.test.bean.Record; import com.qkf.test.iterator.ResultSetIterator; /** * 數據庫操做Stream api */ public class Records { public static Stream<Record> stream(String sql, Object... params) { try { // 注意: 這裏使用try-with-resource處理Connection的話,數據庫鏈接會提早關閉而致使如下讀取數據庫的操做拋異常 Connection conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/users?useUnicode=true&characterEncoding=utf8", "qkf", "Fengqik@5811"); return StreamSupport.stream(Spliterators.spliteratorUnknownSize( new ResultSetIterator(conn, sql, params), 0), false); } catch(Throwable e) { System.err.println("Connection refused!"); return Stream.empty(); } } }
OK, 以上就是本篇的所有代碼,下面編寫測試代碼:
測試數據:
測試代碼:
package com.qkf.test; /** * test */ public class StreamTest { public static void before() { try { Class.forName("com.mysql.cj.jdbc.Driver"); } catch(Exception e) {} } public static void main(String[] args) { before(); String sql = "select * from users where age <= ?"; Records.stream( sql, 24 // sql params ).peek(r -> { System.out.println("Record: " + r); }).filter(r -> "男".equals(r.getAsString("sex")) ).forEach(r -> System.out.println( String.format("hit: name: %s, age: %d", r.getAsString("name"), r.getAsInt(2))) ); } }
運行結果:
運行成功。
總結一下:建立Stream實例由StreamSupport.stream(Spliterator實例, 是否並行); 而Spliterator實例能夠經過Spliterators工廠類構造獲得,本例使用的工廠方法入參是(自定義的Iterator類的實例, 特徵(整形, characteristics));
大概就這樣。
若有錯誤和不足,望斧正。
晚了,睡覺。謝謝閱讀。