有人會問,爲啥要用這個叫啥Kudu的,Kudu是啥?html
就像官網所說,Kudu是一個針對Apache hadoop 平臺而開發的列式存儲管理器,在本菜鳥看來,它是一種介於hdfs與hbase的一種存儲。它的優點在於:sql
一、OLAP工做的快速處理,也就是針對於查詢,很快,很牛逼。數據庫
二、針對同時運行順序和隨機工做負載的狀況性能很好。編程
三、高可用,Table server和master使用Raft Consensus Algorithm節點來保證高可用,什麼是Raft Consunsus Algorith?參考:https://www.cnblogs.com/mindwind/p/5231986.html),只要有一半以上的副本可用,該tablet即可用於讀寫。api
四、結構化數據模型(能夠理解爲帶schema)。服務器
該圖顯示了一個具備三個 master 和多個 tablet server 的 Kudu 集羣,每一個服務器都支持多個 tablet。它說明了如何使用 Raft 共識來容許 master 和 tablet server 的 leader 和 f ollow。此外,tablet server 能夠成爲某些 tablet 的 leader,也能夠是其餘 tablet 的 follower。leader 以金色顯示,而 follower 則顯示爲藍色。oop
下面是一些基本概念:性能
Table(表)優化
一張 talbe 是數據存儲在 Kudu 的位置。表具備 schema 和全局有序的 primary key(主鍵)。table 被分紅稱爲 tablets 的 segments。ui
Tablet
一個 tablet 是一張 table 連續的 segment,與其它數據存儲引擎或關係型數據庫中的 partition(分區)類似。給定的 tablet 冗餘到多個 tablet 服務器上,而且在任何給定的時間點,其中一個副本被認爲是 leader tablet。任何副本均可以對讀取進行服務,而且寫入時須要在爲 tablet 服務的一組 tablet server之間達成一致性。
Tablet Server
一個 tablet server 存儲 tablet 和爲 tablet 向 client 提供服務。對於給定的 tablet,一個 tablet server 充當 leader,其餘 tablet server 充當該 tablet 的 follower 副本。只有 leader服務寫請求,然而 leader 或 followers 爲每一個服務提供讀請求。leader 使用 Raft Consunsus Algorithm來進行選舉 。一個 tablet server 能夠服務多個 tablets ,而且一個 tablet 能夠被多個 tablet servers 服務着。
具體我尚未那麼深刻,寫了些api調用玩了一把,下面慢慢講述,Kudu的API比較噁心的哈。。
kudu的sql語法與傳統的sql語法比較類似,但也不盡相同,直接解析時,具體sql語法請參考官網,下面以相似hive metastore表結構的方式封裝了下。如下列sql爲例:
create table combined_t6 (x int64, s string, s2 string, primary key (x, s))
partition by hash (x) partitions 10, range (x)
(
partition 0 <= values <= 49, partition 50 <= values <= 100
) REPLICAS 1
public Boolean create(Table table,String operator) { LOGGER.info("kudu Table properties:" + table.getKvInfos().toString()); List<ColumnSchema> columns = new ArrayList(table.getTableColumnList().size());
KuduTableGenerateUtil.generateKuduColumn(table.getTableColumnList(),columns); Schema schema = new Schema(columns); KuduPartitionSchema kuduPartitionSchema = KuduTableGenerateUtil.parserPartition(table); CreateTableOptions tableOptions = KuduTableGenerateUtil.generateKuduTableOptions(table,schema,kuduPartitionSchema); try { getKuduClient(table).createTable(table.getTableName(), schema,tableOptions); } catch (KuduException e) { throw new MetadataInvalidObjectException(e, " create kudu storage table error!!"); } return true; }
kudu的column屬性中,包含有primarfyKey、encoding、compression algorithm、null table 、default value 、block size等屬性,因此從上述代碼中須要先將kuduColumn進行封裝,構造ColumnSchema對象:
new ColumnSchema.ColumnSchemaBuilder(tableColumn.getColumnName(), getKuduColumnType(tableColumn.getDataType())) .key(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_PRIMARY_KEY))) .nullable(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_IS_NULLTABLE))) .defaultValue(defaultValue) .desiredBlockSize(getDesiredBlockSize(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_DESIRED_BLOCKSIZE))) .encoding(getColumnEncoding(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_ENCODING))) .compressionAlgorithm(getCompressionType(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_COMPRESSION_ALGORITHM))) .build();
對於column的數據類型,有不少種,以下:
private static Type getKuduColumnType(String dataType) { switch (dataType.toUpperCase()) { case "INT8": return Type.INT8; case "INT16": return Type.INT16; case "INT32": return Type.INT32; case "INT64": return Type.INT64; case "BINARY": return Type.BINARY; case "STRING": return Type.STRING; case "BOOL": return Type.BOOL; case "FLOAT": return Type.FLOAT; case "DOUBLE": return Type.DOUBLE; case "UNIXTIME_MICROS": return Type.UNIXTIME_MICROS; default: return Type.STRING; } }
壓縮方式包括:
public static CompressionAlgorithm getCompressionType(String compressionType) { if (StringUtils.isNotBlank(compressionType)) { switch (compressionType.toUpperCase()) { case "UNKNOWN": return CompressionAlgorithm.UNKNOWN; case "DEFAULT_COMPRESSION": return CompressionAlgorithm.DEFAULT_COMPRESSION; case "NO_COMPRESSION": return CompressionAlgorithm.NO_COMPRESSION; case "SNAPPY": return CompressionAlgorithm.SNAPPY; case "LZ4": return CompressionAlgorithm.LZ4; case "ZLIB": return CompressionAlgorithm.UNKNOWN.ZLIB; default: return null; } } return null; }
隨之咱們要構造,Kudu Partition,Kudu Partition包含兩種類型,一種是hashPartition,一種是rangePartition,其實從字面意思應該也可以想到,一種是用於對某個字段進行hash散列,一種是進行分區區間的設置,從而在查詢時達到優化的效果,這裏經過將sql解析後的轉換的KuduPartitionSchema對象分別進行range與hash partition的組裝,也就是將sql中 Partition表達式 partition 0 <= values <= 49, partition 50 <= values <= 100 封裝:
public static void generateHashPartition(CreateTableOptions tableOptions, List<HashPartitionSchema> hashPartitionSchemas) { if (null != hashPartitionSchemas && hashPartitionSchemas.size() != 0) {
hashPartitionSchemas.forEach(hashPartitionSchema ->{
tableOptions.addHashPartitions(hashPartitionSchema.getColumns(), hashPartitionSchema.getBucket());
});
}
}
public static void generateRangePartition(Schema schema, CreateTableOptions tableOptions, RangePartitionSchema rangePartitionSchema) { tableOptions.setRangePartitionColumns(rangePartitionSchema.getColumns()); List<RangeSplit> ranges = rangePartitionSchema.getRanges(); ranges.forEach(range -> { tableOptions.addRangePartition( getPartialRow( range.getLower(), schema, rangePartitionSchema.getColumns()), getPartialRow( range.getUpper(), schema, rangePartitionSchema.getColumns()), getRangePartitionBound( range.getLowerBoundType()), getRangePartitionBound( range.getUpperBoundType()) ); }); }
public static RangePartitionBound getRangePartitionBound(String boundType) { if (StringUtils.isNotBlank(boundType)) { switch (boundType) { case "EXCLUSIVE_BOUND": return RangePartitionBound.EXCLUSIVE_BOUND; case "INCLUSIVE_BOUND": return RangePartitionBound.INCLUSIVE_BOUND; default: return null; } } return null; }
最後構造,CreateTableOptions對象:
public static CreateTableOptions generateKuduTableOptions(Table table, Schema schema, KuduPartitionSchema kuduPartitionSchema) { CreateTableOptions tableOptions = new CreateTableOptions(); String numReplicas = table.getKvInfos().get(MetadataConfigKey.TABLE_KUDU_REPLICAS); if (StringUtils.isNotBlank(numReplicas)) { tableOptions.setNumReplicas(Integer.valueOf(numReplicas)); } if (kuduPartitionSchema.getHashPartitionSchemaList() != null && kuduPartitionSchema.getHashPartitionSchemaList().size() != 0) { generateHashPartition(tableOptions, kuduPartitionSchema.getHashPartitionSchemaList()); } if (kuduPartitionSchema.getRangePartitionSchema() != null) { generateRangePartition(schema, tableOptions, kuduPartitionSchema.getRangePartitionSchema()); } return tableOptions; }
沒有hbase編程便捷。。不過對於kudu的鏈接而言,只須要配置kudu master的地址,即可建立鏈接。
public KuduClient getKuduClient(Table table){ if(null == kuduClient){ try{ String kuduMaster = table.getStorageClusterKvs().get(MetadataConfigKey.CLUSTER_KUDU_MASTER); kuduClient = new KuduClient.KuduClientBuilder(kuduMaster).build(); }catch(Exception e){ throw new MetadataRuntimeException(e, " create kuduClient error!!"); } } return kuduClient; }
活兒幹不完啊~改天再深刻完 哈哈~