Java實現大批量數據導入導出(100W以上) -(一)導入

最近業務方有一個需求,須要一次導入超過100萬數據到系統數據庫。可能你們首先會想,這麼大的數據,幹嗎經過程序去實現導入,爲何不直接經過SQL導入到數據庫。html

大數據量報表導出請參考:Java實現大批量數據導入導出(100W以上) -(二)導出java

1、爲何必定要在代碼實現mysql

說說爲何不能經過SQL直接導入到數據庫,而是經過程序實現:spring

1. 首先,這個導入功能開始提供頁面導入,只是開始業務方保證的一次只有<3W的數據導入;sql

2. 其次,業務方導入的內容須要作校驗,好比門店號,商品號等是否系統存在,須要程序校驗;數據庫

3. 最後,業務方導入的都是編碼,數據庫中還要存入對應名稱,方便後期查詢,SQL導入也是沒法實現的。api

基於以上上三點,就沒法直接經過SQL語句導入數據庫。那就只能老老實實的想辦法經過程序實現。緩存

2、程序實現有如下技術難點服務器

1. 一次讀取這麼大的數據量,確定會致使服務器內存溢出;網絡

2. 調用接口保存一次傳輸數據量太大,網絡傳輸壓力會很大;

3. 最終經過SQL一次批量插入,對數據庫壓力也比較大,若是業務同時操做這個表數據,很容易形成死鎖。

3、解決思路

根據列舉的技術難點個人解決思路是:

1. 既然一次讀取整個導入文件,那就先將文件流上傳到服務器磁盤,而後分批從磁盤讀取(支持多線程讀取),這樣就防止內存溢出;

2. 調用插入數據庫接口也是根據分批讀取的內容進行調用;

3. 分批插入數據到數據庫。

4、具體實現代碼

1. 流式上傳文件到服務器磁盤

 略,通常Java上傳就能夠實現,這裏就不貼出。

2. 多線程分批從磁盤讀取

