Hadoop中有不少組件,爲了實現複雜的功能一般都是使用混合架構,html
其實跟Hbase是有點像的java
1:支持主鍵(相似 關係型數據庫)
2:支持事務操做,可對數據增刪改查數據
3:支持各類數據類型
4:支持 alter table。可刪除列(非主鍵)
5:支持 INSERT, UPDATE, DELETE, UPSERT
6:支持Hash,Range分區
進入Impala-shell -i node1ip
具體的CURD語法能夠查詢官方文檔,我就不一一列了
http://kudu.apache.org/docs/kudu_impala_integration.html
建表
Create table kudu_table (Id string,Namestring,Age int,
Primary key(id,name)
)partition by hash partitions 16
Stored as kudu;
插入數據
Insert into kudu_table
Select * from impala_table;
注意
以上的sql語句都是在impala裏面執行的。Kudu和hbase同樣都是nosql查詢的,Kudu自己只提供api。impala集成了kudu。
node
奉上個人Git地址:
https://github.com/LinMingQiang/spark-util/tree/spark-kudumysql
pom.xmlgit
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>1.1.0</version> <exclusions> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.kududb</groupId> <artifactId>kudu-spark_2.10</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-mapreduce</artifactId> <version>1.3.1</version> <exclusions> <exclusion> <artifactId>jsp-api</artifactId> <groupId>javax.servlet.jsp</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions>
val client = new KuduClientBuilder("master2").build() val table = client.openTable("impala::default.kudu_pc_log") client.getTablesList.getTablesList.foreach { println } val schema = table.getSchema(); val kp = KuduPredicate.newComparisonPredicate(schema.getColumn("id"), KuduPredicate.ComparisonOp.EQUAL, "1") val scanner = client.newScanTokenBuilder(table) .addPredicate(kp) .limit(100) .build() val token = scanner.get(0) val scan = KuduScanToken.deserializeIntoScanner(token.serialize(), client) while (scan.hasMoreRows()) { val results = scan.nextRows() while (results.hasNext()) { val rowresult = results.next(); println(rowresult.getString("id")) } }
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Test")) val sparksql = new SQLContext(sc) import sparksql.implicits._ val a = new KuduContext(kuduMaster, sc) def getKuduRDD() { val tableName = "impala::default.kudu_pc_log" val columnProjection = Seq("id", "name") val kp = KuduPredicate.newComparisonPredicate(new ColumnSchemaBuilder("id", Type.STRING).build(), KuduPredicate.ComparisonOp.EQUAL, "q") val df = a.kuduRDD(sc, tableName, columnProjection,Array(kp)) df.foreach { x => println(x.mkString(",")) } } def writetoKudu() { val tableName = "impala::default.student" val rdd = sc.parallelize(Array("k", "b", "a")).map { n => STU(n.hashCode, n) } val data = rdd.toDF() a.insertRows(data, tableName) } case class STU(id: Int, name: String)