經過spring抽象路由數據源+MyBatis攔截器實現數據庫自動讀寫分離

前言

以前使用的讀寫分離的方案是在mybatis中配置兩個數據源,而後生成兩個不一樣的SqlSessionTemplate而後手動去識別執行sql語句是操做主庫仍是從庫。以下圖所示:java

技術分享

好處是,你能夠人爲的去控制操做的數據庫。缺點也顯而易見,就是代碼很是麻煩,老是須要去判斷使用什麼庫,並且遇到事務的時候還必須特別當心。mysql

此次咱們利用spring抽象路由數據源+MyBatis攔截器來實現自動的讀寫分離,而且保證在使用事務的狀況下也能正確。結構以下圖所示正則表達式

 

 咱們仍是按照老套路,首先我會直接進行代碼的實現,而後根據源碼進行分析,最後作一個總結。spring

 

代碼實現

咱們一共須要5個類和兩個配置文件sql

首先來講類數據庫

/**
 * 全局動態數據源實體
 * @author LinkinStar
 *
 */
public enum DynamicDataSourceGlobal {
    READ, WRITE;
}

這是一個枚舉的實體,後面會用到apache

/**
 * 動態數據源線程持有者
 * @author LinkinStar
 *
 */
public final class DynamicDataSourceHolder {

    private static final ThreadLocal<DynamicDataSourceGlobal> holder = new ThreadLocal<DynamicDataSourceGlobal>();

    /**
     * 設置當前線程使用的數據源
     */
    public static void putDataSource(DynamicDataSourceGlobal dataSource){
        holder.set(dataSource);
    }

    /**
     * 獲取當前線程須要使用的數據源
     */
    public static DynamicDataSourceGlobal getDataSource(){
        return holder.get();
    }

    /**
     * 清空使用的數據源
     */
    public static void clearDataSource() {
        holder.remove();
    }

}

以上是兩個工具,下面就是重點了緩存

一個是咱們的主角,動態數據源,它繼承自spring的抽象動態路由數據源session

/**
 * 動態數據源(繼承自spring抽象動態路由數據源)
 * @author LinkinStar
 *
 */
public class DynamicDataSource extends AbstractRoutingDataSource {

    private Object writeDataSource; //寫數據源

    private Object readDataSource; //讀數據源

    /**
     * 在初始化以前被調用,設置默認數據源,以及數據源資源(這裏的寫法是參考源碼中的)
     */
    @Override
    public void afterPropertiesSet() {
        //若是寫數據源不存在,則拋出非法異常
        if (this.writeDataSource == null) {
            throw new IllegalArgumentException("Property ‘writeDataSource‘ is required");
        }
        //設置默認目標數據源爲主庫
        setDefaultTargetDataSource(writeDataSource);
        //設置全部數據源資源,有從庫添加,沒有就添加
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);
        if(readDataSource != null) {
            targetDataSources.put(DynamicDataSourceGlobal.READ.name(), readDataSource);
        }
        setTargetDataSources(targetDataSources);
        super.afterPropertiesSet();
    }

    /**
     * 這是AbstractRoutingDataSource類中的一個抽象方法,而它的返回值是你所要用的數據源dataSource的key值
     */
    @Override
    protected Object determineCurrentLookupKey() {
        //根據當前線程所使用的數據源進行切換
        DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();

        //若是沒有被賦值,那麼默認使用主庫
        if(dynamicDataSourceGlobal == null
                || dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE) {
            return DynamicDataSourceGlobal.WRITE.name();
        }
        //其餘狀況使用從庫
        return DynamicDataSourceGlobal.READ.name();
    }

    public void setWriteDataSource(Object writeDataSource) {
        this.writeDataSource = writeDataSource;
    }

    public Object getWriteDataSource() {
        return writeDataSource;
    }

    public Object getReadDataSource() {
        return readDataSource;
    }

    public void setReadDataSource(Object readDataSource) {
        this.readDataSource = readDataSource;
    }
}

而後是咱們的另外一個主角,動態數據源插件,實現MyBatis攔截器接口mybatis

import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.executor.keygen.SelectKeyGenerator;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.plugin.Intercepts;
import org.apache.ibatis.plugin.Invocation;
import org.apache.ibatis.plugin.Plugin;
import org.apache.ibatis.plugin.Signature;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.springframework.transaction.support.TransactionSynchronizationManager;

/**
 * 動態數據源插件,實現MyBatis攔截器接口
 * @author LinkinStar
 *
 */
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {
        MappedStatement.class, Object.class }),
@Signature(type = Executor.class, method = "query", args = {
        MappedStatement.class, Object.class, RowBounds.class,
        ResultHandler.class }) })
public class DynamicPlugin implements Interceptor {

    /**
     * 匹配SQL語句的正則表達式
     */
    private static final String REGEX = ".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*";

