懶松鼠Flink-Boot(Flink+Spring):一款將Flink與Spring生態完美融合的腳手架工程

還在爲開發Flink流處理應用程序時沒法像開發Spring Boot程序那麼優雅的分層以及裝配Bean而煩惱嗎?java

你可能面臨以下苦惱:

  1. 開發的Flink流處理應用程序,業務邏輯所有寫在Flink的操做符中,代碼沒法服用,沒法分層
  2. 要是有一天它能夠像開發Spring Boot程序那樣能夠優雅的分層,優雅的裝配Bean,不須要本身new對象好了
  3. 可使用各類Spring生態的框架,一些瑣碎的邏輯再也不硬編碼到代碼中。

GitHub最近超火的一款開源框架,懶松鼠Flink-Boot腳手架,該腳手架簡直是Spring開發工程師的福音,完美融合Spring生態體系,不再須要手動在Java類中建立臃腫的Java對象,簡直是開發大型流處理應用程序的必不可少的工具。地址:懶松鼠Flink-Boot 腳手架由《深刻理解Flink核心設計與實踐原理》做者開發。
mysql

接口緩存

你的現狀git

static Map<String,String> cache=new HashMap<String,String>();

public String findUUID(FlowData flowData) {
    String value=cache.get(flowData.getSubTestItem());
    if(value==null)
    {
        String uuid=userMapper.findUUID(flowData);
        cache.put(uuid,value);
        return uuid;
    }
    return value;
}

你想要的是這樣github

@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem")
public String findUUID(FlowData flowData) {
    return userMapper.findUUID(flowData);
}

重試機制

你的現狀spring

public void insertFlow(FlowData flowData) {
    try{
        userMapper.insertFlow(flowData);
      }Cache(Exception e)
      {
         Thread.sleep(10000);
         userMapper.insertFlow(flowData);
      }
}

你想要的是這樣sql

@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))
    @Override
    public void insertFlow(FlowData flowData) {
        userMapper.insertFlow(flowData);
    }

Bean校驗

你的現狀數據庫

if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7)
{
    return null;
}
if(flowData.getBillNumber()==null)
{
    return null;
}

你想要的是這樣編程

Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData);
if (validate != null) {
    System.out.println(validate);
    return null;
}

public class FlowData {

    private String uuid;
    //聲明該參數的校驗規則字符串長度必須在7到20之間
    @Size(min = 7, max = 20, message = "長度必須在{min}-{max}之間")
    private String subTestItem;
    //聲明該參數的校驗規則字符串不能爲空
    @NotBlank(message = "billNumber不能爲空")
    private String billNumber;
}

等等......

GitHub最近超火的一款開源框架,懶松鼠Flink-Boot腳手架,該腳手架簡直是Spring開發工程師的福音,完美融合Spring生態體系,不再須要手動在Java類中建立臃腫的Java對象,簡直是開發大型流處理應用程序的必不可少的工具。懶松鼠Flink-Boot 腳手架由《深刻理解Flink核心設計與實踐原理》做者開發。緩存

它爲流計算開發工程師解決了

  1. 將全部對象的建立和依賴關係的維護工做都交給Spring容器的管理,下降了對象之間的耦合性,使代碼變得更簡潔,拒絕臃腫。
  2. 消除在工程中對單例的過多使用。
  3. 聲明式事務處理,經過配置就能夠完成對事物的管理,而無須手動編程。
  4. 聲明式註解,能夠經過註解定義方法的緩衝功能,無序手動編程。
  5. 註解式定義Bean對象的校驗規則,經過註解便可完成對對象的參數校驗,無序手動編程。
  6. 集成MyBatis ORM框架,註解式維護實例對象的依賴關係。
  7. 解耦Flink SQL,SQL語句剝離出JAVA文件,以簡潔的模式表如今XML文件中。
  8. 封裝Flink API,僅提供業務方法去編寫,Spring生態融合所有搞定,無需操心。

有了它你的代碼就像這樣子:

/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版書籍《深刻理解Flink核心設計與實踐原理》 隨書代碼
* RichFlatMapFunction爲Flink框架的一個通用型操做符(算子),開發者通常在該算子的flatMap方法中編寫業務邏輯
* @auther: intsmaze(劉洋)
* @date: 2020/10/15 18:33
*/
public class MybatisFlatMap extends RichFlatMapFunction<String, String> {

