最近業務方有一個需求,須要一次導入超過100萬數據到系統數據庫。可能你們首先會想,這麼大的數據,幹嗎經過程序去實現導入,爲何不直接經過SQL導入到數據庫。html
大數據量報表導出請參考:Java實現大批量數據導入導出(100W以上) -(二)導出java
1、爲何必定要在代碼實現mysql
說說爲何不能經過SQL直接導入到數據庫,而是經過程序實現:spring
1. 首先,這個導入功能開始提供頁面導入,只是開始業務方保證的一次只有<3W的數據導入;sql
2. 其次,業務方導入的內容須要作校驗,好比門店號,商品號等是否系統存在,須要程序校驗;數據庫
3. 最後,業務方導入的都是編碼,數據庫中還要存入對應名稱,方便後期查詢,SQL導入也是沒法實現的。api
基於以上上三點,就沒法直接經過SQL語句導入數據庫。那就只能老老實實的想辦法經過程序實現。緩存
2、程序實現有如下技術難點服務器
1. 一次讀取這麼大的數據量,確定會致使服務器內存溢出;網絡
2. 調用接口保存一次傳輸數據量太大,網絡傳輸壓力會很大;
3. 最終經過SQL一次批量插入,對數據庫壓力也比較大,若是業務同時操做這個表數據,很容易形成死鎖。
3、解決思路
根據列舉的技術難點個人解決思路是:
1. 既然一次讀取整個導入文件,那就先將文件流上傳到服務器磁盤,而後分批從磁盤讀取(支持多線程讀取),這樣就防止內存溢出;
2. 調用插入數據庫接口也是根據分批讀取的內容進行調用;
3. 分批插入數據到數據庫。
4、具體實現代碼
1. 流式上傳文件到服務器磁盤
略,通常Java上傳就能夠實現,這裏就不貼出。
2. 多線程分批從磁盤讀取
批量讀取文件:
1 import org.slf4j.Logger; 2 import org.slf4j.LoggerFactory; 3 4 import java.io.File; 5 import java.io.FileNotFoundException; 6 import java.io.RandomAccessFile; 7 import java.nio.ByteBuffer; 8 import java.nio.channels.FileChannel; 9 10 /** 11 * 類功能描述:批量讀取文件 12 * 13 * @author WangXueXing create at 19-3-14 下午6:47 14 * @version 1.0.0 15 */ 16 public class BatchReadFile { 17 private final Logger LOGGER = LoggerFactory.getLogger(BatchReadFile.class); 18 /** 19 * 字符集UTF-8 20 */ 21 public static final String CHARSET_UTF8 = "UTF-8"; 22 /** 23 * 字符集GBK 24 */ 25 public static final String CHARSET_GBK = "GBK"; 26 /** 27 * 字符集gb2312 28 */ 29 public static final String CHARSET_GB2312 = "gb2312"; 30 /** 31 * 文件內容分割符-逗號 32 */ 33 public static final String SEPARATOR_COMMA = ","; 34 35 private int bufSize = 1024; 36 // 換行符 37 private byte key = "\n".getBytes()[0]; 38 // 當前行數 39 private long lineNum = 0; 40 // 文件編碼,默認爲gb2312 41 private String encode = CHARSET_GB2312; 42 // 具體業務邏輯監聽器 43 private ReaderFileListener readerListener; 44 45 public void setEncode(String encode) { 46 this.encode = encode; 47 } 48 49 public void setReaderListener(ReaderFileListener readerListener) { 50 this.readerListener = readerListener; 51 } 52 53 /** 54 * 獲取準確開始位置 55 * @param file 56 * @param position 57 * @return 58 * @throws Exception 59 */ 60 public long getStartNum(File file, long position) throws Exception { 61 long startNum = position; 62 FileChannel fcin = new RandomAccessFile(file, "r").getChannel(); 63 fcin.position(position); 64 try { 65 int cache = 1024; 66 ByteBuffer rBuffer = ByteBuffer.allocate(cache); 67 // 每次讀取的內容 68 byte[] bs = new byte[cache]; 69 // 緩存 70 byte[] tempBs = new byte[0]; 71 while (fcin.read(rBuffer) != -1) { 72 int rSize = rBuffer.position(); 73 rBuffer.rewind(); 74 rBuffer.get(bs); 75 rBuffer.clear(); 76 byte[] newStrByte = bs; 77 // 若是發現有上次未讀完的緩存,則將它加到當前讀取的內容前面 78 if (null != tempBs) { 79 int tL = tempBs.length; 80 newStrByte = new byte[rSize + tL]; 81 System.arraycopy(tempBs, 0, newStrByte, 0, tL); 82 System.arraycopy(bs, 0, newStrByte, tL, rSize); 83 } 84 // 獲取開始位置以後的第一個換行符 85 int endIndex = indexOf(newStrByte, 0); 86 if (endIndex != -1) { 87 return startNum + endIndex; 88 } 89 tempBs = substring(newStrByte, 0, newStrByte.length); 90 startNum += 1024; 91 } 92 } finally { 93 fcin.close(); 94 } 95 return position; 96 } 97 98 /** 99 * 從設置的開始位置讀取文件,一直到結束爲止。若是 end設置爲負數,剛讀取到文件末尾 100 * @param fullPath 101 * @param start 102 * @param end 103 * @throws Exception 104 */ 105 public void readFileByLine(String fullPath, long start, long end) throws Exception { 106 File fin = new File(fullPath); 107 if (!fin.exists()) { 108 throw new FileNotFoundException("沒有找到文件:" + fullPath); 109 } 110 FileChannel fileChannel = new RandomAccessFile(fin, "r").getChannel(); 111 fileChannel.position(start); 112 try { 113 ByteBuffer rBuffer = ByteBuffer.allocate(bufSize); 114 // 每次讀取的內容 115 byte[] bs = new byte[bufSize]; 116 // 緩存 117 byte[] tempBs = new byte[0]; 118 String line; 119 // 當前讀取文件位置 120 long nowCur = start; 121 while (fileChannel.read(rBuffer) != -1) { 122 int rSize = rBuffer.position(); 123 rBuffer.rewind(); 124 rBuffer.get(bs); 125 rBuffer.clear(); 126 byte[] newStrByte; 127 //去掉表頭 128 if(nowCur == start){ 129 int firstLineIndex = indexOf(bs, 0); 130 int newByteLenth = bs.length-firstLineIndex-1; 131 newStrByte = new byte[newByteLenth]; 132 System.arraycopy(bs, firstLineIndex+1, newStrByte, 0, newByteLenth); 133 } else { 134 newStrByte = bs; 135 } 136 137 // 若是發現有上次未讀完的緩存,則將它加到當前讀取的內容前面 138 if (null != tempBs && tempBs.length != 0) { 139 int tL = tempBs.length; 140 newStrByte = new byte[rSize + tL]; 141 System.arraycopy(tempBs, 0, newStrByte, 0, tL); 142 System.arraycopy(bs, 0, newStrByte, tL, rSize); 143 } 144 // 是否已經讀到最後一位 145 boolean isEnd = false; 146 nowCur += bufSize; 147 // 若是當前讀取的位數已經比設置的結束位置大的時候,將讀取的內容截取到設置的結束位置 148 if (end > 0 && nowCur > end) { 149 // 緩存長度 - 當前已經讀取位數 - 最後位數 150 int l = newStrByte.length - (int) (nowCur - end); 151 newStrByte = substring(newStrByte, 0, l); 152 isEnd = true; 153 } 154 int fromIndex = 0; 155 int endIndex = 0; 156 // 每次讀一行內容,以 key(默認爲\n) 做爲結束符 157 while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) { 158 byte[] bLine = substring(newStrByte, fromIndex, endIndex); 159 line = new String(bLine, 0, bLine.length, encode); 160 lineNum++; 161 // 輸出一行內容,處理方式由調用方提供 162 readerListener.outLine(line.trim(), lineNum, false); 163 fromIndex = endIndex + 1; 164 } 165 // 將未讀取完成的內容放到緩存中 166 tempBs = substring(newStrByte, fromIndex, newStrByte.length); 167 if (isEnd) { 168 break; 169 } 170 } 171 // 將剩下的最後內容做爲一行,輸出,並指明這是最後一行 172 String lineStr = new String(tempBs, 0, tempBs.length, encode); 173 readerListener.outLine(lineStr.trim(), lineNum, true); 174 } finally { 175 fileChannel.close(); 176 fin.deleteOnExit(); 177 } 178 } 179 180 /** 181 * 查找一個byte[]從指定位置以後的一個換行符位置 182 * 183 * @param src 184 * @param fromIndex 185 * @return 186 * @throws Exception 187 */ 188 private int indexOf(byte[] src, int fromIndex) throws Exception { 189 for (int i = fromIndex; i < src.length; i++) { 190 if (src[i] == key) { 191 return i; 192 } 193 } 194 return -1; 195 } 196 197 /** 198 * 從指定開始位置讀取一個byte[]直到指定結束位置爲止生成一個全新的byte[] 199 * 200 * @param src 201 * @param fromIndex 202 * @param endIndex 203 * @return 204 * @throws Exception 205 */ 206 private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception { 207 int size = endIndex - fromIndex; 208 byte[] ret = new byte[size]; 209 System.arraycopy(src, fromIndex, ret, 0, size); 210 return ret; 211 } 212 }
以上是關鍵代碼:利用FileChannel與ByteBuffer從磁盤中分批讀取數據
多線程調用批量讀取:
1 /** 2 * 類功能描述: 線程讀取文件 3 * 4 * @author WangXueXing create at 19-3-14 下午6:51 5 * @version 1.0.0 6 */ 7 public class ReadFileThread extends Thread { 8 private ReaderFileListener processDataListeners; 9 private String filePath; 10 private long start; 11 private long end; 12 private Thread preThread; 13 14 public ReadFileThread(ReaderFileListener processDataListeners, 15 long start,long end, 16 String file) { 17 this(processDataListeners, start, end, file, null); 18 } 19 20 public ReadFileThread(ReaderFileListener processDataListeners, 21 long start,long end, 22 String file, 23 Thread preThread) { 24 this.setName(this.getName()+"-ReadFileThread"); 25 this.start = start; 26 this.end = end; 27 this.filePath = file; 28 this.processDataListeners = processDataListeners; 29 this.preThread = preThread; 30 } 31 32 @Override 33 public void run() { 34 BatchReadFile readFile = new BatchReadFile(); 35 readFile.setReaderListener(processDataListeners); 36 readFile.setEncode(processDataListeners.getEncode()); 37 try { 38 readFile.readFileByLine(filePath, start, end + 1); 39 if(this.preThread != null){ 40 this.preThread.join(); 41 } 42 } catch (Exception e) { 43 throw new RuntimeException(e); 44 } 45 } 46 }
監聽讀取:
1 import java.util.ArrayList; 2 import java.util.List; 3 4 /** 5 * 類功能描述:讀文件監聽父類 6 * 7 * @author WangXueXing create at 19-3-14 下午6:52 8 * @version 1.0.0 9 */ 10 public abstract class ReaderFileListener<T> { 11 // 一次讀取行數,默認爲1000 12 private int readColNum = 1000; 13 14 /** 15 * 文件編碼 16 */ 17 private String encode; 18 19 /** 20 * 分批讀取行列表 21 */ 22 private List<String> rowList = new ArrayList<>(); 23 24 /** 25 *其餘參數 26 */ 27 private T otherParams; 28 29 /** 30 * 每讀取到一行數據,添加到緩存中 31 * @param lineStr 讀取到的數據 32 * @param lineNum 行號 33 * @param over 是否讀取完成 34 * @throws Exception 35 */ 36 public void outLine(String lineStr, long lineNum, boolean over) throws Exception { 37 if(null != lineStr && !lineStr.trim().equals("")){ 38 rowList.add(lineStr); 39 } 40 41 if (!over && (lineNum % readColNum == 0)) { 42 output(rowList); 43 rowList = new ArrayList<>(); 44 } else if (over) { 45 output(rowList); 46 rowList = new ArrayList<>(); 47 } 48 } 49 50 /** 51 * 批量輸出 52 * 53 * @param stringList 54 * @throws Exception 55 */ 56 public abstract void output(List<String> stringList) throws Exception; 57 58 /** 59 * 設置一次讀取行數 60 * @param readColNum 61 */ 62 protected void setReadColNum(int readColNum) { 63 this.readColNum = readColNum; 64 } 65 66 public String getEncode() { 67 return encode; 68 } 69 70 public void setEncode(String encode) { 71 this.encode = encode; 72 } 73 74 public T getOtherParams() { 75 return otherParams; 76 } 77 78 public void setOtherParams(T otherParams) { 79 this.otherParams = otherParams; 80 } 81 82 public List<String> getRowList() { 83 return rowList; 84 } 85 86 public void setRowList(List<String> rowList) { 87 this.rowList = rowList; 88 } 89 }
實現監聽讀取並分批調用插入數據接口:
1 import com.today.api.finance.ImportServiceClient; 2 import com.today.api.finance.request.ImportRequest; 3 import com.today.api.finance.response.ImportResponse; 4 import com.today.api.finance.service.ImportService; 5 import com.today.common.Constants; 6 import com.today.domain.StaffSimpInfo; 7 import com.today.util.EmailUtil; 8 import com.today.util.UserSessionHelper; 9 import com.today.util.readfile.ReadFile; 10 import com.today.util.readfile.ReadFileThread; 11 import com.today.util.readfile.ReaderFileListener; 12 import org.slf4j.Logger; 13 import org.slf4j.LoggerFactory; 14 import org.springframework.beans.factory.annotation.Value; 15 import org.springframework.stereotype.Service; 16 import org.springframework.util.StringUtils; 17 18 import java.io.File; 19 import java.io.FileInputStream; 20 import java.util.ArrayList; 21 import java.util.Arrays; 22 import java.util.List; 23 import java.util.concurrent.FutureTask; 24 import java.util.stream.Collectors; 25 26 /** 27 * 類功能描述:報表導入服務實現 28 * 29 * @author WangXueXing create at 19-3-19 下午1:43 30 * @version 1.0.0 31 */ 32 @Service 33 public class ImportReportServiceImpl extends ReaderFileListener<ImportRequest> { 34 private final Logger LOGGER = LoggerFactory.getLogger(ImportReportServiceImpl.class); 35 @Value("${READ_COL_NUM_ONCE}") 36 private String readColNum; 37 @Value("${REPORT_IMPORT_RECEIVER}") 38 private String reportImportReceiver; 39 /** 40 * 財務報表導入接口 41 */ 42 private ImportService service = new ImportServiceClient(); 43 44 /** 45 * 讀取文件內容 46 * @param file 47 */ 48 public void readTxt(File file, ImportRequest importRequest) throws Exception { 49 this.setOtherParams(importRequest); 50 ReadFile readFile = new ReadFile(); 51 try(FileInputStream fis = new FileInputStream(file)){ 52 int available = fis.available(); 53 long maxThreadNum = 3L; 54 // 線程粗略開始位置 55 long i = available / maxThreadNum; 56 57 this.setRowList(new ArrayList<>()); 58 StaffSimpInfo staffSimpInfo = ((StaffSimpInfo)UserSessionHelper.getCurrentUserInfo().getData()); 59 String finalReportReceiver = getEmail(staffSimpInfo.getEmail(), reportImportReceiver); 60 this.setReadColNum(Integer.parseInt(readColNum)); 61 this.setEncode(ReadFile.CHARSET_GB2312); 62 //這裏單獨使用一個線程是爲了當maxThreadNum大於1的時候,統一管理這些線程 63 new Thread(()->{ 64 Thread preThread = null; 65 FutureTask futureTask = null ; 66 try { 67 for (long j = 0; j < maxThreadNum; j++) { 68 //計算精確開始位置 69 long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j); 70 long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2L; 71 72 //具體監聽實現 73 preThread = new ReadFileThread(this, startNum, endNum, file.getPath(), preThread); 74 futureTask = new FutureTask(preThread, new Object()); 75 futureTask.run(); 76 } 77 if(futureTask.get() != null) { 78 EmailUtil.sendEmail(EmailUtil.REPORT_IMPORT_EMAIL_PREFIX, finalReportReceiver, "導入報表成功", "導入報表成功" ); //todo 等文案 79 } 80 } catch (Exception e){ 81 futureTask.cancel(true); 82 try { 83 EmailUtil.sendEmail(EmailUtil.REPORT_IMPORT_EMAIL_PREFIX, finalReportReceiver, "導入報表失敗", e.getMessage()); 84 } catch (Exception e1){ 85 //ignore 86 LOGGER.error("發送郵件失敗", e1); 87 } 88 LOGGER.error("導入報表類型:"+importRequest.getReportType()+"失敗", e); 89 } finally { 90 futureTask.cancel(true); 91 } 92 }).start(); 93 } 94 } 95 96 private String getEmail(String infoEmail, String reportImportReceiver){ 97 if(StringUtils.isEmpty(infoEmail)){ 98 return reportImportReceiver; 99 } 100 return infoEmail; 101 } 102 103 /** 104 * 每批次調用導入接口 105 * @param stringList 106 * @throws Exception 107 */ 108 @Override 109 public void output(List<String> stringList) throws Exception { 110 ImportRequest importRequest = this.getOtherParams(); 111 List<List<String>> dataList = stringList.stream() 112 .map(x->Arrays.asList(x.split(ReadFile.SEPARATOR_COMMA)).stream().map(String::trim).collect(Collectors.toList())) 113 .collect(Collectors.toList()); 114 LOGGER.info("上傳數據:{}", dataList); 115 importRequest.setDataList(dataList); 116 // LOGGER.info("request對象:{}",importRequest, "request增長請求字段:{}", importRequest.data); 117 ImportResponse importResponse = service.batchImport(importRequest); 118 LOGGER.info("===========SUCESS_CODE======="+importResponse.getCode()); 119 //導入錯誤,輸出錯誤信息 120 if(!Constants.SUCESS_CODE.equals(importResponse.getCode())){ 121 LOGGER.error("導入報表類型:"+importRequest.getReportType()+"失敗","返回碼爲:", importResponse.getCode() ,"返回信息:",importResponse.getMessage()); 122 throw new RuntimeException("導入報表類型:"+importRequest.getReportType()+"失敗"+"返回碼爲:"+ importResponse.getCode() +"返回信息:"+importResponse.getMessage()); 123 } 124 // if(importResponse.data != null && importResponse.data.get().get("batchImportFlag")!=null) { 125 // LOGGER.info("eywa-service請求batchImportFlag不爲空"); 126 // } 127 importRequest.setData(importResponse.data); 128 129 } 130 }
注意:
第53行代碼:
long maxThreadNum = 3L;
就是設置分批讀取磁盤文件的線程數,我設置爲3,你們不要設置太大,否則多個線程讀取到內存,也會形成服務器內存溢出。
以上全部批次的批量讀取並調用插入接口都成功發送郵件通知給導入人,任何一個批次失敗直接發送失敗郵件。
數據庫分批插入數據:
1 /** 2 * 批量插入非聯機第三方導入帳單 3 * @param dataList 4 */ 5 def insertNonOnlinePayment(dataList: List[NonOnlineSourceData]) : Unit = { 6 if (dataList.nonEmpty) { 7 CheckAccountDataSource.mysqlData.withConnection { conn => 8 val sql = 9 s""" INSERT INTO t_pay_source_data 10 (store_code, 11 store_name, 12 source_date, 13 order_type, 14 trade_type, 15 third_party_payment_no, 16 business_type, 17 business_amount, 18 trade_time, 19 created_at, 20 updated_at) 21 VALUES (?,?,?,?,?,?,?,?,?,NOW(),NOW())""" 22 23 conn.setAutoCommit(false) 24 var stmt = conn.prepareStatement(sql) 25 var i = 0 26 dataList.foreach { x => 27 stmt.setString(1, x.storeCode) 28 stmt.setString(2, x.storeName) 29 stmt.setString(3, x.sourceDate) 30 stmt.setInt(4, x.orderType) 31 stmt.setInt(5, x.tradeType) 32 stmt.setString(6, x.tradeNo) 33 stmt.setInt(7, x.businessType) 34 stmt.setBigDecimal(8, x.businessAmount.underlying()) 35 stmt.setString(9, x.tradeTime.getOrElse(null)) 36 stmt.addBatch() 37 if ((i % 5000 == 0) && (i != 0)) { //分批提交 38 stmt.executeBatch 39 conn.commit 40 conn.setAutoCommit(false) 41 stmt = conn.prepareStatement(sql) 42 43 } 44 i += 1 45 } 46 stmt.executeBatch() 47 conn.commit() 48 } 49 } 50 }
以上代碼實現每5000 行提交一次批量插入,防止一次提較數據庫的壓力。
以上,若是你們有更好方案,請留言。