Hmily: 高性能異步分佈式事務TCC框架

Hmily框架特性[github.com/yu199195/hm…]

  • 無縫集成Spring,Spring boot start。
  • 無縫集成Dubbo,SpringCloud,Motan等rpc框架。
  • 多種事務日誌的存儲方式(redis,mongdb,mysql等)。
  • 多種不一樣日誌序列化方式(Kryo,protostuff,hession)。
  • 事務自動恢復。
  • 支持內嵌事務的依賴傳遞。
    • 代碼零侵入,配置簡單靈活。

Hmily爲何這麼高性能?

1.採用disruptor進行事務日誌的異步讀寫(disruptor是一個無鎖,無GC的併發編程框架)

package com.hmily.tcc.core.disruptor.publisher;

import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.common.enums.EventTypeEnum;
import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.disruptor.event.HmilyTransactionEvent;
import com.hmily.tcc.core.disruptor.factory.HmilyTransactionEventFactory;
import com.hmily.tcc.core.disruptor.handler.HmilyConsumerDataHandler;
import com.hmily.tcc.core.disruptor.translator.HmilyTransactionEventTranslator;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/** * event publisher. * * @author xiaoyu(Myth) */
@Component
public class HmilyTransactionEventPublisher implements DisposableBean {

    private Disruptor<HmilyTransactionEvent> disruptor;

    private final CoordinatorService coordinatorService;

    @Autowired
    public HmilyTransactionEventPublisher(final CoordinatorService coordinatorService) {
        this.coordinatorService = coordinatorService;
    }

    /** * disruptor start. * * @param bufferSize this is disruptor buffer size. * @param threadSize this is disruptor consumer thread size. */
    public void start(final int bufferSize, final int threadSize) {
        disruptor = new Disruptor<>(new HmilyTransactionEventFactory(), bufferSize, r -> {
            AtomicInteger index = new AtomicInteger(1);
            return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement());
        }, ProducerType.MULTI, new BlockingWaitStrategy());

        final Executor executor = new ThreadPoolExecutor(threadSize, threadSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                HmilyThreadFactory.create("hmily-log-disruptor", false),
                new ThreadPoolExecutor.AbortPolicy());

        HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize];
        for (int i = 0; i < threadSize; i++) {
            consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
        disruptor.start();
    }

    /** * publish disruptor event. * * @param tccTransaction {@linkplain com.hmily.tcc.common.bean.entity.TccTransaction } * @param type {@linkplain EventTypeEnum} */
    public void publishEvent(final TccTransaction tccTransaction, final int type) {
        final RingBuffer<HmilyTransactionEvent> ringBuffer = disruptor.getRingBuffer();
        ringBuffer.publishEvent(new HmilyTransactionEventTranslator(type), tccTransaction);
    }

    @Override
    public void destroy() {
        disruptor.shutdown();
    }
}
複製代碼
  • 在這裏bufferSize 的默認值是4094 * 4,用戶能夠根據自行的狀況進行配置。 ```java

HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize]; for (int i = 0; i < threadSize; i++) { consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService); } disruptor.handleEventsWithWorkerPool(consumers);java

* 這裏是採用多個消費者去處理隊列裏面的任務。

### 2.異步執行confrim,cancel方法。
​```java
package com.hmily.tcc.core.service.handler;

import com.hmily.tcc.common.bean.context.TccTransactionContext;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.common.enums.TccActionEnum;
import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory;
import com.hmily.tcc.core.service.HmilyTransactionHandler;
import com.hmily.tcc.core.service.executor.HmilyTransactionExecutor;
import org.aspectj.lang.ProceedingJoinPoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * this is transaction starter.
 *
 * @author xiaoyu
 */
@Component
public class StarterHmilyTransactionHandler implements HmilyTransactionHandler {

    private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors() << 1;

    private final HmilyTransactionExecutor hmilyTransactionExecutor;

