springbatch操做DB

1、需求分析html

使用Spring Batch對DB進行讀寫操做: 從一個表中讀取數據, 而後批量的插入另一張表中.java


2、代碼實現mysql

1. 代碼結構圖:spring


2. applicationContext.xml
sql

<?

xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd"> <!-- 配置spring掃描範圍 --> <context:component-scan base-package="com.zdp" /> <!-- 配置數據源 --> <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" abstract="false" scope="singleton"> <property name="driverClass" value="org.gjt.mm.mysql.Driver" /> <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/test?app

useUnicode=true&characterEncoding=UTF-8" /> <property name="user" value="root" /> <property name="password" value="root" /> <property name="checkoutTimeout" value="30000" /> <property name="maxIdleTime" value="120" /> <property name="maxPoolSize" value="100" /> <property name="minPoolSize" value="2" /> <property name="initialPoolSize" value="2" /> <property name="maxStatements" value="0" /> <property name="maxStatementsPerConnection" value="0" /> <property name="idleConnectionTestPeriod" value="30" /> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <tx:annotation-driven transaction-manager="transactionManager" /> </beans> ide

base-package: 掃描spring註解post

jobLauncher: 啓動Jobui

jobRepository: 爲Job提供持久化操做spa

transactionManager: 提供事務管理操做


3. springBatch.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans	
	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
	http://www.springframework.org/schema/batch 
	http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
	http://www.springframework.org/schema/context 
	http://www.springframework.org/schema/context/spring-context-3.1.xsd
	http://www.springframework.org/schema/tx 
	http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">
	
	<!-- 引入spring核心配置文件 -->
	<import resource="applicationContext.xml"/>

	<batch:job id="ledgerJob">
		<!-- 監聽job運行狀態 -->
		<batch:listeners>
			<batch:listener ref="appJobExecutionListener" />
		</batch:listeners>
		<batch:step id="step">
			<!-- 加入事務控制 -->
			<batch:tasklet transaction-manager="transactionManager">
				<batch:listeners>
					<batch:listener ref="itemFailureLoggerListener" />
				</batch:listeners>
				<!-- commit-interval: 批量提交的條數; skip-limit: 指贊成跳過記錄數 -->
				<batch:chunk reader="ledgerReader" writer="ledgerWriter" commit-interval="1000" skip-limit="1000">
					<batch:skippable-exception-classes>
						<batch:include class="java.lang.Exception"/> <!-- 出現exception或其子類, Job仍然會日後運行 -->
						<batch:exclude class="java.io.FileNotFoundException"/> <!-- 出現這個異常, Job會立馬中止 -->
					</batch:skippable-exception-classes>
				</batch:chunk> 
			</batch:tasklet>
		</batch:step>
	</batch:job>
	 
	<!-- 從ledger表讀取數據 -->
	<bean id="ledgerReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
		<property name="sql" value="select * from ledger" /> 
		<property name="rowMapper" ref="ledgerRowMapper" />
	</bean>
	 
	<bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" />
	
	<!-- 定時任務開始 -->  
	<bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">  
		<property name="targetObject">  
			<!-- 定時運行類 -->  
			<ref bean="quartzLedgerJob" />  
		</property>  
		<property name="targetMethod">  
			<!-- 定時運行類的方法 -->  
			<value>execute</value>  
		</property>  
	</bean>  
  
	<bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" >   
		<property name="jobDetail" >  
			<ref bean="ledgerJobDetail" />  
		</property>  
		<property name="cronExpression" > 
			<!-- 天天晚上22:30運行 --> 
			<value>0 30 22 ?

* *</value> </property> </bean> <!-- 觸發器工廠。將所有的定時任務都注入工廠--> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <!-- 加入觸發器 --> <property name="triggers"> <list> <!-- 將上面定義的測試定時任務注入(可以定義多個定時任務。同一時候注入)--> <ref local="ledgerCronTrigger" /> </list> </property> </bean> </beans>


4. AppJobExecutionListener.java

/**
 * 監聽job執行狀態
 */
@Component("appJobExecutionListener")
public class AppJobExecutionListener implements JobExecutionListener {
	private final static Logger logger = Logger.getLogger(AppJobExecutionListener.class);

	public void afterJob(JobExecution jobExecution) {
		if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
			logger.info("Job completed: " + jobExecution.getJobId());
		} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
			logger.info("Job failed: " + jobExecution.getJobId());
		}
	}

	public void beforeJob(JobExecution jobExecution) {
		if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
			logger.info("Job completed: " + jobExecution.getJobId());
		} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
			logger.info("Job failed: " + jobExecution.getJobId());
		}
	}
}


5. ItemFailureLoggerListener.java

/**
 * 檢查是讀出錯仍是寫出錯
 */
@Component("itemFailureLoggerListener")
public class ItemFailureLoggerListener extends ItemListenerSupport<Object, Object> {
	private final static Logger LOG = Logger.getLogger(ItemFailureLoggerListener.class);

	public void onReadError(Exception ex) {
		LOG.error("Encountered error on read", ex);
	}

	public void onWriteError(Exception ex, Object item) {
		LOG.error("Encountered error on write", ex);
	}

}

6. Ledger.java

