本身寫的數據交換工具——從Oracle到Elasticsearch

先說說需求的背景,因爲業務數據都在Oracle數據庫中,想要對它進行數據的分析會很是很是慢,用傳統的數據倉庫-->數據集市這種方式,集市層表會很是大,查詢的時候若是再作一些group的操做,一個訪問須要一分鐘甚至更久才能響應。html

爲了解決這個問題,就想把業務庫的數據遷移到Elasticsearch中,而後針對es再去作聚合查詢。java

問題來了,數據庫中的數據量很大,如何導入到ES中呢?sql

Logstash JDBC

Logstash提供了一款JDBC的插件,能夠在裏面寫sql語句,自動查詢而後導入到ES中。這種方式比較簡單,須要注意的就是須要用戶本身下載jdbc的驅動jar包。數據庫

input {
    jdbc {
        jdbc_driver_library => "ojdbc14-10.2.0.3.0.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:test"
        jdbc_user => "test"
        jdbc_password => "test123"
        schedule => "* * * * *"
        statement => "select * from TARGET_TABLE"
        add_field => ["type","a"]
    }
}
output{
    elasticsearch {
        hosts =>["10.10.1.205:9200"]
        index => "product"
        document_type => "%{type}"
    }
}

不過,它的性能實在是太差了!我導了一天,才導了兩百多萬的數據。

所以,就考慮本身來導。json

本身的數據交換工具

思路:

最後使用發現,本身寫的導入程序,比Logstash jdbc快5-6倍~~~~~~ 嗨皮!!!!bash

遇到的問題

  • 1 JDBC須要採用分頁的方式讀取全量數據
  • 2 要模仿bulk文件進行存儲
  • 3 因爲bulk文件過大,致使curl內存溢出

程序開源

下面的代碼須要注意的就是oracle

public class JDBCUtil {
    private static Connection conn = null;
    private static PreparedStatement sta=null;
    static{
        try {
            Class.forName("oracle.jdbc.driver.OracleDriver");
            conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:test", "test", "test123");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        System.out.println("Database connection established");
    }
    /**
    * 把查到的數據格式化寫入到文件
    *
    * @param list 須要存儲的數據
    * @param index 索引的名稱
    * @param type 類型的名稱
    * @param path 文件存儲的路徑
    **/
    public static void writeTable(List<Map> list,String index,String type,String path) throws SQLException, IOException {
        System.out.println("開始寫文件");
        File file = new File(path);
        int count = 0;
        int size = list.size();
        for(Map map : list){
            FileUtils.write(file,  "{ \"index\" : { \"_index\" : \""+index+"\", \"_type\" : \""+type+"\" } }\n","UTF-8",true);
            FileUtils.write(file, JSON.toJSONString(map)+"\n","UTF-8",true);
//            System.out.println("寫入了" + ((count++)+1) + "[" + size + "]");
        }
        System.out.println("寫入完成");
    }

    /**
     * 讀取數據
     * @param sql
     * @return
     * @throws SQLException
     */
    public static List<Map> readTable(String tablename,int start,int end) throws SQLException {
        System.out.println("開始讀數據庫");
        //執行查詢
        sta = conn.prepareStatement("select * from(select rownum as rn,t.* from "+tablename+" t )where rn >="+start+" and rn <"+end);
        ResultSet rs = sta.executeQuery();

        //獲取數據列表
        List<Map> data = new ArrayList();
        List<String> columnLabels = getColumnLabels(rs);

        Map<String, Object> map = null;
        while(rs.next()){
            map = new HashMap<String, Object>();

            for (String columnLabel : columnLabels) {
                Object value = rs.getObject(columnLabel);
                map.put(columnLabel.toLowerCase(), value);
            }
            data.add(map);
        }
        sta.close();
        System.out.println("數據讀取完畢");
        return data;
    }
    /**
     * 得到列名
     * @param resultSet
     * @return
     * @throws SQLException
     */
    private static List<String> getColumnLabels(ResultSet resultSet)
            throws SQLException {
        List<String> labels = new ArrayList<String>();

        ResultSetMetaData rsmd = (ResultSetMetaData) resultSet.getMetaData();
        for (int i = 0; i < rsmd.getColumnCount(); i++) {
            labels.add(rsmd.getColumnLabel(i + 1));
        }

        return labels;
    }
    /**
    * 得到數據庫表的總數,方便進行分頁
    *
    * @param tablename 表名
    */
    public static int count(String tablename) throws SQLException {
        int count = 0;
        Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
        ResultSet rs = stmt.executeQuery("select count(1) from "+tablename);
        while (rs.next()) {
            count = rs.getInt(1);
        }
        System.out.println("Total Size = " + count);
        rs.close();
        stmt.close();
        return count;
    }
    /**
     * 執行查詢,並持久化文件
     * 
     * @param tablename 導出的代表
     * @param page 分頁的大小
     * @param path 文件的路徑
     * @param index 索引的名稱
     * @param type 類型的名稱
     * @return
     * @throws SQLException
     */
    public static void readDataByPage(String tablename,int page,String path,String index,String type) throws SQLException, IOException {
        int count = count(tablename);
        int i =0;
        for(i =0;i<count;){
            List<Map> map = JDBCUtil.readTable(tablename,i,i+page);
            JDBCUtil.writeTable(map,index,type,path);
            i+=page;
        }
    }
}

在main方法中傳入必要的參數便可:curl

public class Main {
    public static void main(String[] args) {
        try {
            JDBCUtil.readDataByPage("TABLE_NAME",1000,"D://data.json","index","type");
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

這樣獲得bulk的數據後,就能夠運行腳本分批導入了。elasticsearch

下面腳本的思路,就是每100000行左右的數據導入到一個目標文件,使用bulk命令導入到es中。注意一個細節就是不能隨意的切分文件,由於bulk的文件是兩行爲一條數據的。工具

#!/bin/bash

count=0
rm target.json
touch target.json


while read line;do

((count++))

{
        echo $line >> target.json

        if [ $count -gt 100000 ] && [ $((count%2)) -eq 0 ];then
                count=0
                curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null
                rm target.json
                touch target.json
        fi

}

done < $1
echo 'last submit'
curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null

最後執行腳本:

sh auto_bulk.sh data.json

本身測試最後要比logstasj jdbc快5-6倍。

相關文章
相關標籤/搜索