使用EntityFrameworkCore實現Repository, UnitOfWork,支持MySQL分庫分表

 

昨天(星期五)下班,19:00左右回到家,洗個澡,而後20:30左右開始寫代碼,寫完代碼以後,上牀看了《生活大爆炸10季》1七、18兩集,發現沒有更新到19集,瞄了一眼手機,居然已是凌晨02:00多了,關掉電視睡覺,10:30左右被老婆電話吵醒,洗漱完畢,去麥當勞吃了一個早餐,而後屁顛屁顛地坐地鐵到很遠的地方去登山。登山回來以後,閒來無事,寫篇文章記錄一下昨晚所花的幾個小時乾的事情——使用EntityFrameworkCore實現Repository<TEntity>UnitOfWork<TContext>,支持MySQL分庫分表。html

因爲是使用業餘時間寫來玩的,時間也有限,因此,所有代碼作了一個基本假設:Repository<TEntity>UnitOfWork<TContext>只支持同一個IP上的MySQL分庫分表,不一樣IP上的MySQL分庫分表,須要使用不一樣的Repository<TEntity>UnitOfWork<TContext>對象。如下示例代碼,假設數據庫是按年分庫按月分表。mysql

git-flow-hostory-branch

EntityFrameworkCore默認並不支持分庫分表,咱們看一眼EntityFrameworkCore默認生成的SQL:git

Executed DbCommand [Parameters=[@p2='?', @p4='?' (Size = 8000), @p6='?' (Size = 8000)], CommandType='Text', CommandTimeout='0']
INSERT INTO `t_user_201703` (`Fis_deleted`, `Fpassword`, `Fname`)
VALUES (@p2, @p4, @p6);
SELECT LAST_INSERT_ID();

默認生成的SQL並無帶上庫名,而想要讓EntityFrameworkCore支持MySQL分庫分表,首要條件是必須能作到能夠動態地改變庫名錶名。軟件界有一句老話叫:凡是作不到的就多抽象一層,因此,想要讓EntityFrameworkCore支持MySQL分庫分表,我抽象瞭如下兩個接口, IRepository<TEntity>IUnitOfWorkgithub

/// <summary>
/// Defines the interfaces for generic repository.
/// </summary>
/// <typeparam name="TEntity">The type of the entity.</typeparam>
public interface IRepository<TEntity> where TEntity : class
{
    /// <summary>
    /// Changes the table name. This require the tables in the same database.
    /// </summary>
    /// <param name="table"></param>
    /// <remarks>
    /// This only been used for supporting multiple tables in the same model. This require the tables in the same database.
    /// </remarks>
    void ChangeTable(string table);

    /// <summary>
    /// Filters a sequence of values based on a predicate. This method default no-tracking query.
    /// </summary>
    /// <param name="predicate">A function to test each element for a condition.</param>
    /// <param name="disableTracking"><c>True</c> to disable changing tracking; otherwise, <c>false</c>. Default to <c>true</c>.</param>
    /// <returns>An <see cref="IQueryable{TEntity}"/> that contains elements that satisfy the condition specified by <paramref name="predicate"/>.</returns>
    /// <remarks>This method default no-tracking query.</remarks>
    IQueryable<TEntity> Query(Expression<Func<TEntity, bool>> predicate, bool disableTracking = true);

    /// <summary>
    /// Uses raw SQL queries to fetch the specified <typeparamref name="TEntity" /> data.
    /// </summary>
    /// <param name="sql">The raw SQL.</param>
    /// <param name="parameters">The parameters.</param>
    /// <returns>An <see cref="IQueryable{TEntity}" /> that contains elements that satisfy the condition specified by raw SQL.</returns>
    IQueryable<TEntity> FromSql(string sql, params object[] parameters);

    /// <summary>
    /// Finds an entity with the given primary key values. If found, is attached to the context and returned. If no entity is found, then null is returned.
    /// </summary>
    /// <param name="keyValues">The values of the primary key for the entity to be found.</param>
    /// <returns>The found entity or null.</returns>
    TEntity Find(params object[] keyValues);

    /// <summary>
    /// Finds an entity with the given primary key values. If found, is attached to the context and returned. If no entity is found, then null is returned.
    /// </summary>
    /// <param name="keyValues">The values of the primary key for the entity to be found.</param>
    /// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe while waiting for the task to complete.</param>
    /// <returns>A <see cref="Task{TEntity}"/> that represents the asynchronous find operation. The task result contains the found entity or null.</returns>
    Task<TEntity> FindAsync(object[] keyValues, CancellationToken cancellationToken);

    /// <summary>
    /// Inserts a new entity synchronously.
    /// </summary>
    /// <param name="entity">The entity to insert.</param>
    void Insert(TEntity entity);

    /// <summary>
    /// Inserts a new entity asynchronously.
    /// </summary>
    /// <param name="entity">The entity to insert.</param>
    /// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe while waiting for the task to complete.</param>
    /// <returns>A <see cref="Task"/> that represents the asynchronous insert operation.</returns>
    Task InsertAsync(TEntity entity, CancellationToken cancellationToken = default(CancellationToken));

