本篇文章主要介紹的是SpringBoot整合kafka和storm以及在這過程遇到的一些問題和解決方案。java
若是你對kafka和storm熟悉的話,這一段能夠直接跳過!若是不熟,也能夠看看我以前寫的博客。一些相關博客以下。mysql
kafka 和 storm的環境安裝git
地址:http://www.panchengming.com/2018/01/26/pancm70/github
kafka的相關使用web
地址:http://www.panchengming.com/2018/01/28/pancm71/
http://www.panchengming.com/2018/02/08/pancm72/spring
storm的相關使用sql
地址:http://www.panchengming.com/2018/03/16/pancm75/shell
通常而言,使用kafka整合storm能夠應付大多數需求。可是在擴展性上來講,可能就不太好。目前主流的微服務框架SpringCloud是基於SpringBoot的,因此使用SpringBoot對kafka和storm進行整合,能夠進行統一配置,擴展性會更好。數據庫
通常來講,kafka和storm的整合,使用kafka進行數據的傳輸,而後使用storm實時的處理kafka中的數據。apache
在這裏咱們加入SpringBoot以後,也是作這些,只不過是由SpringBoot對kafka和storm進行統一的管理。
若是仍是很差理解的話,能夠經過下面這個簡單的業務場景瞭解下:
在數據庫中有一批大量的用戶數據,其中這些用戶數據中有不少是不須要的,也就是髒數據,咱們須要對這些用戶數據進行清洗,而後從新存入數據庫中,可是要求實時、延時低,而且便於管理。
因此這裏咱們就可使用SpringBoot+kafka+storm來進行相應的開發。
在進行代碼開發前,咱們要明確開發什麼。
在上述的業務場景中,須要大量的數據,可是咱們這裏只是簡單的進行開發,也就是寫個簡單的demo出來,可以簡單的實現這些功能,因此咱們只需知足以下條件就能夠了:
那麼根據上述要求咱們進行SpringBoot、kafka和storm的整合。
首先須要相應jar包,因此maven的依賴以下:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <springboot.version>1.5.9.RELEASE</springboot.version> <mybatis-spring-boot>1.2.0</mybatis-spring-boot> <mysql-connector>5.1.44</mysql-connector> <slf4j.version>1.7.25</slf4j.version> <logback.version>1.2.3</logback.version> <kafka.version>1.0.0</kafka.version> <storm.version>1.2.1</storm.version> <fastjson.version>1.2.41</fastjson.version> <druid>1.1.8</druid> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${springboot.version}</version> </dependency> <!-- Spring Boot Mybatis 依賴 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis-spring-boot}</version> </dependency> <!-- MySQL 鏈接驅動依賴 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql-connector}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka.version}</version> </dependency> <!--storm相關jar --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!--排除相關依賴 --> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-1.2-api</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-web</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <artifactId>ring-cors</artifactId> <groupId>ring-cors</groupId> </exclusion> </exclusions> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm.version}</version> </dependency> <!--fastjson 相關jar --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!-- Druid 數據鏈接池依賴 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid}</version> </dependency> </dependencies>
成功添加了相關依賴以後,這裏咱們再來添加相應的配置。
在application.properties中添加以下配置:
# log logging.config=classpath:logback.xml ## mysql spring.datasource.url=jdbc:mysql://localhost:3306/springBoot2?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true spring.datasource.username=root spring.datasource.password=123456 spring.datasource.driverClassName=com.mysql.jdbc.Driver ## kafka kafka.servers = 192.169.0.23\:9092,192.169.0.24\:9092,192.169.0.25\:9092 kafka.topicName = USER_TOPIC kafka.autoCommit = false kafka.maxPollRecords = 100 kafka.groupId = groupA kafka.commitRule = earliest
注:上述的配置只是一部分,完整的配置能夠在個人github中找到。
數據庫腳本:
-- springBoot2庫的腳本 CREATE TABLE `t_user` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id', `name` varchar(10) DEFAULT NULL COMMENT '姓名', `age` int(2) DEFAULT NULL COMMENT '年齡', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8
注:由於這裏咱們只是簡單的模擬一下業務場景,因此只是創建一張簡單的表。
說明:這裏我只對幾個關鍵的類進行說明,完整的項目工程連接能夠在博客底部找到。
在使用SpringBoot整合kafka和storm以前,咱們能夠先對kfaka和storm的相關代碼編寫,而後在進行整合。
首先是數據源的獲取,也就是使用storm中的spout從kafka中拉取數據。
在以前的storm入門中,講過storm的運行流程,其中spout是storm獲取數據的一個組件,其中咱們主要實現nextTuple方法,編寫從kafka中獲取數據的代碼就能夠在storm啓動後進行數據的獲取。
spout類的主要代碼以下:
@Override public void nextTuple() { for (;;) { try { msgList = consumer.poll(100); if (null != msgList && !msgList.isEmpty()) { String msg = ""; List<User> list=new ArrayList<User>(); for (ConsumerRecord<String, String> record : msgList) { // 原始數據 msg = record.value(); if (null == msg || "".equals(msg.trim())) { continue; } try{ list.add(JSON.parseObject(msg, User.class)); }catch(Exception e){ logger.error("數據格式不符!數據:{}",msg); continue; } } logger.info("Spout發射的數據:"+list); //發送到bolt中 this.collector.emit(new Values(JSON.toJSONString(list))); consumer.commitAsync(); }else{ TimeUnit.SECONDS.sleep(3); logger.info("未拉取到數據..."); } } catch (Exception e) { logger.error("消息隊列處理異常!", e); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e1) { logger.error("暫停失敗!",e1); } } } }
注:若是spout在發送數據的時候發送失敗,是會重發的!
上述spout類中主要是將從kafka獲取的數據傳輸傳輸到bolt中,而後再由bolt類處理該數據,處理成功以後,寫入數據庫,而後給與sqout響應,避免重發。
bolt類主要處理業務邏輯的方法是execute,咱們主要實現的方法也是寫在這裏。須要注意的是這裏只用了一個bolt,所以也不用定義Field進行再次的轉發。
代碼的實現類以下:
@Override public void execute(Tuple tuple) { String msg=tuple.getStringByField(Constants.FIELD); try{ List<User> listUser =JSON.parseArray(msg,User.class); //移除age小於10的數據 if(listUser!=null&&listUser.size()>0){ Iterator<User> iterator = listUser.iterator(); while (iterator.hasNext()) { User user = iterator.next(); if (user.getAge()<10) { logger.warn("Bolt移除的數據:{}",user); iterator.remove(); } } if(listUser!=null&&listUser.size()>0){ userService.insertBatch(listUser); } } }catch(Exception e){ logger.error("Bolt的數據處理失敗!數據:{}",msg,e); } }
編寫完了spout和bolt以後,咱們再來編寫storm的主類。
storm的主類主要是對Topology(拓步)進行提交,提交Topology的時候,須要對spout和bolt進行相應的設置。Topology的運行的模式有兩種:
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TopologyApp", conf,builder.createTopology());
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
這裏爲了方便,兩種方法都編寫了,經過主方法的args參數來進行控制。
Topology相關的配置說明在代碼中的註釋寫的很詳細了,這裏我就再也不多說了。
代碼以下:
public void runStorm(String[] args) { // 定義一個拓撲 TopologyBuilder builder = new TopologyBuilder(); // 設置1個Executeor(線程),默認一個 builder.setSpout(Constants.KAFKA_SPOUT, new KafkaInsertDataSpout(), 1); // shuffleGrouping:表示是隨機分組 // 設置1個Executeor(線程),和兩個task builder.setBolt(Constants.INSERT_BOLT, new InsertBolt(), 1).setNumTasks(1).shuffleGrouping(Constants.KAFKA_SPOUT); Config conf = new Config(); //設置一個應答者 conf.setNumAckers(1); //設置一個work conf.setNumWorkers(1); try { // 有參數時,表示向集羣提交做業,並把第一個參數當作topology名稱 // 沒有參數時,本地提交 if (args != null && args.length > 0) { logger.info("運行遠程模式"); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { // 啓動本地模式 logger.info("運行本地模式"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TopologyApp", conf, builder.createTopology()); } } catch (Exception e) { logger.error("storm啓動失敗!程序退出!",e); System.exit(1); } logger.info("storm啓動成功..."); }
好了,編寫完了kafka和storm相關的代碼以後,咱們再來進行和SpringBoot的整合!
在進行和SpringBoot整合前,咱們先要解決下一下幾個問題。
1 在SpringBoot程序中如何提交storm的Topolgy?
storm是經過提交Topolgy來肯定如何啓動的,通常使用過運行main方法來啓動,可是SpringBoot啓動方式通常也是經過main方法啓動的。因此應該怎麼樣解決呢?
2 如何讓bolt和spout類使用spring註解?
3.有時啓動正常,有時沒法啓動,動態的bean也沒法獲取?
上面的三個問題是我在整合的時候遇到的,其中解決辦法在目前看來是可行的,或許其中的問題多是由於其餘的緣由致使的,不過目前就這樣整合以後,就沒出現過其餘的問題了。若上述問題和解決辦法有不妥以後,歡迎批評指正!
解決了上面的問題以後,咱們回到代碼這塊。
其中,程序的入口,也就是主類的代碼在進行整合後以下:
@SpringBootApplication public class Application{ public static void main(String[] args) { // 啓動嵌入式的 Tomcat 並初始化 Spring 環境及其各 Spring 組件 ConfigurableApplicationContext context = SpringApplication.run(Application.class, args); GetSpringBean springBean=new GetSpringBean(); springBean.setApplicationContext(context); TopologyApp app = context.getBean(TopologyApp.class); app.runStorm(args); } }
動態獲取bean的代碼以下:
public class GetSpringBean implements ApplicationContextAware{ private static ApplicationContext context; public static Object getBean(String name) { return context.getBean(name); } public static <T> T getBean(Class<T> c) { return context.getBean(c); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if(applicationContext!=null){ context = applicationContext; } } }
主要的代碼的介紹就到這裏了,至於其它的,基本就和之前的同樣了。
成功啓動程序以後,咱們先調用接口新增幾條數據到kafka
新增請求:
POST http://localhost:8087/api/user {"name":"張三","age":20} {"name":"李四","age":10} {"name":"王五","age":5}
新增成功以後,咱們可使用xshell工具在kafka集羣中查看數據。
輸入:**kafka-console-consumer.sh --zookeeper master:2181 --topic USER_TOPIC --from-beginning**
而後能夠看到如下輸出結果。
上述也表示了數據成功的寫入了kafka。
由於是實時的從kafka那數據,咱們也能夠從控制檯查看打印的語句。
控制檯輸出:
INFO com.pancm.storm.spout.KafkaInsertDataSpout - Spout發射的數據:[{"age":5,"name":"王五"}, {"age":10,"name":"李四"}, {"age":20,"name":"張三"}] WARN com.pancm.storm.bolt.InsertBolt - Bolt移除的數據:{"age":5,"name":"王五"} INFO com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited DEBUG com.pancm.dao.UserDao.insertBatch - ==> Preparing: insert into t_user (name,age) values (?,?) , (?,?) DEBUG com.pancm.dao.UserDao.insertBatch - ==> Parameters: 李四(String), 10(Integer), 張三(String), 20(Integer) DEBUG com.pancm.dao.UserDao.insertBatch - <== Updates: 2 INFO com.pancm.service.impl.UserServiceImpl - 批量新增2條數據成功!
能夠在控制檯成功的看處處理的過程和結果。
而後咱們也能夠經過接口進行數據庫全部的數據查詢。
查詢請求:
GET http://localhost:8087/api/user
返回結果:
[{"id":1,"name":"李四","age":10},{"id":2,"name":"張三","age":20}]
上述代碼中測試返回的結果顯然是符合咱們預期的。
關於SpringBoot整合kafka和storm暫時就告一段落了。本篇文章只是簡單的介紹這些 相關的使用,在實際的應用可能會更復雜。若是有有更好的想法和建議,歡迎留言進行討論!
SpringBoot整合kafka和storm的工程我放在github上了,若是感受不錯的話請給個star吧。
Gihub地址:https://github.com/xuwujing/springBoot-study
對了,也有kafka整合storm的工程,也在個人github上。
地址:https://github.com/xuwujing/kafka-study
到此,本文結束,謝謝閱讀。