   private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();

    protected ApplicationContext beanFactory;
    //mybatis的Service對象,操做數據庫的user表
    private UserService userService;

    @Override
    public void open(Configuration parameters) {
        ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
                .getExecutionConfig().getGlobalJobParameters();
        beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters);
        
        userService = beanFactory.getBean(UserServiceImpl.class);
    }

    @Override
    public void flatMap(String value, Collector<String> out){

        FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() {
        }.getType());
        Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData);
        if (validate != null) {
            System.out.println(validate);
            return null;
        }
        //數據庫查詢,屏蔽掉獲取數據庫鏈接,是否數據庫鏈接,事務的聲明等
        String flowUUID = userService.findUUID(flowData);
        if (StringUtils.isBlank(flowUUID)) {
            flowUUID = UUID.randomUUID().toString();
            flowData.setUuid(flowUUID);
            //數據庫插入,屏蔽掉獲取數據庫鏈接,是否數據庫鏈接,事務的聲明等
            userService.insertFlow(flowData);
        }
        out.collect(gson.toJson(flowData));
    }
}


public interface UserService {

    String findUUID(FlowData flowData);

    void insertFlow(FlowData flowData);
}

//經過註解實例化Bean對象。
@Service
//經過註解聲明進行事務管理
@Transactional
//經過註解聲明方法具備異常重試機制
@EnableRetry
public class UserServiceImpl implements UserService {
   //經過註解進行依賴注入
    @Resource
    private UserMapper userMapper;

    @Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem")
    @Override
    public String findUUID(FlowData flowData) {
        return userMapper.findUUID(flowData);
    }
    
   //經過註解聲明該方法異常後的重試機制,無需手動編程
    @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))
    @Override
    public void insertFlow(FlowData flowData) {
        userMapper.insertFlow(flowData);
    }
}

public interface UserMapper {

    String findUUID(FlowData flowData);

    void insertFlow(FlowData flowData);
}

//註解式聲明參數校驗規則
public class FlowData {

    private String uuid;
    //聲明該參數的校驗規則字符串長度必須在7到20之間
    @Size(min = 7, max = 20, message = "長度必須在{min}-{max}之間")
    private String subTestItem;
    //聲明該參數的校驗規則字符串不能爲空
    @NotBlank(message = "billNumber不能爲空")
    private String billNumber;

    @NotBlank(message = "barcode不能爲空")
    private String barcode;

    private String flowName;

    private String flowStatus;

    ......
}

倉庫地址:懶松鼠Flink-Boot腳手架由《深刻理解Flink核心設計與實踐原理》做者開發。mybatis

  1. 該腳手架屏蔽掉組裝Flink API細節,讓跨界變得簡單,使得開發者能以傳統Java WEB模式的開發方式開發出具有分佈式計算能力的流處理程序。

  2. 開發者徹底不須要理解分佈式計算的理論知識和Flink框架的細節,即可以快速編寫業務代碼實現。

  3. 爲了進一步提高開發者使用該腳手架開發大型項目的敏捷的程度,該腳手架工程默認集成Spring框架進行Bean管理,同時將微服務以及WEB開發領域中常常用到的框架集成進來,進一步提高開發速度。

  4. 除此以外針對目前流行的各大Java框架,該Flink腳手架工程也進行了集成,加快開發人員的編碼速度,好比:

  • 集成Jbcp-template對Mysql,Oracle,SQLServer等關係型數據庫的快速訪問。
  • 集成Hibernate Validator框架進行參數校驗。
  • 集成Spring Retry框架進行重試標誌。
  • 集成Mybatis框架,提升對關係型數據庫增,刪,改,查的開發速度。
  • 集成Spring Cache框架,實現註解式定義方法緩存。
  • ......

1. 組織結構

Flink-Boot
├── Flink-Base -- Flink-Boot工程基礎模塊
├── Flink-Client -- Flink-Boot 客戶端模塊
├── flink-annotation -- 註解生效模塊
├── flink-mybatis -- mybatis orm模塊
├── flink-retry -- 註解重試機制模式
├── flink-validate -- 校驗模塊
├── flink-sql -- Flink SQL解耦至XML配置模塊
├── flink-cache-annotation -- 接口緩衝模塊
├── flink-junit -- 單元測試模塊
├── flink-apollo -- 阿波羅配置客戶端模塊