    /// <summary>
    /// Updates the specified entity.
    /// </summary>
    /// <param name="entity">The entity.</param>
    void Update(TEntity entity);

    /// <summary>
    /// Deletes the entity by the specified primary key.
    /// </summary>
    /// <param name="id">The primary key value.</param>
    void Delete(object id);
}

/// <summary>
/// Defines the interfaces for unit of work.
/// </summary>
public interface IUnitOfWork : IDisposable
{
    /// <summary>
    /// Changes the database name. This require the databases in the same machine.
    /// </summary>
    /// <param name="database">The database name.</param>
    /// <remarks>
    /// This only been used for supporting multiple databases in the same model. This require the databases in the same machine.
    /// </remarks>
    void ChangeDatabase(string database);

    /// <summary>
    /// Saves all changes made in this context to the database.
    /// </summary>
    /// <returns>The number of state entries written to the database.</returns>
    int SaveChanges();

    /// <summary>
    /// Asynchronously saves all changes made in this unit of work to the database.
    /// </summary>
    /// <returns>A <see cref="Task{Int32}"/> that represents the asynchronous save operation. The task result contains the number of state entities written to database.</returns>
    Task<int> SaveChangesAsync();

    /// <summary>
    /// Executes the specified raw SQL command.
    /// </summary>
    /// <param name="sql">The raw SQL.</param>
    /// <param name="parameters">The parameters.</param>
    /// <returns>The number of state entities written to database.</returns>
    int ExecuteSqlCommand(string sql, params object[] parameters);

    /// <summary>
    /// Uses raw SQL queries to fetch the specified <typeparamref name="TEntity"/> data.
    /// </summary>
    /// <typeparam name="TEntity">The type of the entity.</typeparam>
    /// <param name="sql">The raw SQL.</param>
    /// <param name="parameters">The parameters.</param>
    /// <returns>An <see cref="IQueryable{TEntity}"/> that contains elements that satisfy the condition specified by raw SQL.</returns>
    IQueryable<TEntity> FromSql<TEntity>(string sql, params object[] parameters) where TEntity : class;
}

不少人都本身動手實現過RepositoryUnitOfWork,雖然各自實現不盡相同,可是其實現自己並無難度,但在這裏,咱們須要特別關注兩個方法:void ChangeTable(string table)void ChangeDatabase(string database)sql

/// <summary>
/// Changes the table name. This require the tables in the same database.
/// </summary>
/// <param name="table"></param>
/// <remarks>
/// This only been used for supporting multiple tables in the same model. This require the tables in the same database.
/// </remarks>
void ChangeTable(string table);
/// <summary>
/// Changes the database name. This require the databases in the same machine.
/// </summary>
/// <param name="database">The database name.</param>
/// <remarks>
/// This only been used for supporting multiple databases in the same model. This require the databases in the same machine.
/// </remarks>
void ChangeDatabase(string database);

怎麼實現這兩個方法,就須要必定的技術功底了,我之前在一家創業公司的時候,由於看不慣架構師自覺得是的樣子,本身動手寫了一個輕量級的ORM框架,若是之後有時間,我打算寫一篇《如何基於Dapper實現一個輕量級的ORM框架》的文章。ORM框架背後的動機很單純,就是數據庫Domain之間的一種雙向映射,真正把這種單純的動機搞複雜是的那些性能優化,各類緩存實現。而從Domain到數據庫這一單方向上的映射,在.NET領域藉助了一種代碼即數據的思想,再細化到C#語言代碼即數據就是表達式樹。因此,咱們有理由相信:SQL是根據表達式樹生成的。如今咱們已經找準了方向,那麼咱們看看EntityFrameworkCore在什麼地方生成表名的,也就是說,咱們只須要修改一下生成表名的代碼,就能夠作到動態生成database.table SQL。EntityFrameworkCore是經過TableExpression來生成表名的:數據庫

public class TableExpression
{
    public virtual string Table { get; }
    public virtual string Schema { get; }
}

若是你MySQL知識至少跟我同樣的水平的話,看到TableExpression表達式有一個Schema是否是當即就能夠想到:哈哈,太好了,我壓根就不用修改EntityFrameworkCore自己的代碼就能夠實現。爲何呢?好吧,看看MySQL官網怎麼說Schema的:api

In MySQL, physically, a schema is synonymous with a database. You can substitute the keyword SCHEMA instead of DATABASE in MySQL SQL syntax, for example using CREATE SCHEMA instead of CREATE DATABASE. Some other database products draw a distinction. For example, in the Oracle Database product, a schema represents only a part of a database: the tables and other objects owned by a single user.緩存

好吧,Schema就是Database,那麼咱們就用Schema.Table來表示database.table。如今事情就變得簡單了,變成了咱們如何動態地改變SchemaTable了,如下是我提供的簡化實現:性能優化

