EF6的多線程與分庫架構設計實現

1.項目背景 數據庫

這裏簡單介紹一下項目需求背景,以前公司的項目基於EF++Repository+UnitOfWork的框架設計的,其中涉及到的技術有RabbitMq消息隊列,Autofac依賴注入等經常使用的.net插件。因爲公司的發展,業務不斷更新,變得複雜起來,對於數據的實時性、存儲容量要求也提升了一個新的高度。數據庫上下文DbContext設計的是單例模式,基本上告別了多線程高併發的數據讀寫能力,看了園子裏不少大神的博客,均爲找到適合本身當前需求的DbContext的管理方式。總結目前主要的管理方式:緩存

1)DbContext單例模式(長鏈接)。即公司以前的設計。很明顯,這種設計方式沒法支持多線程同步數據操做。報各類錯誤,最多見的,好比:集合已修改,沒法進行枚舉操做。---棄用多線程

2)Using模式(短鏈接)。這種模式適合一些對於外鍵,導航屬性不常用的場合,因爲導航屬性是放在上下文緩存中的,一旦上下文釋放掉,導航屬性就爲null。固然,也嘗試了其餘大神的作法,好比,在上下文釋放以前轉換爲        ToList或者使用飢餓加載的方式(ps:這種方式很不靈活,你總不可能遇到一個類類型就去利用反射加載找到它具備的導航屬性吧或者直接InCluding),這些方法依舊沒有辦法解決目前的困境。也嘗試這直接賦值給一個定義的同類      型的變量,可是對於這種帶導航的導航的複雜類的深拷貝,沒有找到合適的路子,有知道的能夠告訴我,很是感謝!架構

以上兩種方式及網上尋找的其餘方式都沒有解決個人問題。這裏先上一下以前的Repository:併發

 1 using System.Data.Entity;
 2 using System.Data.Entity.Validation;
 3 
 4 namespace MM.Data.Library.Entityframework
 5 {
 6     public class EntityFrameworkRepositoryContext : RepositoryContext, IEntityFrameworkRepositoryContext
 7     {
 8         protected DbContext container;
 9 
10         public EntityFrameworkRepositoryContext(DbContext container)
11         {
12             this.container = container;
13         }
14 
15         public override void RegisterNew<TAggregateRoot>(TAggregateRoot obj)
16         {            
17             this.container.Set<TAggregateRoot>().Add(obj);
18             this.IsCommit = false;
19         }
20 
21         public override void RegisterModified<TAggregateRoot>(TAggregateRoot obj)
22         {
23             if (this.container.Entry<TAggregateRoot>(obj).State == EntityState.Detached)
24             {
25                 this.container.Set<TAggregateRoot>().Attach(obj);
26             }
27             this.container.Entry<TAggregateRoot>(obj).State = EntityState.Modified;
28             this.IsCommit = false;
29         }
30 
31         public override void RegisterDeleted<TAggregateRoot>(TAggregateRoot obj)
32         {
33             this.container.Set<TAggregateRoot>().Remove(obj);
34             this.IsCommit = false;
35         }
36 
37         public override void Rollback()
38         {
39             this.IsCommit = false;
40         }
41 
42         protected override void DoCommit()
43         {
44             if (!IsCommit)
45             {
46                 //var count = container.SaveChanges();
47                 //IsCommit = true;
48                 try
49                 {
50                     var count = container.SaveChanges();
51                     IsCommit = true;
52                 }
53                 catch (DbEntityValidationException dbEx)
54                 {
55                     foreach (var validationErrors in dbEx.EntityValidationErrors)
56                     {
57                         foreach (var validationError in validationErrors.ValidationErrors)
58                         { 
59                         }
60                     }
61                     IsCommit = false;
62                 }
63             }
64         }
65 
66         public System.Data.Entity.DbContext DbContext
67         {
68             get { return container; }
69         }
70 
71         public override void Dispose()
72         {
73             if (container != null)
74                 container.Dispose();
75         }
76     }
77 }
View Code