    /**
     * 這個map用於存放已經執行過的sql語句所對應的數據源
     */
    private static final Map<String, DynamicDataSourceGlobal> cacheMap = new ConcurrentHashMap<>();

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
     boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
        if (!actualTransactionActive) {
            return invocation.proceed();
        }
            
        //從代理類參數中獲取參數
        Object[] objects = invocation.getArgs();
        //其中參數的第一個值爲執行的sql語句
        MappedStatement ms = (MappedStatement) objects[0];

        //當前sql語句所應該使用的數據源,經過sql語句的id從map中獲取,若是獲取到,則以前已經執行過直接取,
        DynamicDataSourceGlobal dynamicDataSourceGlobal = cacheMap.get(ms.getId());
        if (dynamicDataSourceGlobal != null) {
            DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal);
            return invocation.proceed();
        }

        //若是沒有,則從新進行存放
        //ms中獲取方法,若是是查詢方法
        if (ms.getSqlCommandType().equals(SqlCommandType.SELECT)) {
            //!selectKey 爲自增id查詢主鍵(SELECT LAST_INSERT_ID() )方法,使用主庫
            if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) {
                dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
            } else {
                BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);
                //經過正則表達式匹配,肯定使用那一個數據源
                String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", " ");
                if(sql.matches(REGEX)) {
                    dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
                } else {
                    dynamicDataSourceGlobal = DynamicDataSourceGlobal.READ;
                }
            }
        } else {
            dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
        }
        //將sql對應使用的數據源放進map中存放
        cacheMap.put(ms.getId(), dynamicDataSourceGlobal);
        
        //最後設置使用的數據源
        DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal);
        
        //執行代理以後的方法
        return invocation.proceed();
    }
    
    @Override
    public Object plugin(Object target) {
        if (target instanceof Executor) {
            return Plugin.wrap(target, this);
        } else {
            return target;
        }
    }

    @Override
    public void setProperties(Properties properties) {
    }
}

最後是咱們的配角,動態數據源的事務管理器

import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;

/**
 * 動態數據源事務管理器
 * @author LinkinStar
 *
 */
public class DynamicDataSourceTransactionManager extends DataSourceTransactionManager {
    
    private static final long serialVersionUID = 1L;

    /**
     * 只讀事務到讀庫,讀寫事務到寫庫
     */
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        //根據事務可讀性進行判斷
        boolean readOnly = definition.isReadOnly();
        //只讀類型事務能夠只用從庫
        if(readOnly) {
            DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.READ);
        } else {
            DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.WRITE);
        }
        super.doBegin(transaction, definition);
    }

    /**
     * 清理本地線程的數據源(會被默認調用,調用時清除相應數據源)
     */
    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        super.doCleanupAfterCompletion(transaction);
        DynamicDataSourceHolder.clearDataSource();
    }
}

而後是兩個配置文件,根據你本身的須要進行修改

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/tx
           http://www.springframework.org/schema/tx/spring-tx.xsd">
 
    <context:property-placeholder location="classpath:resources/jdbc.properties"/>
    
    <bean id="abstractDataSource" abstract="true" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="${jdbc.driverClassName}" />
        <property name="minIdle" value="${jdbc.minIdle}"></property>
        <property name="maxIdle" value="${jdbc.maxIdle}"></property>
        <property name="maxWait" value="${jdbc.maxWait}"></property>
        <property name="maxActive" value="${jdbc.maxActive}"></property>
        <property name="initialSize" value="${jdbc.initialSize}"></property>
        <property name="testWhileIdle"><value>true</value></property>
        <property name="testOnBorrow"><value>true</value></property>
        <property name="testOnReturn"><value>false</value></property>
        <property name="validationQuery"><value>SELECT 1 FROM DUAL</value></property>
        <property name="validationQueryTimeout"><value>1</value></property>
        <property name="timeBetweenEvictionRunsMillis"><value>3000</value></property>
        <property name="numTestsPerEvictionRun"><value>2</value></property>
    </bean>
    
    <bean id="dataSourceRead"  parent="abstractDataSource">
        <property name="url" value="${jdbc.url.read}" />
        <property name="username" value="${jdbc.username.read}"/>
        <property name="password" value="${jdbc.password.read}"/>
    </bean>
    
    <bean id="dataSourceWrite"  parent="abstractDataSource">
        <property name="url" value="${jdbc.url}" />
        <property name="username" value="${jdbc.username}"/>
        <property name="password" value="${jdbc.password}"/>
    </bean>
    
    <bean id="dataSource"  class="com.ssm.dao.data.DynamicDataSource">
         <property name="writeDataSource"  ref="dataSourceWrite"></property>
         <property name="readDataSource" ref="dataSourceRead"></property>
    </bean>

    <!--配置基於註解的聲明式事務,默認使用註解來管理事務行爲-->
    <tx:annotation-driven transaction-manager="transactionManager"/>

    <!--配置事務管理器(mybatis採用的是JDBC的事務管理器)-->
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"></property>
    </bean>

    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <!--注入數據庫鏈接池-->
        <property name="dataSource" ref="dataSource" />
        <!--掃描entity包,使用別名,多個用;隔開-->
        <property name="typeAliasesPackage" value="com/ssm/entity" />
        <!--掃描sql配置文件:mapper須要的xml文件-->
        <property name="mapperLocations" value="classpath*:com/ssm/dao/sqlxml/*.xml"></property>
        <property name="plugins">
            <array>
                <bean class="com.ssm.dao.data.DynamicPlugin" />
            </array>
        </property>
    </bean>
    
    <bean id="sqlSessionTemplate" class="org.mybatis.spring.SqlSessionTemplate">
        <constructor-arg name="sqlSessionFactory" ref="sqlSessionFactory" />
    </bean>

    <!--配置掃描Dao接口包,動態實現DAO接口,注入到spring容器-->
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <!--注入SqlSessionFactory-->
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>
        <!-- 給出須要掃描的Dao接口-->
        <property name="basePackage" value="com.ssm.dao"/>
    </bean>

