Storm框架:Storm整合springboot

咱們知道Storm自己是一個獨立運行的分佈式流式數據處理框架,Springboot也是一個獨立運行的web框架。那麼如何在Strom框架中集成Springboot使得咱們可以在Storm開發中運用Spring的Ioc容器及其餘如Spring Jpa等功能呢?咱們先來了解如下概念:
Storm主要的三個Component:Topology、Spout、Bolt。Topology做爲主進程控制着spout、bolt線程的運行,他們至關於獨立運行的容器分佈於storm集羣中的各個機器節點。
SpringApplication:是配置Spring應用上下文的起點。經過調用SpringApplication.run()方法它將建立ApplicationContext實例,這是咱們可以使用Ioc容器的主要BeanFactory。以後Spring將會加載全部單例模式的beans,並啓動後臺運行的CommandLineRunner beans等。
ApplicationContextAware:這是咱們可以在普通Java類中調用Spring容器裏的beans的關鍵接口。html

實現原理git

Storm框架中的每一個Spout和Bolt都至關於獨立的應用,Strom在啓動spout和bolt時提供了一個open方法(spout)和prepare方法(bolt)。咱們能夠把初始化Spring應用的操做放在這裏,這樣能夠保證每一個spout/bolt應用在後續執行過程當中都能獲取到Spring的ApplicationContext,有了ApplicationContext實例對象,Spring的全部功能就都能用上了。github

Spout.open方法實現br/>@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
//啓動Springboot應用
SpringStormApplication.run();web

this.map = map;
this.topologyContext = topologyContext;
this.spoutOutputCollector = spoutOutputCollector;

}br/>Bolt.prepare方法實現
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
//啓動Springboot應用
SpringStormApplication.run();spring

this.map = map;
this.topologyContext = topologyContext;
this.outputCollector = outputCollector;

}br/>SpringStormApplication啓動類
@SpringBootApplication
@ComponentScan(value = "com.xxx.storm")
public class SpringStormApplication {
/**apache

  • 非工程啓動入口,因此不用main方法
  • 加上synchronized的做用是因爲storm在啓動多個bolt線程實例時,若是Springboot用到Apollo分佈式配置,會報ConcurrentModificationException錯誤
  • 詳見:https://github.com/ctripcorp/apollo/issues/1658
  • @param args
    */
    public synchronized static void run(String ...args) {
    SpringApplication app = new SpringApplication(SpringStormApplication.class);
    //咱們並不須要web servlet功能,因此設置爲WebApplicationType.NONE
    app.setWebApplicationType(WebApplicationType.NONE);
    //忽略掉banner輸出
    app.setBannerMode(Banner.Mode.OFF);
    //忽略Spring啓動信息日誌
    app.setLogStartupInfo(false);
    app.run(args);
    }
    }
    與咱們傳統的Springboot應用啓動入口稍微有點區別,主要禁用了web功能,看下正常的啓動方式:

@SpringBootApplication
@ComponentScan(value = "com.xxx.web")
public class PlatformApplication {
public static void main(String[] args) {
SpringApplication.run(PlatformApplication.class, args);br/>}
}
在spout/bolt中調用了SpringStormApplication.run方法後,咱們還須要可以拿到ApplicationContext容器對象,這時候咱們還須要實現ApplicationContextAware接口,寫個工具類BeanUtils:
@Component
public class BeanUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;springboot

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    if (BeanUtils.applicationContext == null) {
        BeanUtils.applicationContext = applicationContext;
    }
}

public static ApplicationContext getApplicationContext() {
    return applicationContext;
}

public static Object getBean(String name) {
    return getApplicationContext().getBean(name);
}

public static <T> T getBean(Class<T> clazz) {
    return getApplicationContext().getBean(clazz);
}

public static <T> T getBean(String name, Class<T> clazz) {
    return getApplicationContext().getBean(name, clazz);
}

}
經過@Component註解使得Spring在啓動時可以掃描到該bean,由於BeanUtils實現了ApplicationContextAware接口,Spring會在啓動成功時自動調用BeanUtils.setApplicationContext方法,將ApplicationContext對象保存到工具類的靜態變量中,以後咱們就可使用BeanUtils.getBean()去獲取Spring容器中的bean了。app

寫個簡單例子框架

在FilterBolt的execute方法中獲取Spring beanbr/>@Override
public void execute(Tuple tuple) {
FilterService filterService = (FilterService) BeanUtils.getBean("filterService");
filterService.deleteAll();
}
定義FilterService類,這時候咱們就可使用Spring的相關注解,自動注入,Spring Jpa等功能了。br/>@Service("filterService")
public class FilterService {br/>@Autowired
UserRepository userRepository;分佈式

public void deleteAll() {
    userRepository.deleteAll();
}

}
將storm應用做爲Springboot工程的一個子模塊

工程主目錄的pom文件仍是springboot相關的依賴,在storm子模塊中引入storm依賴,這時候啓動Strom的topology應用會有一個日誌包依賴衝突。

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
咱們須要在storm子模塊的pom文件中重寫org.springframework.boot:spring-boot-starter包依賴,將Springboot的相關日誌包排除掉,以下:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><exclusions><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j2</artifactId></exclusion><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic2</artifactId></exclusion></exclusions></dependency>OK,完美整合!

相關文章
相關標籤/搜索