2. 技術選項和集成狀況

技術 名稱 狀態
Spring Framework 容器 已集成
Spring 基於XML方式配置Bean 裝配Bean 已集成
Spring 基於註解方式配置Bean 裝配Bean 已集成
Spring 基於註解聲明方法重試機制 Retry註解 已集成
Spring 基於註解聲明方法緩存 Cache註解 已集成
Hibernate Validator 校驗框架 已集成
Druid 數據庫鏈接池 已集成
MyBatis ORM框架 已集成
Kafka 消息隊列 已集成
HDFS 分佈式文件系統 已集成
Log4J 日誌組件 已集成
Junit 單元測試 已集成
Mybatis-Plus MyBatis擴展包 進行中
PageHelper MyBatis物理分頁插件 進行中
ZooKeeper 分佈式協調服務 進行中
Dubbo 分佈式服務框架 進行中
Redis 分佈式緩存數據庫 進行中
Solr & Elasticsearch 分佈式全文搜索引擎 進行中
Ehcache 進程內緩存框架 進行中
sequence 分佈式高效ID生產 進行中
Dubbole消費者 服務消費者 進行中
Spring eurake消費者 服務消費者 進行中
Apollo配置中心 攜程阿波羅配置中心 進行中
Spring Config配置中心 Spring Cloud Config配置中心 進行中

3. 快速開始

下面是集成Spring生態的基礎手冊.

3.1 核心基礎工程

  • flink-base :基礎工程,封裝了開發Flink工程的必須參數,同時集成Spring容器,爲後續集成Spring各種框架提供了支撐。
    1. 能夠在本地開發環境和Flink集羣運行環境中隨意切換。
    2. 能夠在增量檢查點和全量檢查點之間隨意切換。
    3. 內置使用HDFS做爲檢查點的持久存儲介質。
    4. 默認使用Kafka做爲數據源
    5. 內置實現了任務的暫停機制-達到任務仍在運行但再也不接收Kafka數據源中的數據,代替了中止任務後再從新部署任務這一繁瑣流程。
  • flink-client:業務工程,該工程依賴flink-base工程,開發任務在該工程中進行業務邏輯的開發。

3.2 Spring容器

該容器模式配置了JdbcTemplate實例,數據庫鏈接池採用Druid,在業務方法中只須要獲取容器中的JdbcTemplate實例即可以快速與關係型數據庫進行交互,dataService實例封裝了一些訪問數據庫表的方法。

topology-base.xml
<beans ......
       default-lazy-init="true" default-init-method="init">

    <context:property-placeholder location="classpath:config.properties"/>

    <bean id="druidDataSource" class="com.alibaba.druid.pool.DruidDataSource">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"></property>
        <property name="url"
                  value="${jdbc.url}"></property>
        <property name="username" value="${jdbc.user}"></property>
        <property name="password" value="${jdbc.password}"></property>
    </bean>
    
    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <constructor-arg ref="druidDataSource"></constructor-arg>
    </bean>
    
    <bean id="dataService" class="com.intsmaze.flink.base.service.DataService">
        <property name="jdbcTemplate" ref="jdbcTemplate"></property>
    </bean>

</beans>
config.properties
jdbc.user = intsmaze
jdbc.password = intsmaze
jdbc.url = jdbc:mysql://127.0.0.1:3306/flink-boot?useUnicode=true&characterEncoding=UTF-8

3.3 啓動類示例

以下是SimpleClient(com.intsmaze.flink.client.SimpleClient)類的示例代碼,該類繼承了BaseFlink,能夠看到對應實現的方法中分別設置以下:

  • public String getTopoName():定義本做業的名稱。
  • public String getConfigName():定義本做業須要讀取的spring配置文件的名稱
  • public String getPropertiesName():定義本做業須要讀取的properties配置文件的名稱。
  • public void createTopology(StreamExecutionEnvironment builder):構造本做業的拓撲結構。
/**
 * github地址: https://github.com/intsmaze
 * 博客地址:https://www.cnblogs.com/intsmaze/
 * 出版書籍《深刻理解Flink核心設計與實踐原理》 隨書代碼
 *
 * @auther: intsmaze(劉洋)
 * @date: 2020/10/15 18:33
 */
public class SimpleClient extends BaseFlink {