批量讀取文件:

 1 import org.slf4j.Logger;  2 import org.slf4j.LoggerFactory;  3 
 4 import java.io.File;  5 import java.io.FileNotFoundException;  6 import java.io.RandomAccessFile;  7 import java.nio.ByteBuffer;  8 import java.nio.channels.FileChannel;  9 
 10 /**
 11  * 類功能描述:批量讀取文件  12  *  13  * @author WangXueXing create at 19-3-14 下午6:47  14  * @version 1.0.0  15  */
 16 public class BatchReadFile {  17     private final Logger LOGGER = LoggerFactory.getLogger(BatchReadFile.class);  18     /**
 19  * 字符集UTF-8  20      */
 21     public static final String CHARSET_UTF8 = "UTF-8";  22     /**
 23  * 字符集GBK  24      */
 25     public static final String CHARSET_GBK = "GBK";  26     /**
 27  * 字符集gb2312  28      */
 29     public static final String CHARSET_GB2312 = "gb2312";  30     /**
 31  * 文件內容分割符-逗號  32      */
 33     public static final String SEPARATOR_COMMA = ",";  34 
 35     private int bufSize = 1024;  36     // 換行符
 37     private byte key = "\n".getBytes()[0];  38     // 當前行數
 39     private long lineNum = 0;  40     // 文件編碼,默認爲gb2312
 41     private String encode = CHARSET_GB2312;  42     // 具體業務邏輯監聽器
 43     private ReaderFileListener readerListener;  44 
 45     public void setEncode(String encode) {  46         this.encode = encode;  47  }  48 
 49     public void setReaderListener(ReaderFileListener readerListener) {  50         this.readerListener = readerListener;  51  }  52 
 53     /**
 54  * 獲取準確開始位置  55  * @param file  56  * @param position  57  * @return
 58  * @throws Exception  59      */
 60     public long getStartNum(File file, long position) throws Exception {  61         long startNum = position;  62         FileChannel fcin = new RandomAccessFile(file, "r").getChannel();  63  fcin.position(position);  64         try {  65             int cache = 1024;  66             ByteBuffer rBuffer = ByteBuffer.allocate(cache);  67             // 每次讀取的內容
 68             byte[] bs = new byte[cache];  69             // 緩存
 70             byte[] tempBs = new byte[0];  71             while (fcin.read(rBuffer) != -1) {  72                 int rSize = rBuffer.position();  73  rBuffer.rewind();  74  rBuffer.get(bs);  75  rBuffer.clear();  76                 byte[] newStrByte = bs;  77                 // 若是發現有上次未讀完的緩存,則將它加到當前讀取的內容前面
 78                 if (null != tempBs) {  79                     int tL = tempBs.length;  80                     newStrByte = new byte[rSize + tL];  81                     System.arraycopy(tempBs, 0, newStrByte, 0, tL);  82                     System.arraycopy(bs, 0, newStrByte, tL, rSize);  83  }  84                 // 獲取開始位置以後的第一個換行符
 85                 int endIndex = indexOf(newStrByte, 0);  86                 if (endIndex != -1) {  87                     return startNum + endIndex;  88  }  89                 tempBs = substring(newStrByte, 0, newStrByte.length);  90                 startNum += 1024;  91  }  92         } finally {  93  fcin.close();  94  }  95         return position;  96  }  97 
 98     /**
 99  * 從設置的開始位置讀取文件,一直到結束爲止。若是 end設置爲負數,剛讀取到文件末尾 100  * @param fullPath 101  * @param start 102  * @param end 103  * @throws Exception 104      */
105     public void readFileByLine(String fullPath, long start, long end) throws Exception { 106         File fin = new File(fullPath); 107         if (!fin.exists()) { 108             throw new FileNotFoundException("沒有找到文件:" + fullPath); 109  } 110         FileChannel fileChannel = new RandomAccessFile(fin, "r").getChannel(); 111  fileChannel.position(start); 112         try { 113             ByteBuffer rBuffer = ByteBuffer.allocate(bufSize); 114             // 每次讀取的內容
115             byte[] bs = new byte[bufSize]; 116             // 緩存
117             byte[] tempBs = new byte[0]; 118  String line; 119             // 當前讀取文件位置
120             long nowCur = start; 121             while (fileChannel.read(rBuffer) != -1) { 122                 int rSize = rBuffer.position(); 123  rBuffer.rewind(); 124  rBuffer.get(bs); 125  rBuffer.clear(); 126                 byte[] newStrByte; 127                 //去掉表頭
128                 if(nowCur == start){ 129                     int firstLineIndex = indexOf(bs, 0); 130                     int newByteLenth = bs.length-firstLineIndex-1; 131                     newStrByte = new byte[newByteLenth]; 132                     System.arraycopy(bs, firstLineIndex+1, newStrByte, 0, newByteLenth); 133                 } else { 134                     newStrByte = bs; 135  } 136 
137                 // 若是發現有上次未讀完的緩存,則將它加到當前讀取的內容前面
138                 if (null != tempBs && tempBs.length != 0) { 139                     int tL = tempBs.length; 140                     newStrByte = new byte[rSize + tL]; 141                     System.arraycopy(tempBs, 0, newStrByte, 0, tL); 142                     System.arraycopy(bs, 0, newStrByte, tL, rSize); 143  } 144                 // 是否已經讀到最後一位
145                 boolean isEnd = false; 146                 nowCur += bufSize; 147                 // 若是當前讀取的位數已經比設置的結束位置大的時候,將讀取的內容截取到設置的結束位置
148                 if (end > 0 && nowCur > end) { 149                     // 緩存長度 - 當前已經讀取位數 - 最後位數
150                     int l = newStrByte.length - (int) (nowCur - end); 151                     newStrByte = substring(newStrByte, 0, l); 152                     isEnd = true; 153  } 154                 int fromIndex = 0; 155                 int endIndex = 0; 156                 // 每次讀一行內容,以 key(默認爲\n) 做爲結束符
157                 while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) { 158                     byte[] bLine = substring(newStrByte, fromIndex, endIndex); 159                     line = new String(bLine, 0, bLine.length, encode); 160                     lineNum++; 161                     // 輸出一行內容,處理方式由調用方提供
162                     readerListener.outLine(line.trim(), lineNum, false); 163                     fromIndex = endIndex + 1; 164  } 165                 // 將未讀取完成的內容放到緩存中
166                 tempBs = substring(newStrByte, fromIndex, newStrByte.length); 167                 if (isEnd) { 168                     break; 169  } 170  } 171             // 將剩下的最後內容做爲一行,輸出,並指明這是最後一行
172             String lineStr = new String(tempBs, 0, tempBs.length, encode); 173             readerListener.outLine(lineStr.trim(), lineNum, true); 174         } finally { 175  fileChannel.close(); 176  fin.deleteOnExit(); 177  } 178  } 179 
180     /**
181  * 查找一個byte[]從指定位置以後的一個換行符位置 182  * 183  * @param src 184  * @param fromIndex 185  * @return
186  * @throws Exception 187      */
188     private int indexOf(byte[] src, int fromIndex) throws Exception { 189         for (int i = fromIndex; i < src.length; i++) { 190             if (src[i] == key) { 191                 return i; 192  } 193  } 194         return -1; 195  } 196 
197     /**
198  * 從指定開始位置讀取一個byte[]直到指定結束位置爲止生成一個全新的byte[] 199  * 200  * @param src 201  * @param fromIndex 202  * @param endIndex 203  * @return
204  * @throws Exception 205      */
206     private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception { 207         int size = endIndex - fromIndex; 208         byte[] ret = new byte[size]; 209         System.arraycopy(src, fromIndex, ret, 0, size); 210         return ret; 211  } 212 }

 

