spring batch 與模擬實現

Spring Batchhtml

    A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款輕量的、全面的批處理框架,用於開發強大的平常運營的企業級批處理應用程序。)
java

框架主要功能:spring

    Transaction management(事務管理)數據庫

    Chunk based processing(基於塊的處理)app

    Declarative I/O(聲明式的輸入輸出)框架

    Start/Stop/Restart(啓動/中止/再啓動)less

    Retry/Skip(重試/跳過)分佈式

    image.png

    框架一共有4個主要角色:ide

           JobLauncher是任務啓動器,經過它來啓動任務,能夠看作是程序的入口。工具

      Job表明着一個具體的任務。

      Step表明着一個具體的步驟,一個Job能夠包含多個Step。

      JobRepository是存儲數據的地方,能夠看作是一個數據庫的接口,在任務執行的時候須要經過它來記錄任務狀態等等信息。


JobLauncher

    JobLauncher是任務啓動器,該接口只有一個run方法:

         public interface JobLauncher {

            public JobExecution run(Job job, JobParameters jobParameters) throws Exception;

        }

    除了傳入Job對象以外,還須要傳入JobParameters對象。經過JobLauncher能夠在Java程序中調用批處理任務,也能夠經過命令行或者其餘框架(如定時調度框架Quartz、Web後臺框架Spring MVC)中調用批處理任務。Spring Batch框架提供了一個JobLauncher的實現類SimpleJobLauncher。


Job

    Job表明着一個任務,一個Job與一個或者多個JobInstance相關聯,而一個JobInstance又與一個或者多個JobExecution相關聯:

        image.png

    考慮到任務可能不是隻執行一次就不再執行了,更多的狀況多是定時任務,如天天執行一次,每一個星期執行一次等等,那麼爲了區分每次執行的任務,框架使用了JobInstance。

        如上圖所示,Job是一個EndOfDay(天天最後時刻執行的任務),其中一個JobInstance就表明着2007年5月5日那天執行的任務實例。框架經過在執行JobLauncher.run(Job, JobParameters)方法時傳入的JobParameters來區分是哪一天的任務。因爲2007年5月5日那天執行的任務可能不會一次就執行完成,好比中途被中止,或者出現異常致使中斷,須要多執行幾回才能完成,因此框架使用了JobExecution來表示每次執行的任務。


Step

    一個Job任務能夠分爲幾個Step步驟,與JobExection相同,每次執行Step的時候使用StepExecution來表示執行的步驟。每個Step還包含着一個ItemReader、ItemProcessor、ItemWriter:

        ItemReader

        public interface ItemReader<T> {

            T read() throws Exception;

        }

        ItemReader表明着讀操做,框架已經提供了多種ItemReader接口的實現類,包括對文本文件、XML文件、數據庫、JMS消息等讀的處理,固然咱們也能夠本身實現該接口

        ItemProcessor

        public interface ItemProcessor<I, O> {

            O process(I item) throws Exception;

        }

        ItemProcessor表明着處理操做,process方法的形參傳入I類型的對象,經過處理後返回O型的對象。開發者能夠實現本身的業務代碼來對數據進行處理

        ItemWriter

        public interface ItemWriter<T> {

            void write(List<? extends T> items) throws Exception;

        }

        ItemReader表明着寫操做,框架已經提供了多種ItemWriter接口的實現類,包括對文本文件、XML文件、數據庫、JMS消息等寫的處理,固然咱們也能夠本身實現該接口