/// <summary>
/// Changes the database name. This require the databases in the same machine.
/// </summary>
/// <param name="database">The database name.</param>
/// <remarks>
/// This only been used for supporting multiple databases in the same model. This require the databases in the same machine.
/// </remarks>
public void ChangeDatabase(string database)
{
    if (_context.Model.Relational() is RelationalModelAnnotations relational)
    {
        relational.DatabaseName = database;
    }

    var connection = _context.Database.GetDbConnection();
    if (connection.State.HasFlag(ConnectionState.Open))
    {
        connection.ChangeDatabase(database);
    }

    var items = _context.Model.GetEntityTypes();
    foreach (var item in items)
    {
        if (item.Relational() is RelationalEntityTypeAnnotations extensions)
        {
            extensions.Schema = database;
        }
    }
}
/// <summary>
/// Changes the table name. This require the tables in the same database.
/// </summary>
/// <param name="table"></param>
/// <remarks>
/// This only been used for supporting multiple tables in the same model. This require the tables in the same database.
/// </remarks>
public void ChangeTable(string table)
{
    if (_dbContext.Model.FindEntityType(typeof(TEntity)).Relational() is RelationalEntityTypeAnnotations relational)
    {
        relational.TableName = table;
    }
}

OK, 雖然有點low,可是畢竟支持了MySQL分庫分表,看看怎麼用:架構

namespace QuickStart.Controllers
{
    [Route("api/[controller]")]
    public class UserController : ApiController
    {
        private readonly IUnitOfWork _unitOfWork;

        // 1. IRepositoryFactory used for readonly scenario;
        // 2. IUnitOfWork used for read/write scenario;
        // 3. IUnitOfWork<TContext> used for multiple databases scenario;

        public UserController(IUnitOfWork unitOfWork)
        {
            _unitOfWork  = unitOfWork;

            unitOfWork.ChangeDatabase($"rigofunc_{DateTime.Now.Year}");

            var userRepo = unitOfWork.GetRepository<User>();
            var postRepo = unitOfWork.GetRepository<Post>();

            var ym = DateTime.Now.ToString("yyyyMM");

            userRepo.ChangeTable($"t_user_{ym}");
            postRepo.ChangeTable($"t_post_{ym}");

            var user = new User
            {
                //UserId = 123,
                UserName = "rigofunc",
                Password = "password"
            };

            userRepo.Insert(user);

            var post = new Post
            {
                //PostId = 123,
                UserId = user.UserId,
                Content = "What a piece of junk!"
            };

            postRepo.Insert(post);

            unitOfWork.SaveChanges();

            var find = userRepo.Find(user.UserId);

            find.Password = "p@ssword";

            unitOfWork.SaveChanges();
        }

        [HttpGet]
        public IPagedList<User> Get()
        {
            _unitOfWork.ChangeDatabase($"rigofunc_2018");

            var userRepo = _unitOfWork.GetRepository<User>();

            return userRepo.Query(u => true).OrderBy(u => u.UserId).ToPagedList(0, 20);
        }
    }
}

如下是生成的SQL:

Executed DbCommand [Parameters=[@p2='?', @p4='?' (Size = 8000), @p6='?' (Size = 8000)], CommandType='Text', CommandTimeout='0']
INSERT INTO `rigofunc_2017`.`t_user_201703` (`Fis_deleted`, `Fpassword`, `Fname`)
VALUES (@p2, @p4, @p6);
SELECT LAST_INSERT_ID()
Executed DbCommand [Parameters=[@p10='?' (Size = 8000), @p12='?', @p14='?'], CommandType='Text', CommandTimeout='0']
INSERT INTO `rigofunc_2017`.`t_post_201703` (`Fcontent`, `Fis_deleted`, `Fuser_id`)
VALUES (@p10, @p12, @p14);
SELECT LAST_INSERT_ID()
Executed DbCommand [Parameters=[@p0='?', @p3='?', @p4='?' (Size = 8000)], CommandType='Text', CommandTimeout='0']
UPDATE `rigofunc_2017`.`t_user_201703` SET `Fpassword` = @p4
WHERE `Fid` = @p0 AND `Fis_deleted` = @p3;
SELECT ROW_COUNT()
Executed DbCommand [Parameters=[], CommandType='Text', CommandTimeout='0']
SELECT `u`.`Fid`, `u`.`Fis_deleted`, `u`.`Fpassword`, `u`.`Fname`
FROM `rigofunc_2017`.`t_user_201703` AS `u`
ORDER BY `u`.`Fid
Executed DbCommand [Parameters=[], CommandType='Text', CommandTimeout='0']
SELECT `u`.`Fid`, `u`.`Fis_deleted`, `u`.`Fpassword`, `u`.`Fname`
FROM `rigofunc_2018`.`t_user_201703` AS `u`
ORDER BY `u`.`Fid`

以上代碼,自己作了簡化,同時也採用了最小改動的實現,因此比較low,可是提供了最基本的實現思路,感興趣的同窗能夠本身再從EntityFrameworkCore內部改改,我以後會用一些時間實現一個高級一點的版本,而後放到個人GitHub UnitOfWork.

相關文章
相關標籤/搜索