CDC+ETL實現數據集成方案java
實戰kudu集成impalamysql
對於impala而言,開發人員是能夠經過JDBC鏈接impala的,有了JDBC,開發人員能夠經過impala來間接操做 kudu;sql
<!--impala的jdbc操做--> <dependency> <groupId>com.cloudera</groupId> <artifactId>ImpalaJDBC41</artifactId> <version>2.5.42</version> </dependency>
使用JDBC鏈接impala操做kudu,與JDBC鏈接mysql作更重增刪改查基本同樣,建立實體類代碼以下:數據庫
package cn.itcast.impala.impala; public class Person { private int companyId; private int workId; private String name; private String gender; private String photo; public Person(int companyId, int workId, String name, String gender, String photo) { this.companyId = companyId; this.workId = workId; this.name = name; this.gender = gender; this.photo = photo; } public int getCompanyId() { return companyId; } public void setCompanyId(int companyId) { this.companyId = companyId; } public int getWorkId() { return workId; } public void setWorkId(int workId) { this.workId = workId; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } public String getPhoto() { return photo; } public void setPhoto(String photo) { this.photo = photo; } }
package cn.itcast.impala.impala; import java.sql.*; public class Contants { private static String JDBC_DRIVER="com.cloudera.impala.jdbc41.Driver"; private static String CONNECTION_URL="jdbc:impala://node1:21050/default;auth=noSasl"; //定義數據庫鏈接 static Connection conn=null; //定義PreparedStatement對象 static PreparedStatement ps=null; //定義查詢的結果集 static ResultSet rs= null; //數據庫鏈接 public static Connection getConn(){ try { Class.forName(JDBC_DRIVER); conn=DriverManager.getConnection(CONNECTION_URL); } catch (Exception e) { e.printStackTrace(); } return conn; } //建立一個表 public static void createTable(){ conn=getConn(); String sql="CREATE TABLE impala_kudu_test" + "(" + "companyId BIGINT," + "workId BIGINT," + "name STRING," + "gender STRING," + "photo STRING," + "PRIMARY KEY(companyId)" + ")" + "PARTITION BY HASH PARTITIONS 16 " + "STORED AS KUDU " + "TBLPROPERTIES (" + "'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051'," + "'kudu.table_name' = 'impala_kudu_test'" + ");"; try { ps = conn.prepareStatement(sql); ps.execute(); } catch (SQLException e) { e.printStackTrace(); } } //查詢數據 public static ResultSet queryRows(){ try { //定義執行的sql語句 String sql="select * from impala_kudu_test"; ps = getConn().prepareStatement(sql); rs= ps.executeQuery(); } catch (SQLException e) { e.printStackTrace(); } return rs; } //打印結果 public static void printRows(ResultSet rs){ /** private int companyId; private int workId; private String name; private String gender; private String photo; */ try { while (rs.next()){ //獲取表的每一行字段信息 int companyId = rs.getInt("companyId"); int workId = rs.getInt("workId"); String name = rs.getString("name"); String gender = rs.getString("gender"); String photo = rs.getString("photo"); System.out.print("companyId:"+companyId+" "); System.out.print("workId:"+workId+" "); System.out.print("name:"+name+" "); System.out.print("gender:"+gender+" "); System.out.println("photo:"+photo); } } catch (SQLException e) { e.printStackTrace(); }finally { if(ps!=null){ try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } if(conn !=null){ try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } //插入數據 public static void insertRows(Person person){ conn=getConn(); String sql="insert into table impala_kudu_test(companyId,workId,name,gender,photo) values(?,?,?,?,?)"; try { ps=conn.prepareStatement(sql); //給佔位符?賦值 ps.setInt(1,person.getCompanyId()); ps.setInt(2,person.getWorkId()); ps.setString(3,person.getName()); ps.setString(4,person.getGender()); ps.setString(5,person.getPhoto()); ps.execute(); } catch (SQLException e) { e.printStackTrace(); }finally { if(ps !=null){ try { //關閉 ps.close(); } catch (SQLException e) { e.printStackTrace(); } } if(conn !=null){ try { //關閉 conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } //更新數據 public static void updateRows(Person person){ //定義執行的sql語句 String sql="update impala_kudu_test set workId="+person.getWorkId()+ ",name='"+person.getName()+"' ,"+"gender='"+person.getGender()+"' ,"+ "photo='"+person.getPhoto()+"' where companyId="+person.getCompanyId(); try { ps= getConn().prepareStatement(sql); ps.execute(); } catch (SQLException e) { e.printStackTrace(); }finally { if(ps !=null){ try { //關閉 ps.close(); } catch (SQLException e) { e.printStackTrace(); } } if(conn !=null){ try { //關閉 conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } //刪除數據 public static void deleteRows(int companyId){ //定義sql語句 String sql="delete from impala_kudu_test where companyId="+companyId; try { ps =getConn().prepareStatement(sql); ps.execute(); } catch (SQLException e) { e.printStackTrace(); } } //刪除表 public static void dropTable() { String sql="drop table if exists impala_kudu_test"; try { ps =getConn().prepareStatement(sql); ps.execute(); } catch (SQLException e) { e.printStackTrace(); } } }
package cn.itcast.impala.impala; import java.sql.Connection; public class ImpalaJdbcClient { public static void main(String[] args) { Connection conn = Contants.getConn(); //建立一個表 Contants.createTable(); //插入數據 Contants.insertRows(new Person(1,100,"lisi","male","lisi-photo")); //查詢表的數據 ResultSet rs = Contants.queryRows(); Contants.printRows(rs); //更新數據 Contants.updateRows(new Person(1,200,"zhangsan","male","zhangsan-photo")); //刪除數據 Contants.deleteRows(1); //刪除表 Contants.dropTable(); } }