JobRepository

        JobRepository用於存儲任務執行的狀態信息,好比什麼時間點執行了什麼任務、任務執行結果如何等等。框架提供了2種實現,一種是經過Map形式保存在內存中,當Java程序重啓後任務信息也就丟失了,而且在分佈式下沒法獲取其餘節點的任務執行狀況;另外一種是保存在數據庫中

        例子:

        引入依賴

        <dependency>    

                <groupId>org.springframework.batch</groupId>    

                <artifactId>spring-batch-core</artifactId>    

                <version>3.0.8.RELEASE</version> 

        </dependency>

        裝載bean

        <beans xmlns="http://www.springframework.org/schema/beans"

               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

               xsi:schemaLocation="

                                http://www.springframework.org/schema/beans

                                http://www.springframework.org/schema/beans/spring-beans.xsd">

            <!-- 事務管理器 -->

            <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

            <!-- 任務倉庫 -->

            <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">

                <property name="transactionManager" ref="transactionManager"/>

            </bean>

            <!-- 任務加載器 -->

            <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">

                <property name="jobRepository" ref="jobRepository"/>

            </bean>

        </beans>

        建立Reader

        咱們直接在resources目錄下建立一個batch-data.csv文件,內容以下:

            1,PENDING

            2,PENDING

            3,PENDING

            4,PENDING

            5,PENDING

            6,PENDING

            7,PENDING

            8,PENDING

            9,PENDING

            10,PENDING

    讀操做須要實現ItemReader<T>接口,框架提供了一個現成的實現類FlatFileItemReader。使用該類須要設置Resource和LineMapper。Resource表明着數據源,即咱們的batch-data.csv文件;LineMapper則表示如何將文件的每行數據轉成對應的DTO對象。

        建立DTO對象

            public class DeviceCommand {

                private String id;

                private String status;

            }

       自定義LineMapper

    咱們須要本身實現一個LineMapper實現類,用於將batch-data.csv文件的每行數據,轉成程序方便處理的DeviceCommand對象。

    public class HelloLineMapper implements LineMapper<DeviceCommand> {

            @Override

            public DeviceCommand mapLine(String line, int lineNumber) throws Exception {

                // 逗號分割每一行數據

                String[] args = line.split(",");

                // 建立DeviceCommand對象

                DeviceCommand deviceCommand = new DeviceCommand();

                // 設置id值到對象中

                deviceCommand.setId(args[0]);

                // 設置status值到對象中

                deviceCommand.setStatus(args[1]);

                // 返回對象

                return deviceCommand;

            }

        }

      建立Processor

      讀完數據後,咱們就須要處理數據了。處理操做須要實現ItemProcessor<I, O>接口,咱們本身實現一個HelloItemProcessor.java便可,代碼以下:

        public class HelloItemProcessor implements ItemProcessor<DeviceCommand, DeviceCommand> {

            @Override

            public DeviceCommand process(DeviceCommand deviceCommand) throws Exception {

                // 模擬下發命令給設備

                System.out.println("send command to device, id=" + deviceCommand.getId());

                // 更新命令狀態

                deviceCommand.setStatus("SENT");

                // 返回命令對象

                return deviceCommand;     

            }     

        }

       建立Writer

    處理完數據後,咱們須要更新命令狀態到文件裏,用於記錄咱們已經下發。與讀文件相似,咱們須要實現ItemWriter<T>接口,框架也提供了一個現成的實現類FlatFileItemWriter。使用該類須要設置Resource和LineAggregator。Resource表明着數據源,即咱們的batch-data.csv文件;LineAggregator則表示如何將DTO對象轉成字符串保存到文件的每行。

       自定義LineAggregator

       咱們須要本身實現一個LineAggregator實現類,用於將DeviceCommand對象轉成字符串,保存到batch-data.csv文件。

        public class HelloLineAggregator implements LineAggregator<DeviceCommand> {

            @Override

            public String aggregate(DeviceCommand deviceCommand) {

                StringBuffer sb = new StringBuffer();

                sb.append(deviceCommand.getId());

                sb.append(",");

                sb.append(deviceCommand.getStatus());

                return sb.toString();

            }

        }

      主程序

        image.png

        public class Main {

            public static void main(String[] args) throws Exception {

                // 加載上下文

                String[] configLocations = {"applicationContext.xml"};

                ApplicationContext applicationContext = new ClassPathXmlApplicationContext(configLocations);

                // 獲取任務啓動器,任務倉庫,事物管理器

                JobLauncher jobLauncher = applicationContext.getBean(JobLauncher.class);

                JobRepository jobRepository = applicationContext.getBean(JobRepository.class);

                PlatformTransactionManager transactionManager = applicationContext.getBean(PlatformTransactionManager.class);

                // 建立reader

                FlatFileItemReader<DeviceCommand> flatFileItemReader = new FlatFileItemReader<>();

                flatFileItemReader.setResource(new FileSystemResource("src/main/resources/batch-data.csv"));

                flatFileItemReader.setLineMapper(new HelloLineMapper());

                // 建立processor

                HelloItemProcessor helloItemProcessor = new HelloItemProcessor();

                // 建立writer

                FlatFileItemWriter<DeviceCommand> flatFileItemWriter = new FlatFileItemWriter<>();

                flatFileItemWriter.setResource(new FileSystemResource("src/main/resources/batch-data.csv"));

                flatFileItemWriter.setLineAggregator(new HelloLineAggregator());

                // 建立Step

                StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);

                Step step = stepBuilderFactory.get("step")

                           .<DeviceCommand, DeviceCommand>chunk(1)

                           .reader(flatFileItemReader)       // 讀操做

                           .processor(helloItemProcessor)    // 處理操做

                           .writer(flatFileItemWriter)       // 寫操做

                           .build();

                // 建立Job

                JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);

                Job job = jobBuilderFactory.get("job")

                                           .start(step)

                                           .build();

                // 啓動任務

                jobLauncher.run(job, new JobParameters());

            }

        }

        執行main方法以後,屏幕將會輸出下面信息:

        send command to device, id=1

        send command to device, id=2

        send command to device, id=3

        send command to device, id=4

        send command to device, id=5

        send command to device, id=6

        send command to device, id=7

        send command to device, id=8

        send command to device, id=9

        send command to device, id=10

        再查看batch-data.csv文件,將會發現命令狀態所有更新爲SENT:

        1,SENT

        2,SENT

        3,SENT

        4,SENT

        5,SENT

        6,SENT

        7,SENT

        8,SENT

        9,SENT

        10,SENT


    引伸----實現批處理

     1.獲得要批處理的數據列表

        dataList-->Process.getNeedIds(Parm)

       2.生產者(準備數據)

                ProducerExecutor<T, Long> ProducerExecutor{

                    List<T> producer(List<Long> dataList){

                        Process.getNeedUpdate(dataList, Parm)根據列表獲得數據

                    }

                }

        3.消費者(執行數據的更新操做)

                CustomerExecutor<T> customerExecutor{

                    void execute(T data){

                        Process.updateOrAdd(data, Parm)數據執行更新操做

                    }

                }

        4.process(定義數據的執行邏輯)

              Process.getNeedIds

        Process.getNeedUpdate

        Process.updateOrAdd

        Process.after

        5.批量執行任務

                生產者:

            data = ProducerExecutor.producer(dataList.subList(e, 分批大小toIndex))

                //執行完的數據寫入LinkedBlockingQueue queue中

                queue.put(data)

            消費者:

            ExecutorService executorService = CustomerExecutor.newFixedThreadPool(consumeThreadPoolSize);

                //從queue中取出數據執行

                data = queue.poll()

       //當將一個任務(分批大小的data)添加到線程池中的時候,線程池會爲每一個任務建立一個線程

                executorService.submit

                        customerExecutor.execute(data)

                

    6.執行完畢後

        Process.after(Parm)