    private final Executor executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            HmilyThreadFactory.create("hmily-execute", false),
            new ThreadPoolExecutor.AbortPolicy());

    @Autowired
    public StarterHmilyTransactionHandler(final HmilyTransactionExecutor hmilyTransactionExecutor) {
        this.hmilyTransactionExecutor = hmilyTransactionExecutor;
    }

    @Override
    public Object handler(final ProceedingJoinPoint point, final TccTransactionContext context)
            throws Throwable {
        Object returnValue;
        try {
            TccTransaction tccTransaction = hmilyTransactionExecutor.begin(point);
            try {
                //execute try
                returnValue = point.proceed();
                tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
                hmilyTransactionExecutor.updateStatus(tccTransaction);
            } catch (Throwable throwable) {
                //if exception ,execute cancel
                final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
                executor.execute(() -> hmilyTransactionExecutor
                        .cancel(currentTransaction));
                throw throwable;
            }
            //execute confirm
            final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
            executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction));
        } finally {
            hmilyTransactionExecutor.remove();
        }
        return returnValue;
    }
}
複製代碼
  • 當try方法的AOP切面有異常的時候,採用線程池異步去執行cancel,無異常的時候去執行confrim方法。

這裏有人可能會問:那麼cancel方法異常,或者confrim方法異常怎麼辦呢?

答:首先這種狀況是很是罕見的,由於你上一面纔剛剛執行完try。其次若是出現這種狀況,在try階段會保存好日誌,Hmily有內置的調度線程池來進行恢復,不用擔憂。mysql

有人又會問:這裏若是日誌保存異常了怎麼辦?

答:首先這又是一個牛角尖問題,首先日誌配置的參數,在框架啓動的時候,會要求你配置的。其次,就算在運行過程當中日誌保存異常,這時候框架會取緩存中的,並不會影響程序正確執行。最後,萬一日誌保存異常了,系統又在很極端的狀況下down機了,恭喜你,你能夠去買彩票了,最好的解決辦法就是不去解決它。git

3.ThreadLocal緩存的使用。

/** * transaction begin. * * @param point cut point. * @return TccTransaction */
    public TccTransaction begin(final ProceedingJoinPoint point) {
        LogUtil.debug(LOGGER, () -> "......hmily transaction!start....");
        //build tccTransaction
        final TccTransaction tccTransaction = buildTccTransaction(point, TccRoleEnum.START.getCode(), null);
        //save tccTransaction in threadLocal
        CURRENT.set(tccTransaction);
        //publishEvent
        hmilyTransactionEventPublisher.publishEvent(tccTransaction, EventTypeEnum.SAVE.getCode());
        //set TccTransactionContext this context transfer remote
        TccTransactionContext context = new TccTransactionContext();
        //set action is try
        context.setAction(TccActionEnum.TRYING.getCode());
        context.setTransId(tccTransaction.getTransId());
        context.setRole(TccRoleEnum.START.getCode());
        TransactionContextLocal.getInstance().set(context);
        return tccTransaction;
    }
複製代碼
  • 首先要理解,threadLocal保存的發起者一方法的事務信息。這個很重要,不要會有點懵逼。rpc的調用,會造成調用鏈,進行保存。
/** * add participant. * * @param participant {@linkplain Participant} */
    public void enlistParticipant(final Participant participant) {
        if (Objects.isNull(participant)) {
            return;
        }
        Optional.ofNullable(getCurrentTransaction())
                .ifPresent(c -> {
                    c.registerParticipant(participant);
                    updateParticipant(c);
                });
    }
複製代碼

4.GuavaCache的使用

package com.hmily.tcc.core.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.helper.SpringBeanUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

/** * use google guava cache. * @author xiaoyu */
public final class TccTransactionCacheManager {

    private static final int MAX_COUNT = 10000;

    private static final LoadingCache<String, TccTransaction> LOADING_CACHE =
            CacheBuilder.newBuilder().maximumWeight(MAX_COUNT)
                    .weigher((Weigher<String, TccTransaction>) (string, tccTransaction) -> getSize())
                    .build(new CacheLoader<String, TccTransaction>() {
                        @Override
                        public TccTransaction load(final String key) {
                            return cacheTccTransaction(key);
                        }
                    });

