還在爲開發Flink流處理應用程序時沒法像開發Spring Boot程序那麼優雅的分層以及裝配Bean而煩惱嗎?java
GitHub最近超火的一款開源框架,懶松鼠Flink-Boot腳手架,該腳手架簡直是Spring開發工程師的福音,完美融合Spring生態體系,不再須要手動在Java類中建立臃腫的Java對象,簡直是開發大型流處理應用程序的必不可少的工具。地址:懶松鼠Flink-Boot 腳手架由《深刻理解Flink核心設計與實踐原理》做者開發。
mysql
你的現狀git
static Map<String,String> cache=new HashMap<String,String>(); public String findUUID(FlowData flowData) { String value=cache.get(flowData.getSubTestItem()); if(value==null) { String uuid=userMapper.findUUID(flowData); cache.put(uuid,value); return uuid; } return value; }
你想要的是這樣github
@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") public String findUUID(FlowData flowData) { return userMapper.findUUID(flowData); }
你的現狀spring
public void insertFlow(FlowData flowData) { try{ userMapper.insertFlow(flowData); }Cache(Exception e) { Thread.sleep(10000); userMapper.insertFlow(flowData); } }
你想要的是這樣sql
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) @Override public void insertFlow(FlowData flowData) { userMapper.insertFlow(flowData); }
你的現狀數據庫
if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7) { return null; } if(flowData.getBillNumber()==null) { return null; }
你想要的是這樣編程
Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData); if (validate != null) { System.out.println(validate); return null; } public class FlowData { private String uuid; //聲明該參數的校驗規則字符串長度必須在7到20之間 @Size(min = 7, max = 20, message = "長度必須在{min}-{max}之間") private String subTestItem; //聲明該參數的校驗規則字符串不能爲空 @NotBlank(message = "billNumber不能爲空") private String billNumber; }
GitHub最近超火的一款開源框架,懶松鼠Flink-Boot腳手架,該腳手架簡直是Spring開發工程師的福音,完美融合Spring生態體系,不再須要手動在Java類中建立臃腫的Java對象,簡直是開發大型流處理應用程序的必不可少的工具。懶松鼠Flink-Boot 腳手架由《深刻理解Flink核心設計與實踐原理》做者開發。緩存
/** * github地址: https://github.com/intsmaze * 博客地址:https://www.cnblogs.com/intsmaze/ * 出版書籍《深刻理解Flink核心設計與實踐原理》 隨書代碼 * RichFlatMapFunction爲Flink框架的一個通用型操做符(算子),開發者通常在該算子的flatMap方法中編寫業務邏輯 * @auther: intsmaze(劉洋) * @date: 2020/10/15 18:33 */ public class MybatisFlatMap extends RichFlatMapFunction<String, String> { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); protected ApplicationContext beanFactory; //mybatis的Service對象,操做數據庫的user表 private UserService userService; @Override public void open(Configuration parameters) { ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters); userService = beanFactory.getBean(UserServiceImpl.class); } @Override public void flatMap(String value, Collector<String> out){ FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() { }.getType()); Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData); if (validate != null) { System.out.println(validate); return null; } //數據庫查詢,屏蔽掉獲取數據庫鏈接,是否數據庫鏈接,事務的聲明等 String flowUUID = userService.findUUID(flowData); if (StringUtils.isBlank(flowUUID)) { flowUUID = UUID.randomUUID().toString(); flowData.setUuid(flowUUID); //數據庫插入,屏蔽掉獲取數據庫鏈接,是否數據庫鏈接,事務的聲明等 userService.insertFlow(flowData); } out.collect(gson.toJson(flowData)); } } public interface UserService { String findUUID(FlowData flowData); void insertFlow(FlowData flowData); } //經過註解實例化Bean對象。 @Service //經過註解聲明進行事務管理 @Transactional //經過註解聲明方法具備異常重試機制 @EnableRetry public class UserServiceImpl implements UserService { //經過註解進行依賴注入 @Resource private UserMapper userMapper; @Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") @Override public String findUUID(FlowData flowData) { return userMapper.findUUID(flowData); } //經過註解聲明該方法異常後的重試機制,無需手動編程 @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) @Override public void insertFlow(FlowData flowData) { userMapper.insertFlow(flowData); } } public interface UserMapper { String findUUID(FlowData flowData); void insertFlow(FlowData flowData); } //註解式聲明參數校驗規則 public class FlowData { private String uuid; //聲明該參數的校驗規則字符串長度必須在7到20之間 @Size(min = 7, max = 20, message = "長度必須在{min}-{max}之間") private String subTestItem; //聲明該參數的校驗規則字符串不能爲空 @NotBlank(message = "billNumber不能爲空") private String billNumber; @NotBlank(message = "barcode不能爲空") private String barcode; private String flowName; private String flowStatus; ...... }
倉庫地址:懶松鼠Flink-Boot腳手架由《深刻理解Flink核心設計與實踐原理》做者開發。mybatis
該腳手架屏蔽掉組裝Flink API細節,讓跨界變得簡單,使得開發者能以傳統Java WEB模式的開發方式開發出具有分佈式計算能力的流處理程序。
開發者徹底不須要理解分佈式計算的理論知識和Flink框架的細節,即可以快速編寫業務代碼實現。
爲了進一步提高開發者使用該腳手架開發大型項目的敏捷的程度,該腳手架工程默認集成Spring框架進行Bean管理,同時將微服務以及WEB開發領域中常常用到的框架集成進來,進一步提高開發速度。
除此以外針對目前流行的各大Java框架,該Flink腳手架工程也進行了集成,加快開發人員的編碼速度,好比:
Flink-Boot ├── Flink-Base -- Flink-Boot工程基礎模塊 ├── Flink-Client -- Flink-Boot 客戶端模塊 ├── flink-annotation -- 註解生效模塊 ├── flink-mybatis -- mybatis orm模塊 ├── flink-retry -- 註解重試機制模式 ├── flink-validate -- 校驗模塊 ├── flink-sql -- Flink SQL解耦至XML配置模塊 ├── flink-cache-annotation -- 接口緩衝模塊 ├── flink-junit -- 單元測試模塊 ├── flink-apollo -- 阿波羅配置客戶端模塊
技術 | 名稱 | 狀態 |
---|---|---|
Spring Framework | 容器 | 已集成 |
Spring 基於XML方式配置Bean | 裝配Bean | 已集成 |
Spring 基於註解方式配置Bean | 裝配Bean | 已集成 |
Spring 基於註解聲明方法重試機制 | Retry註解 | 已集成 |
Spring 基於註解聲明方法緩存 | Cache註解 | 已集成 |
Hibernate Validator | 校驗框架 | 已集成 |
Druid | 數據庫鏈接池 | 已集成 |
MyBatis | ORM框架 | 已集成 |
Kafka | 消息隊列 | 已集成 |
HDFS | 分佈式文件系統 | 已集成 |
Log4J | 日誌組件 | 已集成 |
Junit | 單元測試 | 已集成 |
Mybatis-Plus | MyBatis擴展包 | 進行中 |
PageHelper | MyBatis物理分頁插件 | 進行中 |
ZooKeeper | 分佈式協調服務 | 進行中 |
Dubbo | 分佈式服務框架 | 進行中 |
Redis | 分佈式緩存數據庫 | 進行中 |
Solr & Elasticsearch | 分佈式全文搜索引擎 | 進行中 |
Ehcache | 進程內緩存框架 | 進行中 |
sequence | 分佈式高效ID生產 | 進行中 |
Dubbole消費者 | 服務消費者 | 進行中 |
Spring eurake消費者 | 服務消費者 | 進行中 |
Apollo配置中心 | 攜程阿波羅配置中心 | 進行中 |
Spring Config配置中心 | Spring Cloud Config配置中心 | 進行中 |
下面是集成Spring生態的基礎手冊.
該容器模式配置了JdbcTemplate實例,數據庫鏈接池採用Druid,在業務方法中只須要獲取容器中的JdbcTemplate實例即可以快速與關係型數據庫進行交互,dataService實例封裝了一些訪問數據庫表的方法。
<beans ...... default-lazy-init="true" default-init-method="init"> <context:property-placeholder location="classpath:config.properties"/> <bean id="druidDataSource" class="com.alibaba.druid.pool.DruidDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver"></property> <property name="url" value="${jdbc.url}"></property> <property name="username" value="${jdbc.user}"></property> <property name="password" value="${jdbc.password}"></property> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <constructor-arg ref="druidDataSource"></constructor-arg> </bean> <bean id="dataService" class="com.intsmaze.flink.base.service.DataService"> <property name="jdbcTemplate" ref="jdbcTemplate"></property> </bean> </beans>
jdbc.user = intsmaze jdbc.password = intsmaze jdbc.url = jdbc:mysql://127.0.0.1:3306/flink-boot?useUnicode=true&characterEncoding=UTF-8
以下是SimpleClient(com.intsmaze.flink.client.SimpleClient)類的示例代碼,該類繼承了BaseFlink,能夠看到對應實現的方法中分別設置以下:
/** * github地址: https://github.com/intsmaze * 博客地址:https://www.cnblogs.com/intsmaze/ * 出版書籍《深刻理解Flink核心設計與實踐原理》 隨書代碼 * * @auther: intsmaze(劉洋) * @date: 2020/10/15 18:33 */ public class SimpleClient extends BaseFlink { public static void main(String[] args) throws Exception { SimpleClient topo = new SimpleClient(); topo.run(ParameterTool.fromArgs(args)); } @Override public String getTopoName() { return "SimpleClient"; } @Override public String getConfigName() { return "topology-base.xml"; } @Override public String getPropertiesName() { return "config.properties"; } @Override public void createTopology(StreamExecutionEnvironment builder) { DataStream<String> inputDataStrem = env.addSource(new SimpleDataSource()); DataStream<String> processDataStream = inputDataStrem.flatMap(new SimpleFunction()); processDataStream.print("輸出結果"); } }
採用自定義數據源,用戶須要編寫自定義DataSource類,該類須要繼承XXX抽象類,實現以下方法。
public class SimpleDataSource extends CommonDataSource { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); ...... @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ...//構造讀取各種外部系統數據的鏈接實例 } @Override public String sendMess() throws InterruptedException { Thread.sleep(1000); ...... MainData mainData = new MainData(); ......//經過外部系統數據的鏈接實例讀取外部系統數據,封裝進MainData對象中,而後返回便可。 return gson.toJson(mainData); } }
本做業計算的業務邏輯在Flink轉換操做符中進行實現,通常來講開發者只須要實現flatMap算子便可以知足大部分算子的使用。
用戶編寫的自定義類須要繼承com.intsmaze.flink.base.transform.CommonFunction抽象類,均需實現以下方法。
public class SimpleFunction extends CommonFunction { private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); @Override public String execute(String message) throws Exception { FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() { }.getType()); String flowUUID = dataService.findUUID(flowData); if (StringUtils.isBlank(flowUUID)) { flowUUID = UUID.randomUUID().toString(); flowData.setUuid(flowUUID); dataService.insertFlow(flowData); } return gson.toJson(flowData); } }
CommonFunction抽象類中默認在open方法中經過BeanFactory對象獲取到了Spring容器中對於的dataService實例,對於Spring中的其餘實例同理在SimpleFunction類中的open方法中獲取便可。
public abstract class CommonFunction extends RichFlatMapFunction<String, String> { private IntCounter numLines = new IntCounter(); protected DataService dataService; protected ApplicationContext beanFactory; @Override public void open(Configuration parameters) { getRuntimeContext().addAccumulator("num-FlatMap", this.numLines); ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters); dataService = beanFactory.getBean(DataService.class); } @Override public void flatMap(String value, Collector<String> out) throws Exception { this.numLines.add(1); String execute = execute(value); if (StringUtils.isNotBlank(execute)) { out.collect(execute); } } public abstract String execute(String message) throws Exception; }
能夠根據狀況選擇重寫open(Configuration parameters)方法,同時重寫的open(Configuration parameters)方法的第一行要調用父類的open(Configuration parameters)方法。
public void open(Configuration parameters){ super.open(parameters); ...... //獲取在Spring配置文件中配置的實例 XXX xxx=beanFactory.getBean(XXX.class); }
在自定義的Topology類編寫Main方法,建立自定義的Topology對象後,調用對象的run(...)方法。
public class SimpleClient extends BaseFlink {
/** * 本地啓動參數 -isLocal local * 集羣啓動參數 -isIncremental isIncremental */ public static void main(String[] args) throws Exception { SimpleClient topo = new SimpleClient(); topo.run(ParameterTool.fromArgs(args)); } .......