先說說需求的背景,因爲業務數據都在Oracle數據庫中,想要對它進行數據的分析會很是很是慢,用傳統的數據倉庫-->數據集市這種方式,集市層表會很是大,查詢的時候若是再作一些group的操做,一個訪問須要一分鐘甚至更久才能響應。html
爲了解決這個問題,就想把業務庫的數據遷移到Elasticsearch中,而後針對es再去作聚合查詢。java
問題來了,數據庫中的數據量很大,如何導入到ES中呢?sql
Logstash提供了一款JDBC的插件,能夠在裏面寫sql語句,自動查詢而後導入到ES中。這種方式比較簡單,須要注意的就是須要用戶本身下載jdbc的驅動jar包。數據庫
input { jdbc { jdbc_driver_library => "ojdbc14-10.2.0.3.0.jar" jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:test" jdbc_user => "test" jdbc_password => "test123" schedule => "* * * * *" statement => "select * from TARGET_TABLE" add_field => ["type","a"] } } output{ elasticsearch { hosts =>["10.10.1.205:9200"] index => "product" document_type => "%{type}" } }
所以,就考慮本身來導。json
最後使用發現,本身寫的導入程序,比Logstash jdbc快5-6倍~~~~~~ 嗨皮!!!!bash
下面的代碼須要注意的就是oracle
public class JDBCUtil { private static Connection conn = null; private static PreparedStatement sta=null; static{ try { Class.forName("oracle.jdbc.driver.OracleDriver"); conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:test", "test", "test123"); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } System.out.println("Database connection established"); } /** * 把查到的數據格式化寫入到文件 * * @param list 須要存儲的數據 * @param index 索引的名稱 * @param type 類型的名稱 * @param path 文件存儲的路徑 **/ public static void writeTable(List<Map> list,String index,String type,String path) throws SQLException, IOException { System.out.println("開始寫文件"); File file = new File(path); int count = 0; int size = list.size(); for(Map map : list){ FileUtils.write(file, "{ \"index\" : { \"_index\" : \""+index+"\", \"_type\" : \""+type+"\" } }\n","UTF-8",true); FileUtils.write(file, JSON.toJSONString(map)+"\n","UTF-8",true); // System.out.println("寫入了" + ((count++)+1) + "[" + size + "]"); } System.out.println("寫入完成"); } /** * 讀取數據 * @param sql * @return * @throws SQLException */ public static List<Map> readTable(String tablename,int start,int end) throws SQLException { System.out.println("開始讀數據庫"); //執行查詢 sta = conn.prepareStatement("select * from(select rownum as rn,t.* from "+tablename+" t )where rn >="+start+" and rn <"+end); ResultSet rs = sta.executeQuery(); //獲取數據列表 List<Map> data = new ArrayList(); List<String> columnLabels = getColumnLabels(rs); Map<String, Object> map = null; while(rs.next()){ map = new HashMap<String, Object>(); for (String columnLabel : columnLabels) { Object value = rs.getObject(columnLabel); map.put(columnLabel.toLowerCase(), value); } data.add(map); } sta.close(); System.out.println("數據讀取完畢"); return data; } /** * 得到列名 * @param resultSet * @return * @throws SQLException */ private static List<String> getColumnLabels(ResultSet resultSet) throws SQLException { List<String> labels = new ArrayList<String>(); ResultSetMetaData rsmd = (ResultSetMetaData) resultSet.getMetaData(); for (int i = 0; i < rsmd.getColumnCount(); i++) { labels.add(rsmd.getColumnLabel(i + 1)); } return labels; } /** * 得到數據庫表的總數,方便進行分頁 * * @param tablename 表名 */ public static int count(String tablename) throws SQLException { int count = 0; Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); ResultSet rs = stmt.executeQuery("select count(1) from "+tablename); while (rs.next()) { count = rs.getInt(1); } System.out.println("Total Size = " + count); rs.close(); stmt.close(); return count; } /** * 執行查詢,並持久化文件 * * @param tablename 導出的代表 * @param page 分頁的大小 * @param path 文件的路徑 * @param index 索引的名稱 * @param type 類型的名稱 * @return * @throws SQLException */ public static void readDataByPage(String tablename,int page,String path,String index,String type) throws SQLException, IOException { int count = count(tablename); int i =0; for(i =0;i<count;){ List<Map> map = JDBCUtil.readTable(tablename,i,i+page); JDBCUtil.writeTable(map,index,type,path); i+=page; } } }
在main方法中傳入必要的參數便可:curl
public class Main { public static void main(String[] args) { try { JDBCUtil.readDataByPage("TABLE_NAME",1000,"D://data.json","index","type"); } catch (SQLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
這樣獲得bulk的數據後,就能夠運行腳本分批導入了。elasticsearch
下面腳本的思路,就是每100000行左右的數據導入到一個目標文件,使用bulk命令導入到es中。注意一個細節就是不能隨意的切分文件,由於bulk的文件是兩行爲一條數據的。工具
#!/bin/bash count=0 rm target.json touch target.json while read line;do ((count++)) { echo $line >> target.json if [ $count -gt 100000 ] && [ $((count%2)) -eq 0 ];then count=0 curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null rm target.json touch target.json fi } done < $1 echo 'last submit' curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null
最後執行腳本:
sh auto_bulk.sh data.json
本身測試最後要比logstasj jdbc快5-6倍。