以上是關鍵代碼:利用FileChannel與ByteBuffer從磁盤中分批讀取數據

 

多線程調用批量讀取:

 1 /**
 2  * 類功能描述: 線程讀取文件  3  *  4  * @author WangXueXing create at 19-3-14 下午6:51  5  * @version 1.0.0  6  */
 7 public class ReadFileThread extends Thread {  8     private ReaderFileListener processDataListeners;  9     private String filePath; 10     private long start; 11     private long end; 12     private Thread preThread; 13 
14     public ReadFileThread(ReaderFileListener processDataListeners, 15                           long start,long end, 16  String file) { 17         this(processDataListeners, start, end, file, null); 18  } 19 
20     public ReadFileThread(ReaderFileListener processDataListeners, 21                           long start,long end, 22  String file, 23  Thread preThread) { 24         this.setName(this.getName()+"-ReadFileThread"); 25         this.start = start; 26         this.end = end; 27         this.filePath = file; 28         this.processDataListeners = processDataListeners; 29         this.preThread = preThread; 30  } 31 
32  @Override 33     public void run() { 34         BatchReadFile readFile = new BatchReadFile(); 35  readFile.setReaderListener(processDataListeners); 36  readFile.setEncode(processDataListeners.getEncode()); 37         try { 38             readFile.readFileByLine(filePath, start, end + 1); 39             if(this.preThread != null){ 40                 this.preThread.join(); 41  } 42         } catch (Exception e) { 43            throw new RuntimeException(e); 44  } 45  } 46 }

 

監聽讀取:

 1 import java.util.ArrayList;  2 import java.util.List;  3 
 4 /**
 5  * 類功能描述:讀文件監聽父類  6  *  7  * @author WangXueXing create at 19-3-14 下午6:52  8  * @version 1.0.0  9  */
10 public abstract class ReaderFileListener<T> { 11     // 一次讀取行數,默認爲1000
12     private int readColNum = 1000; 13 
14     /**
15  * 文件編碼 16      */
17     private String encode; 18 
19     /**
20  * 分批讀取行列表 21      */
22     private List<String> rowList = new ArrayList<>(); 23 
24     /**
25  *其餘參數 26      */
27     private T otherParams; 28 
29     /**
30  * 每讀取到一行數據,添加到緩存中 31  * @param lineStr 讀取到的數據 32  * @param lineNum 行號 33  * @param over 是否讀取完成 34  * @throws Exception 35      */
36     public void outLine(String lineStr, long lineNum, boolean over) throws Exception { 37         if(null != lineStr && !lineStr.trim().equals("")){ 38  rowList.add(lineStr); 39  } 40 
41         if (!over && (lineNum % readColNum == 0)) { 42  output(rowList); 43             rowList = new ArrayList<>(); 44         } else if (over) { 45  output(rowList); 46             rowList = new ArrayList<>(); 47  } 48  } 49 
50     /**
51  * 批量輸出 52  * 53  * @param stringList 54  * @throws Exception 55      */
56     public abstract void output(List<String> stringList) throws Exception; 57 
58     /**
59  * 設置一次讀取行數 60  * @param readColNum 61      */
62     protected void setReadColNum(int readColNum) { 63         this.readColNum = readColNum; 64  } 65 
66     public String getEncode() { 67         return encode; 68  } 69 
70     public void setEncode(String encode) { 71         this.encode = encode; 72  } 73 
74     public T getOtherParams() { 75         return otherParams; 76  } 77 
78     public void setOtherParams(T otherParams) { 79         this.otherParams = otherParams; 80  } 81 
82     public List<String> getRowList() { 83         return rowList; 84  } 85 
86     public void setRowList(List<String> rowList) { 87         this.rowList = rowList; 88  } 89 }

 