    private static CoordinatorService coordinatorService = SpringBeanUtils.getInstance().getBean(CoordinatorService.class);

    private static final TccTransactionCacheManager TCC_TRANSACTION_CACHE_MANAGER = new TccTransactionCacheManager();

    private TccTransactionCacheManager() {

    }

    /** * TccTransactionCacheManager. * * @return TccTransactionCacheManager */
    public static TccTransactionCacheManager getInstance() {
        return TCC_TRANSACTION_CACHE_MANAGER;
    }

    private static int getSize() {
        return (int) LOADING_CACHE.size();
    }

    private static TccTransaction cacheTccTransaction(final String key) {
        return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction());
    }

    /** * cache tccTransaction. * * @param tccTransaction {@linkplain TccTransaction} */
    public void cacheTccTransaction(final TccTransaction tccTransaction) {
        LOADING_CACHE.put(tccTransaction.getTransId(), tccTransaction);
    }

    /** * acquire TccTransaction. * * @param key this guava key. * @return {@linkplain TccTransaction} */
    public TccTransaction getTccTransaction(final String key) {
        try {
            return LOADING_CACHE.get(key);
        } catch (ExecutionException e) {
            return new TccTransaction();
        }
    }

    /** * remove guava cache by key. * @param key guava cache key. */
    public void removeByKey(final String key) {
        if (StringUtils.isNotEmpty(key)) {
            LOADING_CACHE.invalidate(key);
        }
    }

}
複製代碼
  • 在參與者中,咱們使用了ThreadLocal,而在參與者中,咱們爲何不使用呢? 其實緣由有二點:首先.由於try,和confrim 會不在一個線程裏,會形成ThreadLocal失效。當考慮到RPC集羣的時候,可能會負載到不一樣的機器上。這裏有一個細節就是:github

    private static TccTransaction cacheTccTransaction(final String key) {
        return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction());
    }
    複製代碼

    當GuavaCache裏面沒有的時候,會去查詢日誌返回,這樣就保證了對集羣環境的支持。redis

    以上4點造就了Hmily是一個異步的高性能分佈式事務TCC框架的緣由。

    Hmily如何使用?(github.com/yu199195/hm…

    首先由於以前的包命名問題,框架包並無上傳到maven中心倉庫,固須要使用者本身拉取代碼,編譯deploy到本身的私服。spring

    1.dubbo用戶

    • 在你的Api接口項目引入 ```xml

    com.hmily.tcc hmily-tcc-annotation {you version} ```sql

  • 在你的服務提供者項目引入mongodb

<dependency>
            <groupId>com.hmily.tcc</groupId>
            <artifactId>hmily-tcc-dubbo</artifactId>
            <version>{you version}</version>
        </dependency>
複製代碼
  • 配置啓動bean ```xml
* 固然配置屬性不少,這裏我只給出了demo,具體能夠參考這個類:

​```java
package com.hmily.tcc.common.config;

import com.hmily.tcc.common.enums.RepositorySupportEnum;
import lombok.Data;

/**
 * hmily config.
 *
 * @author xiaoyu
 */
@Data
public class TccConfig {


    /**
     * Resource suffix this parameter please fill in about is the transaction store path.
     * If it's a table store this is a table suffix, it's stored the same way.
     * If this parameter is not filled in, the applicationName of the application is retrieved by default
     */
    private String repositorySuffix;

    /**
     * log serializer.
     * {@linkplain com.hmily.tcc.common.enums.SerializeEnum}
     */
    private String serializer = "kryo";

    /**
     * scheduledPool Thread size.
     */
    private int scheduledThreadMax = Runtime.getRuntime().availableProcessors() << 1;

    /**
     * scheduledPool scheduledDelay unit SECONDS.
     */
    private int scheduledDelay = 60;

    /**
     * retry max.
     */
    private int retryMax = 3;

    /**
     * recoverDelayTime Unit seconds
     * (note that this time represents how many seconds after the local transaction was created before execution).
     */
    private int recoverDelayTime = 60;

    /**
     * Parameters when participants perform their own recovery.
     * 1.such as RPC calls time out
     * 2.such as the starter down machine
     */
    private int loadFactor = 2;

    /**
     * repositorySupport.
     * {@linkplain RepositorySupportEnum}
     */
    private String repositorySupport = "db";

    /**
     * disruptor bufferSize.
     */
    private int bufferSize = 4096 * 2 * 2;

    /**
     * this is disruptor consumerThreads.
     */
    private int consumerThreads = Runtime.getRuntime().availableProcessors() << 1;

    /**
     * db config.
     */
    private TccDbConfig tccDbConfig;

    /**
     * mongo config.
     */
    private TccMongoConfig tccMongoConfig;

    /**
     * redis config.
     */
    private TccRedisConfig tccRedisConfig;

    /**
     * zookeeper config.
     */
    private TccZookeeperConfig tccZookeeperConfig;

    /**
     * file config.
     */
    private TccFileConfig tccFileConfig;

}
複製代碼

SpringCloud用戶

<dependency>          <groupId>com.hmily.tcc</groupId>          <artifactId>hmily-tcc-springcloud</artifactId>          <version>{you version}</version>      </dependency>
複製代碼

Motan用戶

<dependency>          <groupId>com.hmily.tcc</groupId>          <artifactId>hmily-tcc-motan</artifactId>          <version>{you version}</version>      </dependency>
複製代碼

hmily-spring-boot-start那這個就更容易了,只須要根據你的RPC框架去引入不一樣的jar包。

  • 若是你是dubbo用戶,那麼引入apache

    <dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-spring-boot-starter-dubbo</artifactId> <version>${your version}</version></dependency>
    複製代碼
    • 若是你是SpringCloud用戶,那麼引入
    <dependency> <groupId>com.hmily.tcc</groupId> <artifactId>hmily-tcc-spring-boot-starter-springcloud</artifactId> <version>${your version}</version></dependency>
    複製代碼
  • 若是你是Motan用戶,那麼引入:編程

<dependency>     <groupId>com.hmily.tcc</groupId>     <artifactId>hmily-tcc-spring-boot-starter-motan</artifactId>     <version>${your version}</version> </dependency>
複製代碼
  • 而後在你的yml裏面進行以下配置:

    hmily:tcc : serializer : kryo recoverDelayTime : 128 retryMax : 3 scheduledDelay : 128 scheduledThreadMax : 10 repositorySupport : db tccDbConfig : driverClassName : com.mysql.jdbc.Driver url :  jdbc:mysql://192.168.1.98:3306/tcc?useUnicode=true&amp;characterEncoding=utf8             username : root password : 123456    #repositorySupport : redis #tccRedisConfig: #masterName: mymaster #sentinel : true #sentinelUrl : 192.168.1.91:26379;192.168.1.92:26379;192.168.1.93:26379 #password : foobaredbbexONE123 # repositorySupport : zookeeper # host : 92.168.1.73:2181 # sessionTimeOut : 100000 # rootPath : /tcc # repositorySupport : mongodb # mongoDbUrl : 192.168.1.68:27017 # mongoDbName : happylife # mongoUserName : xiaoyu # mongoUserPwd : 123456 # repositorySupport : file # path : /account # prefix : account
    複製代碼
    • 就這麼簡單,而後就能夠在接口方法上加上@Tcc註解,進行愉快的使用了。
    • 固然由於篇幅問題,不少東西只是簡單的描述,尤爲是邏輯方面的。
    • 若是你感興趣,能夠在github上進行star和fork,也能夠加微信和QQ羣進行交流。
    • 下面是github地址:github.com/yu199195/hm…
    • 最後再次感謝你們,若是有興趣的朋友,能夠提供你的優秀牛逼轟轟的PR。。。。
相關文章
相關標籤/搜索