</beans>

 

另外就是jdbc的配置文件,也須要根據本身進行修改,這邊使用兩個

jdbc.driverClassName=com.mysql.jdbc.Driver

jdbc.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8
jdbc.username=root
jdbc.password=123456

jdbc.url.read=jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=UTF-8
jdbc.username.read=root
jdbc.password.read=123456

jdbc.maxActive = 2
jdbc.maxIdle =5
jdbc.minIdle=1
jdbc.initialSize =3
jdbc.maxWait =3000

 

至此全部的配置都已經完成,如今你已經能夠進行測試,看看在查詢和新增的時候是否使用的是不一樣的數據庫。

看看在使用事務的狀況下,是否使用相同的數據庫。

 

實現分析

首先咱們來分析兩個主角

動態數據源(繼承自spring抽象動態路由數據源)

先看一下源碼中父類的說明

/**
 * Abstract {@link javax.sql.DataSource} implementation that routes {@link #getConnection()}
 * calls to one of various target DataSources based on a lookup key. The latter is usually
 * (but not necessarily) determined through some thread-bound transaction context.
 *
 * @author Juergen Hoeller
 * @since 2.0.1
 * @see #setTargetDataSources
 * @see #setDefaultTargetDataSource
 * @see #determineCurrentLookupKey()
 */
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {

 

咱們寫的這個類中重寫了父類兩個重要的方法

一、afterPropertiesSet

首先源碼中是這樣的:

@Override
    public void afterPropertiesSet() {
        if (this.targetDataSources == null) {
            throw new IllegalArgumentException("Property ‘targetDataSources‘ is required");
        }
        this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size());
        for (Map.Entry<Object, Object> entry : this.targetDataSources.entrySet()) {
            Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());
            DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
            this.resolvedDataSources.put(lookupKey, dataSource);
        }
        if (this.defaultTargetDataSource != null) {
            this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
        }
    }

而咱們重寫的目的就是爲了設置默認咱們的主庫和從庫

 

二、determineCurrentLookupKey

這是AbstractRoutingDataSource類中的一個抽象方法,而它的返回值是你所要用的數據源dataSource的key值

在這個方法中咱們經過DynamicDataSourceHolder獲取當前線程所應該使用的數據源,而後將數據源的名字返回。也就是dataSource的key值。

 

而後是下一個主角,動態數據源插件,實現MyBatis攔截器接口,這個類一共幹了下面幾個事情

(當咱們實現了MyBatis攔截器接口以後就能在數據庫執行sql以前作操做,具體請參考別的博客,這裏不細說)

一、經過當前是否使用事務肯定數據源,若是使用事務,那麼默認使用主庫

二、從sql語句中獲取sql執行的類型,根據具體的類型肯定使用的數據源

三、利用cacheMap緩存已經進行判斷過的sql和對應執行時使用的數據源

四、經過DynamicDataSourceHolder保存當前線程所須要使用的數據源

 

最後一個是動態數據源事務管理器

這個類主要是保證,當一些事務是隻讀類型的事務時,使用的數據源是從庫。

而後保存到DynamicDataSourceHolder中

總結

一、使用此種方式實現數據庫讀寫分離,對於代碼來講不會對現有代碼形成影響,沒有入侵性,容易剝離和加入。

二、對於事務使用同一個數據庫能保證讀寫的一致性。

三、不須要人爲去判斷使用哪個數據庫,不用擔憂會出現人物問題。

四、擴展性上面,當有多個從庫的時候,不要想着配置多個從庫數據源解決問題,而是應該配置數據庫負載均衡而後實現多個從數據庫的訪問。

相關文章
相關標籤/搜索