利用JPA實現消息落地的一些問題

目前咱們處理消息的同步,通常是落地到DB後,再同過異步的方式作數據的聚合和處理。至於DB的操做爲了簡單直接用了Hibernate提供的一套JPA接口,(老實說真的是不喜歡JPA,一是sql log很差分析沒法優化,二是必須很是瞭解JPA的全部關鍵字含義,否則就要出問題,因此我一直喜歡用mybatis這種更輕量的甚至spring-jdbc)。html

那麼使用JPA的過程就遇到了一些問題,見招拆招一件一件來。java

問題1

遇到的第一個問題就很是的要命,咱們的系統是一張單表須要支持multi-tenant多租戶,簡單說就是表中有個tenantId的字段來區分租戶,這是比較常見的設計。那麼在對DB作操做的時候,ORM框架應該提供分租戶的CURD接口,而不須要開發人員都本身在where中加tenantId=***spring

解決

這個問題其實沒有解決,由於Hibernate尚未實現單表的Multi-tenant(真是至關的坑)。官網文檔中說了這樣三種狀況sql

SCHEMA
Correlates to the separate schema approach. It is an error to attempt to open a session without a tenant identifier using this strategy. Additionally, a MultiTenantConnectionProvider must be specified.
DATABASE
Correlates to the separate database approach. It is an error to attempt to open a session without a tenant identifier using this strategy. Additionally, a MultiTenantConnectionProvider must be specified.
DISCRIMINATOR
Correlates to the partitioned (discriminator) approach. It is an error to attempt to open a session without a tenant identifier using this strategy. This strategy is not yet implemented in Hibernate as of 4.0 and 4.1. Its support is planned for 5.0.

能夠看到最後一種還不支持呢。沒辦法只有手寫where啦。數據庫

問題2

因爲是處理消息,即便收到DELETE的message也不能真的刪除,由於消息是亂序的,若是先來了DELETE再來UPDATE怎麼辦,其實是先UPDATE再DELETE,但因爲處理效率不一致,因此收到的消息順序也是沒法肯定的。基於這點,爲了保證數據的最終一致性,因此操做都做爲UPDATE處理。刪除操做必須是soft deletesession

解決

能夠寫一個BaseEntity,都有isactive這個字段,默認都是truemybatis

@MappedSuperclass
public class BaseEntity {

    @Column(name="isactive", columnDefinition="boolean DEFAULT true")
    private Boolean active = true;

    public Boolean getActive() {
        return active;
    }

    public void setActive(Boolean active) {
        this.active = active;
    }
}

而後繼承一下併發

@Entity
@Inheritance(strategy = InheritanceType.JOINED)
@Table(name = "Product")
@Where(clause="isactive = 1")
public class ProductEntity extends BaseEntity {}

注意@Where就是全部操做都會拼上這個condition從而實現soft delete。app

問題3

在處理相似外鍵關聯這種數據的時候,例如Product上有個CategoryId字段,那麼數據庫設計是一張Category表,一張Product表,Product表上CategoryId字段做爲外鍵關聯到Category表的ID字段。那麼做爲一個JPA的Entity,你們知道Entity是OO的,Product Entity下應該包含一個Category Entity,關係是ManyToOne的。框架

public class ProductEntity extends BaseEntity {

    @ManyToOne(fetch = FetchType.EAGER)
    @JoinColumn(name = "categoryId")
    private CategoryEntity category;
    
}

(這裏要插一句,其實若是隻是把Category當普通字段,存一個CategoryId也是沒有問題的,可是在查詢的時候就須要用這個Product.CategoryId再去Category裏查一次。用了JPA以後,爲了減小一次查詢,有時候事情反而會複雜)。

至於消息,好比先收到Product的CREATE事件,這時候會拿消息體裏的categoryId去category表查一下有沒有這個Category Entity,若是有直接獲得後塞到Product的Category屬性上去,可是若是沒有這個Category怎麼辦?

解決

若是沒有的話,按照JPA的外鍵關聯原則,咱們須要創建一個虛擬的Category,也就是說插入一條佔位數據到Category表中,只有ID有值。因此對ProductEntity作些改造。

public class ProductEntity extends BaseEntity {

    @ManyToOne(cascade = {CascadeType.PERSIST}, fetch = FetchType.EAGER)
    @NotFound(action= NotFoundAction.IGNORE)
    @JoinColumn(name = "categoryId")
    private CategoryEntity category;
    
}

注意加了兩點,一是cascade = {CascadeType.PERSIST},意思是若是Persist了Product的話,發現categoryId不爲空而category表中又沒有該Category,那麼級聯插入這條數據(只有ID)。二是@NotFound(action= NotFoundAction.IGNORE),加這條是防止當收到一個Category.DELETE事件後軟刪除了Category,而讀取Product的時候就會Eager地得到Category,一旦獲取不到JPA會拋出EntityNotExist的異常。加了這個註解,Product裏的category就爲null,不會出異常。

問題4

這其實是問題3的衍生,解決3的時候咱們使用了Cascade=PERSIST,那麼在發現Category不存在的時候,JPA會發起一個insert,固然數據只有ID,其餘的字段等待真正的Category的CREATE事件來了再填充。可是併發的問題就出現了,若是正好就在發起insert以前,Category的CREATE事件來了(另外一個Worker在處理),那裏也發現表裏沒有這個Category,因此也隨即發起一個insert操做。conflict就這樣發生了,主鍵衝突!這時候怎麼辦?

