Super CSV 線程池併發處理大批量數據

Super CSV是一個用於處理CSV文件的Java開源項目。它徹底圍繞面向對象的思想進行設計,所以能夠利用你的面向對象代碼來使得處理CSV文件變得更加簡易。它支持輸入/輸出類型轉換、數據完整性校驗,支持從任何地方以任何編碼讀寫數據,只要提供相應的Reader與Writer對象。可配置分割符,空格符號和行結束符等。 java

1、下面先來看簡單數據處理
引入依賴包:spring

<dependency>
    <groupId>net.sf.supercsv</groupId>
    <artifactId>super-csv</artifactId>
    <version>2.4.0</version>
</dependency>

下面來看一下官方文檔中的代碼示例。 sql

  1. 根據頭來讀取CSV文件
    把文件中的每行記錄讀取出來轉化爲java對象,假設你有一個UserBean類,代碼以下:
import java.util.Date;

public class UserBean {
    int id;
    Date date;
    String username, password, town;
    int zip;

    public Date getDate() {return date;}
    public void setDate(Date date) {this.date = date;}
    public int getId() { return id;}
    public String getPassword() { return password; }
    public String getTown() { return town; }
    public String getUsername() { return username; }
    public int getZip() { return zip; }
    public void setId(int id) { this.id = id; }
    public void setPassword(String password) { this.password = password; }
    public void setTown(String town) { this.town = town; }
    public void setUsername(String username) { this.username = username; }
    public void setZip(int zip) { this.zip = zip; }
}

而且有一個CSV文件,包含一個文件頭,假設文件內容以下:
id,username,password,date,zip,town
1,Klaus,qwexyKiks,17/1/2007,1111,New York
2,Oufud,bobilop213,10/10/2007,4555,New York
3,Oufud1,bobilop213,10/10/2007,4555,New York
4,Oufud2,bobilop213,10/10/2007,4555,New York
5,Oufud3,bobilop213,10/10/2007,4555,New York
6,Oufud4,bobilop213,10/10/2007,4555,New York
7,Oufud5,bobilop213,10/10/2007,4555,New York
8,Oufud6,bobilop213,10/10/2007,4555,New York
9,Oufud7,bobilop213,10/10/2007,4555,New York
10,Oufud8,bobilop213,10/10/2007,4555,New York
11,Oufud9,bobilop213,10/10/2007,4555,New York
12,Oufud10,bobilop213,10/10/2007,4555,New York
13,Oufud11,bobilop213,10/10/2007,4555,New York
14,Oufud12,bobilop213,10/10/2007,4555,New York
15,Oufud13,bobilop213,10/10/2007,4555,New York數據庫

而後你能夠使用一下代碼來建立UserBean的實例對象,並打印出對象的屬性值:
class ReadingObjects {  
        public static void main(String[] args) throws Exception{  
            ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE);  
            try {  
                final String[] header = inFile.getCSVHeader(true);  
                UserBean user;  
                while( (user = inFile.read(UserBean.class, header, processors)) != null) {  
                    System.out.println(user.getZip());  
                }  
            } finally {  
                inFile.close();  
            }  
        }  
    }

咱們還剩下processors沒有定義,經過名字咱們能夠看出是解析器,用來處理每列的數據,固然你也能夠傳入null,表示該列不作特殊處理,每一個解析器能夠被另一個包含在內部,new Unique(new StrMinMax(5,20)),這個代碼該列的值爲惟一的,而且長度爲8到20,具體處理細節咱們先不講,來看一下咱們所須要的processors是如何定義的:數組

final CellProcessor[] processors = new CellProcessor[] {  
        new Unique(new ParseInt()),
        new Unique(new StrMinMax(5, 20)),  
        new StrMinMax(8, 35),  
        new ParseDate("dd/MM/yyyy"),  
        new Optional(new ParseInt()),  
        null  
    };

上面的代碼的具體意思爲:
第一列是一個字符串,而且值是惟一的,長度爲5到20
第二列是一個字符串,長度是8到35
第三列爲一個日期類型,格式爲天/月/年(day/month/year)
第四列是一個整型數字,但只有這列有值的時候ParseInt處理器纔會去處理這個值(其實就是該列能夠爲空)
第五列爲一個字符串(默認),不使用處理器 併發

若是你的CSV文件沒有頭,你也能夠定義個數組來替代:ide

final String[] header = new String[] { "id",」"username", "password", "date", "zip", "town"};

若是你想忽略某一列,和定義處理器相似,直接在頭數組中使用null。 函數

所有代碼以下: 大數據