關於ExecutorService接口:

    它擴展自Executor接口,Executor接口僅有一個方法:execute(runnable)

    ExecutorService在Executor的基礎上增長了「service」特性的方法:

        shutdown()、shutdownNow():都是關閉當前service服務,釋放Executor的全部資源;它所觸發的動做就是取消隊列中任務的執行

            shutdown是一種「友好」的關閉,它將再也不接受新的任務提交,同時把已經提交到隊列中的任務執行完畢。

            shutdownNow更加直接一些,它將會把還沒有執行的任務再也不執行。shutdowNow是個有返回類型的方法,它返回那些等待執行的任務列表(List<Runnable>)

        Future submit(callable/runnale):向Executor提交任務,並返回一個結果未定的Future。


    經過 Executor來啓動線程,比用Thread的start()更好:能夠很容易控制線程的啓動、執行和關閉過程,還能夠很容易使用線程池的特性。

    

    1.建立ExecutorService

        經過工具類java.util.concurrent.Executors的靜態方法建立。

        Executors包中所定義了 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和Callable類的工廠和實用方法。

        好比建立一個ExecutorService的實例,ExecutorService其實是一個線程池的管理工具:

                ExecutorService executorService = Executors.newCachedThreadPool();

                ExecutorService executorService = Executors.newFixedThreadPool(3);

                ExecutorService executorService = Executors.newSingleThreadExecutor();

    2.將任務添加到線程去執行

        當將一個任務添加到線程池中的時候,線程池會爲每一個任務建立一個線程,該線程會在以後的某個時刻自動執行。

    3.關閉執行服務對象

        executorService.shutdown();

    4.獲取任務的執行的返回值

        任務分兩類:一類是實現Runnable接口的類,一類是實現了Callable接口的類。

        二者均可以被 ExecutorService執行,可是Runnable任務沒有返回值,而Callable任務有返回值。

        而且Callable的call()方法只能經過ExecutorService的(<T> task) 方法來執行,而且返回一個 <T>,<T>是表示任務等待完成的 Future。

        例子:

                ExecutorService executorService = Executors.newCachedThreadPool();

                //建立10個任務並執行

                for (int i = 0; i < 10; i++) {

                    //使用ExecutorService執行Callable類型的任務,並將結果保存在future中

                    Future<String> future = executorService.submit(new TaskWithResult(i));

                    System.out.println(future.get()); //打印各個線程(任務)執行的結果

                    //啓動一次順序關閉,執行之前提交的任務,但不接受新任務。

                    若是已經關閉,則調用沒有其餘做用。

                    executorService.shutdown();

                }

                定義任務:

                class TaskWithResult implements Callable<String> {

                    privateint id;

                    public TaskWithResult(int id) {

                        this.id = id;

                    }

                    public String call() throws Exception {

                        System.out.println("call()方法被自動調用));

                        //一個模擬耗時的操做

                        for (int i = 999999; i > 0; i--) ;

                        return"call()方法的返回結果,保存在future";

                    }

                }

        


部分轉載 http://www.importnew.com/26177.html

相關文章
相關標籤/搜索