Spring事務事件監控

前面咱們講到了Spring在進行事務邏輯織入的時候,不管是事務開始,提交或者回滾,都會觸發相應的事務事件。本文首先會使用實例進行講解Spring事務事件是如何使用的,而後會講解這種使用方式的實現原理。mysql

1.示例

對於事務事件,Spring提供了一個註解@TransactionEventListener,將這個註解標註在某個方法上,那麼就將這個方法聲明爲了一個事務事件處理器,而具體的事件類型則是由TransactionalEventListener.phase屬性進行定義的。以下是TransactionalEventListener的聲明:spring

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {

// 指定當前標註方法處理事務的類型
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;

// 用於指定當前方法若是沒有事務,是否執行相應的事務事件監聽器
boolean fallbackExecution() default false;

// 與classes屬性同樣,指定了當前事件傳入的參數類型,指定了這個參數以後就能夠在監聽方法上
// 直接什麼一個這個參數了
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] value() default {};

// 做用於value屬性同樣,用於指定當前監聽方法的參數類型
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] classes() default {};

// 這個屬性使用Spring Expression Language對目標類和方法進行匹配,對於不匹配的方法將會過濾掉
String condition() default "";

}

關於這裏的classes屬性須要說明一下,若是指定了classes屬性,那麼當前監聽方法的參數類型就能夠直接使用所發佈的事件的參數類型,若是沒有指定,那麼這裏監聽的參數類型可使用兩種:ApplicationEvent和PayloadApplicationEvent。對於ApplicationEvent類型的參數,能夠經過其getSource()方法獲取發佈的事件參數,只不過其返回值是一個Object類型的,若是想獲取具體的類型還須要進行強轉;對於PayloadApplicationEvent類型,其能夠指定一個泛型參數,該泛型參數必須與發佈的事件的參數類型一致,這樣就能夠經過其getPayload()方法獲取事務事件發佈的數據了。關於上述屬性中的TransactionPhase,其能夠取以下幾個類型的值:sql

public enum TransactionPhase {
   // 指定目標方法在事務commit以前執行
   BEFORE_COMMIT,

   // 指定目標方法在事務commit以後執行
    AFTER_COMMIT,

    // 指定目標方法在事務rollback以後執行
    AFTER_ROLLBACK,

   // 指定目標方法在事務完成時執行,這裏的完成是指不管事務是成功提交仍是事務回滾了
   AFTER_COMPLETION
  }

這裏咱們假設數據庫有一個user表,對應的有一個UserService和User的model,用於往該表中插入數據,而且插入動做時使用註解標註目標方法。以下是這幾個類的聲明:數據庫

public class User {
private long id;
private String name;
private int age;

// getter and setter...
}

.express

@Service
@Transactional
public class UserServiceImpl implements UserService {
@Autowired
private JdbcTemplate jdbcTemplate;

 @Autowired
 private ApplicationEventPublisher publisher;

@Override
public void insert(User user) {
jdbcTemplate.update("insert into user (id, name, age) value (?, ?, ?)", 
    user.getId(), user.getName(), user.getAge());
publisher.publishEvent(user);
}
}

上述代碼中有一點須要注意的是,對於須要監控事務事件的方法,在目標方法執行的時候須要使用ApplicationEventPublisher發佈相應的事件消息。以下是對上述消息進行監控的程序:apache