    public static void main(String[] args) throws Exception {
        SimpleClient topo = new SimpleClient();
        topo.run(ParameterTool.fromArgs(args));
    }

    @Override
    public String getTopoName() {
        return "SimpleClient";
    }

    @Override
    public String getConfigName() {
        return "topology-base.xml";
    }

    @Override
    public String getPropertiesName() {
        return "config.properties";
    }

    @Override
    public void createTopology(StreamExecutionEnvironment builder) {

        DataStream<String> inputDataStrem = env.addSource(new SimpleDataSource());

        DataStream<String> processDataStream = inputDataStrem.flatMap(new SimpleFunction());

        processDataStream.print("輸出結果");
    }

}

3.4 數據源

採用自定義數據源,用戶須要編寫自定義DataSource類,該類須要繼承XXX抽象類,實現以下方法。

  • public abstract void open(StormBeanFactory beanFactory):獲取本做業在Spring配置文件中配置的bean對象。
  • public abstract String sendMessage():本做業spout生成數據的方法,在該方法內編寫業務邏輯產生源數據,產生的數據以String類型進行返回。
public class SimpleDataSource extends CommonDataSource {

    private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
	......

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ...//構造讀取各種外部系統數據的鏈接實例
    }

    @Override
    public String sendMess() throws InterruptedException {
        Thread.sleep(1000);
		......
        MainData mainData = new MainData();
        ......//經過外部系統數據的鏈接實例讀取外部系統數據,封裝進MainData對象中,而後返回便可。
        return gson.toJson(mainData);
    }
}

3.5 業務邏輯實現

本做業計算的業務邏輯在Flink轉換操做符中進行實現,通常來講開發者只須要實現flatMap算子便可以知足大部分算子的使用。

用戶編寫的自定義類須要繼承com.intsmaze.flink.base.transform.CommonFunction抽象類,均需實現以下方法。

  • public abstract String execute(String message):本做業業務邏輯計算的方法,參數message爲Kafka主題中讀取過來的參數,默認參數爲String類型,若是須要將處理的數據發送給Kakfa主題中,則要經過return將處理的數據返回便可。
public class SimpleFunction extends CommonFunction {

    private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
    
    @Override
    public String execute(String message) throws Exception {
        FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() {
        }.getType());

        String flowUUID = dataService.findUUID(flowData);
        if (StringUtils.isBlank(flowUUID)) {
            flowUUID = UUID.randomUUID().toString();
            flowData.setUuid(flowUUID);
            dataService.insertFlow(flowData);
        }
        return gson.toJson(flowData);
    }
}
CommonFunction

CommonFunction抽象類中默認在open方法中經過BeanFactory對象獲取到了Spring容器中對於的dataService實例,對於Spring中的其餘實例同理在SimpleFunction類中的open方法中獲取便可。

public abstract class CommonFunction extends RichFlatMapFunction<String, String> {

    private IntCounter numLines = new IntCounter();

    protected DataService dataService;

    protected ApplicationContext beanFactory;

    @Override
    public void open(Configuration parameters) {
        getRuntimeContext().addAccumulator("num-FlatMap", this.numLines);

        ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
                .getExecutionConfig().getGlobalJobParameters();
        beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters);

        dataService = beanFactory.getBean(DataService.class);
    }

    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        this.numLines.add(1);
        String execute = execute(value);
        if (StringUtils.isNotBlank(execute)) {
            out.collect(execute);
        }
    }

    public abstract String execute(String message) throws Exception;

}

能夠根據狀況選擇重寫open(Configuration parameters)方法,同時重寫的open(Configuration parameters)方法的第一行要調用父類的open(Configuration parameters)方法。

public void open(Configuration parameters){
	super.open(parameters);
	......
	//獲取在Spring配置文件中配置的實例
	XXX xxx=beanFactory.getBean(XXX.class);
}

3.6 集羣/本地運行

在自定義的Topology類編寫Main方法,建立自定義的Topology對象後,調用對象的run(...)方法。

public class SimpleClient extends BaseFlink {

/**
 * 本地啓動參數  -isLocal local
 * 集羣啓動參數  -isIncremental isIncremental
 */
public static void main(String[] args) throws Exception {
    SimpleClient topo = new SimpleClient();
    topo.run(ParameterTool.fromArgs(args));
}

.......
相關文章
相關標籤/搜索