2.設計思路及方法框架

 從上下文的單例模式來看,所要解決的問題無非就是在多線程對數據庫寫操做上面。只要在這上面作手腳,問題應該就能引刃而解。個人想法是將全部的要修改的數據分別放入UpdateList,InsertList,DeleteList三個集合中去,而後提交到數據庫保存。至於DbContext的管理,經過一個數據庫工廠獲取,保證每個數據庫的鏈接都是惟一的,不重複的(防止發生相似這種錯誤:正在建立模型,此時不可以使用上下文。),用的時候直接去Factory拿。等到數據庫提交成功後,清空集合數據。看起來,實現起來很容易,可是由於還涉及到其餘技術,好比Redis。因此實現過程費勁。也許個人能力還差不少。總之,廢話很少說,直接上部分實現代碼:async

數據庫上下文創建工廠:ide

 1     /// <summary>
 2     /// 數據庫創建工廠
 3     /// Modify By:
 4     /// Modify Date:
 5     /// Modify Reason:
 6     /// </summary>
 7     public sealed class DbFactory
 8     {
 9         public static IDbContext GetCurrentDbContext(string connectstring,string threadName)
10         {
11             lock (threadName)
12             {
13                 //CallContext:是線程內部惟一的獨用的數據槽(一塊內存空間)  
14                 //傳遞Context進去獲取實例的信息,在這裏進行強制轉換。  
15                 var Context = CallContext.GetData("Context") as IDbContext;
16 
17                 if (Context == null)  //線程在內存中沒有此上下文  
18                 {
19                     var Scope = UnitoonIotContainer.Container.BeginLifetimeScope();
20                     //若是不存在上下文 建立一個(自定義)EF上下文  而且放在數據內存中去  
21                     Context = Scope.Resolve<IDbContext>(new NamedParameter("connectionString", connectstring));
22                     CallContext.SetData("Context", Context);
23                 }
24                 else
25                 {
26 
27                     if (!Context.ConnectionString.Equals(connectstring))
28                     {
29                         var Scope = UnitoonIotContainer.Container.BeginLifetimeScope();
30                         //若是不存在上下文 建立一個(自定義)EF上下文  而且放在數據內存中去  
31                         Context = Scope.Resolve<IDbContext>(new NamedParameter("connectionString", connectstring));
32                         CallContext.SetData("Context", Context);
33                     }
34                 }
35                 return Context;
36             }
37         }
38 
39     }
View Code

