摘要: 原創出處 www.iocoder.cn/Sharding-JD… 「芋道源碼」歡迎轉載,保留摘要,謝謝!java
本文主要基於 Sharding-JDBC 1.5.0 正式版 git
🙂🙂🙂關注微信公衆號:【芋道源碼】有福利: github
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
- 掘金Java羣:217878901。
本文主要分享 JDBC 與 讀寫分離 的實現。爲何會把這兩個東西放在一塊兒講呢?客戶端直連數據庫的讀寫分離主要經過獲取讀庫和寫庫的不一樣鏈接來實現,和 JDBC Connection 恰好放在一塊。sql
OK,咱們先來看一段 Sharding-JDBC 官方對本身的定義和定位mongodb
Sharding-JDBC定位爲輕量級java框架,使用客戶端直連數據庫,以jar包形式提供服務,未使用中間層,無需額外部署,無其餘依賴,DBA也無需改變原有的運維方式,可理解爲加強版的JDBC驅動,舊代碼遷移成本幾乎爲零。數據庫
能夠看出,Sharding-JDBC 經過實現 JDBC規範,對上層提供透明化數據庫分庫分表的訪問。😈 黑科技?實際咱們使用的數據庫鏈接池也是經過這種方式實現對上層無感知的提供鏈接池。甚至還能夠經過這種方式實現對 Lucene、MongoDB 等等的訪問。數組
扯遠了,下面來看看 Sharding-JDBC jdbc
包的結構:緩存
unsupported
:聲明不支持的數據操做方法adapter
:適配類,實現和分庫分表無關的方法core
:核心類,實現和分庫分表相關的方法根據 core
包,能夠看出分到四種咱們超級熟悉的對象 微信
Datasourceapp
Connection
Statement
ResultSet
實現層級以下:JDBC 接口 <=(繼承)== unsupported
抽象類 <=(繼承)== unsupported
抽象類 <=(繼承)== core
類。
本文內容順序
unspported
包adapter
包Sharding-JDBC 正在收集使用公司名單:傳送門。
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門
unspported
包內的抽象類,聲明不支持操做的數據對象,全部方法都是 throw new SQLFeatureNotSupportedException()
方式。
public abstract class AbstractUnsupportedGeneratedKeysResultSet extends AbstractUnsupportedOperationResultSet {
@Override
public boolean getBoolean(final int columnIndex) throws SQLException {
throw new SQLFeatureNotSupportedException("getBoolean");
}
// .... 省略其它相似方法
}
public abstract class AbstractUnsupportedOperationConnection extends WrapperAdapter implements Connection {
@Override
public final CallableStatement prepareCall(final String sql) throws SQLException {
throw new SQLFeatureNotSupportedException("prepareCall");
}
// .... 省略其它相似方法
}複製代碼
adapter
包內的抽象類,實現和分庫分表無關的方法。
考慮到第四、5兩小節更容易理解,本小節貼的代碼會相對多
WrapperAdapter,JDBC Wrapper 適配類。
對 Wrapper 接口實現以下兩個方法:
@Override
public final <T> T unwrap(final Class<T> iface) throws SQLException {
if (isWrapperFor(iface)) {
return (T) this;
}
throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));
}
@Override
public final boolean isWrapperFor(final Class<?> iface) throws SQLException {
return iface.isInstance(this);
}複製代碼
提供子類 #recordMethodInvocation()
記錄方法調用,#replayMethodsInvocation()
回放記錄的方法調用:
/** * 記錄的方法數組 */
private final Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();
/** * 記錄方法調用. * * @param targetClass 目標類 * @param methodName 方法名稱 * @param argumentTypes 參數類型 * @param arguments 參數 */
public final void recordMethodInvocation(final Class<?> targetClass, final String methodName, final Class<?>[] argumentTypes, final Object[] arguments) {
try {
jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));
} catch (final NoSuchMethodException ex) {
throw new ShardingJdbcException(ex);
}
}
/** * 回放記錄的方法調用. * * @param target 目標對象 */
public final void replayMethodsInvocation(final Object target) {
for (JdbcMethodInvocation each : jdbcMethodInvocations) {
each.invoke(target);
}
}複製代碼
這兩個方法有什麼用途呢?例以下文會提到的 AbstractConnectionAdapter 的 #setAutoCommit()
,當它無數據庫鏈接時,先記錄;等得到到數據鏈接後,再回放:
// AbstractConnectionAdapter.java
@Override
public final void setAutoCommit(final boolean autoCommit) throws SQLException {
this.autoCommit = autoCommit;
if (getConnections().isEmpty()) { // 無數據鏈接時,記錄方法調用
recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});
return;
}
for (Connection each : getConnections()) {
each.setAutoCommit(autoCommit);
}
}複製代碼
JdbcMethodInvocation,反射調用JDBC相關方法的工具類:
public class JdbcMethodInvocation {
/** * 方法 */
@Getter
private final Method method;
/** * 方法參數 */
@Getter
private final Object[] arguments;
/** * 調用方法. * * @param target 目標對象 */
public void invoke(final Object target) {
try {
method.invoke(target, arguments); // 反射調用
} catch (final IllegalAccessException | InvocationTargetException ex) {
throw new ShardingJdbcException("Invoke jdbc method exception", ex);
}
}
}複製代碼
提供子類 #throwSQLExceptionIfNecessary()
拋出異常鏈:
protected void throwSQLExceptionIfNecessary(final Collection<SQLException> exceptions) throws SQLException {
if (exceptions.isEmpty()) { // 爲空不拋出異常
return;
}
SQLException ex = new SQLException();
for (SQLException each : exceptions) {
ex.setNextException(each); // 異常鏈
}
throw ex;
}複製代碼
AbstractDataSourceAdapter,數據源適配類。
直接點擊連接查看源碼。
AbstractConnectionAdapter,數據庫鏈接適配類。
咱們來瞅瞅你們最關心的事務相關方法的實現。
/** * 是否自動提交 */
private boolean autoCommit = true;
/** * 得到連接 * * @return 連接 */
protected abstract Collection<Connection> getConnections();
@Override
public final boolean getAutoCommit() throws SQLException {
return autoCommit;
}
@Override
public final void setAutoCommit(final boolean autoCommit) throws SQLException {
this.autoCommit = autoCommit;
if (getConnections().isEmpty()) { // 無數據鏈接時,記錄方法調用
recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});
return;
}
for (Connection each : getConnections()) {
each.setAutoCommit(autoCommit);
}
}複製代碼
#setAutoCommit()
調用時,實際會設置其所持有的 Connection 的 autoCommit
屬性#getConnections()
和分庫分表相關,於是僅抽象該方法,留給子類實現@Override
public final void commit() throws SQLException {
for (Connection each : getConnections()) {
each.commit();
}
}
@Override
public final void rollback() throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : getConnections()) {
try {
each.rollback();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}複製代碼
#commit()
、#rollback()
調用時,實際調用其所持有的 Connection 的方法異常狀況下,#commit()
和 #rollback()
處理方式不一樣,筆者暫時不知道答案,求證後會進行更新
#commit()
處理方式須要改爲和 #rollback()
同樣。代碼以下:
@Override
public final void commit() throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : getConnections()) {
try {
each.commit();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}複製代碼
事務級別和是否只讀相關代碼以下:
/** * 只讀 */
private boolean readOnly = true;
/** * 事務級別 */
private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
@Override
public final void setReadOnly(final boolean readOnly) throws SQLException {
this.readOnly = readOnly;
if (getConnections().isEmpty()) {
recordMethodInvocation(Connection.class, "setReadOnly", new Class[] {boolean.class}, new Object[] {readOnly});
return;
}
for (Connection each : getConnections()) {
each.setReadOnly(readOnly);
}
}
@Override
public final void setTransactionIsolation(final int level) throws SQLException {
transactionIsolation = level;
if (getConnections().isEmpty()) {
recordMethodInvocation(Connection.class, "setTransactionIsolation", new Class[] {int.class}, new Object[] {level});
return;
}
for (Connection each : getConnections()) {
each.setTransactionIsolation(level);
}
}複製代碼
AbstractStatementAdapter,靜態語句對象適配類。
@Override
public final int getUpdateCount() throws SQLException {
long result = 0;
boolean hasResult = false;
for (Statement each : getRoutedStatements()) {
if (each.getUpdateCount() > -1) {
hasResult = true;
}
result += each.getUpdateCount();
}
if (result > Integer.MAX_VALUE) {
result = Integer.MAX_VALUE;
}
return hasResult ? Long.valueOf(result).intValue() : -1;
}
/** * 獲取路由的靜態語句對象集合. * * @return 路由的靜態語句對象集合 */
protected abstract Collection<? extends Statement> getRoutedStatements();複製代碼
#getUpdateCount()
調用持有的 Statement 計算更新數量#getRoutedStatements()
和分庫分表相關,於是僅抽象該方法,留給子類實現AbstractPreparedStatementAdapter,預編譯語句對象的適配類。
#recordSetParameter()
實現對佔位符參數的設置:
/** * 記錄的設置參數方法數組 */
private final List<SetParameterMethodInvocation> setParameterMethodInvocations = new LinkedList<>();
/** * 參數 */
@Getter
private final List<Object> parameters = new ArrayList<>();
@Override
public final void setInt(final int parameterIndex, final int x) throws SQLException {
setParameter(parameterIndex, x);
recordSetParameter("setInt", new Class[]{int.class, int.class}, parameterIndex, x);
}
/** * 記錄佔位符參數 * * @param parameterIndex 佔位符參數位置 * @param value 參數 */
private void setParameter(final int parameterIndex, final Object value) {
if (parameters.size() == parameterIndex - 1) {
parameters.add(value);
return;
}
for (int i = parameters.size(); i <= parameterIndex - 1; i++) { // 用 null 填充前面未設置的位置
parameters.add(null);
}
parameters.set(parameterIndex - 1, value);
}
/** * 記錄設置參數方法調用 * * @param methodName 方法名,例如 setInt、setLong 等 * @param argumentTypes 參數類型 * @param arguments 參數 */
private void recordSetParameter(final String methodName, final Class[] argumentTypes, final Object... arguments) {
try {
setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod(methodName, argumentTypes), arguments, arguments[1]));
} catch (final NoSuchMethodException ex) {
throw new ShardingJdbcException(ex);
}
}
/** * 回放記錄的設置參數方法調用 * * @param preparedStatement 預編譯語句對象 */
protected void replaySetParameter(final PreparedStatement preparedStatement) {
addParameters();
for (SetParameterMethodInvocation each : setParameterMethodInvocations) {
updateParameterValues(each, parameters.get(each.getIndex() - 1)); // 同一個位置屢次設置,值可能不同,須要更新下
each.invoke(preparedStatement);
}
}
/** * 當使用分佈式主鍵時,生成後會添加到 parameters,此時 parameters 數量多於 setParameterMethodInvocations,須要生成該分佈式主鍵的 SetParameterMethodInvocation */
private void addParameters() {
for (int i = setParameterMethodInvocations.size(); i < parameters.size(); i++) {
recordSetParameter("setObject", new Class[]{int.class, Object.class}, i + 1, parameters.get(i));
}
}
private void updateParameterValues(final SetParameterMethodInvocation setParameterMethodInvocation, final Object value) {
if (!Objects.equals(setParameterMethodInvocation.getValue(), value)) {
setParameterMethodInvocation.changeValueArgument(value); // 修改佔位符參數
}
}複製代碼
邏輯相似 WrapperAdapter
的 #recordMethodInvocation()
,#replayMethodsInvocation()
,請認真閱讀代碼註釋
SetParameterMethodInvocation,繼承 JdbcMethodInvocation,反射調用參數設置方法的工具類:
public final class SetParameterMethodInvocation extends JdbcMethodInvocation {
/** * 位置 */
@Getter
private final int index;
/** * 參數值 */
@Getter
private final Object value;
/** * 設置參數值. * * @param value 參數值 */
public void changeValueArgument(final Object value) {
getArguments()[1] = value;
}
}複製代碼
AbstractResultSetAdapter,代理結果集適配器。
public abstract class AbstractResultSetAdapter extends AbstractUnsupportedOperationResultSet {
/** * 結果集集合 */
@Getter
private final List<ResultSet> resultSets;
@Override
// TODO should return sharding statement in future
public final Statement getStatement() throws SQLException {
return getResultSets().get(0).getStatement();
}
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
return getResultSets().get(0).getMetaData();
}
@Override
public int findColumn(final String columnLabel) throws SQLException {
return getResultSets().get(0).findColumn(columnLabel);
}
// .... 省略其它方法
}複製代碼
插入使用分佈式主鍵例子代碼以下:
// 代碼僅僅是例子,生產環境下請注意異常處理和資源關閉
String sql = "INSERT INTO t_order(uid, nickname, pid) VALUES (1, '2', ?)";
DataSource dataSource = new ShardingDataSource(shardingRule);
Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); // 返回主鍵須要 Statement.RETURN_GENERATED_KEYS
ps.setLong(1, 100);
ps.executeUpdate();
ResultSet rs = ps.getGeneratedKeys();
if (rs.next()) {
System.out.println("id:" + rs.getLong(1));
}複製代碼
調用 #executeUpdate()
方法,內部過程以下:
是否是對上層徹底透明?!咱們來看看內部是怎麼實現的。
// ShardingPreparedStatement.java
@Override
public int executeUpdate() throws SQLException {
try {
Collection<PreparedStatementUnit> preparedStatementUnits = route();
return new PreparedStatementExecutor(
getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeUpdate();
} finally {
clearBatch();
}
}複製代碼
#route()
分庫分表路由,得到預編譯語句對象執行單元( PreparedStatementUnit )集合。
public final class PreparedStatementUnit implements BaseStatementUnit {
/** * SQL 執行單元 */
private final SQLExecutionUnit sqlExecutionUnit;
/** * 預編譯語句對象 */
private final PreparedStatement statement;
}複製代碼
#executeUpdate()
調用執行引擎並行執行多個預編譯語句對象。執行時,最終調用預編譯語句對象( PreparedStatement )。咱們來看一個例子:
// PreparedStatementExecutor.java
public int executeUpdate() {
Context context = MetricsContext.start("ShardingPreparedStatement-executeUpdate");
try {
List<Integer> results = executorEngine.executePreparedStatement(sqlType, preparedStatementUnits, parameters, new ExecuteCallback<Integer>() {
@Override
public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {
// 調用 PreparedStatement#executeUpdate()
return ((PreparedStatement) baseStatementUnit.getStatement()).executeUpdate();
}
});
return accumulate(results);
} finally {
MetricsContext.stop(context);
}
}複製代碼
// ShardingPreparedStatement.java
private Collection<PreparedStatementUnit> route() throws SQLException {
Collection<PreparedStatementUnit> result = new LinkedList<>();
// 路由
setRouteResult(routingEngine.route(getParameters()));
// 遍歷 SQL 執行單元
for (SQLExecutionUnit each : getRouteResult().getExecutionUnits()) {
SQLType sqlType = getRouteResult().getSqlStatement().getType();
Collection<PreparedStatement> preparedStatements;
// 建立實際的 PreparedStatement
if (SQLType.DDL == sqlType) {
preparedStatements = generatePreparedStatementForDDL(each);
} else {
preparedStatements = Collections.singletonList(generatePreparedStatement(each));
}
getRoutedStatements().addAll(preparedStatements);
// 回放設置佔位符參數到 PreparedStatement
for (PreparedStatement preparedStatement : preparedStatements) {
replaySetParameter(preparedStatement);
result.add(new PreparedStatementUnit(each, preparedStatement));
}
}
return result;
}
/** * 建立 PreparedStatement * * @param sqlExecutionUnit SQL 執行單元 * @return PreparedStatement * @throws SQLException 當 JDBC 操做發生異常時 */
private PreparedStatement generatePreparedStatement(final SQLExecutionUnit sqlExecutionUnit) throws SQLException {
Optional<GeneratedKey> generatedKey = getGeneratedKey();
// 得到鏈接
Connection connection = getShardingConnection().getConnection(sqlExecutionUnit.getDataSource(), getRouteResult().getSqlStatement().getType());
// 聲明返回主鍵
if (isReturnGeneratedKeys() || isReturnGeneratedKeys() && generatedKey.isPresent()) {
return connection.prepareStatement(sqlExecutionUnit.getSql(), RETURN_GENERATED_KEYS);
}
return connection.prepareStatement(sqlExecutionUnit.getSql(), getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
}複製代碼
#generatePreparedStatement()
建立 PreparedStatement,後調用 #replaySetParameter()
回放設置佔位符參數到 PreparedStatement當 聲明返回主鍵 時,即 #isReturnGeneratedKeys()
返回 true
時,調用 connection.prepareStatement(sqlExecutionUnit.getSql(), RETURN_GENERATED_KEYS)
。爲何該方法會返回 true
?上文例子 conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
// ShardingConnection.java
@Override
public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
return new ShardingPreparedStatement(this, sql, Statement.RETURN_GENERATED_KEYS);
}
// ShardingPreparedStatement.java
public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int autoGeneratedKeys) {
this(shardingConnection, sql);
if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
markReturnGeneratedKeys();
}
}
protected final void markReturnGeneratedKeys() {
returnGeneratedKeys = true;
}複製代碼
聲明返回主鍵後,插入執行完成,咱們調用 #getGeneratedKeys()
能夠得到主鍵 :
// ShardingStatement.java
@Override
public ResultSet getGeneratedKeys() throws SQLException {
Optional<GeneratedKey> generatedKey = getGeneratedKey();
// 分佈式主鍵
if (generatedKey.isPresent() && returnGeneratedKeys) {
return new GeneratedKeysResultSet(routeResult.getGeneratedKeys().iterator(), generatedKey.get().getColumn(), this);
}
// 數據庫自增
if (1 == getRoutedStatements().size()) {
return getRoutedStatements().iterator().next().getGeneratedKeys();
}
return new GeneratedKeysResultSet();
}複製代碼
調用 ShardingConnection#getConnection()
方法得到該 PreparedStatement 對應的真實數據庫鏈接( Connection ):
// ShardingConnection.java
/** * 根據數據源名稱獲取相應的數據庫鏈接. * * @param dataSourceName 數據源名稱 * @param sqlType SQL語句類型 * @return 數據庫鏈接 * @throws SQLException SQL異常 */
public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {
// 從鏈接緩存中獲取鏈接
Optional<Connection> connection = getCachedConnection(dataSourceName, sqlType);
if (connection.isPresent()) {
return connection.get();
}
Context metricsContext = MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnection", dataSourceName));
//
DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName);
Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName);
String realDataSourceName;
if (dataSource instanceof MasterSlaveDataSource) {
dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
} else {
realDataSourceName = dataSourceName;
}
Connection result = dataSource.getConnection();
MetricsContext.stop(metricsContext);
// 添加到鏈接緩存
connectionMap.put(realDataSourceName, result);
// 回放 Connection 方法
replayMethodsInvocation(result);
return result;
}
private Optional<Connection> getCachedConnection(final String dataSourceName, final SQLType sqlType) {
String key = connectionMap.containsKey(dataSourceName) ? dataSourceName : MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
return Optional.fromNullable(connectionMap.get(key));
}複製代碼
#getCachedConnection()
嘗試得到已緩存的數據庫鏈接;若是緩存中不存在,獲取到鏈接後會進行緩存#replayMethodsInvocation()
回放記錄的 Connection 方法插入實現的代碼基本分享完了,由於是不斷代碼下鑽的方式分析,能夠反向向上在理理,會更加清晰。
單純從 core
包裏的 JDBC 實現,查詢流程 #executeQuery()
和 #execute()
基本一致,差異在於執行和多結果集歸併。
@Override
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
// 路由
Collection<PreparedStatementUnit> preparedStatementUnits = route();
// 執行
List<ResultSet> resultSets = new PreparedStatementExecutor(
getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
// 結果歸併
result = new ShardingResultSet(resultSets, new MergeEngine(
getShardingConnection().getShardingContext().getDatabaseType(), resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge());
} finally {
clearBatch();
}
// 設置結果集
setCurrentResultSet(result);
return result;
}複製代碼
結果歸併 #merge()
完後,建立分片結果集( ShardingResultSet )
public final class ShardingResultSet extends AbstractResultSetAdapter {
/** * 歸併結果集 */
private final ResultSetMerger mergeResultSet;
@Override
public int getInt(final int columnIndex) throws SQLException {
Object result = mergeResultSet.getValue(columnIndex, int.class);
wasNull = null == result;
return (int) ResultSetUtil.convertValue(result, int.class);
}
@Override
public int getInt(final String columnLabel) throws SQLException {
Object result = mergeResultSet.getValue(columnLabel, int.class);
wasNull = null == result;
return (int) ResultSetUtil.convertValue(result, int.class);
}
// .... 隱藏其餘相似 getXXXX() 方法
}複製代碼
建議前置閱讀:《官方文檔 —— 讀寫分離》
當你有讀寫分離的需求時,將 ShardingRule 配置對應的數據源 從 ShardingDataSource 替換成 MasterSlaveDataSource。咱們來看看 MasterSlaveDataSource 的功能和實現。
支持一主多從的讀寫分離配置,可配合分庫分表使用
// MasterSlaveDataSourceFactory.java
public final class MasterSlaveDataSourceFactory {
/** * 建立讀寫分離數據源. * * @param name 讀寫分離數據源名稱 * @param masterDataSource 主節點數據源 * @param slaveDataSource 從節點數據源 * @param otherSlaveDataSources 其餘從節點數據源 * @return 讀寫分離數據源 */
public static DataSource createDataSource(final String name, final DataSource masterDataSource, final DataSource slaveDataSource, final DataSource... otherSlaveDataSources) {
return new MasterSlaveDataSource(name, masterDataSource, Lists.asList(slaveDataSource, otherSlaveDataSources));
}
}
// MasterSlaveDataSource.java
public final class MasterSlaveDataSource extends AbstractDataSourceAdapter {
/** * 數據源名 */
private final String name;
/** * 主數據源 */
@Getter
private final DataSource masterDataSource;
/** * 從數據源集合 */
@Getter
private final List<DataSource> slaveDataSources;
}複製代碼
同一線程且同一數據庫鏈接內,若有寫入操做,之後的讀操做均從主庫讀取,用於保證數據一致性。
// ShardingConnection.java
public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {
// .... 省略部分代碼
String realDataSourceName;
if (dataSource instanceof MasterSlaveDataSource) { // 讀寫分離
dataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
realDataSourceName = MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
} else {
realDataSourceName = dataSourceName;
}
Connection result = dataSource.getConnection();
// .... 省略部分代碼
}
// MasterSlaveDataSource.java
/** * 當前線程是不是 DML 操做標識 */
private static final ThreadLocal<Boolean> DML_FLAG = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
/** * 從庫負載均衡策略 */
private final SlaveLoadBalanceStrategy slaveLoadBalanceStrategy = new RoundRobinSlaveLoadBalanceStrategy();
/** * 獲取主或從節點的數據源. * * @param sqlType SQL類型 * @return 主或從節點的數據源 */
public DataSource getDataSource(final SQLType sqlType) {
if (isMasterRoute(sqlType)) {
DML_FLAG.set(true);
return masterDataSource;
}
return slaveLoadBalanceStrategy.getDataSource(name, slaveDataSources);
}
private static boolean isMasterRoute(final SQLType sqlType) {
return SQLType.DQL != sqlType || DML_FLAG.get() || HintManagerHolder.isMasterRouteOnly();
}複製代碼
MasterSlaveDataSource#getConnection()
方法獲取真實的數據源#isMasterRoute()
判斷是否讀取主庫,如下三種狀況會訪問主庫:
DML_FLAG
實現HintManager.getInstance().setMasterRouteOnly()
實現訪問從庫時,會經過負載均衡策略( SlaveLoadBalanceStrategy ) 選擇一個從庫
// SlaveLoadBalanceStrategy.java
public interface SlaveLoadBalanceStrategy {
/** * 根據負載均衡策略獲取從庫數據源. * * @param name 讀寫分離數據源名稱 * @param slaveDataSources 從庫數據源列表 * @return 選中的從庫數據源 */
DataSource getDataSource(String name, List<DataSource> slaveDataSources);
}
// RoundRobinSlaveLoadBalanceStrategy.java
public final class RoundRobinSlaveLoadBalanceStrategy implements SlaveLoadBalanceStrategy {
private static final ConcurrentHashMap<String, AtomicInteger> COUNT_MAP = new ConcurrentHashMap<>();
@Override
public DataSource getDataSource(final String name, final List<DataSource> slaveDataSources) {
AtomicInteger count = COUNT_MAP.containsKey(name) ? COUNT_MAP.get(name) : new AtomicInteger(0);
COUNT_MAP.putIfAbsent(name, count);
count.compareAndSet(slaveDataSources.size(), 0);
return slaveDataSources.get(count.getAndIncrement() % slaveDataSources.size());
}
}複製代碼
沒有彩蛋
沒有彩
沒有
沒
下一篇,《分佈式事務(一)之最大努力型》走起。老司機,趕忙上車。
道友,分享一個朋友圈可好?否則交個道姑那敏感詞你。