public class Ledger implements Serializable {
	private static final long serialVersionUID = 1L;
	private int id;
	private Date receiptDate;
	private String memberName;
	private String checkNumber;
	private Date checkDate;
	private String paymentType;
	private double depositAmount;
	private double paymentAmount;
	private String comments;

	// getter and setter
}

7. LedgerRowMapper.java

/**
 * ledger行的映射類
 */
@SuppressWarnings("rawtypes")
@Component("ledgerRowMapper")
public class LedgerRowMapper implements RowMapper {
	public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
		Ledger ledger = new Ledger();
		ledger.setId(rs.getInt("ID"));
		ledger.setReceiptDate(rs.getDate("RECEIPT_DATE"));
		ledger.setMemberName(rs.getString("MEMBER_NAME"));
		ledger.setCheckNumber(rs.getString("MEMBER_NAME"));
		ledger.setCheckDate(rs.getDate("CHECK_DATE"));
		ledger.setPaymentType(rs.getString("PAYMENT_TYPE"));
		ledger.setDepositAmount(rs.getDouble("DEPOSIT_AMOUNT"));
		ledger.setPaymentAmount(rs.getDouble("PAYMENT_AMOUNT"));
		ledger.setComments(rs.getString("COMMENTS"));
		return ledger;
	}
}

8. LedgerDao.java

public interface LedgerDao {
	public void save(final Ledger item) ;
}

9.  LedgerDaoImpl.java
/**
 * ledger數據操做類
 */
@Repository
public class LedgerDaoImpl implements LedgerDao {

	private static final String SAVE_SQL = "INSERT INTO LEDGER_TEMP (RECEIPT_DATE, MEMBER_NAME, CHECK_NUMBER, CHECK_DATE, PAYMENT_TYPE, DEPOSIT_AMOUNT, PAYMENT_AMOUNT, COMMENTS) VALUES(?,?

,?

,?,?

,?,?

,?)"; @Autowired private JdbcTemplate jdbcTemplate; @Override public void save(final Ledger item) { jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() { public void setValues(PreparedStatement stmt) throws SQLException { stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime())); stmt.setString(2, item.getMemberName()); stmt.setString(3, item.getCheckNumber()); stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime())); stmt.setString(5, item.getPaymentType()); stmt.setDouble(6, item.getDepositAmount()); stmt.setDouble(7, item.getPaymentAmount()); stmt.setString(8, item.getComments()); } }); } }


10. LedgerWriter.java

/**
 * ledger寫入數據
 */
@Component("ledgerWriter")
public class LedgerWriter implements ItemWriter<Ledger> {

	@Autowired
	private LedgerDao ledgerDao;

	/**
	 * 寫入數據
	 * @param ledgers
	 */
	public void write(List<? extends Ledger> ledgers) throws Exception {
		for (Ledger ledger : ledgers) {
			ledgerDao.save(ledger);
		}
	}

}

11.  QuartzLedgerJob.java

/**
 * 定時調度類
 */
@Component("quartzLedgerJob")
public class QuartzLedgerJob {

	private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class);

	@Autowired
	private JobLauncher jobLauncher;

	@Autowired
	private Job ledgerJob;

	@Autowired
	JobParametersBuilder jobParameterBulider;

	private static long counter = 0l;
	
	/**
	 * 運行業務方法
	 * @throws Exception
	 */
	public void execute() throws Exception {
		/**
		 * Spring Batch Job同一個job instance,成功運行後是不一樣意又一次運行的, 
		 * 失敗後是否贊成重跑,可經過配置Job的restartable參數來控制,默認是true,假設需要又一次運行。可以變通處理,
		 * 加入一個JobParameters構建類,以當前時間做爲參數,保證其它參數一樣的狀況下是不一樣的job instance
		 */
		LOG.debug("start...");
		StopWatch stopWatch = new StopWatch(); 
		stopWatch.start();
		jobParameterBulider.addDate("date", new Date());
		jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters());
		stopWatch.stop();
		LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", stopWatch.prettyPrint(), ++counter);
	}
}


12. StartQuartz.java

/**
 * 啓動定時調度
 * 需求描寫敘述: 定時從表ledger讀取數據, 而後批量寫入表ledger_temp
 */
public class StartQuartz {
	public static void main(String[] args) throws FileNotFoundException {
		new ClassPathXmlApplicationContext("/com/zdp/resources/springBatch.xml");
	}
}

13. sql:

create table ledger(
	ID int(10) not null AUTO_INCREMENT PRIMARY KEY,
	RECEIPT_DATE date,
	MEMBER_NAME varchar(10) ,
	CHECK_NUMBER varchar(10) ,
	CHECK_DATE date,
	PAYMENT_TYPE varchar(10) ,
	DEPOSIT_AMOUNT double(10,3),
	PAYMENT_AMOUNT double(10,3),
	COMMENTS varchar(100) 
);

create table ledger_temp(
	ID int(10) not null AUTO_INCREMENT PRIMARY KEY,
	RECEIPT_DATE date,
	MEMBER_NAME varchar(10) ,
	CHECK_NUMBER varchar(10) ,
	CHECK_DATE date,
	PAYMENT_TYPE varchar(10) ,
	DEPOSIT_AMOUNT double(10,3),
	PAYMENT_AMOUNT double(10,3),
	COMMENTS varchar(100) 
);
相關文章
相關標籤/搜索