@Component
public class UserTransactionEventListener {

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void beforeCommit(PayloadApplicationEvent<User> event) {
System.out.println("before commit, id: " + event.getPayload().getId());
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterCommit(PayloadApplicationEvent<User> event) {
System.out.println("after commit, id: " + event.getPayload().getId());
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void afterCompletion(PayloadApplicationEvent<User> event) {
System.out.println("after completion, id: " + event.getPayload().getId());
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void afterRollback(PayloadApplicationEvent<User> event) {
System.out.println("after rollback, id: " + event.getPayload().getId());
}
}

這裏對於事件的監控,只須要在監聽方法上添加@TransactionalEventListener註解便可。這裏須要注意的一個問題,在實際使用過程當中,對於監聽的事務事件,須要使用其餘的參數進行事件的過濾,由於這裏的監聽仍是會監聽全部事件參數爲User類型的事務,而不管其是哪一個位置發出來的。若是須要對事件進行過濾,這裏能夠封裝一個UserEvent對象,其內保存一個相似EventType的屬性和一個User對象,這樣在發佈消息的時候就能夠指定EventType屬性,而在監聽消息的時候判斷當前方法監聽的事件對象的EventType是否爲目標type,若是是,則對其進行處理,不然直接略過。下面是上述程序的xml文件配置和驅動程序:數組

<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:mysql://localhost/test?useUnicode=true"/>
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="username" value="******"/>
<property name="password" value="******"/>
</bean>

<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSource"/>
</bean>

<bean id="transactionManager" 
    class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>

<context:component-scan base-package="com.transaction"/>
<tx:annotation-driven/>

.性能優化

public class TransactionApp {
@Test
public void testTransaction() {
ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
UserService userService = context.getBean(UserService.class);
User user = getUser();
userService.insert(user);
}

private User getUser() {
int id = new Random()
  .nextInt(1000000);
User user = new User();
user.setId(id);
user.setName("Mary");
user.setAge(27);
return user;
}
}

運行上述程序,其執行結果以下:架構

before commit, id: 935052
after commit, id: 935052
after completion, id: 935052

能夠看到,這裏確實成功監聽了目標程序的相關事務行爲。併發

2.實現原理

關於事務的實現原理,這裏實際上是比較簡單的,在前面的文章中,咱們講解到,Spring對事務監控的處理邏輯在TransactionSynchronization中,以下是該接口的聲明:

public interface TransactionSynchronization extends Flushable {

// 在當前事務掛起時執行
default void suspend() {
}

// 在當前事務從新加載時執行
default void resume() {
}

// 在當前數據刷新到數據庫時執行
default void flush() {
}

// 在當前事務commit以前執行
default void beforeCommit(boolean readOnly) {
}

// 在當前事務completion以前執行
default void beforeCompletion() {
}

// 在當前事務commit以後實質性
default void afterCommit() {
}

// 在當前事務completion以後執行
default void afterCompletion(int status) {
}
}

很明顯,這裏的TransactionSynchronization接口只是抽象了一些行爲,用於事務事件發生時觸發,這些行爲在Spring事務中提供了內在支持,即在相應的事務事件時,其會獲取當前全部註冊的TransactionSynchronization對象,而後調用其相應的方法。那麼這裏TransactionSynchronization對象的註冊點對於咱們瞭解事務事件觸發有相當重要的做用了。這裏咱們首先回到事務標籤的解析處,在前面講解事務標籤解析時,咱們講到Spring會註冊一個TransactionalEventListenerFactory類型的bean到Spring容器中,這裏關於標籤的解析讀者能夠閱讀本人前面的文章Spring事務用法示例與實現原理。這裏註冊的TransactionalEventListenerFactory實現了EventListenerFactory接口,這個接口的主要做用是先判斷目標方法是不是某個監聽器的類型,而後爲目標方法生成一個監聽器,其會在某個bean初始化以後由Spring調用其方法用於生成監聽器。以下是該類的實現:

public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {

// 指定當前監聽器的順序
private int order = 50;

public void setOrder(int order) {
    this.order = order;
}

@Override
public int getOrder() {
    return this.order;
}


// 指定目標方法是不是所支持的監聽器的類型,這裏的判斷邏輯就是若是目標方法上包含有
// TransactionalEventListener註解,則說明其是一個事務事件監聽器
@Override
public boolean supportsMethod(Method method) {
    return (AnnotationUtils.findAnnotation(method, TransactionalEventListener.class) != null);
}

// 爲目標方法生成一個事務事件監聽器,這裏ApplicationListenerMethodTransactionalAdapter實現了
// ApplicationEvent接口
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
    return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
}

}

這裏關於事務事件監聽的邏輯其實已經比較清楚了。ApplicationListenerMethodTransactionalAdapter本質上是實現了ApplicationListener接口的,也就是說,其是Spring的一個事件監聽器,這也就是爲何進行事務處理時須要使用ApplicationEventPublisher.publish()方法發佈一下當前事務的事件。

ApplicationListenerMethodTransactionalAdapter在監聽到發佈的事件以後會生成一個TransactionSynchronization對象,而且將該對象註冊到當前事務邏輯中,以下是監聽事務事件的處理邏輯:

@Override
 public void onApplicationEvent(ApplicationEvent event) {
// 若是當前TransactionManager已經配置開啓事務事件監聽,
// 此時纔會註冊TransactionSynchronization對象
if (TransactionSynchronizationManager.isSynchronizationActive()) {
    // 經過當前事務事件發佈的參數,建立一個TransactionSynchronization對象
    TransactionSynchronization transactionSynchronization = 
        createTransactionSynchronization(event);
    // 註冊TransactionSynchronization對象到TransactionManager中
    TransactionSynchronizationManager
        .registerSynchronization(transactionSynchronization);
} else if (this.annotation.fallbackExecution()) {
    // 若是當前TransactionManager沒有開啓事務事件處理,可是當前事務監聽方法中配置了
    // fallbackExecution屬性爲true,說明其須要對當前事務事件進行監聽,不管其是否有事務
    if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK 
        && logger.isWarnEnabled()) {
        logger.warn("Processing " 
                    + event + " as a fallback execution on AFTER_ROLLBACK phase");
    }
    processEvent(event);
} else {
    // 走到這裏說明當前是不須要事務事件處理的,於是直接略過
    if (logger.isDebugEnabled()) {
        logger.debug("No transaction is active - skipping " + event);
    }
}
}

這裏須要說明的是,上述annotation屬性就是在事務監聽方法上解析的TransactionalEventListener註解中配置的屬性。能夠看到,對於事務事件的處理,這裏建立了一個TransactionSynchronization對象,其實主要的處理邏輯就是在返回的這個對象中,而createTransactionSynchronization()方法內部只是建立了一個TransactionSynchronizationEventAdapter對象就返回了。這裏咱們直接看該對象的源碼:

private static class TransactionSynchronizationEventAdapter 
    extends TransactionSynchronizationAdapter {

    private final ApplicationListenerMethodAdapter listener;
    private final ApplicationEvent event;
    private final TransactionPhase phase;

public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter 
    listener, ApplicationEvent event, TransactionPhase phase) {
    this.listener = listener;
    this.event = event;
    this.phase = phase;
}

@Override
public int getOrder() {
    return this.listener.getOrder();
}

// 在目標方法配置的phase屬性爲BEFORE_COMMIT時,處理before commit事件
public void beforeCommit(boolean readOnly) {
    if (this.phase == TransactionPhase.BEFORE_COMMIT) {
        processEvent();
    }
}

// 這裏對於after completion事件的處理,雖然分爲了三個if分支,可是實際上都是執行的processEvent()
// 方法,由於after completion事件是事務事件中必定會執行的,於是這裏對於commit,
// rollback和completion事件都在當前方法中處理也是沒問題的
public void afterCompletion(int status) {
    if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
        processEvent();
    } else if (this.phase == TransactionPhase.AFTER_ROLLBACK 
               && status == STATUS_ROLLED_BACK) {
        processEvent();
    } else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
        processEvent();
    }
}