import Java.io.FileReader;  
import Java.io.IOException;  
import org.supercsv.cellprocessor.Optional;  
import org.supercsv.cellprocessor.ParseDate;  
import org.supercsv.cellprocessor.ParseInt;  
import org.supercsv.cellprocessor.constraint.StrMinMax;  
import org.supercsv.cellprocessor.constraint.Unique;  
import org.supercsv.cellprocessor.ift.CellProcessor;  
import org.supercsv.io.CsvBeanReader;  
import org.supercsv.io.ICsvBeanReader;  
import org.supercsv.prefs.CsvPreference;  

class ReadingObjects {  

    static final CellProcessor[] userProcessors = new CellProcessor[] {  
            new Unique(new ParseInt()),
        new Unique(new StrMinMax(5, 20)),  
        new StrMinMax(8, 35),  
        new ParseDate("dd/MM/yyyy"),  
        new Optional(new ParseInt()),  
        null  
    };  

    public static void main(String[] args) throws Exception {  
        ICsvBeanReader inFile = new CsvBeanReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE);  
        try {  
          final String[] header = inFile.getHeader(true);  
          UserBean user;  
          while( (user = inFile.read(UserBean.class, header, userProcessors)) != null) {  
            System.out.println(user.getZip());  
          }  
        } finally {  
          inFile.close();  
        }  
   }  
}  

public class UserBean {  
    String username, password, town;  
    Date date;  
    int zip;  

    public Date getDate() {  
        return date;  
    }  

    public String getPassword() {  
        return password;  
    }  

    public String getTown() {  
        return town;  
    }  

    public String getUsername() {  
        return username;  
    }  

    public int getZip() {  
        return zip;  
    }  

    public void setDate(final Date date) {  
        this.date = date;  
    }  

    public void setPassword(final String password) {  
        this.password = password;  
    }  

    public void setTown(final String town) {  
        this.town = town;  
    }  

    public void setUsername(final String username) {  
        this.username = username;  
    }  

    public void setZip(final int zip) {  
        this.zip = zip;  
    }  

}

若是你在讀取文件以前根本不知道文件的具體格式,你能夠選擇CsvListReader.read()方法,把每行讀出出來的數據放在一個List裏面。 ui

讀取文件的代碼咱們看到了,下面來看一下寫的操做,也很簡單。

import Java.util.HashMap;  
import org.supercsv.io.*;  
import org.supercsv.prefs.CsvPreference;  

class WritingMaps {  
  main(String[] args) throws Exception {  
    ICsvMapWriter writer = new CsvMapWriter(new FileWriter(...), CsvPreference.STANDARD_PREFERENCE);  
    try {  
      final String[] header = new String[] { "name", "city", "zip" };  
      // set up some data to write  
      final HashMap<String, ? super Object> data1 = new HashMap<String, Object>();  
      data1.put(header[0], "Karl");  
      data1.put(header[1], "Tent city");  
      data1.put(header[2], 5565);  
      final HashMap<String, ? super Object> data2 = new HashMap<String, Object>();  
      data2.put(header[0], "Banjo");  
      data2.put(header[1], "River side");  
      data2.put(header[2], 5551);  
      // the actual writing  
      writer.writeHeader(header);  
      writer.write(data1, header);  
      writer.write(data2, header);  
    } finally {  
      writer.close();  
    }  
  }  
}

利用MapReader方式解析的代碼:
csv文件:

