這個地方理解的不透徹,暫時寫的也不透徹。sql
雖然如今Spark不支持Hbase,須要本身寫,可是我認爲之後spark在發展中會支持,這個文檔,就當在提升本身了。apache
1.說明app
dataFrame保存數據與讀取讀取數據最重要的是load與save方法。ide
經過源碼,能夠發現,他們中有不少類似的地方,這裏以load爲例,進行源碼的分析。oop
一:load測試
1.loadui
能夠發現,load主幹只有兩行。this
因此重點就在這裏。spa
2.進入ResolvedDataSource方法scala
進入的是apply。
· 將ResolvedDataSource方法打開,這裏是須要的全部參數,須要解釋的參數是provider,能夠發現來源是source,其實source來源於format中的參數,就是格式。
3.apply方法的主要構架
4.先看clazz怎麼產生
5.dataSource的類型
6.說明的問題
說明dataSource須要繼承RelationProvider
可是,這樣並無完成,還須要看一下save的方法中的dataSource,我思考了一下,是須要看save的。
7.save中的apply
8.總結
DefaultSource extends RelationProvider with CreatableRelationProvider
二:Hbase中的read與writer的實現
這個部分的程序,不是我本身寫的,複製別人的代碼。
1.DefaultDataSource
1 package com.ibeifeng.spark.sql.hbase 2 3 import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} 4 import org.apache.hadoop.hbase.client.{HBaseAdmin, Put} 5 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat 6 import org.apache.hadoop.hbase.util.Bytes 7 import org.apache.hadoop.mapreduce.{MRJobConfig, OutputFormat} 8 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} 9 import org.apache.spark.sql.sources._ 10 import org.apache.spark.sql.types._ 11 12 import scala.collection.mutable.ArrayBuffer 13 14 /** 15 * Created by ibf on 02/16. 16 */ 17 class DefaultSource 18 extends RelationProvider with CreatableRelationProvider { 19 /** 20 * 建立獲取數據時候構建的Relation 21 * 22 * @param sqlContext 23 * @param parameters 24 * @return 25 */ 26 override def createRelation( 27 sqlContext: SQLContext, 28 parameters: Map[String, String] 29 ): BaseRelation = { 30 HBaseRelation(parameters)(sqlContext) 31 } 32 33 /** 34 * 建立保存數據時候的Relation 35 * 36 * @param sqlContext 37 * @param mode 38 * @param parameters 39 * @param data 40 * @return 41 */ 42 override def createRelation( 43 sqlContext: SQLContext, 44 mode: SaveMode, 45 parameters: Map[String, String], 46 data: DataFrame): BaseRelation = { 47 HBaseConfiguration.merge(sqlContext.sparkContext.hadoopConfiguration, HBaseConfiguration.create(sqlContext.sparkContext.hadoopConfiguration)) 48 val hbaseConf = sqlContext.sparkContext.hadoopConfiguration 49 val hbaseAdmin = new HBaseAdmin(hbaseConf) 50 try { 51 val hbaseTableName = parameters.getOrElse("hbase_table_name", sys.error("not valid schema")) 52 val family = parameters.getOrElse("hbase_table_family", sys.error("not valid schema")) 53 // 獲取schema 54 val familyAndSchema = if (data.schema.map(_.name.toLowerCase).contains("row_key")) { 55 sqlContext.sparkContext.broadcast((Bytes.toBytes(family), data.schema)) 56 } else { 57 sys.error("no row_key column!!!") 58 } 59 // 過濾schema 60 familyAndSchema.value._2.foreach { 61 case StructField(_, dataType, _, _) => { 62 dataType match { 63 case dt: StringType => 64 case dt: DoubleType => 65 case dt: FloatType => 66 case dt: IntegerType => 67 case dt: LongType => 68 case dt: ShortType => 69 case _ => sys.error(s"Can't support those data type of ${dataType}") 70 } 71 } 72 } 73 74 val doSave = if (hbaseAdmin.tableExists(hbaseTableName)) { 75 mode match { 76 case SaveMode.Append => true 77 case SaveMode.ErrorIfExists => sys.error(s"hbase table '$hbaseTableName' already exists.") 78 case SaveMode.Ignore => false 79 case SaveMode.Overwrite => { 80 try 81 // 將表設置爲disable 82 if (hbaseAdmin.isTableEnabled(hbaseTableName)) { 83 hbaseAdmin.disableTable(hbaseTableName) 84 } 85 catch { 86 case _: Exception => // nothings 87 } 88 // 刪除表 89 hbaseAdmin.deleteTable(hbaseTableName) 90 true 91 } 92 } 93 } else { 94 true 95 } 96 if (doSave) { 97 // 保存數據 98 hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName) 99 hbaseConf.setClass(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, classOf[TableOutputFormat[Any]], classOf[OutputFormat[Any, Any]]) 100 101 // 建立表,若是表不存在 102 if (!hbaseAdmin.tableExists(hbaseTableName)) { 103 // 表不存在,建立一個表 104 val desc = new HTableDescriptor(TableName.valueOf(hbaseTableName)) 105 desc.addFamily(new HColumnDescriptor(family)) 106 hbaseAdmin.createTable(desc) 107 } 108 109 data.rdd.map(row => { 110 val family = familyAndSchema.value._1 111 val schema = familyAndSchema.value._2 112 113 val buffers = schema.map { 114 case StructField(name, dataType, nullable, _) => { 115 val isRowKey = "row_key".equalsIgnoreCase(name) 116 val value = dataType match { 117 case dt: StringType => Bytes.toBytes(row.getAs[String](name)) 118 case dt: DoubleType => Bytes.toBytes(row.getAs[Double](name)) 119 case dt: FloatType => Bytes.toBytes(row.getAs[Float](name)) 120 case dt: IntegerType => Bytes.toBytes(row.getAs[Integer](name)) 121 case dt: LongType => Bytes.toBytes(row.getAs[Long](name)) 122 case dt: ShortType => Bytes.toBytes(row.getAs[Short](name)) 123 case _ => sys.error(s"can't support those data type of ${dataType}") 124 } 125 // 返回結果 126 (if (isRowKey) "row_key" else name, value) 127 } 128 } 129 130 // 構建Put對象 131 val rowKey = buffers.toMap.getOrElse("row_key", sys.error("")) 132 val put = buffers.filter(!_._1.equals("row_key")).foldLeft(new Put(rowKey))((put, b) => { 133 put.add(family, Bytes.toBytes(b._1), b._2) 134 put 135 }) 136 // 返回對象 137 (null, put) 138 }).saveAsNewAPIHadoopDataset(hbaseConf) 139 } 140 } finally { 141 if (hbaseAdmin != null) hbaseAdmin.close() 142 } 143 144 // 返回結果 145 HBaseRelation(parameters)(sqlContext) 146 } 147 }
2.HbaseRelation
1 package com.ibeifeng.spark.sql.hbase 2 3 import org.apache.hadoop.hbase.HBaseConfiguration 4 import org.apache.hadoop.hbase.client.Result 5 import org.apache.hadoop.hbase.mapreduce.TableInputFormat 6 import org.apache.hadoop.hbase.util.Bytes 7 import org.apache.spark.rdd.RDD 8 import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} 9 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, InsertableRelation, TableScan} 10 import org.apache.spark.sql.types._ 11 12 import scala.collection.mutable.ArrayBuffer 13 14 /** 15 * Created by ibf on 02/16. 16 */ 17 case class HBaseRelation(@transient val parameters: Map[String, String]) 18 (@transient val sqlContext: SQLContext) 19 extends BaseRelation with Serializable with TableScan { 20 21 private case class SchemaType(dataType: DataType, nullable: Boolean) 22 23 private lazy val (hbaseTableName, hbaseTableFields, fieldsRelations, queryColumns, startRowKey, endRowKey) = 24 try { 25 val hbaseTableName = parameters.getOrElse("hbase_table_name", sys.error("not valid schema")) 26 val hbaseTableSchema = parameters.getOrElse("hbase_table_schema", sys.error("not valid schema")) 27 val registerTableSchema = parameters.getOrElse("sparksql_table_schema", sys.error("not valid schema")) 28 val rowRange = parameters.getOrElse("row_range", "->") 29 //get star row and end row 30 val range = rowRange.split("->", -1) 31 val startRowKey = range(0).trim 32 val endRowKey = range(1).trim 33 34 val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field 35 val registerTableFields = extractRegisterSchema(registerTableSchema) 36 val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields, registerTableFields) 37 38 val hbaseTableFields = feedTypes(tempFieldRelation) 39 val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields, registerTableFields) 40 val queryColumns = getQueryTargetCloumns(hbaseTableFields) 41 42 (hbaseTableName, hbaseTableFields, fieldsRelations, queryColumns, startRowKey, endRowKey) 43 } catch { 44 case e: RuntimeException => throw e 45 case e: Exception => sys.error(s"初始化HBaseRelation發生異常, 異常信息爲:${e.getMessage}") 46 } 47 48 def feedTypes(mapping: Map[HBaseSchemaField, RegisteredSchemaField]): Array[HBaseSchemaField] = { 49 val hbaseFields = mapping.map { 50 case (k, v) => 51 val field = k.copy(fieldType = v.fieldType) 52 field 53 } 54 hbaseFields.toArray 55 } 56 57 def isRowKey(field: HBaseSchemaField): Boolean = { 58 val cfColArray = field.fieldName.split(":", -1) 59 val cfName = cfColArray(0) 60 val colName = cfColArray(1) 61 if (cfName == "" && colName == "key") true else false 62 } 63 64 def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = { 65 var str = ArrayBuffer[String]() 66 hbaseTableFields.foreach { field => 67 if (!isRowKey(field)) { 68 str += field.fieldName 69 } 70 } 71 str.mkString(" ") 72 } 73 74 def tableSchemaFieldMapping(externalHBaseTable: Array[HBaseSchemaField], registerTable: Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = { 75 if (externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!") 76 val rs = externalHBaseTable.zip(registerTable) 77 rs.toMap 78 } 79 80 def extractRegisterSchema(registerTableSchema: String): Array[RegisteredSchemaField] = { 81 val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1) 82 val fieldsArray = fieldsStr.split(",").map(_.trim) 83 fieldsArray.map { fildString => 84 val splitedField = fildString.split("\\s+", -1) 85 RegisteredSchemaField(splitedField(0), splitedField(1)) 86 } 87 } 88 89 def extractHBaseSchema(externalTableSchema: String): Array[HBaseSchemaField] = { 90 val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1) 91 val fieldsArray = fieldsStr.split(",").map(_.trim) 92 fieldsArray.map(fildString => HBaseSchemaField(fildString, "")) 93 } 94 95 override lazy val schema: StructType = { 96 val fields = hbaseTableFields.map { field => 97 val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName 98 val relatedType = field.fieldType.toLowerCase match { 99 case "string" => 100 SchemaType(StringType, nullable = false) 101 case "int" => 102 SchemaType(IntegerType, nullable = false) 103 case "long" => 104 SchemaType(LongType, nullable = false) 105 case e => 106 sys.error(s"Can't support those field type of ${e}") 107 } 108 StructField(name, relatedType.dataType, relatedType.nullable) 109 } 110 StructType(fields) 111 } 112 113 override lazy val buildScan: RDD[Row] = { 114 val hbaseConf = HBaseConfiguration.create(sqlContext.sparkContext.hadoopConfiguration) 115 hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName) 116 hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns); 117 hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey); 118 hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey); 119 hbaseConf.set("hbase.zookeeper.quorum", "hadoop-senior01:2181") 120 121 val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD( 122 hbaseConf, 123 classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], 124 classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 125 classOf[org.apache.hadoop.hbase.client.Result] 126 ) 127 128 129 val rs = hbaseRdd.map(tuple => tuple._2).map(result => { 130 var values = new ArrayBuffer[Any]() 131 hbaseTableFields.foreach { field => 132 values += Resolver.resolve(field, result) 133 } 134 Row.fromSeq(values.toSeq) 135 }) 136 137 rs 138 } 139 } 140 141 object Resolver extends Serializable { 142 143 def resolve(hbaseField: HBaseSchemaField, result: Result): Any = { 144 val cfColArray = hbaseField.fieldName.split(":", -1) 145 val cfName = cfColArray(0) 146 val colName = cfColArray(1) 147 var fieldRs: Any = null 148 //resolve row key otherwise resolve column 149 if (cfName == "" && colName == "key") { 150 fieldRs = resolveRowKey(result, hbaseField.fieldType) 151 } else { 152 fieldRs = resolveColumn(result, cfName, colName, hbaseField.fieldType) 153 } 154 fieldRs 155 } 156 157 private def resolveBytes(bytes: Array[Byte], isFloatType: Boolean = false): Any = { 158 val length = bytes.length 159 try { 160 length match { 161 case 2 => Bytes.toShort(bytes) 162 case 4 => { 163 if (isFloatType) Bytes.toFloat(bytes) 164 else Bytes.toInt(bytes).toString 165 } 166 case 8 => { 167 if (isFloatType) Bytes.toDouble(bytes) 168 else Bytes.toLong(bytes) 169 } 170 case _ => Bytes.toString(bytes) 171 } 172 } catch { 173 case _: Exception => Bytes.toString(bytes) 174 } 175 } 176 177 private def resolveRowKey(result: Result, resultType: String): Any = { 178 val rowkey = resultType match { 179 case "string" => 180 resolveBytes(result.getRow) 181 case "int" => 182 resolveBytes(result.getRow) 183 case "long" => 184 resolveBytes(result.getRow) 185 case "float" => 186 resolveBytes(result.getRow, true) 187 case "double" => 188 resolveBytes(result.getRow, true) 189 } 190 println(s"rowkey->${rowkey}") 191 rowkey 192 } 193 194 private def resolveColumn(result: Result, columnFamily: String, columnName: String, resultType: String): Any = { 195 val column = resultType match { 196 case "string" => 197 resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes)) 198 case "int" => 199 resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes)) 200 case "long" => 201 resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes)) 202 case "float" => 203 resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes), true) 204 case "double" => 205 resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes), true) 206 } 207 column 208 } 209 }
3.Package
主要是常量
1 package com.ibeifeng.spark.sql 2 3 import org.apache.spark.sql.SQLContext 4 5 import scala.collection.immutable.HashMap 6 7 /** 8 * Created by ibf on 02/16. 9 */ 10 package object hbase { 11 12 abstract class SchemaField extends Serializable 13 14 case class RegisteredSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable 15 16 case class HBaseSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable 17 18 case class Parameter(name: String) 19 20 protected val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema") 21 protected val HBASE_TABLE_NAME = Parameter("hbase_table_name") 22 protected val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema") 23 protected val ROW_RANGE = Parameter("row_range") 24 25 implicit class HBaseContext(sqlContext: SQLContext) { 26 def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = { 27 var params = new HashMap[String, String] 28 params += (SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema) 29 params += (HBASE_TABLE_NAME.name -> hbaseTableName) 30 params += (HBASE_TABLE_SCHEMA.name -> hbaseTableSchema) 31 //get star row and end row 32 params += (ROW_RANGE.name -> rowRange) 33 sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext)); 34 } 35 } 36 37 }
4.運行測試程序
1 package com.ibeifeng.senior.spark_sql_hbase 2 3 import org.apache.spark.sql.{SQLContext, SaveMode} 4 import org.apache.spark.{SparkConf, SparkContext} 5 6 /** 7 * 讀寫HBase SparkSQL Demo案例 8 * Created by ibf on 02/16. 9 */ 10 object ReaderAndWriterHbaseSparkSQLDemo { 11 def main(args: Array[String]): Unit = { 12 val conf = new SparkConf() 13 .setMaster("local[*]") 14 .setAppName("hbase-reader") 15 .set("spark.hadoop.hbase.zookeeper.quorum", "hadoop-senior01:2181") 16 val sc = SparkContext.getOrCreate(conf) 17 val sqlContext = new SQLContext(sc) 18 19 // ========================================== 20 sqlContext 21 .read 22 .format("com.ibeifeng.spark.sql.hbase") 23 .option("sparksql_table_schema", "(rowkey string, ca string, cb string, cc string)") 24 .option("hbase_table_name", "test") 25 .option("hbase_table_schema", "(:key,info:a,info:b,info:c)") 26 .load() 27 .registerTempTable("t_abc") 28 29 sqlContext.sql("select * from t_abc").show() 30 31 sqlContext.sql("select * from t_abc").toDF("row_key", "v1", "v2", "v3") 32 .write 33 .mode(SaveMode.Overwrite) 34 .format("com.ibeifeng.spark.sql.hbase") 35 .option("hbase_table_name", "test2") 36 .option("hbase_table_family", "info") 37 .save() // 要求實現CreatableRelationProvider接口 38 39 sqlContext.createDataFrame(Array( 40 (1, "gerry", "13166291750", 12345L), 41 (2, "Tom", "132521412", 12121L), 42 (3, "張三", "1232512542", 125215L), 43 (4, "李四", "1235215421", 12351L) 44 )).toDF("row_key", "name", "phone", "salary") 45 .write 46 .mode(SaveMode.Append) 47 .format("com.ibeifeng.spark.sql.hbase") 48 .option("hbase_table_name", "test3") 49 .option("hbase_table_family", "info") 50 .save() 51 52 sqlContext 53 .read 54 .format("com.ibeifeng.spark.sql.hbase") 55 .option("sparksql_table_schema", "(id string, name string, phone string, salary long)") 56 .option("hbase_table_name", "test3") 57 .option("hbase_table_schema", "(:key,info:name,info:phone,info:salary)") 58 .load() 59 .show() 60 } 61 }