// 執行事務事件
protected void processEvent() {
    this.listener.processEvent(this.event);
}
}

能夠看到,對於事務事件的處理,最終都是委託給了ApplicationListenerMethodAdapter.processEvent()方法進行的。以下是該方法的源碼:

public void processEvent(ApplicationEvent event) {
// 處理事務事件的相關參數,這裏主要是判斷TransactionalEventListener註解中是否配置了value
// 或classes屬性,若是配置了,則將方法參數轉換爲該指定類型傳給監聽的方法;若是沒有配置,則判斷
// 目標方法是ApplicationEvent類型仍是PayloadApplicationEvent類型,是則轉換爲該類型傳入
Object[] args = resolveArguments(event);
// 這裏主要是獲取TransactionalEventListener註解中的condition屬性,而後經過
// Spring expression language將其與目標類和方法進行匹配
if (shouldHandle(event, args)) {
    // 經過處理獲得的參數藉助於反射調用事務監聽方法
    Object result = doInvoke(args);
    if (result != null) {
        // 對方法的返回值進行處理
        handleResult(result);
    } else {
        logger.trace("No result object given - no result to handle");
    }
}
 }

 // 處理事務監聽方法的參數
protected Object[] resolveArguments(ApplicationEvent event) {
// 獲取發佈事務事件時傳入的參數類型
ResolvableType declaredEventType = getResolvableType(event);
if (declaredEventType == null) {
    return null;
}

// 若是事務監聽方法的參數個數爲0,則直接返回
if (this.method.getParameterCount() == 0) {
    return new Object[0];
}

// 若是事務監聽方法的參數不爲ApplicationEvent或PayloadApplicationEvent,則直接將發佈事務
// 事件時傳入的參數當作事務監聽方法的參數傳入。從這裏能夠看出,若是事務監聽方法的參數不是
// ApplicationEvent或PayloadApplicationEvent類型,那麼其參數必須只能有一個,而且這個
// 參數必須與發佈事務事件時傳入的參數一致
Class<?> eventClass = declaredEventType.getRawClass();
if ((eventClass == null || !ApplicationEvent.class.isAssignableFrom(eventClass)) &&
    event instanceof PayloadApplicationEvent) {
    return new Object[] {((PayloadApplicationEvent) event).getPayload()};
} else {
    // 若是參數類型爲ApplicationEvent或PayloadApplicationEvent,則直接將其傳入事務事件方法
    return new Object[] {event};
}
 }

 // 判斷事務事件方法方法是否須要進行事務事件處理