ustomerNo,firstName,lastName,birthDate,mailingAddress,married,numberOfKids,favouriteQuote,email,loyaltyPoints
1,John,Dunbar,13/06/1945,"1600 Amphitheatre Parkway Mountain View, CA 94043 United States",,,"""May the Force be with you."" - Star Wars",jdunbar@gmail.com,0
2,Bob,Down,25/02/1919,"1601 Willow Rd. Menlo Park, CA 94025 United States",Y,0,"""Frankly, my dear, I don't give a damn."" - Gone With The Wind",bobdown@hotmail.com,123456
3,Alice,Wunderland,08/08/1985,"One Microsoft Way Redmond, WA 98052-6399 United States",Y,0,"""Play it, Sam. Play ""As Time Goes By."""" - Casablanca",throughthelookingglass@yahoo.com,2255887799
4,Bill,Jobs,10/07/1973,"2701 San Tomas Expressway Santa Clara, CA 95050 United States",Y,3,"""You've got to ask yourself one question: ""Do I feel lucky?"" Well, do ya, punk?"" - Dirty Harry",billy34@hotmail.com,36

示例代碼:

import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseBool;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.constraint.LMinMax;
import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.constraint.StrRegEx;
import org.supercsv.cellprocessor.constraint.UniqueHashCode;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvMapReader;
import org.supercsv.io.ICsvMapReader;
import org.supercsv.prefs.CsvPreference;

import java.io.FileReader;
import java.util.Map;

public class ReadingObjects {
    public static void main(String[] args) throws Exception{

        ICsvMapReader mapReader = null;
        try {
            mapReader = new CsvMapReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE);

            // the header columns are used as the keys to the Map
            final String[] header = mapReader.getHeader(true);
            final CellProcessor[] processors = getProcessors();

            Map<String,Object> customerMap;
            while( (customerMap = mapReader.read(header, processors)) != null ) {
                System.out.println(String.format("lineNo=%s, rowNo=%s, customerMap=%s", mapReader.getLineNumber(),
                        mapReader.getRowNumber(), customerMap));
            }

        }
        finally {
            if( mapReader != null ) {
                mapReader.close();
            }
        }
    }

    private static CellProcessor[] getProcessors() {

        final String emailRegex = "[a-z0-9._] @[a-z0-9.] "; // just an example, not very robust!
        StrRegEx.registerMessage(emailRegex, "must be a valid email address");

        final CellProcessor[] processors = new CellProcessor[] {
                new UniqueHashCode(), // customerNo (must be unique)
                new NotNull(), // firstName
                new NotNull(), // lastName
                new ParseDate("dd/MM/yyyy"), // birthDate
                new NotNull(), // mailingAddress
                new Optional(new ParseBool()), // married
                new Optional(new ParseInt()), // numberOfKids
                new NotNull(), // favouriteQuote
                new StrRegEx(emailRegex), // email
                new LMinMax(0L, LMinMax.MAX_LONG) // loyaltyPoints
        };

        return processors;}
}

2、併發分批處理大數據量的數據更新
代碼以下

import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.constraint.StrMinMax;
import org.supercsv.cellprocessor.constraint.Unique;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class ThreadReadingObjects {

    static final CellProcessor[] userProcessors = new CellProcessor[] {
            new Unique(new ParseInt()),//惟一的,int id
            new Unique(new StrMinMax(5, 20)),//惟一的,長度爲5到20
            new StrMinMax(8, 35), //長度是8到35
            new ParseDate("dd/MM/yyyy"), //格式爲天/月/年(day/month/year)
            new Optional(new ParseInt()), //整型數字,但只有這列有值的時候ParseInt處理器纔會去處理這個值(其實就是該列能夠爲空)
            null //不使用處理器
    };

    public static void main(String[] args) throws Exception {
        // InputStreamReader freader = new InputStreamReader(inputStream,"UTF-8");
        // ICsvBeanReader inFile = new CsvBeanReader(freader, CsvPreference.STANDARD_PREFERENCE);

        ICsvBeanReader inFile = new CsvBeanReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE);

        ExecutorService executorService = null;
        try {
            //若是你的CSV文件沒有頭,你也能夠定義個數組來替代:
            // final String[] header = new String[] { "id","username", "password", "date", "zip", "town"};
            final String[] header = inFile.getHeader(true);

            //建立線程池
            //注意: 線程數不宜過多,jdbc操做時會佔用鏈接數,過多會超出數據庫鏈接
            List<Future<String>> futureList = new ArrayList<Future<String>>();
            executorService = Executors.newFixedThreadPool(5);

            //分頁讀取數據後,加入線程池處理
            while (getPageUserList(executorService,futureList,inFile, header)) {}

            //獲取線程處理結果
            for (Future<String> future : futureList) {
                while (true) {
                    if (future.isDone() && !future.isCancelled()) {
                        System.out.println("future result: "+future.get());
                        break;
                    } else {
                        Thread.sleep(1000);
                    }
                }
            }

        } finally {
            inFile.close();
            executorService.shutdown();
        }
    }

    private static boolean getPageUserList(ExecutorService executorService, List<Future<String>> futureList, ICsvBeanReader inFile, String[] header) throws IOException {
        int index = 0;
        boolean status = false;
        List<UserBean> userBeans = new ArrayList<UserBean>();
        UserBean user;
        while ((user = inFile.read(UserBean.class, header, userProcessors)) != null) {// 這裏從第一行開始取數據
            userBeans.add(user);
            index++;
            //每次讀取的行數,每一個線程處理的記錄數,根據實際狀況修改
            if (index == 10) {
                status = true;
                break;
            }
        }
        //添加到線程集合
        if(!userBeans.isEmpty()){
            Future<String> future = executorService.submit(getUpdateDbJob(futureList.size(),userBeans));
            futureList.add(future);
        }

        return status;
    }

    private static Callable<String> getUpdateDbJob(int threadNo,List<UserBean> userBeans) {
        return new Callable<String>() {
            @Override
            public String call() throws Exception {
                int count = userBeans.size();
                //第一種: 數組List函數分批量處理方法
                batchPageInsertDataOne(threadNo,userBeans);

                //第二種:取% 分批處理方法
//                batchPageInsertDataTwo(threadNo,userBeans);

                return String.valueOf(count);
            }
        };
    }

    private static void batchPageInsertDataOne(int threadNo,List<UserBean> userBeans){
        int perCount = 4, index = 0;
        int times = userBeans.size() / perCount;
        long stime=System.currentTimeMillis();

        try {
            do {
                // 休眠50ms
                Thread.sleep(50);

                List<UserBean> listTemp= null;

                if (userBeans.size() >= perCount) {
                    listTemp = userBeans.subList(0, perCount);// listTemp是分段處理邏輯的參數
                    System.out.println("線程"+threadNo+"更新用戶:"+listTemp.size()+" 個");
                }else{
                    listTemp = userBeans.subList(0, userBeans.size());// listTemp是分段處理邏輯的參數
                    System.out.println("線程"+threadNo+"更新用戶:"+listTemp.size()+" 個");
                }

                // 事務單元執行個數==儘可能在事務裏面處理少一點(事務儘可能小一點)
                //注意: 每次分批事務提交時數量不宜過多,太多會形成行鎖;
                jdbcPerBatchInsert(listTemp);

                userBeans.removeAll(listTemp);

                index++;

            }while(index<= times);

            // 計算時間
            long etime=System.currentTimeMillis();
            System.out.println("線程"+threadNo+"批量事務插入總共耗時-----------------------:"+(etime-stime)+"ms!");

        }catch(Exception e) {
            e.printStackTrace();

            System.out.println("JDBC批量執行插入異常:>>" + userBeans.size());

            throw new RuntimeException();

        }
    }

    private static void batchPageInsertDataTwo(int threadNo,List<UserBean> userBeans){
         long stime=System.currentTimeMillis();

         try {
            //分批量寫入數據庫
            int perCount = 4;
            List<UserBean> userList = new ArrayList<UserBean>();
            for(int i=0;i<userBeans.size();i++){
                userList.add(userBeans.get(i));
                //若是數據量比較大再次事務分批commit,提交 perCount 條記錄
                //取 % 條數根據實際狀況修改
                if (i > 0 && i % perCount == 0) {
                    System.out.println("線程"+threadNo+"更新用戶:"+userList.size()+" 個成功");
                    //採用jdbcTemplate 批量寫入數據庫
                    jdbcPerBatchInsert(userBeans);

                    userList.clear();
                } else if (i == userBeans.size() - 1) {
                    //處理最後一批數據提交
                    System.out.println("線程"+threadNo+"更新用戶:"+userList.size()+" 個成功");

                    //採用jdbcTemplate 批量寫入數據庫
                    jdbcPerBatchInsert(userBeans);
                    userList.clear();
                }
            }

             // 計算時間
             long etime=System.currentTimeMillis();
             System.out.println("線程"+threadNo+"批量事務插入總共耗時-----------------------:"+(etime-stime)+"ms!");
         }catch(Exception e) {
             e.printStackTrace();

             System.out.println("JDBC批量執行插入異常:>>" + userBeans.size());

             throw new RuntimeException();

         }

    }
    /**
     * 採用jdbcTemplate 批量寫入數據庫
     * @param listTemp
     */
    private static void jdbcPerBatchInsert(List<UserBean> listTemp){

    }
}

運行後返回結果:

線程0更新用戶:4 個
線程1更新用戶:4 個
線程0更新用戶:4 個
線程1更新用戶:1 個
線程1批量事務插入總共耗時-----------------------:100ms!
線程0更新用戶:2 個
線程0批量事務插入總共耗時-----------------------:151ms!
future result: 10
future result: 5

實際工做中遇到的坑,一塊兒分享給你們,本人實際操做幾百萬數據處理遇到的問題
注意:
問題1:
org.springframework.jdbc.CannotGetJdbcConnectionException: Could not get JDBC Connection; nested exception is com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 100000, active 8, maxActive 8, runningSqlCount 7 :
線程數過多,形成數據庫鏈接數不夠,調整maxActive最大鏈接數參數;

問題2:
Caused by: java.sql.BatchUpdateException: Deadlock found when trying to get lock; try restarting transaction
每次jdbc事務comiit的數量過大,形成鎖表問題,儘可能在事務裏面處理少一點;

好的,記錄完畢,以爲看了有幫助的點個贊!O(∩_∩)O哈!
相關文章
相關標籤/搜索