實現監聽讀取並分批調用插入數據接口:

 1 import com.today.api.finance.ImportServiceClient;  2 import com.today.api.finance.request.ImportRequest;  3 import com.today.api.finance.response.ImportResponse;  4 import com.today.api.finance.service.ImportService;  5 import com.today.common.Constants;  6 import com.today.domain.StaffSimpInfo;  7 import com.today.util.EmailUtil;  8 import com.today.util.UserSessionHelper;  9 import com.today.util.readfile.ReadFile;  10 import com.today.util.readfile.ReadFileThread;  11 import com.today.util.readfile.ReaderFileListener;  12 import org.slf4j.Logger;  13 import org.slf4j.LoggerFactory;  14 import org.springframework.beans.factory.annotation.Value;  15 import org.springframework.stereotype.Service;  16 import org.springframework.util.StringUtils;  17 
 18 import java.io.File;  19 import java.io.FileInputStream;  20 import java.util.ArrayList;  21 import java.util.Arrays;  22 import java.util.List;  23 import java.util.concurrent.FutureTask;  24 import java.util.stream.Collectors;  25 
 26 /**
 27  * 類功能描述:報表導入服務實現  28  *  29  * @author WangXueXing create at 19-3-19 下午1:43  30  * @version 1.0.0  31  */
 32 @Service  33 public class ImportReportServiceImpl extends ReaderFileListener<ImportRequest> {  34     private final Logger LOGGER = LoggerFactory.getLogger(ImportReportServiceImpl.class);  35     @Value("${READ_COL_NUM_ONCE}")  36     private String readColNum;  37     @Value("${REPORT_IMPORT_RECEIVER}")  38     private String reportImportReceiver;  39     /**
 40  * 財務報表導入接口  41      */
 42     private ImportService service = new ImportServiceClient();  43 
 44     /**
 45  * 讀取文件內容  46  * @param file  47      */
 48     public void readTxt(File file, ImportRequest importRequest) throws Exception {  49         this.setOtherParams(importRequest);  50         ReadFile readFile = new ReadFile();  51         try(FileInputStream fis = new FileInputStream(file)){  52             int available = fis.available();  53             long maxThreadNum = 3L;  54             // 線程粗略開始位置
 55             long i = available / maxThreadNum;  56 
 57             this.setRowList(new ArrayList<>());  58             StaffSimpInfo staffSimpInfo = ((StaffSimpInfo)UserSessionHelper.getCurrentUserInfo().getData());  59             String finalReportReceiver = getEmail(staffSimpInfo.getEmail(), reportImportReceiver);  60             this.setReadColNum(Integer.parseInt(readColNum));  61             this.setEncode(ReadFile.CHARSET_GB2312);  62             //這裏單獨使用一個線程是爲了當maxThreadNum大於1的時候,統一管理這些線程
 63             new Thread(()->{  64                 Thread preThread = null;  65                 FutureTask futureTask = null ;  66                 try {  67                     for (long j = 0; j < maxThreadNum; j++) {  68                         //計算精確開始位置
 69                         long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);  70                         long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2L;  71 
 72                         //具體監聽實現
 73                         preThread = new ReadFileThread(this, startNum, endNum, file.getPath(), preThread);  74                         futureTask = new FutureTask(preThread, new Object());  75  futureTask.run();  76  }  77                     if(futureTask.get() != null) {  78                         EmailUtil.sendEmail(EmailUtil.REPORT_IMPORT_EMAIL_PREFIX, finalReportReceiver, "導入報表成功", "導入報表成功" );  //todo 等文案
 79  }  80                 } catch (Exception e){  81                     futureTask.cancel(true);  82                     try {  83                         EmailUtil.sendEmail(EmailUtil.REPORT_IMPORT_EMAIL_PREFIX, finalReportReceiver, "導入報表失敗", e.getMessage());  84                     } catch (Exception e1){  85                         //ignore
 86                         LOGGER.error("發送郵件失敗", e1);  87  }  88                     LOGGER.error("導入報表類型:"+importRequest.getReportType()+"失敗", e);  89                 } finally {  90                     futureTask.cancel(true);  91  }  92  }).start();  93  }  94  }  95 
 96     private String getEmail(String infoEmail, String reportImportReceiver){  97         if(StringUtils.isEmpty(infoEmail)){  98             return reportImportReceiver;  99  } 100         return infoEmail; 101  } 102 