private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) {
if (args == null) {
    return false;
}
String condition = getCondition();
if (StringUtils.hasText(condition)) {
    Assert.notNull(this.evaluator, "EventExpressionEvaluator must no be null");
    EvaluationContext evaluationContext = this.evaluator.createEvaluationContext(
        event, this.targetClass, this.method, args, this.applicationContext);
    return this.evaluator.condition(condition, this.methodKey, evaluationContext);
}
return true;
 }

 // 對事務事件方法的返回值進行處理,這裏的處理方式主要是將其做爲一個事件繼續發佈出去,這樣就能夠在
// 一個統一的位置對事務事件的返回值進行處理
protected void handleResult(Object result) {
// 若是返回值是數組類型,則對數組元素一個一個進行發佈
if (result.getClass().isArray()) {
    Object[] events = ObjectUtils.toObjectArray(result);
    for (Object event : events) {
        publishEvent(event);
    }
} else if (result instanceof Collection<?>) {
    // 若是返回值是集合類型,則對集合進行遍歷,而且發佈集合中的每一個元素
    Collection<?> events = (Collection<?>) result;
    for (Object event : events) {
        publishEvent(event);
    }
} else {
    // 若是返回值是一個對象,則直接將其進行發佈
    publishEvent(result);
}
}

對於事務事件的處理,總結而言,就是爲每一個事務事件監聽方法建立了一個TransactionSynchronizationEventAdapter對象,經過該對象在發佈事務事件的時候,會在當前線程中註冊該對象,這樣就能夠保證每一個線程每一個監聽器中只會對應一個TransactionSynchronizationEventAdapter對象。在Spring進行事務事件的時候會調用該對象對應的監聽方法,從而達到對事務事件進行監聽的目的。

在此我向你們推薦一個架構學習交流羣。交流學習羣號:478030634 裏面會分享一些資深架構師錄製的視頻錄像:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化、分佈式架構等這些成爲架構師必備的知識體系。還能領取免費的學習資源,目前受益良多

3.小結

本文首先對事務事件監聽程序的使用方式進行了講解,而後在源碼層面講解了Spring事務監聽器是如何實現的。在Spring事務監聽器使用過程當中,須要注意的是要對當前接收到的事件類型進行判斷,由於不一樣的事務可能會發布一樣的消息對象過來。

相關文章
相關標籤/搜索