前二天寫了一篇《Java 多線程併發編程》點我直達,放國慶,在家閒着沒事,繼續寫剩下的東西,開幹!html
例如web服務器、數據庫服務器、文件服務器或郵件服務器之類的。請求的時候,單個任務時間很短,可是請求數量巨大。每一次請求,就會建立一個新線程,而後在新線程中請求服務,頻繁的建立線程,銷燬線程形成系統很大的開銷,資源的浪費。java
線程池爲線程生命週期開銷問題和資源不足問題提供瞭解決方案。經過對多個任務重用線程,線程車建立的開銷分攤到多個任務上。mysql
對具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果、設置結果。get方法會阻塞,直到任務返回結果。web
Callable與Runnable功能類似,Callable有返回值;Runnable沒有返回值;通常狀況下,Callable與FutureTask一塊兒使用,或者與線程池一塊兒使用spring
通常系統,多數會與第三方系統的數據進行打交道,而第三方的生產庫,並不容許咱們直接操做。在企業裏面,通常都是經過中間表進行同步,即第三方系統將生產數據放入一張與其生產環境隔離的另外一個獨立數據庫中的獨立表,在根據接口協議,增長相應的字段。而我方須要讀取該中間表中的數據,並對數據進行同步操做。此時就須要編寫相應的程序進行數據同步。sql
讀取中間表的數據,並同步到業務系統中數據庫
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cyb</groupId> <artifactId>ybchen_syn</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- 添加MyBatis框架 --> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.5.6</version> <!-- 版本號視狀況修改 --> </dependency> <!-- 添加MySql驅動包 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.21</version> </dependency> <!--鏈接池--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.2.1</version> </dependency> <!--日誌--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency> <!--單元測試--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> <scope>test</scope> </dependency> </dependencies> </project>
### 設置###
log4j.rootLogger = debug,stdout,D,E
### 輸出信息到控制擡 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
### 輸出DEBUG 級別以上的日誌到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = ./logs/debug.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
### 輸出ERROR 級別以上的日誌到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File =./logs/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="middle-student"> <resultMap id="BaseResultMap" type="com.cyb.entity.middle.Student"> <id column="id" property="id" jdbcType="INTEGER"/> <result column="name" property="name" jdbcType="VARCHAR"/> <result column="sex" property="sex" jdbcType="VARCHAR"/> <result column="birth" property="birth" jdbcType="TIMESTAMP"/> <result column="department" property="department" jdbcType="VARCHAR"/> <result column="add_time" property="addTime" jdbcType="TIMESTAMP"/> <result column="data_status" property="dataStatus" jdbcType="VARCHAR"/> <result column="deal_time" property="dealTime" jdbcType="TIMESTAMP"/> </resultMap> <select id="selectList" resultMap="BaseResultMap"> SELECT * FROM student WHERE data_status = 'I' limit #{count} </select> <update id="updateStatusById"> update student set data_status = #{dataStatus}, deal_time = #{dealTime} where id =#{id} </update> </mapper>
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="test-student"> <insert id="addStudent"> insert into student (name,sex,department) values (#{name},#{sex},#{department}) </insert> </mapper>
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> <!-- MyBatis的全局配置文件 --> <configuration > <!-- 1.配置開發環境 --> <environments default="develop"> <!-- 這裏能夠配置多個環境,好比develop,test等 --> <environment id="develop"> <!-- 1.1.配置事務管理方式:JDBC:將事務交給JDBC管理(推薦) --> <transactionManager type="JDBC"></transactionManager> <!-- 1.2.配置數據源,即鏈接池方式:JNDI/POOLED/UNPOOLED --> <dataSource type="com.cyb.datasource.DruidDataSourceFactory"> <property name="driverClass" value="com.mysql.cj.jdbc.Driver"/> <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/middle?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&autoReconnect=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="initialSize" value="2"/> <property name="maxActive" value="300"/> <property name="maxWait" value="60000"/> <property name="poolPreparedStatements" value="true"/> <property name="maxPoolPreparedStatementPerConnectionSize" value="200"/> </dataSource> </environment> </environments> <!-- 2.加載Mapper配置文件,路徑以斜槓間隔: xx/xx/../xx.xml --> <mappers> <mapper resource="middle-student.xml"/> </mappers> </configuration>
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> <!-- MyBatis的全局配置文件 --> <configuration > <!-- 1.配置開發環境 --> <environments default="develop"> <!-- 這裏能夠配置多個環境,好比develop,test等 --> <environment id="develop"> <!-- 1.1.配置事務管理方式:JDBC:將事務交給JDBC管理(推薦) --> <transactionManager type="JDBC"></transactionManager> <!-- 1.2.配置數據源,即鏈接池方式:JNDI/POOLED/UNPOOLED --> <dataSource type="com.cyb.datasource.DruidDataSourceFactory"> <property name="driverClass" value="com.mysql.cj.jdbc.Driver"/> <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&autoReconnect=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="initialSize" value="2"/> <property name="maxActive" value="300"/> <property name="maxWait" value="60000"/> <property name="poolPreparedStatements" value="true"/> <property name="maxPoolPreparedStatementPerConnectionSize" value="200"/> </dataSource> </environment> </environments> <!-- 2.加載Mapper配置文件,路徑以斜槓間隔: xx/xx/../xx.xml --> <mappers> <mapper resource="test-student.xml"/> </mappers> </configuration>
package com.cyb.cost; public class StudentConst { //I:第三方系統入庫;D:處理中;F:處理完成;E:發生錯誤或異常 public static final String INIT="I"; public static final String DEALING="D"; public static final String FINISH="F"; public static final String ERROR="E"; }
package com.cyb.datasource; import com.alibaba.druid.pool.DruidDataSource; import org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory; /** * @ClassName:DruidDataSourceFactory * @Description:Druid鏈接池工廠類 * @Author:chenyb * @Date:2020/10/7 7:30 下午 * @Versiion:1.0 */ public class DruidDataSourceFactory extends UnpooledDataSourceFactory { public DruidDataSourceFactory() { this.dataSource = new DruidDataSource(); } }
package com.cyb.entity.middle; import java.io.Serializable; import java.util.Date; public class Student implements Serializable { private Integer id; private String name; private String sex; private String address; private String department; private Date addTime; private String dataStatus; private Date dealTime; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public String getDepartment() { return department; } public void setDepartment(String department) { this.department = department; } public Date getAddTime() { return addTime; } public void setAddTime(Date addTime) { this.addTime = addTime; } public String getDataStatus() { return dataStatus; } public void setDataStatus(String dataStatus) { this.dataStatus = dataStatus; } public Date getDealTime() { return dealTime; } public void setDealTime(Date dealTime) { this.dealTime = dealTime; } @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", sex='" + sex + '\'' + ", address='" + address + '\'' + ", department='" + department + '\'' + ", addTime=" + addTime + ", dataStatus='" + dataStatus + '\'' + ", dealTime=" + dealTime + '}'; } }
package com.cyb.entity.test; import java.io.Serializable; public class Student implements Serializable { private Integer id; private String name; private String sex; private String department; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public String getDepartment() { return department; } public void setDepartment(String department) { this.department = department; } @Override public String toString() { return "student{" + "id=" + id + ", name='" + name + '\'' + ", sex='" + sex + '\'' + ", department='" + department + '\'' + '}'; } }
package com.cyb.process; import com.cyb.entity.middle.Student; import java.util.List; public interface MiddleProcess { /** * 查詢數據 * @param count 一次查詢的數量 * @return */ List<Student> queryList(int count); /** * 修改數據狀態 * @param data 待修改數據 * @param status 要修改爲的狀態 * @return */ int modifyListStatus(List<Student> data, String status); }
package com.cyb.process; import com.cyb.entity.middle.Student; import java.util.List; public interface TestProcess { /** * 處理數據 * @param data */ void hand(List<Student> data); }
package com.cyb.process.impl; import com.cyb.entity.middle.Student; import com.cyb.process.MiddleProcess; import com.cyb.util.SqlSessionUtil; import org.apache.ibatis.session.SqlSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.List; public class MiddleProcessImpl implements MiddleProcess { private static final Logger LOGGER= LoggerFactory.getLogger(MiddleProcess.class); @Override public List<Student> queryList(int count) { SqlSession middleSqlSession = SqlSessionUtil.getSqlSession("middle"); List<Student> objects =null; try { objects = middleSqlSession.selectList("middle-student.selectList", count); }catch (Exception e){ LOGGER.error("查詢發生異常=======》",e); } finally { //關閉鏈接 middleSqlSession.close(); } return objects; } @Override public int modifyListStatus(List<Student> data, String status) { data.forEach(stu->{ stu.setDataStatus(status); SqlSession middleSqlSession = SqlSessionUtil.getSqlSession("middle"); try { middleSqlSession.update("middle-student.updateStatusById",stu); middleSqlSession.commit(); }catch (Exception e){ //回滾當前提交 middleSqlSession.rollback(); LOGGER.error("修改狀態失敗=======》",e); }finally { middleSqlSession.close(); } }); return 0; } }
package com.cyb.process.impl; import com.cyb.cost.StudentConst; import com.cyb.entity.middle.Student; import com.cyb.process.TestProcess; import com.cyb.util.SqlSessionUtil; import org.apache.ibatis.session.SqlSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; import java.util.List; public class TestProcessImpl implements TestProcess { private Logger LOGGER = LoggerFactory.getLogger(TestProcess.class); @Override public void hand(List<Student> data) { //將data轉換成業務庫的實體 List<com.cyb.entity.test.Student> students = adapter(data); //處理數據,併入庫 students.forEach(stu -> { stu.setName(stu.getName() + "_test"); SqlSession testSqlSession = SqlSessionUtil.getSqlSession("test"); try { testSqlSession.insert("test-student.addStudent", stu); testSqlSession.commit(); //修改中間表狀態 modifyMiddle(stu.getId(), StudentConst.FINISH); } catch (Exception e) { //回滾操做 testSqlSession.rollback(); LOGGER.error("處理數據發生異常============》",e); } finally { testSqlSession.close(); } }); } /** * 數據適配器 * * @param data * @return */ public List<com.cyb.entity.test.Student> adapter(List<Student> data) { List<com.cyb.entity.test.Student> result = new ArrayList<>(); data.forEach(stu -> { com.cyb.entity.test.Student student = new com.cyb.entity.test.Student(); student.setId(stu.getId()); student.setName(stu.getName()); student.setDepartment(stu.getDepartment()); student.setSex(stu.getSex()); result.add(student); }); return result; } /** * 修改中間表狀態 * @param id * @param status */ private void modifyMiddle(int id, String status) { Student student = new Student(); student.setId(id); student.setDataStatus(status); student.setDealTime(new Date()); SqlSession middleSqlSession = SqlSessionUtil.getSqlSession("middle"); try { middleSqlSession.update("middle-student.updateStatusById", student); middleSqlSession.commit(); } catch (Exception e) { middleSqlSession.rollback(); LOGGER.error("修改中間表狀態失敗===========》",e); } finally { middleSqlSession.close(); } } }
package com.cyb.start; import com.cyb.entity.middle.Student; import com.cyb.process.TestProcess; import java.util.List; import java.util.concurrent.LinkedBlockingDeque; /** * @ClassName:Consumer * @Description:消費者 * @Author:chenyb * @Date:2020/10/7 9:23 下午 * @Versiion:1.0 */ public class Consumer implements Runnable{ private List<Student> data; private TestProcess testProcess; private LinkedBlockingDeque<Runnable> consumer; public Consumer(TestProcess testProcess, LinkedBlockingDeque<Runnable> consumer) { this.testProcess = testProcess; this.consumer = consumer; } @Override public void run() { try { testProcess.hand(data); }finally { try { //添加元素,隊列滿,進入阻塞狀態 consumer.put(this); } catch (InterruptedException e) { e.printStackTrace(); } } } public void setData(List<Student> data){ this.data=data; } }
package com.cyb.start; import com.cyb.cost.StudentConst; import com.cyb.entity.middle.Student; import com.cyb.process.MiddleProcess; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; /** * @ClassName:Producer * @Description:提供者 * @Author:chenyb * @Date:2020/10/7 9:22 下午 * @Versiion:1.0 */ public class Producer implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private MiddleProcess middleProcess; private LinkedBlockingDeque<Runnable> consumer; private ThreadPoolExecutor executor; public Producer(MiddleProcess middleProcess, LinkedBlockingDeque<Runnable> consumer, ThreadPoolExecutor executor) { this.middleProcess = middleProcess; this.consumer = consumer; this.executor = executor; } @Override public void run() { while (true) { //每次生產10條數據 List<Student> students = middleProcess.queryList(10); try { if (students != null && students.size() > 0) { //將數據修改成處理中 middleProcess.modifyListStatus(students, StudentConst.DEALING); Consumer con = (Consumer) consumer.take(); con.setData(students); executor.execute(con); } else { //若是沒有數據,睡眠5秒 try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } } }catch (Exception e){ LOGGER.error("生產者發生異常========>",e); } } } }
package com.cyb.start; import com.cyb.process.MiddleProcess; import com.cyb.process.TestProcess; import com.cyb.process.impl.MiddleProcessImpl; import com.cyb.process.impl.TestProcessImpl; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 生產着消費者:1 VS 10 */ public class Main { public static void main(String[] args) { TestProcess testProcess=new TestProcessImpl(); MiddleProcess middleProcess=new MiddleProcessImpl(); LinkedBlockingDeque<Runnable> runnables=new LinkedBlockingDeque<>(10); ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(10,20,5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20)); //10個消費者 for (int i = 0; i < 10; i++) { try { runnables.put(new Consumer(testProcess,runnables)); } catch (InterruptedException e) { e.printStackTrace(); } } //開啓一個線程-》生產者 Producer producer=new Producer(middleProcess,runnables,threadPoolExecutor); new Thread(producer).start(); } }
package com.cyb.util; import org.apache.ibatis.io.Resources; import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.session.SqlSessionFactoryBuilder; import org.apache.ibatis.session.SqlSession; import java.io.IOException; import java.io.Reader; /** * @ClassName:SqlSessionUtil * @Description:SqlSession工具類 * @Author:chenyb * @Date:2020/10/7 7:37 下午 * @Versiion:1.0 */ public class SqlSessionUtil { private static final String MYBATIS_CONFIG_MIDDLE = "mybatis-config-middle.xml"; private static final String MYBATIS_CONFIG_TEST = "mybatis-config-test.xml"; private static SqlSessionFactory middleSqlSessionFactory; private static SqlSessionFactory testSqlSessionFactory; private static Reader middleResourceAsReader =null; private static Reader testResourceAsReader =null; static { try { middleResourceAsReader = Resources.getResourceAsReader(MYBATIS_CONFIG_MIDDLE); testResourceAsReader = Resources.getResourceAsReader(MYBATIS_CONFIG_TEST); middleSqlSessionFactory=new SqlSessionFactoryBuilder().build(middleResourceAsReader); testSqlSessionFactory=new SqlSessionFactoryBuilder().build(testResourceAsReader); } catch (IOException e) { e.printStackTrace(); }finally { try { middleResourceAsReader.close(); testResourceAsReader.close(); } catch (IOException e) { e.printStackTrace(); } } } public static SqlSession getSqlSession(String type){ if ("test".equals(type)){ return testSqlSessionFactory.openSession(); } return middleSqlSessionFactory.openSession(); } }
/* Navicat Premium Data Transfer Source Server : localhost Source Server Type : MySQL Source Server Version : 50728 Source Host : localhost:3306 Source Schema : middle Target Server Type : MySQL Target Server Version : 50728 File Encoding : 65001 Date: 07/10/2020 22:42:55 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for student -- ---------------------------- DROP TABLE IF EXISTS `student`; CREATE TABLE `student` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵', `name` varchar(255) NOT NULL COMMENT '姓名', `sex` varchar(255) DEFAULT NULL COMMENT '性別', `address` varchar(255) DEFAULT NULL COMMENT '地址', `department` varchar(255) DEFAULT NULL COMMENT '系', `add_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '數據進入中間表時間', `data_status` varchar(10) NOT NULL DEFAULT 'I' COMMENT 'I:第三方系統入庫;D:處理中;F:處理完成;E:發生錯誤或異常', `deal_time` datetime DEFAULT NULL COMMENT '處理時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of student -- ---------------------------- BEGIN; INSERT INTO `student` VALUES (1, '張三', '男', '上海', '英語系', '2020-10-07 22:19:25', 'F', '2020-10-07 22:19:26'); INSERT INTO `student` VALUES (2, '李四', '女', '北京', '中文系', '2020-10-07 22:19:25', 'F', '2020-10-07 22:19:26'); INSERT INTO `student` VALUES (3, '王五', '男', '天津', '計算機系', '2020-10-07 22:19:25', 'F', '2020-10-07 22:19:26'); COMMIT; SET FOREIGN_KEY_CHECKS = 1;
/* Navicat Premium Data Transfer Source Server : localhost Source Server Type : MySQL Source Server Version : 50728 Source Host : localhost:3306 Source Schema : test Target Server Type : MySQL Target Server Version : 50728 File Encoding : 65001 Date: 07/10/2020 22:45:03 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for student -- ---------------------------- DROP TABLE IF EXISTS `student`; CREATE TABLE `student` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵', `name` varchar(255) DEFAULT NULL COMMENT '姓名', `sex` varchar(255) DEFAULT NULL COMMENT '性別', `department` varchar(255) DEFAULT NULL COMMENT '系', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; SET FOREIGN_KEY_CHECKS = 1;
連接: https://pan.baidu.com/s/1C7q7_QRUhRoCZIVZ_Bp3KQ 密碼: 7hbf
mainClass指定啓動包下的mainapache
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!-- 默認啓動 程序,mainClass指定啓動的main函數的報名 -->
<mainClass>com.cyb.start.Main</mainClass>
<layout>JAR</layout>
<addResources>true</addResources>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>