jdk1.8自定義Stream api.

    直接入正題,如下用數據庫操做舉例。java

    查看ArrayList.stream()方法的實現,如圖:mysql

    發現方法實在Collection接口裏面實現的,能夠看出Stream對象是由StreamSupport.stream()方法初始化的。第一個入參是Spliterator實例,第二個入參,true表示執行的是並行操做,反之。查看api能夠發現StreamSupport.Stream()方法還有另一個以Suplier Function爲數據源的重載方法,這裏不舉例。sql

    從新定位上圖spliterator()方法,如圖:數據庫

    查看this的類型:api

    能夠看到Spliterators.spliterator()方法的第一個入參是Iterator實例,第二個入參看文檔貌似是用於適當分割數據源,增長批處理大小(batch size)的,填0的話,內部會替換爲默認的數值,不然取自定義的值,這裏不深究,固然也但願有大牛不吝解惑。ide

    好了,到這裏就能夠實現本身的Stream api了;測試

    直接上碼:this

    1. 建立實現Iterator接口:spa

package com.qkf.test.iterator;


import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;

import com.qkf.test.bean.Record;

/**
 * 用於構建Stream的Source
 * @author qkf
 */
public class ResultSetIterator implements Iterator<Record> {

	private Connection conn;
	private ResultSet rs;
	private PreparedStatement ps;
	
	public ResultSetIterator(Connection conn, String sql, Object... params) {
		assert conn != null;
		assert sql != null;
		this.conn = conn;
		try {
			ps = conn.prepareStatement(sql);
			if (params != null && params.length > 0) {
				for (int i = 1; i <= params.length; ++i) {
					ps.setObject(i, params[i-1]);
				}
			}
			rs = ps.executeQuery();
		} catch (SQLException e) {
			closeRes();
			e.printStackTrace();
		}
	}
	
	@Override
	public boolean hasNext() {
		try {
			return rs.next();
		} catch (SQLException e) {
			closeRes();
			e.printStackTrace();
		}
		return false;
	}

	@Override
	public Record next() {
		return new Record(rs);
	}
	
	private void close(AutoCloseable... closeable) {
		if (closeable != null) {
			for (AutoCloseable c : closeable) {
					try {
						c.close();
					} catch (Exception e) {
						// nothing to do
					}
			}
		}
	}
	
	/**
	 * 關閉資源
	 */
	private void closeRes() {
		close(this.rs, this.ps, this.conn);
	}

}

    以上用到的Record類表示數據庫的一行記錄,上碼:code

package com.qkf.test.bean;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 表示數據行記錄
 * @author qkf
 */
public class Record {
	
	private Map<String, Object> columnMap = null; // 行記錄保存爲key-value形式
	private List<Object> columnList = null; // 行記錄保存爲列表形式
	private int columnCount; // 行記錄列數
	
	public Record(ResultSet rs) {
		columnMap = new HashMap<>();
		columnList = new ArrayList<>();
		try {
			ResultSetMetaData metaData = rs.getMetaData();
			columnCount = metaData.getColumnCount();
			if (columnCount > 0) {
				for (int i = 1; i <= columnCount; ++i) {
					Object obj = rs.getObject(i);
					columnList.add(obj);
					columnMap.put(metaData.getColumnLabel(i), obj);
				}
			}
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}
	
	public Integer getAsInt(int index) {
		return Integer.valueOf(columnList.get(index).toString());
	}
	
	public Integer getAsInt(String name) {
		return Integer.valueOf(columnMap.get(name).toString());
	}
	
	public Double getAsDouble(int index) {
		return Double.valueOf(columnList.get(index).toString());
	}
	
	public Double getAsDouble(String name) {
		return Double.valueOf(columnMap.get(name).toString());
	}
	
	public String getAsString(int index) {
		return String.valueOf(columnList.get(index).toString());
	}
	
	public String getAsString(String name) {
		return String.valueOf(columnMap.get(name).toString());
	}
	
	public int getColumnSize() {
		return this.columnCount;
	}
	
	@Override
	public String toString() {
		return columnMap.toString();
	}
}

    2.根據開頭閱讀源碼獲得的Api自定義Stream api:

package com.qkf.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import com.qkf.test.bean.Record;
import com.qkf.test.iterator.ResultSetIterator;

/**
 * 數據庫操做Stream api
 */
public class Records {
	
	public static Stream<Record> stream(String sql, Object... params) {
		try { // 注意: 這裏使用try-with-resource處理Connection的話,數據庫鏈接會提早關閉而致使如下讀取數據庫的操做拋異常
			Connection conn = DriverManager.getConnection(
					"jdbc:mysql://localhost:3306/users?useUnicode=true&characterEncoding=utf8", 
					"qkf", 
					"Fengqik@5811");
			return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
					new ResultSetIterator(conn, sql, params), 0), 
					false);
		} catch(Throwable e) {
			System.err.println("Connection refused!");
			return Stream.empty();
		}
	}
}

    OK, 以上就是本篇的所有代碼,下面編寫測試代碼:

 測試數據:

    

    測試代碼:

package com.qkf.test;

/**
 * test
 */
public class StreamTest {
	
	public static void before() {
		try {
			Class.forName("com.mysql.cj.jdbc.Driver");
		} catch(Exception e) {}
	}
	
	public static void main(String[] args) {
		before();
		String sql = "select * from users where age <= ?";
		Records.stream(
				sql, 
				24 // sql params
		).peek(r -> {
			System.out.println("Record: " + r);
		}).filter(r -> 
			"男".equals(r.getAsString("sex"))
		).forEach(r -> System.out.println(
				String.format("hit: name: %s, age: %d", 
						r.getAsString("name"), 
						r.getAsInt(2)))
		);
	}
}

    運行結果:

    運行成功。

    總結一下:建立Stream實例由StreamSupport.stream(Spliterator實例, 是否並行); 而Spliterator實例能夠經過Spliterators工廠類構造獲得,本例使用的工廠方法入參是(自定義的Iterator類的實例, 特徵(整形, characteristics));

     大概就這樣。

    若有錯誤和不足,望斧正。

    晚了,睡覺。謝謝閱讀。

相關文章
相關標籤/搜索