基於spring環境的eventbus

eventbus做爲事件總線能夠提供消費者的註冊和事件的分發功能,是實現事件驅動的必要工具也是項目中的解耦利器。html

本組件是基於greenrobot的eventbus包裝,考慮到大部分項目都是基於spring開發,消費者提供了bean的自動註冊。事件發佈者也以單例bean的形式建立。java

使用示例:https://github.com/neuSnail/s...git

快速開始
maven依賴:github

<dependency>
    <groupId>com.github.neusnail</groupId>
    <artifactId>spring-eventbus</artifactId>
    <version>1.0</version>
</dependency>

配置發佈器web

@Configuration
public class EventBusConfig {
    @Bean
    public EventPublisher eventPublisher() {
        return EventPublisherBuilder.createDefault();
    }
}

建立消費者和事件spring

@EventSubscriber
@Slf4j
//用戶行爲事件消費者
public class UserBehaviorSubscriber {
    @Subscribe()
    public void userLeaveSubscriber(UserEntryEvent event) {
        log.info("get userLeaveEvent userName:{}", event.getUserName());
    }
 
    @Subscribe(threadMode = ThreadMode.ASYNC, priority = 1)
    public void userEntrySubscriber(UserLeaveEvent event) {
        log.info("get userEntryEvent userName:{}", event.getUserName());
    }
}
@Data
@EqualsAndHashCode(callSuper = true)
public class UserEntryEvent extends BaseEvent {
    private String userId;
    private String userName;
}

開啓消費者掃描異步

建議在main方法以後直接調用註冊,也可使用@EventListener、CommandLineRunner、實現aware等方式在容器啓動後註冊,但注意確保不要重複註冊async

@SpringBootApplication
public class DemoApplication{
 
    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(DemoApplication.class, args);
        SubscriberRegister.register(context);
    }
}

發送事件maven

public void postEvent() {
    UserEntryEvent userEntryEvent = new UserEntryEvent();
    userEntryEvent.setUserName("bigBoss");
    eventPublisher.post(userEntryEvent);
}

注意這裏提供的postAsync是生產方的異步,決定權在生產者這裏,threadmode是消費方的異步,決定權在消費者裏工具

參數配置
在註冊publisher bean時候能夠配置部分參數

@Configuration
public class EventBusConfig {
    @Bean
    public EventPublisher eventPublisher() {
        return EventPublisherBuilder.builder()
                .asyncPostExecutor(Executors.newFixedThreadPool(1))
                .EventBus(EventBus.builder()
                        .throwSubscriberException(false)
                        .sendNoSubscriberEvent(true)
                        .executorService(Executors.newCachedThreadPool())
                        .build()
                ).build();
    }
}

其中asyncPostExecutor是發佈者異步發送時使用的線程池,eventbus中的executorService是消費者異步消費的線程池

關於eventbus builder的具體參數文檔可參照:https://greenrobot.org/files/...

默認的發送者和消費者線程池都使用了阿里的ttl線程池,避免父子線程傳遞問題

private ExecutorService defaultExecutor() {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8,
            5L, TimeUnit.MINUTES,
            new LinkedBlockingQueue<Runnable>(1000),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    return TtlExecutors.getTtlExecutorService(executor);
}

消費者線程模式和優先級

@subscribe中的threadmode有如下選項

ThreadMode.POSTING:和發送事件在同一個線程
ThreadMode.MAIN:主線程
ThreadMode.BACKGROUND:子線程
ThreadMode.ASYNC:異步線程
咱們在javaweb的開發中通常只會使用ThreadMode.POSTING(同步)和ThreadMode.ASYNC(異步)

priority是優先級,數值越大越先被消費

相關文章
相關標籤/搜索