103     /**
104  * 每批次調用導入接口 105  * @param stringList 106  * @throws Exception 107      */
108  @Override 109     public void output(List<String> stringList) throws Exception { 110         ImportRequest importRequest = this.getOtherParams(); 111         List<List<String>> dataList = stringList.stream() 112                                                 .map(x->Arrays.asList(x.split(ReadFile.SEPARATOR_COMMA)).stream().map(String::trim).collect(Collectors.toList())) 113  .collect(Collectors.toList()); 114         LOGGER.info("上傳數據:{}", dataList); 115  importRequest.setDataList(dataList); 116 // LOGGER.info("request對象:{}",importRequest, "request增長請求字段:{}", importRequest.data);
117         ImportResponse importResponse = service.batchImport(importRequest); 118         LOGGER.info("===========SUCESS_CODE======="+importResponse.getCode()); 119         //導入錯誤,輸出錯誤信息
120         if(!Constants.SUCESS_CODE.equals(importResponse.getCode())){ 121             LOGGER.error("導入報表類型:"+importRequest.getReportType()+"失敗","返回碼爲:", importResponse.getCode() ,"返回信息:",importResponse.getMessage()); 122             throw new RuntimeException("導入報表類型:"+importRequest.getReportType()+"失敗"+"返回碼爲:"+ importResponse.getCode() +"返回信息:"+importResponse.getMessage()); 123  } 124 // if(importResponse.data != null && importResponse.data.get().get("batchImportFlag")!=null) { 125 // LOGGER.info("eywa-service請求batchImportFlag不爲空"); 126 // }
127  importRequest.setData(importResponse.data); 128 
129  } 130 }

 

注意:
第53行代碼:

long maxThreadNum = 3L;
就是設置分批讀取磁盤文件的線程數,我設置爲3,你們不要設置太大,否則多個線程讀取到內存,也會形成服務器內存溢出。

以上全部批次的批量讀取並調用插入接口都成功發送郵件通知給導入人,任何一個批次失敗直接發送失敗郵件。

 

數據庫分批插入數據:

 1   /**
 2  * 批量插入非聯機第三方導入帳單  3  * @param dataList  4     */
 5   def insertNonOnlinePayment(dataList: List[NonOnlineSourceData]) : Unit = {  6     if (dataList.nonEmpty) {  7       CheckAccountDataSource.mysqlData.withConnection { conn =>
 8         val sql =
 9           s""" INSERT INTO t_pay_source_data
10  (store_code, 11  store_name, 12  source_date, 13  order_type, 14  trade_type, 15  third_party_payment_no, 16  business_type, 17  business_amount, 18  trade_time, 19  created_at, 20  updated_at) 21               VALUES (?,?,?,?,?,?,?,?,?,NOW(),NOW())""" 22 
23         conn.setAutoCommit(false) 24         var stmt = conn.prepareStatement(sql) 25         var i = 0
26         dataList.foreach { x =>
27           stmt.setString(1, x.storeCode) 28           stmt.setString(2, x.storeName) 29           stmt.setString(3, x.sourceDate) 30           stmt.setInt(4, x.orderType) 31           stmt.setInt(5, x.tradeType) 32           stmt.setString(6, x.tradeNo) 33           stmt.setInt(7, x.businessType) 34           stmt.setBigDecimal(8, x.businessAmount.underlying()) 35           stmt.setString(9, x.tradeTime.getOrElse(null)) 36  stmt.addBatch() 37           if ((i % 5000 == 0) && (i != 0)) { //分批提交
38  stmt.executeBatch 39  conn.commit 40             conn.setAutoCommit(false) 41             stmt = conn.prepareStatement(sql) 42 
43  } 44           i += 1
45  } 46  stmt.executeBatch() 47  conn.commit() 48  } 49  } 50   }

 

 以上代碼實現每5000 行提交一次批量插入,防止一次提較數據庫的壓力。

 以上,若是你們有更好方案,請留言。

相關文章
相關標籤/搜索