Guava EventBus集成spring

EventBus 不是通用的消息系統,也不是用來作進程間的通訊的,而是在進程內,用於解耦兩段直接調用的業務邏輯;java

一、代碼結構

  • event:eventbus中流轉的事件(消息),包結構按照業務模塊在細分(好比應用部署模塊就是deployment);
  • subscriber:消費者,和event 是一一對應的,一個event 對應一個消費者,包結構按照業務模塊在細分(好比應用部署模塊就是deployment);
  • poster:生產者,這邊把生產者單獨出來是爲了收斂入口,這樣能夠方便的知道有哪些地方在生產消息,按照業務模塊分爲不一樣的類(由於生產消息的功能比較單薄);

二、代碼實現

在applicationContext.xml 中定義好EventBusspring

asyncEventBus
< bean  id = "taskExecutor"  class = "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"  lazy-init = "true" >
     < property  name = "corePoolSize"  value = "10" />
     < property  name = "maxPoolSize"  value = "50" />
     < property  name = "queueCapacity"  value = "10000" />
     < property  name = "keepAliveSeconds"  value = "300" />
     < property  name = "rejectedExecutionHandler" >
         < bean  class = "java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
     </ property >
</ bean >
< bean  id = "asyncEventBus"  class = "com.google.common.eventbus.AsyncEventBus" >
     < constructor-arg  name = "executor"  ref = "taskExecutor" />
</ bean >

2.一、標準化subscriber

全部的subscriber都要實現 BaseSubscriber這個 interfaceapi

BaseSubscriber
public  interface  BaseSubscriber<E> {
 
     /**
      * event 處理邏輯入口
      **/
     void  subscribe(E event);
}

全部的subscriber在類上加上EventBusRegister 這個annotationapp

EventBusRegister
@Target ({ElementType.TYPE})
@Retention (RetentionPolicy.RUNTIME)
@Documented
public  @interface  EventBusRegister {
}

實現EventBusAdapter用於自動註冊subscriberasync

EventBusAdapter
@Component
public  class  EventBusAdapter  implements  ApplicationContextAware, InitializingBean {
     @Autowired
     private  AsyncEventBus asyncEventBus;
 
     private  ApplicationContext applicationContext;
 
     @Override
     public  void  afterPropertiesSet()  throws  Exception {
         this .applicationContext.getBeansWithAnnotation(EventBusRegister. class ).forEach((name, bean) -> {
             asyncEventBus.register(bean);
         });
     }
 
     @Override
     public  void  setApplicationContext(ApplicationContext applicationContext)  throws  BeansException {
         this .applicationContext = applicationContext;
     }
}

舉個例子ide

BuildUpdateSubscriber
@Component
@EventBusRegister
public  class  BuildUpdateSubscriber  implements  BaseSubscriber<BuildUpdateEvent> {
     @Autowired
     private  BuildService buildService;
 
     @Subscribe
     @Override
     public  void  subscribe(BuildUpdateEvent event) {
         switch  (event.getEventType()) {
             case  BUILD_CONNECTED:
                 List<BuildVo> buildVos = (List<BuildVo>) event.getData();
                 buildService.addBuildVosAndTriggerConnectEvent(buildVos);
                 break ;
             case  BUILD_ADD:
                 BuildVo addedBuildVo = (BuildVo) event.getData();
                 buildService.addBuildVoAndTriggerClientEvent(addedBuildVo);
                 break ;
             case  BUILD_MODIFY:
                 BuildVo modifiedBuildVo = (BuildVo) event.getData();
                 buildService.modifyBuildVoAndTriggerEvent(modifiedBuildVo);
                 break ;
             case  BUILD_DELETE:
                 BuildVo deletedBuildVo = (BuildVo) event.getData();
                 buildService.deleteBuildVoAndTriggerClientEvent(deletedBuildVo);
                 break ;
             default :
                 // ignore
                 break ;
         }
     }
}

三、代碼實現改進

前面經過規範代碼的包結構、加了一些trick使得咱們能夠方便的使用eventbus解耦咱們的業務邏輯,可是有時候咱們須要的bean被註冊 的先後作一些業務邏輯,因此咱們在bean 被註冊到eventbus先後加了兩個hook:AfterRegisterProcessor、BeforeRegisterProcessor;實現這兩個interface而且實現對於的方法,會在bean 被註冊先後被調用post

bean 註冊到eventbus前的hookui

BeforeRegisterProcessor
public  interface  BeforeRegisterProcessor {
     void  beforeRegister();
}

bean 註冊到eventbus後的hookthis

AfterRegisterProcessor
public  interface  AfterRegisterProcessor {
     void  afterRegister();
}

實現:保證在 client.watch 以前,註冊已經完成,這樣watch產生的消息就可以保證被成功消費google

GlueService
@Service
public  class  GlueService  implements  AfterRegisterProcessor {
     @Autowired
     private  PodListener podListener;
 
     @Autowired
     private  RouteListener routerListener;
 
     @Autowired
     private  BuildListener buildListener;
 
     @Autowired
     private  DeploymentListener deploymentListener;
 
     @Autowired
     private  OpenShiftClient openShiftClient;
 
     @Override
     public  void  afterRegister() {
         IClient client = openShiftClient.getClient();
         podWatch = client.watch(podListener, ResourceKind.POD);
         routeWatch = client.watch(routerListener, ResourceKind.ROUTE);
         buildWatch = client.watch(buildListener, ResourceKind.BUILD);
         deploymentWatch = client.watch(deploymentListener, ResourceKind.REPLICATION_CONTROLLER);
     }
}
相關文章
相關標籤/搜索