目前咱們處理消息的同步,通常是落地到DB後,再同過異步的方式作數據的聚合和處理。至於DB的操做爲了簡單直接用了Hibernate提供的一套JPA接口,(老實說真的是不喜歡JPA,一是sql log很差分析沒法優化,二是必須很是瞭解JPA的全部關鍵字含義,否則就要出問題,因此我一直喜歡用mybatis這種更輕量的甚至spring-jdbc)。html
那麼使用JPA的過程就遇到了一些問題,見招拆招一件一件來。java
遇到的第一個問題就很是的要命,咱們的系統是一張單表須要支持multi-tenant多租戶,簡單說就是表中有個tenantId的字段來區分租戶,這是比較常見的設計。那麼在對DB作操做的時候,ORM框架應該提供分租戶的CURD接口,而不須要開發人員都本身在where
中加tenantId=***
。spring
這個問題其實沒有解決,由於Hibernate尚未實現單表的Multi-tenant(真是至關的坑)。官網文檔中說了這樣三種狀況sql
SCHEMA
Correlates to the separate schema approach. It is an error to attempt to open a session without a tenant identifier using this strategy. Additionally, a MultiTenantConnectionProvider must be specified.
DATABASE
Correlates to the separate database approach. It is an error to attempt to open a session without a tenant identifier using this strategy. Additionally, a MultiTenantConnectionProvider must be specified.
DISCRIMINATOR
Correlates to the partitioned (discriminator) approach. It is an error to attempt to open a session without a tenant identifier using this strategy. This strategy is not yet implemented in Hibernate as of 4.0 and 4.1. Its support is planned for 5.0.
能夠看到最後一種還不支持呢。沒辦法只有手寫where啦。數據庫
因爲是處理消息,即便收到DELETE的message也不能真的刪除,由於消息是亂序的,若是先來了DELETE再來UPDATE怎麼辦,其實是先UPDATE再DELETE,但因爲處理效率不一致,因此收到的消息順序也是沒法肯定的。基於這點,爲了保證數據的最終一致性,因此操做都做爲UPDATE處理。刪除操做必須是soft deletesession
能夠寫一個BaseEntity,都有isactive這個字段,默認都是truemybatis
@MappedSuperclass public class BaseEntity { @Column(name="isactive", columnDefinition="boolean DEFAULT true") private Boolean active = true; public Boolean getActive() { return active; } public void setActive(Boolean active) { this.active = active; } }
而後繼承一下併發
@Entity @Inheritance(strategy = InheritanceType.JOINED) @Table(name = "Product") @Where(clause="isactive = 1") public class ProductEntity extends BaseEntity {}
注意@Where
就是全部操做都會拼上這個condition從而實現soft delete。app
在處理相似外鍵關聯這種數據的時候,例如Product上有個CategoryId字段,那麼數據庫設計是一張Category表,一張Product表,Product表上CategoryId字段做爲外鍵關聯到Category表的ID字段。那麼做爲一個JPA的Entity,你們知道Entity是OO的,Product Entity下應該包含一個Category Entity,關係是ManyToOne的。框架
public class ProductEntity extends BaseEntity { @ManyToOne(fetch = FetchType.EAGER) @JoinColumn(name = "categoryId") private CategoryEntity category; }
(這裏要插一句,其實若是隻是把Category當普通字段,存一個CategoryId也是沒有問題的,可是在查詢的時候就須要用這個Product.CategoryId再去Category裏查一次。用了JPA以後,爲了減小一次查詢,有時候事情反而會複雜)。
至於消息,好比先收到Product的CREATE事件,這時候會拿消息體裏的categoryId去category表查一下有沒有這個Category Entity,若是有直接獲得後塞到Product的Category屬性上去,可是若是沒有這個Category怎麼辦?
若是沒有的話,按照JPA的外鍵關聯原則,咱們須要創建一個虛擬的Category,也就是說插入一條佔位數據到Category表中,只有ID有值。因此對ProductEntity作些改造。
public class ProductEntity extends BaseEntity { @ManyToOne(cascade = {CascadeType.PERSIST}, fetch = FetchType.EAGER) @NotFound(action= NotFoundAction.IGNORE) @JoinColumn(name = "categoryId") private CategoryEntity category; }
注意加了兩點,一是cascade = {CascadeType.PERSIST}
,意思是若是Persist了Product的話,發現categoryId不爲空而category表中又沒有該Category,那麼級聯插入這條數據(只有ID)。二是@NotFound(action= NotFoundAction.IGNORE)
,加這條是防止當收到一個Category.DELETE事件後軟刪除了Category,而讀取Product的時候就會Eager地得到Category,一旦獲取不到JPA會拋出EntityNotExist
的異常。加了這個註解,Product裏的category就爲null,不會出異常。
這其實是問題3的衍生,解決3的時候咱們使用了Cascade=PERSIST
,那麼在發現Category不存在的時候,JPA會發起一個insert,固然數據只有ID,其餘的字段等待真正的Category的CREATE事件來了再填充。可是併發的問題就出現了,若是正好就在發起insert以前,Category的CREATE事件來了(另外一個Worker在處理),那裏也發現表裏沒有這個Category,因此也隨即發起一個insert操做。conflict就這樣發生了,主鍵衝突!這時候怎麼辦?
我採起了一種比較粗暴的方式,就是retry,首先每次收到事件後的寫操做,都是查Entity是否存在,存在就Update,不存在就Insert。當兩個Worker同時作寫入操做,確定一個成功一個失敗,失敗的只要retry一次就會發現Entity有了(另外一個Worker寫入的),這時候變成Update操做就不會有conflict。
由於項目中依賴Spring,因此剛好有了spring-retry這個包,直接用起來。
public class RetryTemplateBuilder { protected RetryTemplate buildable; protected RetryTemplateBuilder builder; public RetryTemplateBuilder() { buildable = createBuildable(); builder = getBuilder(); } public static RetryTemplateBuilder retryTemplate() { return new RetryTemplateBuilder(); } public RetryTemplateBuilder withPolicies(RetryPolicy... policies) { CompositeRetryPolicy compositePolicy = new CompositeRetryPolicy(); compositePolicy.setPolicies(policies); buildable.setRetryPolicy(compositePolicy); return this; } public RetryTemplateBuilder withPolicies(RetryPolicy retryPolicy, BackOffPolicy backOffPolicy) { buildable.setRetryPolicy(retryPolicy); buildable.setBackOffPolicy(backOffPolicy); return this; } public RetryTemplateBuilder withPolicies(BackOffPolicy backOffPolicy) { buildable.setBackOffPolicy(backOffPolicy); return this; } public RetryTemplate build() { return buildable; } protected RetryTemplate createBuildable() { return new RetryTemplate(); } protected RetryTemplateBuilder getBuilder() { return this; } }
這是一個TemplateBuilder,能夠理解成retry的模板,一個retryTemplate能夠包含多個policy。
public class SimpleRetryPolicyBuilder { protected SimpleRetryPolicy buildable; protected SimpleRetryPolicyBuilder builder; public SimpleRetryPolicyBuilder() { buildable = createBuildable(); builder = getBuilder(); } public static SimpleRetryPolicyBuilder simpleRetryPolicy() { return new SimpleRetryPolicyBuilder(); } public static SimpleRetryPolicy simpleRetryPolicyWithRetryableExceptions(int maxAttempts, Map<Class<? extends Throwable>, Boolean> exception) { return new SimpleRetryPolicy(maxAttempts, exception); } public SimpleRetryPolicyBuilder withMaxAttempts(int maxAttempts) { buildable.setMaxAttempts(maxAttempts); return this; } public SimpleRetryPolicy build() { return buildable; } protected SimpleRetryPolicy createBuildable() { return new SimpleRetryPolicy(); } protected SimpleRetryPolicyBuilder getBuilder() { return this; } }
好比這種Policy,就是能夠定義須要重試幾回,在哪些異常發生的時候重試。
public class FixedBackOffPolicyBuilder { protected FixedBackOffPolicy buildable; protected FixedBackOffPolicyBuilder builder; private FixedBackOffPolicyBuilder() { buildable = createBuildable(); builder = getBuilder(); } public static FixedBackOffPolicyBuilder fixedBackOffPolicy() { return new FixedBackOffPolicyBuilder(); } public FixedBackOffPolicyBuilder withDelay(long delay) { buildable.setBackOffPeriod(delay); return this; } public FixedBackOffPolicy build() { return buildable; } protected FixedBackOffPolicy createBuildable() { return new FixedBackOffPolicy(); } protected FixedBackOffPolicyBuilder getBuilder() { return this; } }
還有這種能夠定義retry的間隔時間。
最後用起來就手到擒來了,
Map<Class<? extends Throwable>, Boolean> retryFor = new HashMap<>(); // 定義兩種異常發生時retry retryFor.put(DataIntegrityViolationException.class, Boolean.TRUE); retryFor.put(ConstraintViolationException.class, Boolean.TRUE); // 定義最大retry次數和間隔時間 RetryTemplate retryTemplate = RetryTemplateBuilder.retryTemplate() .withPolicies( SimpleRetryPolicyBuilder.simpleRetryPolicyWithRetryableExceptions(MAX_ATTEMPTS, retryFor), FixedBackOffPolicyBuilder.fixedBackOffPolicy().withDelay(RETRY_DELAY).build()) .build(); retryTemplate.execute(new RetryCallback() { public Void doWithRetry(RetryContext context) { log.info("Attempt times [" + context.getRetryCount() + "]"); // Your logic code return null; } });
在生產環境測試,99%的狀況一次retry就能夠解決問題,因此個人經驗值是設置了3次最大重試次數。