解決

我採起了一種比較粗暴的方式,就是retry,首先每次收到事件後的寫操做,都是查Entity是否存在,存在就Update,不存在就Insert。當兩個Worker同時作寫入操做,確定一個成功一個失敗,失敗的只要retry一次就會發現Entity有了(另外一個Worker寫入的),這時候變成Update操做就不會有conflict。

由於項目中依賴Spring,因此剛好有了spring-retry這個包,直接用起來。

public class RetryTemplateBuilder {

    protected RetryTemplate buildable;
    protected RetryTemplateBuilder builder;

    public RetryTemplateBuilder() {
        buildable = createBuildable();
        builder = getBuilder();
    }

    public static RetryTemplateBuilder retryTemplate() {
        return new RetryTemplateBuilder();
    }

    public RetryTemplateBuilder withPolicies(RetryPolicy... policies) {
        CompositeRetryPolicy compositePolicy = new CompositeRetryPolicy();
        compositePolicy.setPolicies(policies);
        buildable.setRetryPolicy(compositePolicy);
        return this;
    }

    public RetryTemplateBuilder withPolicies(RetryPolicy retryPolicy, BackOffPolicy backOffPolicy) {
        buildable.setRetryPolicy(retryPolicy);
        buildable.setBackOffPolicy(backOffPolicy);
        return this;
    }

    public RetryTemplateBuilder withPolicies(BackOffPolicy backOffPolicy) {
        buildable.setBackOffPolicy(backOffPolicy);
        return this;
    }

    public RetryTemplate build() {
        return buildable;
    }

    protected RetryTemplate createBuildable() {
        return new RetryTemplate();
    }

    protected RetryTemplateBuilder getBuilder() {
        return this;
    }

}

這是一個TemplateBuilder,能夠理解成retry的模板,一個retryTemplate能夠包含多個policy。

public class SimpleRetryPolicyBuilder {

    protected SimpleRetryPolicy buildable;
    protected SimpleRetryPolicyBuilder builder;

    public SimpleRetryPolicyBuilder() {
        buildable = createBuildable();
        builder = getBuilder();
    }

    public static SimpleRetryPolicyBuilder simpleRetryPolicy() {
        return new SimpleRetryPolicyBuilder();
    }

    public static SimpleRetryPolicy simpleRetryPolicyWithRetryableExceptions(int maxAttempts,
                                                                             Map<Class<? extends Throwable>, Boolean> exception) {
        return new SimpleRetryPolicy(maxAttempts, exception);
    }

    public SimpleRetryPolicyBuilder withMaxAttempts(int maxAttempts) {
        buildable.setMaxAttempts(maxAttempts);
        return this;
    }

    public SimpleRetryPolicy build() {
        return buildable;
    }

    protected SimpleRetryPolicy createBuildable() {
        return new SimpleRetryPolicy();
    }

    protected SimpleRetryPolicyBuilder getBuilder() {
        return this;
    }

}

好比這種Policy,就是能夠定義須要重試幾回,在哪些異常發生的時候重試。

public class FixedBackOffPolicyBuilder {


    protected FixedBackOffPolicy buildable;
    protected FixedBackOffPolicyBuilder builder;

    private FixedBackOffPolicyBuilder() {
        buildable = createBuildable();
        builder = getBuilder();
    }

    public static FixedBackOffPolicyBuilder fixedBackOffPolicy() {
        return new FixedBackOffPolicyBuilder();
    }

    public FixedBackOffPolicyBuilder withDelay(long delay) {
        buildable.setBackOffPeriod(delay);
        return this;
    }

    public FixedBackOffPolicy build() {
        return buildable;
    }

    protected FixedBackOffPolicy createBuildable() {
        return new FixedBackOffPolicy();
    }

    protected FixedBackOffPolicyBuilder getBuilder() {
        return this;
    }
}

還有這種能夠定義retry的間隔時間。

最後用起來就手到擒來了,

Map<Class<? extends Throwable>, Boolean> retryFor = new HashMap<>();
// 定義兩種異常發生時retry
retryFor.put(DataIntegrityViolationException.class, Boolean.TRUE);
retryFor.put(ConstraintViolationException.class, Boolean.TRUE);
// 定義最大retry次數和間隔時間
RetryTemplate retryTemplate = RetryTemplateBuilder.retryTemplate()
        .withPolicies(
                SimpleRetryPolicyBuilder.simpleRetryPolicyWithRetryableExceptions(MAX_ATTEMPTS, retryFor),
                FixedBackOffPolicyBuilder.fixedBackOffPolicy().withDelay(RETRY_DELAY).build())
        .build();

retryTemplate.execute(new RetryCallback() {
    public Void doWithRetry(RetryContext context) {
        log.info("Attempt times [" + context.getRetryCount() + "]");
        // Your logic code
        return null;
    }
});

在生產環境測試,99%的狀況一次retry就能夠解決問題,因此個人經驗值是設置了3次最大重試次數。

相關文章
相關標籤/搜索