SpringBatch批處理框架+mysql倉庫+web監控實錄

一、概念

Spring Batch 是一款輕量級地適合企業級應用的批處理框架,值得注意的是,不一樣於其餘調度框架,Spring Batch不提供調度功能。java

二、批處理過程

批處理能夠分爲如下幾個步驟:mysql

  1. 讀取數據
  2. 按照業務處理數據
  3. 歸檔數據的過程

三、Spring Batch給咱們提供了什麼?

  1. 統一的讀寫接口
  2. 豐富的任務處理方式
  3. 靈活的事務管理及併發處理
  4. 日誌、監控、任務重啓與跳過等特性

四、基礎組件

名稱 用途
JobRepository 用於註冊和存儲Job的容器
JobLauncher 用於啓動Job
Job 實際要執行的做業,包含一個或多個step
step 步驟,批處理的步驟通常包含ItemReader, ItemProcessor, ItemWriter
ItemReader 從給定的數據源讀取item
ItemProcessor 在item寫入數據源以前進行數據整理
ItemWriter 把Chunk中包含的item寫入數據源。
Chunk 數據塊,給定數量的item集合,讓item進行屢次讀和處理,當知足必定數量的時候再一次寫入。
TaskLet 子任務表, step的一個事務過程,包含重複執行,同步/異步規則等。

五、job, step, tasklet 和 chunk 關係

一個job對應至少一個step,一個step對應0或者1個TaskLet,一個taskLet對應0或者1個Chunkweb

SpringBatch批處理框架+mysql倉庫+web監控實錄

六、實戰:批處理excel插入數據庫

6.1:定義數據倉庫

<!-- 內存倉庫  -->
    <!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>-->

    <!-- 數據庫倉庫  -->
    <batch:job-repository id="jobRepository" data-source="dataRepDruidDataSource"
                          isolation-level-for-create="SERIALIZABLE" transaction-manager="transactionManager"
                          table-prefix="BATCH_" max-varchar-length="1000" />

6.2:定義啓動器

<!-- 做業調度器,用來啓動job,引用做業倉庫 -->
    <bean id="jobLauncher"
          class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

6.3:定義JOB

<batch:job id="userBatchJobName" restartable="true">
        <batch:step id="userStep">
            <batch:tasklet allow-start-if-complete="false"
                           start-limit="1" task-executor="taskExecutor" throttle-limit="5">
                <batch:chunk reader="userReader" writer="userWriter"
                             processor="userProcessor" commit-interval="5" retry-limit="10">
                    <batch:retryable-exception-classes>
                        <batch:include class="org.springframework.dao.DuplicateKeyException"/>
                        <batch:include class="java.sql.BatchUpdateException"/>
                        <batch:include class="java.sql.SQLException"/>
                    </batch:retryable-exception-classes>
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="taskExecutor"
          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 線程池維護線程的最少數量 -->
        <property name="corePoolSize" value="100"/>
        <!-- 線程池維護線程所容許的空閒時間 -->
        <property name="keepAliveSeconds" value="30000"/>
        <!-- 線程池維護線程的最大數量 -->
        <property name="maxPoolSize" value="300"/>
        <!-- 線程池所使用的緩衝隊列 -->
        <property name="queueCapacity" value="100"/>
    </bean>

6.4:定義ItemReader

<bean id="userReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="lineMapper" ref="lineMapper"/>
        <property name="resource" value="classpath:message/batch-data-source.csv"/>
    </bean>
<!-- 將每行映射成對象 -->
    <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                <property name="delimiter" value=","/><!-- 根據某種分隔符分割 -->
                <property name="names" value="id,name" />
            </bean>
        </property>
        <property name="fieldSetMapper"><!-- 將拆分後的字段映射成對象 -->
            <bean class="com.hcw.core.batch.UserFieldSetMapper" />
        </property>
    </bean>

6.5:定義ItemWriter

<bean id="userWriter" class="com.hcw.core.batch.MyBatchItemWriter" scope="step">
        <property name="statementId" value="com.hcw.core.batch.dao.UserToMapper.batchInsert"/>
        <property name="sqlSessionFactory" ref="sqlSessionFactoryTo"/>
    </bean>

6.6:定義ItemProcessor

<bean id="userProcessor" class="com.hcw.core.batch.UserItemProcessor"/>

6.7: 定義jobRepository的數據源

<bean id="dataRepDruidDataSource" class="com.alibaba.druid.pool.DruidDataSource"
          init-method="init" destroy-method="close">
        <property name="url" value="${jdbc.mysql.rep.connection.url}" />
        <property name="username" value="${jdbc.mysql.rep.connection.username}" />
        <property name="password" value="${jdbc.mysql.rep.connection.password}" />
        <property name="filters" value="${jdbc.mysql.rep.connection.filters}" />
        <property name="maxActive" value="${jdbc.mysql.rep.connection.maxActive}" />
        <property name="initialSize" value="${jdbc.mysql.rep.connection.initialSize}" />
        <property name="maxWait" value="${jdbc.mysql.rep.connection.maxWait}" />
        <property name="minIdle" value="${jdbc.mysql.rep.connection.minIdle}" />
        <property name="timeBetweenEvictionRunsMillis"
                  value="${jdbc.mysql.rep.connection.timeBetweenEvictionRunsMillis}" />
        <property name="minEvictableIdleTimeMillis"
                  value="${jdbc.mysql.rep.connection.minEvictableIdleTimeMillis}" />
        <property name="validationQuery"
                  value="${jdbc.mysql.rep.connection.validationQuery}" />
        <property name="testWhileIdle"
                  value="${jdbc.mysql.rep.connection.testWhileIdle}" />
        <property name="testOnBorrow" value="${jdbc.mysql.rep.connection.testOnBorrow}" />
        <property name="testOnReturn" value="${jdbc.mysql.rep.connection.testOnReturn}" />
        <property name="poolPreparedStatements"
                  value="${jdbc.mysql.rep.connection.poolPreparedStatements}" />
        <property name="maxPoolPreparedStatementPerConnectionSize"
                  value="${jdbc.mysql.rep.connection.maxPoolPreparedStatementPerConnectionSize}" />
    </bean>

6.8: 啓動JOB

啓動tomcat,打開啓動頁面

SpringBatch批處理框架+mysql倉庫+web監控實錄

相關文章
相關標籤/搜索