在KUDU以前,大數據主要以兩種方式存儲; java
(1)靜態數據: node
以 HDFS 引擎做爲存儲引擎,適用於高吞吐量的離線大數據分析場景。這類存儲的侷限性是數據沒法進行隨機的讀寫。 mysql
(2)動態數據: 算法
以 HBase、Cassandra 做爲存儲引擎,適用於大數據隨機讀寫場景。這類存儲的侷限性是批量讀取吞吐量遠不如 HDFS,不適用於批量數據分析的場景。 sql
從上面分析可知,這兩種數據在存儲方式上徹底不一樣,進而致使使用場景徹底不一樣,但在真實的場景中,邊界可能沒有那麼清晰,面對既須要隨機讀寫,又須要批量分析的大數據場景,該如何選擇呢?這個場景中,單種存儲引擎沒法知足業務需求,咱們須要經過多種大數據工具組合來知足這一需求。 shell
如上圖所示,數據實時寫入 HBase,實時的數據更新也在 HBase 完成,爲了應對 OLAP 需求,咱們定時(一般是 T+1 或者 T+H)將 HBase 數據寫成靜態的文件(如:Parquet)導入到 OLAP 引擎(如:HDFS)。這一架構能知足既須要隨機讀寫,又能夠支持 OLAP 分析的場景,但他有以下缺點: 數據庫
(1)架構複雜。從架構上看,數據在HBase、消息隊列、HDFS 間流轉,涉及環節太多,運維成本很高。而且每一個環節須要保證高可用,都須要維護多個副本,存儲空間也有必定的浪費。最後數據在多個系統上,對數據安全策略、監控等都提出了挑戰。 apache
(2)時效性低。數據從HBase導出成靜態文件是週期性的,通常這個週期是一天(或一小時),在時效性上不是很高。 api
(3)難以應對後續的更新。真實場景中,總會有數據是延遲到達的。若是這些數據以前已經從HBase導出到HDFS,新到的變動數據就難以處理了,一個方案是把原有數據應用上新的變動後重寫一遍,但這代價又很高。 瀏覽器
爲了解決上述架構的這些問題,KUDU應運而生。KUDU的定位是Fast Analytics on Fast Data,是一個既支持隨機讀寫、又支持 OLAP 分析的大數據存儲引擎。
從上圖能夠看出,KUDU 是一個折中的產品,在 HDFS 和 HBase 這兩個偏科生中平衡了隨機讀寫和批量分析的性能。從 KUDU 的誕生能夠說明一個觀點:底層的技術發展不少時候都是上層的業務推進的,脫離業務的技術極可能是空中樓閣。
Apache Kudu是由Cloudera開源的存儲引擎,能夠同時提供低延遲的隨機讀寫和高效的數據分析能力。它是一個融合HDFS和HBase的功能的新組件,具有介於二者之間的新存儲組件。
Kudu支持水平擴展,而且與Cloudera Impala和Apache Spark等當前流行的大數據查詢和分析工具結合緊密。
Strong performance for both scan and random access to help customers simplify complex hybrid architectures(適用於那些既有隨機訪問,也有批量數據掃描的複合場景)
High CPU efficiency in order to maximize the return on investment that our customers are making in modern processors(高計算量的場景)
High IO efficiency in order to leverage modern persistent storage(使用了高性能的存儲設備,包括使用更多的內存)
The ability to update data in place, to avoid extraneous processing and data movement(支持數據更新,避免數據反覆遷移)
The ability to support active-active replicated clusters that span multiple data centers in geographically distant locations(支持跨地域的實時數據備份和查詢)
國內使用的kudu一些案例能夠查看《構建近實時分析系統.pdf》文檔。
與HDFS和HBase類似,Kudu使用單個的Master節點,用來管理集羣的元數據,而且使用任意數量的Tablet Server(可對比理解HBase中的RegionServer角色)節點用來存儲實際數據。能夠部署多個Master節點來提升容錯性。一個table表的數據,被分割成1個或多個Tablet,Tablet被部署在Tablet Server來提供數據讀寫服務。
下面是一些基本概念:
Master:集羣中的老大,負責集羣管理、元數據管理等功能
Tablet Server:集羣中的小弟,負責數據存儲,並提供數據讀寫服務
一個 tablet server 存儲了table表的tablet 和爲 tablet 向 client 提供服務。對於給定的 tablet,一個tablet server 充當 leader,其餘 tablet server 充當該 tablet 的 follower 副本。
只有 leader服務寫請求,然而 leader 或 followers 爲每一個服務提供讀請求 。一個 tablet server 能夠服務多個 tablets ,而且一個 tablet 能夠被多個 tablet servers 服務着。
Table(表)
一張talbe是數據存儲在Kudu的tablet server中。表具備 schema 和全局有序的primary key(主鍵)。table 被分紅稱爲 tablets 的 segments。
Tablet
一個 tablet 是一張 table連續的segment,tablet是kudu表的水平分區,相似於google Bigtable的tablet,或者HBase的region。每一個tablet存儲着必定連續range的數據(key),且tablet兩兩間的range不會重疊。一張表的全部tablet包含了這張表的全部key空間。與其它數據存儲引擎或關係型數據庫中的 partition(分區)類似。給定的tablet 冗餘到多個 tablet 服務器上,而且在任何給定的時間點,其中一個副本被認爲是leader tablet。任何副本均可以對讀取進行服務,而且寫入時須要在爲 tablet 服務的一組 tablet server之間達成一致性。
<dependencies> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.6.0</version> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> |
|
public class TestKudu { //定義KuduClient客戶端對象 private static KuduClient kuduClient; //定義表名 private static String tableName="person";
/** * 初始化方法 */ @Before public void init(){ //指定master地址 String masterAddress="node1,node2,node3"; //建立kudu的數據庫鏈接 kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
}
//構建表schema的字段信息 //字段名稱 數據類型 是否爲主鍵 public ColumnSchema newColumn(String name, Type type, boolean isKey){ ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type); column.key(isKey); return column.build(); } } |
/** 使用junit進行測試 * * 建立表 * @throws KuduException */ @Test public void createTable() throws KuduException { //設置表的schema List<ColumnSchema> columns = new LinkedList<ColumnSchema>(); columns.add(newColumn("CompanyId", Type.INT32, true)); columns.add(newColumn("WorkId", Type.INT32, false)); columns.add(newColumn("Name", Type.STRING, false)); columns.add(newColumn("Gender", Type.STRING, false)); columns.add(newColumn("Photo", Type.STRING, false));
Schema schema = new Schema(columns);
//建立表時提供的全部選項 CreateTableOptions tableOptions = new CreateTableOptions(); //設置表的副本和分區規則 LinkedList<String> list = new LinkedList<String>(); list.add("CompanyId"); //設置表副本數 tableOptions.setNumReplicas(1); //設置range分區 //tableOptions.setRangePartitionColumns(list); //設置hash分區和分區的數量 tableOptions.addHashPartitions(list,3);
try { kuduClient.createTable("person",schema,tableOptions); } catch (Exception e) { e.printStackTrace(); }
kuduClient.close();
} |
/** * 向表中加載數據 * @throws KuduException */ @Test public void loadData() throws KuduException { //打開表 KuduTable kuduTable = kuduClient.openTable(tableName);
//建立KuduSession對象 kudu必須經過KuduSession寫入數據 KuduSession kuduSession = kuduClient.newSession();
//採用flush方式 手動刷新 kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); kuduSession.setMutationBufferSpace(3000);
//準備數據 for(int i=1; i<=10; i++){ Insert insert = kuduTable.newInsert(); //設置字段的內容 insert.getRow().addInt("CompanyId",i); insert.getRow().addInt("WorkId",i); insert.getRow().addString("Name","lisi"+i); insert.getRow().addString("Gender","male"); insert.getRow().addString("Photo","person"+i);
kuduSession.flush(); kuduSession.apply(insert); }
kuduSession.close(); kuduClient.close();
} |
/** * 查詢表數據 * @throws KuduException */ @Test public void queryData() throws KuduException { //打開表 KuduTable kuduTable = kuduClient.openTable(tableName); //獲取scanner掃描器 KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable); KuduScanner scanner = scannerBuilder.build();
//遍歷 while(scanner.hasMoreRows()){ RowResultIterator rowResults = scanner.nextRows(); while (rowResults.hasNext()){ RowResult result = rowResults.next(); int companyId = result.getInt("CompanyId"); int workId = result.getInt("WorkId"); String name = result.getString("Name"); String gender = result.getString("Gender"); String photo = result.getString("Photo"); System.out.print("companyId:"+companyId+" "); System.out.print("workId:"+workId+" "); System.out.print("name:"+name+" "); System.out.print("gender:"+gender+" "); System.out.println("photo:"+photo); } }
//關閉 scanner.close(); kuduClient.close();
} |
/** * 修改數據 * @throws KuduException */ @Test public void updateData() throws KuduException { //打開表 KuduTable kuduTable = kuduClient.openTable(tableName);
//構建kuduSession對象 KuduSession kuduSession = kuduClient.newSession(); //設置刷新數據模式,自動提交 kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
//更新數據須要獲取Update對象 Update update = kuduTable.newUpdate(); //獲取row對象 PartialRow row = update.getRow(); //設置要更新的數據信息 row.addInt("CompanyId",1); row.addString("Name","kobe"); //操做這個update對象 kuduSession.apply(update);
kuduSession.close();
} |
/** * 刪除表中的數據 */ @Test public void deleteData() throws KuduException { //打開表 KuduTable kuduTable = kuduClient.openTable(tableName); KuduSession kuduSession = kuduClient.newSession(); //獲取Delete對象 Delete delete = kuduTable.newDelete(); //構建要刪除的行對象 PartialRow row = delete.getRow(); //設置刪除數據的條件 row.addInt("CompanyId",2); kuduSession.flush(); kuduSession.apply(delete); kuduSession.close(); kuduClient.close(); } |
/** * 刪除表 */ @Test public void dropTable() throws KuduException { //刪除表 DeleteTableResponse response = kuduClient.deleteTable(tableName); //關閉客戶端鏈接 kuduClient.close(); } |
爲了提供可擴展性,Kudu 表被劃分爲稱爲 tablets 的單元,並分佈在許多 tablet servers 上。行老是屬於單個tablet 。將行分配給 tablet 的方法由在表建立期間設置的表的分區決定。 kudu提供了3種分區方式。
範圍分區能夠根據存入數據的數據量,均衡的存儲到各個機器上,防止機器出現負載不均衡現象.
/** * 測試分區: * RangePartition */ @Test public void testRangePartition() throws KuduException { //設置表的schema LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>(); columnSchemas.add(newColumn("CompanyId", Type.INT32,true)); columnSchemas.add(newColumn("WorkId", Type.INT32,false)); columnSchemas.add(newColumn("Name", Type.STRING,false)); columnSchemas.add(newColumn("Gender", Type.STRING,false)); columnSchemas.add(newColumn("Photo", Type.STRING,false));
//建立schema Schema schema = new Schema(columnSchemas);
//建立表時提供的全部選項 CreateTableOptions tableOptions = new CreateTableOptions(); //設置副本數 tableOptions.setNumReplicas(1); //設置範圍分區的規則 LinkedList<String> parcols = new LinkedList<String>(); parcols.add("CompanyId"); //設置按照那個字段進行range分區 tableOptions.setRangePartitionColumns(parcols);
/** * range * 0 < value < 10 * 10 <= value < 20 * 20 <= value < 30 * ........ * 80 <= value < 90 * */ int count=0; for(int i =0;i<10;i++){ //範圍開始 PartialRow lower = schema.newPartialRow(); lower.addInt("CompanyId",count);
//範圍結束 PartialRow upper = schema.newPartialRow(); count +=10; upper.addInt("CompanyId",count);
//設置每個分區的範圍 tableOptions.addRangePartition(lower,upper); }
try { kuduClient.createTable("student",schema,tableOptions); } catch (KuduException e) { e.printStackTrace(); } kuduClient.close();
} |
哈希分區經過哈希值將行分配到許多 buckets ( 存儲桶 )之一; 哈希分區是一種有效的策略,當不須要對錶進行有序訪問時。哈希分區對於在 tablet 之間隨機散佈這些功能是有效的,這有助於減輕熱點和 tablet 大小不均勻。
/** * 測試分區: * hash分區 */ @Test public void testHashPartition() throws KuduException { //設置表的schema LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>(); columnSchemas.add(newColumn("CompanyId", Type.INT32,true)); columnSchemas.add(newColumn("WorkId", Type.INT32,false)); columnSchemas.add(newColumn("Name", Type.STRING,false)); columnSchemas.add(newColumn("Gender", Type.STRING,false)); columnSchemas.add(newColumn("Photo", Type.STRING,false));
//建立schema Schema schema = new Schema(columnSchemas);
//建立表時提供的全部選項 CreateTableOptions tableOptions = new CreateTableOptions(); //設置副本數 tableOptions.setNumReplicas(1); //設置範圍分區的規則 LinkedList<String> parcols = new LinkedList<String>(); parcols.add("CompanyId"); //設置按照那個字段進行range分區 tableOptions.addHashPartitions(parcols,6); try { kuduClient.createTable("dog",schema,tableOptions); } catch (KuduException e) { e.printStackTrace(); }
kuduClient.close(); } |
Kudu 容許一個表在單個表上組合多級分區。 當正確使用時,多級分區能夠保留各個分區類型的優勢,同時減小每一個分區的缺點 需求.
/** * 測試分區: * 多級分區 * Multilevel Partition * 混合使用hash分區和range分區 * * 哈希分區有利於提升寫入數據的吞吐量,而範圍分區能夠避免tablet無限增加問題, * hash分區和range分區結合,能夠極大的提高kudu的性能 */ @Test public void testMultilevelPartition() throws KuduException { //設置表的schema LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>(); columnSchemas.add(newColumn("CompanyId", Type.INT32,true)); columnSchemas.add(newColumn("WorkId", Type.INT32,false)); columnSchemas.add(newColumn("Name", Type.STRING,false)); columnSchemas.add(newColumn("Gender", Type.STRING,false)); columnSchemas.add(newColumn("Photo", Type.STRING,false));
//建立schema Schema schema = new Schema(columnSchemas); //建立表時提供的全部選項 CreateTableOptions tableOptions = new CreateTableOptions(); //設置副本數 tableOptions.setNumReplicas(1); //設置範圍分區的規則 LinkedList<String> parcols = new LinkedList<String>(); parcols.add("CompanyId");
//hash分區 tableOptions.addHashPartitions(parcols,5);
//range分區 int count=0; for(int i=0;i<10;i++){ PartialRow lower = schema.newPartialRow(); lower.addInt("CompanyId",count); count+=10;
PartialRow upper = schema.newPartialRow(); upper.addInt("CompanyId",count); tableOptions.addRangePartition(lower,upper); }
try { kuduClient.createTable("cat",schema,tableOptions); } catch (KuduException e) { e.printStackTrace(); } kuduClient.close();
} |
Spark與KUDU集成支持:
到目前爲止,咱們已經據說過幾個上下文,例如SparkContext,SQLContext,HiveContext, SparkSession,如今,咱們將使用Kudu引入一個KuduContext。這是可在Spark應用程序中廣播的主要可序列化對象。此類表明在Spark執行程序中與Kudu Java客戶端進行交互。 KuduContext提供執行DDL操做所需的方法,與本機Kudu RDD的接口,對數據執行更新/插入/刪除,將數據類型從Kudu轉換爲Spark等。
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>
<dependencies> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client-tools</artifactId> <version>1.6.0-cdh5.14.0</version> </dependency>
<dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.6.0-cdh5.14.0</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 --> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark2_2.11</artifactId> <version>1.6.0-cdh5.14.0</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies> |
1:提供表名
2:提供schema
3:提供主鍵
4:定義重要選項;例如:定義分區的schema
5:調用create Table api
object SparkKuduTest { def main(args: Array[String]): Unit = { //構建sparkConf對象 val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")
//構建SparkSession對象 val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//獲取sparkContext對象 val sc: SparkContext = sparkSession.sparkContext sc.setLogLevel("warn")
//構建KuduContext對象 val kuduContext = new KuduContext("node1:7051,node2:7051,node3:7051",sc)
//1.建立表操做 createTable(kuduContext)
/** * 建立表 * @param kuduContext * @return */ private def createTable(kuduContext: KuduContext) = {
//1.1定義表名 val tableName = "spark_kudu"
//1.2 定義表的schema val schema = StructType( StructField("userId", StringType, false) :: StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: StructField("sex", StringType, false) :: Nil)
//1.3 定義表的主鍵 val primaryKey = Seq("userId")
//1.4 定義分區的schema val options = new CreateTableOptions //設置分區 options.setRangePartitionColumns(List("userId").asJava) //設置副本 options.setNumReplicas(1)
//1.5 建立表 if(!kuduContext.tableExists(tableName)){ kuduContext.createTable(tableName, schema, primaryKey, options) }
}
} |
定義表時要注意的是Kudu表選項值。你會注意到在指定組成範圍分區列的列名列表時咱們調用"asJava"方 法。這是由於在這裏,咱們調用了Kudu Java客戶端自己,它須要Java對象(即java.util.List)而不是Scala的List對 象;(要使"asJava"方法可用,請記住導入JavaConverters庫。) 建立表後,經過將瀏覽器指向http// master主機名:8051/tables
來查看Kudu主UI能夠找到建立的表,經過單擊表ID,可以看到表模式和分區信息。
點擊Table id 能夠觀察到表的schema等信息:
Kudu支持許多DML類型的操做,其中一些操做包含在Spark on Kudu集成. 包括:
case class People(id:Int,name:String,age:Int) |
/** * 刪除表的數據 * @param sparkSession * @param sc * @param kuduMaster * @param kuduContext * @param tableName */ private def deleteData(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = { //定義一個map集合,封裝kudu的相關信息 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName )
import sparkSession.implicits._ val data = List(People(1, "zhangsan", 20), People(2, "lisi", 30), People(3, "wangwu", 40)) val dataFrame: DataFrame = sc.parallelize(data).toDF dataFrame.createTempView("temp") //獲取年齡大於30的全部用戶id val result: DataFrame = sparkSession.sql("select id from temp where age >30") //刪除對應的數據,這裏必需要是主鍵字段 kuduContext.deleteRows(result, tableName)
} |
/** * 更新數據--添加數據 * * @param sc * @param kuduMaster * @param kuduContext * @param tableName */ private def UpsertData(sparkSession: SparkSession,sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = { //更新表中的數據 //定義一個map集合,封裝kudu的相關信息 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName )
import sparkSession.implicits._ val data = List(People(1, "zhangsan", 50), People(5, "tom", 30)) val dataFrame: DataFrame = sc.parallelize(data).toDF //若是存在就是更新,不然就是插入 kuduContext.upsertRows(dataFrame, tableName)
} |
/** * 更新數據 * @param sparkSession * @param sc * @param kuduMaster * @param kuduContext * @param tableName */ private def updateData(sparkSession: SparkSession,sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = { //定義一個map集合,封裝kudu的相關信息 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName )
import sparkSession.implicits._ val data = List(People(1, "zhangsan", 60), People(6, "tom", 30)) val dataFrame: DataFrame = sc.parallelize(data).toDF //若是存在就是更新,不然就是報錯 kuduContext.updateRows(dataFrame, tableName)
} |
雖然咱們能夠經過上面顯示的KuduContext執行大量操做,但咱們還能夠直接從默認數據源自己調用讀/寫API。要設置讀取,咱們須要爲Kudu表指定選項,命名咱們要讀取的表以及爲表提供服務的Kudu集羣的Kudu主服務器列表。
/** * 使用DataFrameApi讀取kudu表中的數據 * @param sparkSession * @param kuduMaster * @param tableName */ private def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = { //定義map集合,封裝kudu的master地址和要讀取的表名 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName ) sparkSession.read.options(options).kudu.show()
} |
在經過DataFrame API編寫時,目前只支持一種模式"append"。還沒有實現的"覆蓋"模式。
/** * DataFrame api 寫數據到kudu表 * @param sparkSession * @param sc * @param kuduMaster * @param tableName */ private def dataFrame2kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = { //定義map集合,封裝kudu的master地址和要讀取的表名 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName ) val data = List(People(7, "jim", 30), People(8, "xiaoming", 40)) import sparkSession.implicits._ val dataFrame: DataFrame = sc.parallelize(data).toDF //把dataFrame結果寫入到kudu表中 ,目前只支持append追加 dataFrame.write.options(options).mode("append").kudu
//查看結果 //導包 import org.apache.kudu.spark.kudu._ //加載表的數據,導包調用kudu方法,轉換爲dataFrame,最後在使用show方法顯示結果 sparkSession.read.options(options).kudu.show() } |
能夠選擇使用Spark SQL直接使用INSERT語句寫入Kudu表;與'append'相似,INSERT語句實際上將默認使用 UPSERT語義處理;
/** * 使用sparksql操做kudu表 * @param sparkSession * @param sc * @param kuduMaster * @param tableName */ private def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = { //定義map集合,封裝kudu的master地址和表名 val options = Map( "kudu.master" -> kuduMaster, "kudu.table" -> tableName ) val data = List(People(10, "小張", 30), People(11, "小王", 40)) import sparkSession.implicits._ val dataFrame: DataFrame = sc.parallelize(data).toDF //把dataFrame註冊成一張表 dataFrame.createTempView("temp1")
//獲取kudu表中的數據,而後註冊成一張表 sparkSession.read.options(options).kudu.createTempView("temp2") //使用sparkSQL的insert操做插入數據 sparkSession.sql("insert into table temp2 select * from temp1") sparkSession.sql("select * from temp2 where age >30").show()
} |
Spark與Kudu的集成同時提供了kudu RDD.
//使用kuduContext對象調用kuduRDD方法,須要sparkContext對象,表名,想要的字段名稱 val kuduRDD: RDD[Row] = kuduContext.kuduRDD(sc,tableName,Seq("name","age")) //操做該rdd 打印輸出 val result: RDD[(String, Int)] = kuduRDD.map { case Row(name: String, age: Int) => (name, age) } result.foreach(println) |
五、kudu集成impala
impala是cloudera提供的一款高效率的sql查詢工具,提供實時的查詢效果,官方測試性能比hive快10到100倍,其sql查詢比sparkSQL還要更加快速,號稱是當前大數據領域最快的查詢sql工具,
impala是參照谷歌的新三篇論文(Caffeine--網絡搜索引擎、Pregel--分佈式圖計算、Dremel--交互式分析工具)當中的Dremel實現而來,其中舊三篇論文分別是(BigTable,GFS,MapReduce)分別對應咱們即將學的HBase和已經學過的HDFS以及MapReduce。
impala是基於hive並使用內存進行計算,兼顧數據倉庫,具備實時,批處理,多併發等優勢
Kudu與Apache Impala (孵化)緊密集成,impala自然就支持兼容kudu,容許開發人員使用Impala的SQL語法從Kudu的tablets 插入,查詢,更新和刪除數據;
一、須要先啓動hdfs、hive、kudu、impala
二、使用impala的shell控制檯
(1):使用該impala-shell命令啓動Impala Shell 。默認狀況下,impala-shell 嘗試鏈接到localhost端口21000 上的Impala守護程序。要鏈接到其餘主機,請使用該-i <host:port>選項。要自動鏈接到特定的Impala數據庫,請使用該-d <database>選項。例如,若是您的全部Kudu表都位於數據庫中的Impala中impala_kudu,則-d impala_kudu可使用此數據庫。
(2):要退出Impala Shell,請使用如下命令: quit;
使用Impala建立新的Kudu表時,能夠將該表建立爲內部表或外部表。
內部表由Impala管理,當您從Impala中刪除時,數據和表確實被刪除。當您使用Impala建立新表時,它一般是內部表。
CREATE TABLE my_first_table |
此時建立的表是內部表,從impala刪除表的時候,在底層存儲的kudu也會刪除表。
drop table if exists my_first_table; |
外部表(建立者CREATE EXTERNAL TABLE)不受Impala管理,而且刪除此表不會將表從其源位置(此處爲Kudu)丟棄。相反,它只會去除Impala和Kudu之間的映射。這是Kudu提供的用於將現有表映射到Impala的語法。
使用java建立一個kudu表:
public class CreateTable { private static ColumnSchema newColumn(String name, Type type, boolean iskey) { ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type); column.key(iskey); return column.build(); } public static void main(String[] args) throws KuduException { // master地址 final String masteraddr = "node1,node2,node3"; // 建立kudu的數據庫連接 KuduClient client = new KuduClient.KuduClientBuilder(masteraddr).defaultSocketReadTimeoutMs(6000).build();
// 設置表的schema List<ColumnSchema> columns = new LinkedList<ColumnSchema>(); columns.add(newColumn("CompanyId", Type.INT32, true)); columns.add(newColumn("WorkId", Type.INT32, false)); columns.add(newColumn("Name", Type.STRING, false)); columns.add(newColumn("Gender", Type.STRING, false)); columns.add(newColumn("Photo", Type.STRING, false)); Schema schema = new Schema(columns); //建立表時提供的全部選項 CreateTableOptions options = new CreateTableOptions();
// 設置表的replica備份和分區規則 List<String> parcols = new LinkedList<String>();
parcols.add("CompanyId"); //設置表的備份數 options.setNumReplicas(1); //設置range分區 options.setRangePartitionColumns(parcols);
//設置hash分區和數量 options.addHashPartitions(parcols, 3); try { client.createTable("person", schema, options); } catch (KuduException e) { e.printStackTrace(); } client.close(); } } |
在kudu的頁面上能夠觀察到以下信息:
在impala的命令行查看錶:
當前在impala中並無person這個表
使用impala建立外部表 , 將kudu的表映射到impala上:
在impala-shell執行
CREATE EXTERNAL TABLE `person` STORED AS KUDU |
impala 容許使用標準 SQL 語句將數據插入 Kudu 。
建立表
CREATE TABLE my_first_table |
此示例插入單個行
INSERT INTO my_first_table VALUES (50, "zhangsan"); |
查看數據
select * from my_first_table |
使用單個語句插入三行
INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim"); |
6.2.1.2 批量插入Batch Insert
示例
INSERT INTO my_first_table |
示例
UPDATE my_first_table SET name="xiaowang" where id =1 ;
示例
delete from my_first_table where id =2;
開發人員能夠經過更改表的屬性來更改 Impala 與給定 Kudu 表相關的元數據。這些屬性包括表名, Kudu 主地址列表,以及表是否由 Impala (內部)或外部管理。
ALTER TABLE PERSON RENAME TO person_temp;
建立內部表:
CREATE TABLE kudu_student |
若是表是內部表,則能夠經過更改 kudu.table_name 屬性重命名底層的 Kudu 表
ALTER TABLE kudu_student SET TBLPROPERTIES('kudu.table_name' = 'new_student'); |
效果圖
若是用戶在使用過程當中發現其餘應用程序從新命名了kudu表,那麼此時的外部表須要從新映射到kudu上
建立一個外部表:
CREATE EXTERNAL TABLE external_table |
從新映射外部表,指向不一樣的kudu表:
ALTER TABLE external_table |
上面的操做是:將external_table映射的PERSON表從新指向hashTable表
ALTER TABLE my_table SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-new-master.example.com:7051'); |
ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE'); |
對於impala而言,開發人員是能夠經過JDBC鏈接impala的,有了JDBC,開發人員能夠經過impala來間接操做 kudu;
<!--impala的jdbc操做--> <dependency> <groupId>com.cloudera</groupId> <artifactId>ImpalaJDBC41</artifactId> <version>2.5.42</version> </dependency>
<!--Caused by : ClassNotFound : thrift.protocol.TPro--> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libfb303</artifactId> <version>0.9.3</version> <type>pom</type> </dependency>
<!--Caused by : ClassNotFound : thrift.protocol.TPro--> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.3</version> <type>pom</type> </dependency>
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <exclusions> <exclusion> <groupId>org.apache.hive</groupId> <artifactId>hive-service-rpc</artifactId> </exclusion> <exclusion> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> </exclusion> </exclusions> <version>1.1.0</version> </dependency>
<!--導入hive--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>1.1.0</version> </dependency> |
|
使用JDBC鏈接impala操做kudu,與JDBC鏈接mysql作更重增刪改查基本同樣
package cn.itcast.impala.impala;
public class Person { private int companyId; private int workId; private String name; private String gender; private String photo;
public Person(int companyId, int workId, String name, String gender, String photo) { this.companyId = companyId; this.workId = workId; this.name = name; this.gender = gender; this.photo = photo; }
public int getCompanyId() { return companyId; }
public void setCompanyId(int companyId) { this.companyId = companyId; }
public int getWorkId() { return workId; }
public void setWorkId(int workId) { this.workId = workId; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getGender() { return gender; }
public void setGender(String gender) { this.gender = gender; }
public String getPhoto() { return photo; }
public void setPhoto(String photo) { this.photo = photo; }
} |
package cn.itcast.impala.impala;
import java.sql.*;
public class Contants { private static String JDBC_DRIVER="com.cloudera.impala.jdbc41.Driver"; private static String CONNECTION_URL="jdbc:impala://node1:21050/default;auth=noSasl"; //定義數據庫鏈接 static Connection conn=null; //定義PreparedStatement對象 static PreparedStatement ps=null; //定義查詢的結果集 static ResultSet rs= null;
//數據庫鏈接 public static Connection getConn(){ try { Class.forName(JDBC_DRIVER); conn=DriverManager.getConnection(CONNECTION_URL); } catch (Exception e) { e.printStackTrace(); }
return conn; }
//建立一個表 public static void createTable(){ conn=getConn(); String sql="CREATE TABLE impala_kudu_test" + "(" + "companyId BIGINT," + "workId BIGINT," + "name STRING," + "gender STRING," + "photo STRING," + "PRIMARY KEY(companyId)" + ")" + "PARTITION BY HASH PARTITIONS 16 " + "STORED AS KUDU " + "TBLPROPERTIES (" + "'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051'," + "'kudu.table_name' = 'impala_kudu_test'" + ");";
try { ps = conn.prepareStatement(sql); ps.execute(); } catch (SQLException e) { e.printStackTrace(); } }
//查詢數據 public static ResultSet queryRows(){ try { //定義執行的sql語句 String sql="select * from impala_kudu_test"; ps = getConn().prepareStatement(sql); rs= ps.executeQuery(); } catch (SQLException e) { e.printStackTrace(); }
return rs; }
//打印結果 public static void printRows(ResultSet rs){ /** private int companyId; private int workId; private String name; private String gender; private String photo; */
try { while (rs.next()){ //獲取表的每一行字段信息 int companyId = rs.getInt("companyId"); int workId = rs.getInt("workId"); String name = rs.getString("name"); String gender = rs.getString("gender"); String photo = rs.getString("photo"); System.out.print("companyId:"+companyId+" "); System.out.print("workId:"+workId+" "); System.out.print("name:"+name+" "); System.out.print("gender:"+gender+" "); System.out.println("photo:"+photo);
} } catch (SQLException e) { e.printStackTrace(); }finally { if(ps!=null){ try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } }
if(conn !=null){ try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } }
//插入數據 public static void insertRows(Person person){ conn=getConn(); String sql="insert into table impala_kudu_test(companyId,workId,name,gender,photo) values(?,?,?,?,?)";
try { ps=conn.prepareStatement(sql); //給佔位符?賦值 ps.setInt(1,person.getCompanyId()); ps.setInt(2,person.getWorkId()); ps.setString(3,person.getName()); ps.setString(4,person.getGender()); ps.setString(5,person.getPhoto()); ps.execute();
} catch (SQLException e) { e.printStackTrace(); }finally { if(ps !=null){ try { //關閉 ps.close(); } catch (SQLException e) { e.printStackTrace(); } }
if(conn !=null){ try { //關閉 conn.close(); } catch (SQLException e) { e.printStackTrace(); } } }
}
//更新數據 public static void updateRows(Person person){ //定義執行的sql語句 String sql="update impala_kudu_test set workId="+person.getWorkId()+ ",name='"+person.getName()+"' ,"+"gender='"+person.getGender()+"' ,"+ "photo='"+person.getPhoto()+"' where companyId="+person.getCompanyId();
try { ps= getConn().prepareStatement(sql); ps.execute(); } catch (SQLException e) { e.printStackTrace(); }finally { if(ps !=null){ try { //關閉 ps.close(); } catch (SQLException e) { e.printStackTrace(); } }
if(conn !=null){ try { //關閉 conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } }
//刪除數據 public static void deleteRows(int companyId){
//定義sql語句 String sql="delete from impala_kudu_test where companyId="+companyId; try { ps =getConn().prepareStatement(sql); ps.execute(); } catch (SQLException e) { e.printStackTrace();
}
}
//刪除表 public static void dropTable() { String sql="drop table if exists impala_kudu_test"; try { ps =getConn().prepareStatement(sql); ps.execute(); } catch (SQLException e) { e.printStackTrace(); } }
} |
package cn.itcast.impala.impala;
import java.sql.Connection;
public class ImpalaJdbcClient { public static void main(String[] args) { Connection conn = Contants.getConn();
//建立一個表 Contants.createTable();
//插入數據 Contants.insertRows(new Person(1,100,"lisi","male","lisi-photo"));
//查詢表的數據 ResultSet rs = Contants.queryRows(); Contants.printRows(rs);
//更新數據 Contants.updateRows(new Person(1,200,"zhangsan","male","zhangsan-photo"));
//刪除數據 Contants.deleteRows(1);
//刪除表 Contants.dropTable();
}
} |
Kudu設計是面向結構化存儲的,所以,Kudu的表須要用戶在建表時定義它的Schema信息,這些Schema信息包含:列定義(含類型),Primary Key定義(用戶指定的若干個列的有序組合)。數據的惟一性,依賴於用戶所提供的Primary Key中的Column組合的值的惟一性。 Kudu提供了Alter命令來增刪列,但位於Primary Key中的列是不容許刪除的。 Kudu當前並不支持二級索引。 從用戶角度來看,Kudu是一種存儲結構化數據表的存儲系統。在一個Kudu集羣中能夠定義任意數量的table,每一個table都須要預先定義好schema。每一個table的列數是肯定的,每一列都須要有名字和類型,每一個表中能夠把其中一列或多列定義爲主鍵。這麼看來,Kudu更像關係型數據庫,而不是像HBase、Cassandra和MongoDB這些NoSQL數據庫。不過Kudu目前還不能像關係型數據同樣支持二級索引。
Kudu使用肯定的列類型,而不是相似於NoSQL的"everything is byte"。這能夠帶來兩點好處: 肯定的列類型使Kudu能夠進行類型特有的編碼。 能夠提供 SQL-like 元數據給其餘上層查詢工具,好比BI工具。
Kudu的底層數據文件的存儲,未採用HDFS這樣的較高抽象層次的分佈式文件系統,而是自行開發了一套可基於
Table/Tablet/Replica視圖級別的底層存儲系統。
這套實現基於以下的幾個設計目標:
• 可提供快速的列式查詢
• 可支持快速的隨機更新
• 可提供更爲穩定的查詢性能保障
一張表會分紅若干個tablet,每一個tablet包括MetaData元信息及若干個RowSet,RowSet包含一個MemRowSet及若干個DiskRowSet,DiskRowSet中包含一個BloomFile、Ad_hoc Index、BaseData、DeltaMem及若干個RedoFile和UndoFile(UndoFile通常狀況下只有一個)。
MemRowSet:用於新數據insert及已在MemRowSet中的數據的更新,一個MemRowSet寫滿後會將數據刷到磁盤造成若干個DiskRowSet。每次到達32M生成一個DiskRowSet。
DiskRowSet:用於老數據的變動(mutation),後臺按期對DiskRowSet作compaction,以刪除沒用的數據及合併歷史數據,減小查詢過程當中的IO開銷。
BloomFile:根據一個DiskRowSet中的key生成一個bloom filter,用於快速模糊定位某個key是否在DiskRowSet中存在。
Ad_hocIndex:是主鍵的索引,用於定位key在DiskRowSet中的具體哪一個偏移位置。
BaseData是MemRowSet flush下來的數據,按列存儲,按主鍵有序。
UndoFile是基於BaseData以前時間的歷史數據,經過在BaseData上apply UndoFile中的記錄,能夠得到歷史數據。
RedoFile是基於BaseData以後時間的變動(mutation)記錄,經過在BaseData上apply RedoFile中的記錄,可得到較新的數據。
DeltaMem用於DiskRowSet中數據的變動mutation,先寫到內存中,寫滿後flush到磁盤造成RedoFile。
MemRowSets能夠對比理解成HBase中的MemStore, 而DiskRowSets可理解成HBase中的HFile。MemRowSets中的數據按照行試圖進行存儲,數據結構爲B-Tree。
MemRowSets中的數據被Flush到磁盤以後,造成DiskRowSets。
DisRowSets中的數據,按照32MB大小爲單位,按序劃分爲一個個的DiskRowSet。 DiskRowSet中的數據按照Column進行組織,與Parquet相似。
這是Kudu可支持一些分析性查詢的基礎。每個Column的數據被存儲在一個相鄰的數據區域,而這個數據區域進一步被細分紅一個個的小的Page單元,與HBase File中的Block相似,對每個Column Page可採用一些Encoding算法,以及一些通用的Compression算法。 既然可對Column Page可採用Encoding以及Compression算法,那麼,對單條記錄的更改就會比較困難了。
前面提到了Kudu可支持單條記錄級別的更新/刪除,是如何作到的?
與HBase相似,也是經過增長一條新的記錄來描述此次更新/刪除操做的。DiskRowSet是不可修改了,那麼 KUDU 要如何應對數據的更新呢?在KUDU中,把DiskRowSet分爲了兩部分:base data、delta stores。base data 負責存儲基礎數據,delta stores負責存儲 base data 中的變動數據.
如上圖所示,數據從 MemRowSet 刷到磁盤後就造成了一份 DiskRowSet(只包含 base data),每份 DiskRowSet 在內存中都會有一個對應的DeltaMemStore,負責記錄此 DiskRowSet 後續的數據變動(更新、刪除)。DeltaMemStore 內部維護一個 B-樹索引,映射到每一個 row_offset 對應的數據變動。DeltaMemStore 數據增加到必定程度後轉化成二進制文件存儲到磁盤,造成一個 DeltaFile,隨着 base data 對應數據的不斷變動,DeltaFile 逐漸增加。
當建立Kudu客戶端時,其會從主服務器上獲取tablet位置信息,而後直接與服務於該tablet的服務器進行交談。
爲了優化讀取和寫入路徑,客戶端將保留該信息的本地緩存,以防止他們在每一個請求時須要查詢主機的tablet位置信息。
隨着時間的推移,客戶端的緩存可能會變得過期,而且當寫入被髮送到再也不是tablet領導者的tablet服務器時,則將被拒絕。而後客戶端將經過查詢主服務器發現新領導者的位置來更新其緩存。
如上圖,當 Client 請求寫數據時,先根據主鍵從Master Server中獲取要訪問的目標 Tablets,而後到依次對應的Tablet獲取數據。
由於KUDU表存在主鍵約束,因此須要進行主鍵是否已經存在的判斷,這裏就涉及到以前說的索引結構對讀寫的優化了。一個Tablet中存在不少個RowSets,爲了提高性能,咱們要儘量地減小要掃描的RowSets數量。
首先,咱們先經過每一個 RowSet 中記錄的主鍵的(最大最小)範圍,過濾掉一批不存在目標主鍵的RowSets,而後在根據RowSet中的布隆過濾器,過濾掉肯定不存在目標主鍵的 RowSets,最後再經過RowSets中的 B-樹索引,精肯定位目標主鍵是否存在。
若是主鍵已經存在,則報錯(主鍵重複),不然就進行寫數據(寫 MemRowSet)。
如上圖,數據讀取過程大體以下:先根據要掃描數據的主鍵範圍,定位到目標的Tablets,而後讀取Tablets 中的RowSets。
在讀取每一個RowSet時,先根據主鍵過濾要scan範圍,而後加載範圍內的base data,再找到對應的delta stores,應用全部變動,最後union上MemRowSet中的內容,返回數據給Client。
數據更新的核心是定位到待更新數據的位置,這塊與寫入的時候相似,就不展開了,等定位到具體位置後,而後將變動寫到對應的delta store 中。