Repository:函數

  1     public class RepositoryBase<T, TContext> : IRepositoryBase<T> where T : BaseEntity
  2 
  3         where TContext : ContextBase, IDbContext, IDisposable, new()
  4     {
  5         public List<T> InsertList { get; set; }
  6         public List<T> DeleteList { get; set; }
  7         public List<T> UpdateList { get; set; }
  8 
  9         #region field
 10 
 11         protected readonly string Connectstring;
 12         ///// <summary>
 13         ///// </summary>
 14         //protected static IDbContext Context;
 15         protected  IDbContext dbContext;
 16         private static readonly ILifetimeScope Scope;
 17         public static int xcount = 0;
 18         /////// <summary>
 19         /////// </summary>
 20         //protected readonly DbSet<T> Dbset;
 21 
 22         #endregion
 23 
 24         #region ctor
 25 
 26         static RepositoryBase()
 27         {
 28             Scope = UnitoonIotContainer.Container.BeginLifetimeScope();
 29         }
 30 
 31         /// <summary>
 32         ///     使用默認鏈接字符串name=connName
 33         /// </summary>
 34         public RepositoryBase() : this("")
 35         {
 36         }
 37 
 38         /// <summary>
 39         /// 構造函數
 40         /// </summary>
 41         /// <param name="connectionString">鏈接字符串</param>
 42         public RepositoryBase(string connectionString)
 43         {
 44             InsertList = new List<T>();
 45             DeleteList = new List<T>();
 46             UpdateList = new List<T>();
 47 
 48 
 49             //*****作如下調整,初始化,創建全部數據庫鏈接,保持長鏈接狀態,在用的時候去判斷使用鏈接
 50 
 51 
 52             //todo 待處理
 53 
 54 
 55             if (string.IsNullOrWhiteSpace(connectionString))
 56             {
 57                 var name = DataBase.GetConnectionString(Activator.CreateInstance<T>().DbType);
 58                 //Context= ContextHelper.GetDbContext(Activator.CreateInstance<T>().DbType);
 59                 connectionString = name;
 60             }
 61             Connectstring = connectionString;
 62             
 63            // Context = Scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
 64 
 65             //Context = new TContext { ConnectionString = connectionString };
 66 
 67 
 68             //  Dbset = Context.Set<T>();
 69 
 70             //var loggerFactory = ((DbContext)Context).GetService<ILoggerFactory>();
 71             //loggerFactory.AddProvider(new DbLoggerProvider(Console.WriteLine));
 72             //loggerFactory.AddConsole(minLevel: LogLevel.Warning);
 73         }
 74 
 75         //public RepositoryBase(TContext context)
 76         //{
 77         //    Context = context;
 78         //    Dbset = context.Set<T>();
 79         //}
 80 
 81         #endregion
 82 
 83         #region Method
 84 
 85         //public virtual IDbContext GetDbContext(ILifetimeScope scope)
 86         //{
 87 
 88         //}
 89 
 90         #region Check Model
 91 
 92         /// <summary>
 93         ///     校訂Model
 94         /// </summary>
 95         protected virtual void ValidModel()
 96         {
 97         }
 98 
 99         #endregion
100 
101         #region Update
102 
103         public virtual void Update(T entity)
104         {
105             Check.NotNull(entity, "entity");
106             UpdateList.Add(entity);
107             //context.Set<T>().Update(entity);
108         }
109 
110         public virtual void Update(IEnumerable<T> entities)
111         {
112             Check.NotNull(entities, "entities");
113             UpdateList.AddRange(entities);
114         }
115 
116         #endregion
117 
118         #region PageList
119 
120         public virtual IEnumerable<T> GetPageList(Expression<Func<T, bool>> where, Expression<Func<T, object>> orderBy,
121             int pageIndex, int pageSize)
122         {
123            //todo
124         }
125 
126         #endregion
127 
128         #region Insert
129 
130         public virtual void Add(T entity)
131         {
132             Check.NotNull(entity, "entity");
133             //排除已經存在的項(對於多線程沒有任何用處)
134             if (!InsertList.Exists(e => e.Equals(entity)))
135             {
136                 InsertList.Add(entity);
137             }
138 
139         }
140 
141         public virtual void Add(IEnumerable<T> entities)
142         {
143             Check.NotNull(entities, "entities");
144             InsertList.AddRange(entities);
145         }
146 
147         public void BulkInsert(IEnumerable<T> entities)
148         {
149             Check.NotNull(entities, "entities");
150             InsertList.AddRange(entities);
151         }
152 
153         #endregion
154 
155         #region Delete
156 
157         public virtual void Delete(int id)
158         {
159             var entity = GetById(id);
160             Delete(entity);
161             // throw new NotImplementedException("Delete(int id)");
162         }
163 
164         public virtual void Delete(string id)
165         {
166             throw new NotImplementedException("Delete(string id)");
167         }
168 
169         public virtual void Delete(T entity)
170         {
171             Check.NotNull(entity, "entity");
172             DeleteList.Add(entity);
173         }
174 
175         public virtual void Delete(IEnumerable<T> entities)
176         {
177             Check.NotNull(entities, "entities");
178             foreach (var x1 in DeleteList)
179             {
180                 DeleteList.Add(x1);
181             }
182         }
183 
184         public virtual void Delete(Expression<Func<T, bool>> where)
185         {
186             var list = DeleteList.Where(where.Compile());
187             Delete(list);
188         }
189 
190         #endregion
191 
192         #region Commit
193 
194         public int Commit()
195         {
196             ValidModel();
197             //var x = Activator.CreateInstance<T>();
198             //Context = ContextHelper.GetDbContext(x.DbType);
199             //using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
200             //{
201             // var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
202 
203             //var loggerFactory = Activator.CreateInstance<ILoggerFactory>();// ((DbContext)context).GetService<ILoggerFactory>();
204             //loggerFactory.AddProvider(new DbLoggerProvider(Console.WriteLine));
205             dbContext = DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
206             var dbset = dbContext.Set<T>();
207                 if (InsertList != null && InsertList.Any())
208                 {
209                     List<T> InsertNewList = InsertList.Distinct(new PropertyComparer<T>("Id")).ToList();//按照特定規則排除重複項
210                     dbset.AddRange(InsertNewList);
211                 }
212 
213                 if (DeleteList != null && DeleteList.Any())
214                     DeleteList.ForEach(t =>
215                     {
216                        // Context.Entry(t).State = EntityState.Detached;//將以前上下文跟蹤的狀態丟棄
217                         //dbContext.Entry(t).State = EntityState.Detached;//將以前上下文跟蹤的狀態丟棄
218                         dbset.Attach(t);
219                         dbset.Remove(t);
220                     });
221                 if (UpdateList != null && UpdateList.Any())
222                 {
223                     List<T> UpdateNewList = UpdateList.Distinct(new PropertyComparer<T>("Id")).ToList();//按照特定規則排除重複項
224                     UpdateNewList.ForEach(t =>
225                     {
226                         //Context.Entry(t).State = EntityState.Detached;//將以前上下文跟蹤的狀態丟棄
227                        // dbContext.Entry(t).State = EntityState.Detached;//將以前上下文跟蹤的狀態丟棄
228                         dbContext.Entry(t).State = EntityState.Modified;
229                     });//.UpdateRange(UpdateNewList);
230                 }
231                 var result = 0;
232                 try
233                 {
234                     result = dbContext.SaveChanges();
235                 }
236                 catch (Exception ex)
237                 {
238 
239                     //  throw;
240                 }
241 
242                 if (InsertList != null && InsertList.Any())
243                     InsertList.Clear();
244                 if (DeleteList != null && DeleteList.Any())
245                     DeleteList.Clear();
246                 if (UpdateList != null && UpdateList.Any())
247                     UpdateList.Clear();
248                 return result;
249             //}
250         }
251 
252         public async Task<int> CommitAsync()
253         {
254             ValidModel();
255            //todo
256            
257         }
258 
259         #endregion
260 
261         #region Query
262         public IQueryable<T> Get()
263         {
264             return GetAll().AsQueryable();
265         }
266         //public virtual T Get(Expression<Func<T, bool>> @where)
267         //{
268         //    using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
269         //    {
270 
271         //        var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
272         //        var dbset = context.Set<T>();
273         //        return dbset.FirstOrDefault(where);
274         //    }
275         //}
276 
277         public virtual async Task<T> GetAsync(Expression<Func<T, bool>> @where)
278         {
279              //todo
280         }
281 
282         public virtual T GetById(int id)
283         {
284             throw new NotImplementedException("GetById(int id)");
285         }
286 
287         public virtual async Task<T> GetByIdAsync(int id)
288         {
289             throw new NotImplementedException("GetById(int id)");
290         }
291 
292         public virtual T GetById(string id)
293         {
294             throw new NotImplementedException("GetById(int id)");
295         }
296 
297         public virtual async Task<T> GetByIdAsync(string id)
298         {
299             throw new NotImplementedException("GetById(int id)");
300         }
301 
302         public virtual T Get(Expression<Func<T, bool>> @where)//, params string[] includeProperties
303         {
304             //var scope = UnitoonIotContainer.Container.BeginLifetimeScope();
305             //{
306             //    var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
307             //Thread.Sleep(50);
308             //lock (Context)
309             {
310                 dbContext= DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
311                 var dbset = dbContext.Set<T>().Where(e => !e.IsDeleted).AsQueryable();
312                 var entity = dbset.FirstOrDefault(where);
313                 //test
314                 // Context.Entry(entity).State = EntityState.Detached;//將以前上下文跟蹤的狀態丟棄
315                 return entity;
316             }
317            
318 
319             //}
320         }
321 
322         public virtual IEnumerable<T> GetAll()
323         {
324             //Thread.Sleep(50);
325             //lock (Context)
326             {
327                 dbContext = DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
328                 var dbset = dbContext.Set<T>().Where(e => !e.IsDeleted);
329                 //test
330                 //dbset.ToList().ForEach(t =>
331                 //{
332                 //    Context.Entry(t).State = EntityState.Detached;//將以前上下文跟蹤的狀態丟棄
333                 //});
334 
335                 return dbset;
336             }
337                
338 
339            
340             
341             //var scope = UnitoonIotContainer.Container.BeginLifetimeScope();
342 
343             // var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
344            
345         }
346 
347         public async virtual Task<IEnumerable<T>> GetAllAsync()
348         {
349             //todo
350         }
351 
352         public virtual IEnumerable<T> GetMany(Expression<Func<T, bool>> where)
353         {
354             //using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
355             //{
356             //    var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
357 
358             //    var dbset = context.Set<T>();
359             //Thread.Sleep(50);
360             //lock (Context)
361             {
362                 dbContext = DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
363                 var dbset = dbContext.Set<T>().Where(e => !e.IsDeleted);
364                 //test
365                 //dbset.ToList().ForEach(t =>
366                 //{
367                 //    Context.Entry(t).State = EntityState.Detached;//將以前上下文跟蹤的狀態丟棄
368                 //});
369                 return dbset.Where(@where).ToList();
370             }
371           
372             //}
373         }
374 
375         public virtual async Task<IEnumerable<T>> GetManyAsync(Expression<Func<T, bool>> where)
376         {
377            //todo
378         }
379 
380         public virtual IEnumerable<T> IncludeSubSets(params Expression<Func<T, object>>[] includeProperties)
381         {
382            //todo
383         }
384 
385         #region  navigation
386         /// <summary>
387         /// 加載導航
388         /// </summary>
389         /// <param name="where"></param>
390         /// <param name="includeProperties"></param>
391         /// <returns></returns>
392         //public virtual T Get(Expression<Func<T, bool>> @where, params Expression<Func<T, object>>[] includeProperties)
393         //{
394         //    using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
395         //    {
396 
397         //        var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
398         //        var dbset = context.Set<T>();
399         //        var query = includeProperties.Aggregate<Expression<Func<T, object>>, IQueryable<T>>(dbset,
400         //            (current, includeProperty) => current.Include(includeProperty));
401         //        return query.FirstOrDefault(where);
402         //    }
403         //}
404 
405         //public virtual T Get(Expression<Func<T, bool>> @where)//, params string[] includeProperties
406         //{
407         //    //反射獲取導航
408         //    var includeProperties =
409         //        Activator.CreateInstance<T>().GetType().GetProperties().Where(p => p.GetMethod.IsVirtual).Select(e => e.Name).ToArray();
410 
411         //todo
412         //}
413         #endregion
414         public List<TDynamicEntity> GetDynamic<TTable, TDynamicEntity>(Expression<Func<TTable, object>> selector,
415             Func<object, TDynamicEntity> maker) where TTable : class
416         {
417              //todo
418         }
419 
420         public List<TDynamicEntity> GetDynamic<TTable, TDynamicEntity>(Func<TTable, object> selector,
421             Func<object, TDynamicEntity> maker) where TTable : class
422         {
423            //todo
424            
425         }
426 
427         #endregion
428 
429         #region Count
430 
431         public virtual async Task<int> CountAsync()
432         {
433            //todo
434         }
435 
436         public virtual async Task<int> CountByAsync(Expression<Func<T, bool>> where)
437         {
438             //todo
439         }
440 
441         #endregion
442 
443         #region Exists
444 
445         public virtual bool Exists(string id)
446         {
447             throw new NotImplementedException();
448         }
449 
450         public virtual bool Exists(int id)
451         {
452             throw new NotImplementedException();
453         }
454 
455         public virtual async Task<bool> ExistsAsync(string id)
456         {
457             throw new NotImplementedException();
458         }
459 
460         public virtual async Task<bool> ExistsAsync(int id)
461         {
462             throw new NotImplementedException();
463         }
464 
465         public virtual bool Exists(Expression<Func<T, bool>> @where)
466         {
467             throw new NotImplementedException();
468         }
469 
470         public virtual async Task<bool> ExistsAsync(Expression<Func<T, bool>> @where)
471         {
472             throw new NotImplementedException();
473         }
474 
475         #endregion
476 
477 
478         #endregion
479     }
View Code

 

以上就是EF6的多線程與分庫架構設計實現的部分相關內容。高併發

若是有須要所有源代碼或者交流的,直接聯繫我QQ:694666781

另外,該場景下,Redis相關使用方法及可能遇到的問題及解決方法我會另寫一篇進行展開。若有不妥不正之處,請大神指正,謝謝!

相關文章
相關標籤/搜索