042 SparlSQL中的read與write編程模式的原理,以及Hbase的實現

  這個地方理解的不透徹,暫時寫的也不透徹。sql

  雖然如今Spark不支持Hbase,須要本身寫,可是我認爲之後spark在發展中會支持,這個文檔,就當在提升本身了。apache

1.說明app

  dataFrame保存數據與讀取讀取數據最重要的是load與save方法。ide

  經過源碼,能夠發現,他們中有不少類似的地方,這裏以load爲例,進行源碼的分析。oop

 

一:load測試

1.loadui

  能夠發現,load主幹只有兩行。this

  因此重點就在這裏。spa

  

 

2.進入ResolvedDataSource方法scala

  進入的是apply。

·  將ResolvedDataSource方法打開,這裏是須要的全部參數,須要解釋的參數是provider,能夠發現來源是source,其實source來源於format中的參數,就是格式。

  

 

3.apply方法的主要構架

  

 

4.先看clazz怎麼產生

  

  

 

5.dataSource的類型

  

 

6.說明的問題

  說明dataSource須要繼承RelationProvider

  可是,這樣並無完成,還須要看一下save的方法中的dataSource,我思考了一下,是須要看save的。

 

7.save中的apply

  

 

8.總結

  DefaultSource extends RelationProvider with CreatableRelationProvider

 

二:Hbase中的read與writer的實現

  這個部分的程序,不是我本身寫的,複製別人的代碼。

1.DefaultDataSource

 1 package com.ibeifeng.spark.sql.hbase  2 
 3 import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}  4 import org.apache.hadoop.hbase.client.{HBaseAdmin, Put}  5 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  6 import org.apache.hadoop.hbase.util.Bytes  7 import org.apache.hadoop.mapreduce.{MRJobConfig, OutputFormat}  8 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}  9 import org.apache.spark.sql.sources._  10 import org.apache.spark.sql.types._  11 
 12 import scala.collection.mutable.ArrayBuffer  13 
 14 /**
 15  * Created by ibf on 02/16.  16   */
 17 class DefaultSource  18   extends RelationProvider with CreatableRelationProvider {  19   /**
 20  * 建立獲取數據時候構建的Relation  21  *  22  * @param sqlContext  23  * @param parameters  24  * @return
 25     */
 26  override def createRelation(  27  sqlContext: SQLContext,  28  parameters: Map[String, String]  29                              ): BaseRelation = {  30  HBaseRelation(parameters)(sqlContext)  31  }  32 
 33   /**
 34  * 建立保存數據時候的Relation  35  *  36  * @param sqlContext  37  * @param mode  38  * @param parameters  39  * @param data  40  * @return
 41     */
 42  override def createRelation(  43  sqlContext: SQLContext,  44  mode: SaveMode,  45  parameters: Map[String, String],  46                                data: DataFrame): BaseRelation = {  47  HBaseConfiguration.merge(sqlContext.sparkContext.hadoopConfiguration, HBaseConfiguration.create(sqlContext.sparkContext.hadoopConfiguration))  48     val hbaseConf = sqlContext.sparkContext.hadoopConfiguration  49     val hbaseAdmin = new HBaseAdmin(hbaseConf)  50     try {  51       val hbaseTableName = parameters.getOrElse("hbase_table_name", sys.error("not valid schema"))  52       val family = parameters.getOrElse("hbase_table_family", sys.error("not valid schema"))  53       // 獲取schema
 54       val familyAndSchema = if (data.schema.map(_.name.toLowerCase).contains("row_key")) {  55  sqlContext.sparkContext.broadcast((Bytes.toBytes(family), data.schema))  56       } else {  57         sys.error("no row_key column!!!")  58  }  59       // 過濾schema
 60  familyAndSchema.value._2.foreach {  61         case StructField(_, dataType, _, _) => {  62  dataType match {  63             case dt: StringType =>
 64             case dt: DoubleType =>
 65             case dt: FloatType =>
 66             case dt: IntegerType =>
 67             case dt: LongType =>
 68             case dt: ShortType =>
 69             case _ => sys.error(s"Can't support those data type of ${dataType}")  70  }  71  }  72  }  73 
 74       val doSave = if (hbaseAdmin.tableExists(hbaseTableName)) {  75  mode match {  76           case SaveMode.Append => true
 77           case SaveMode.ErrorIfExists => sys.error(s"hbase table '$hbaseTableName' already exists.")  78           case SaveMode.Ignore => false
 79           case SaveMode.Overwrite => {  80             try
 81               // 將表設置爲disable
 82               if (hbaseAdmin.isTableEnabled(hbaseTableName)) {  83  hbaseAdmin.disableTable(hbaseTableName)  84  }  85             catch {  86               case _: Exception => // nothings
 87  }  88             // 刪除表
 89  hbaseAdmin.deleteTable(hbaseTableName)  90             true
 91  }  92  }  93       } else {  94         true
 95  }  96       if (doSave) {  97         // 保存數據
 98  hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName)  99  hbaseConf.setClass(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, classOf[TableOutputFormat[Any]], classOf[OutputFormat[Any, Any]]) 100 
101         // 建立表,若是表不存在
102         if (!hbaseAdmin.tableExists(hbaseTableName)) { 103           // 表不存在,建立一個表
104           val desc = new HTableDescriptor(TableName.valueOf(hbaseTableName)) 105           desc.addFamily(new HColumnDescriptor(family)) 106  hbaseAdmin.createTable(desc) 107  } 108 
109         data.rdd.map(row => { 110           val family = familyAndSchema.value._1 111           val schema = familyAndSchema.value._2 112 
113           val buffers = schema.map { 114             case StructField(name, dataType, nullable, _) => { 115               val isRowKey = "row_key".equalsIgnoreCase(name) 116               val value = dataType match { 117                 case dt: StringType => Bytes.toBytes(row.getAs[String](name)) 118                 case dt: DoubleType => Bytes.toBytes(row.getAs[Double](name)) 119                 case dt: FloatType => Bytes.toBytes(row.getAs[Float](name)) 120                 case dt: IntegerType => Bytes.toBytes(row.getAs[Integer](name)) 121                 case dt: LongType => Bytes.toBytes(row.getAs[Long](name)) 122                 case dt: ShortType => Bytes.toBytes(row.getAs[Short](name)) 123                 case _ => sys.error(s"can't support those data type of ${dataType}") 124  } 125               // 返回結果
126               (if (isRowKey) "row_key" else name, value) 127  } 128  } 129 
130           // 構建Put對象
131           val rowKey = buffers.toMap.getOrElse("row_key", sys.error("")) 132           val put = buffers.filter(!_._1.equals("row_key")).foldLeft(new Put(rowKey))((put, b) => { 133  put.add(family, Bytes.toBytes(b._1), b._2) 134  put 135  }) 136           // 返回對象
137           (null, put) 138  }).saveAsNewAPIHadoopDataset(hbaseConf) 139  } 140     } finally { 141       if (hbaseAdmin != null) hbaseAdmin.close() 142  } 143 
144     // 返回結果
145  HBaseRelation(parameters)(sqlContext) 146  } 147 }

 

2.HbaseRelation

 1 package com.ibeifeng.spark.sql.hbase  2 
 3 import org.apache.hadoop.hbase.HBaseConfiguration  4 import org.apache.hadoop.hbase.client.Result  5 import org.apache.hadoop.hbase.mapreduce.TableInputFormat  6 import org.apache.hadoop.hbase.util.Bytes  7 import org.apache.spark.rdd.RDD  8 import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}  9 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, InsertableRelation, TableScan}  10 import org.apache.spark.sql.types._  11 
 12 import scala.collection.mutable.ArrayBuffer  13 
 14 /**
 15  * Created by ibf on 02/16.  16   */
 17 case class HBaseRelation(@transient val parameters: Map[String, String])  18                         (@transient val sqlContext: SQLContext)  19   extends BaseRelation with Serializable with TableScan {  20 
 21   private case class SchemaType(dataType: DataType, nullable: Boolean)  22 
 23   private lazy val (hbaseTableName, hbaseTableFields, fieldsRelations, queryColumns, startRowKey, endRowKey) =
 24     try {  25       val hbaseTableName = parameters.getOrElse("hbase_table_name", sys.error("not valid schema"))  26       val hbaseTableSchema = parameters.getOrElse("hbase_table_schema", sys.error("not valid schema"))  27       val registerTableSchema = parameters.getOrElse("sparksql_table_schema", sys.error("not valid schema"))  28       val rowRange = parameters.getOrElse("row_range", "->")  29       //get star row and end row
 30       val range = rowRange.split("->", -1)  31       val startRowKey = range(0).trim  32       val endRowKey = range(1).trim  33 
 34       val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
 35       val registerTableFields = extractRegisterSchema(registerTableSchema)  36       val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields, registerTableFields)  37 
 38       val hbaseTableFields = feedTypes(tempFieldRelation)  39       val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields, registerTableFields)  40       val queryColumns = getQueryTargetCloumns(hbaseTableFields)  41 
 42  (hbaseTableName, hbaseTableFields, fieldsRelations, queryColumns, startRowKey, endRowKey)  43     } catch {  44       case e: RuntimeException => throw e  45       case e: Exception => sys.error(s"初始化HBaseRelation發生異常, 異常信息爲:${e.getMessage}")  46  }  47 
 48   def feedTypes(mapping: Map[HBaseSchemaField, RegisteredSchemaField]): Array[HBaseSchemaField] = {  49     val hbaseFields = mapping.map {  50       case (k, v) =>
 51         val field = k.copy(fieldType = v.fieldType)  52  field  53  }  54  hbaseFields.toArray  55  }  56 
 57   def isRowKey(field: HBaseSchemaField): Boolean = {  58     val cfColArray = field.fieldName.split(":", -1)  59     val cfName = cfColArray(0)  60     val colName = cfColArray(1)  61     if (cfName == "" && colName == "key") true else false
 62  }  63 
 64   def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {  65     var str = ArrayBuffer[String]()  66     hbaseTableFields.foreach { field =>
 67       if (!isRowKey(field)) {  68         str += field.fieldName  69  }  70  }  71     str.mkString(" ")  72  }  73 
 74   def tableSchemaFieldMapping(externalHBaseTable: Array[HBaseSchemaField], registerTable: Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {  75     if (externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")  76     val rs = externalHBaseTable.zip(registerTable)  77  rs.toMap  78  }  79 
 80   def extractRegisterSchema(registerTableSchema: String): Array[RegisteredSchemaField] = {  81     val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)  82     val fieldsArray = fieldsStr.split(",").map(_.trim)  83     fieldsArray.map { fildString =>
 84       val splitedField = fildString.split("\\s+", -1)  85       RegisteredSchemaField(splitedField(0), splitedField(1))  86  }  87  }  88 
 89   def extractHBaseSchema(externalTableSchema: String): Array[HBaseSchemaField] = {  90     val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)  91     val fieldsArray = fieldsStr.split(",").map(_.trim)  92     fieldsArray.map(fildString => HBaseSchemaField(fildString, ""))  93  }  94 
 95   override lazy val schema: StructType = {  96     val fields = hbaseTableFields.map { field =>
 97       val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName  98       val relatedType = field.fieldType.toLowerCase match {  99         case "string" =>
100           SchemaType(StringType, nullable = false) 101         case "int" =>
102           SchemaType(IntegerType, nullable = false) 103         case "long" =>
104           SchemaType(LongType, nullable = false) 105         case e =>
106           sys.error(s"Can't support those field type of ${e}") 107  } 108  StructField(name, relatedType.dataType, relatedType.nullable) 109  } 110  StructType(fields) 111  } 112 
113   override lazy val buildScan: RDD[Row] = { 114     val hbaseConf = HBaseConfiguration.create(sqlContext.sparkContext.hadoopConfiguration) 115  hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName) 116  hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns); 117  hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey); 118  hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey); 119     hbaseConf.set("hbase.zookeeper.quorum", "hadoop-senior01:2181") 120 
121     val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD( 122  hbaseConf, 123  classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], 124  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 125  classOf[org.apache.hadoop.hbase.client.Result] 126  ) 127 
128 
129     val rs = hbaseRdd.map(tuple => tuple._2).map(result => { 130       var values = new ArrayBuffer[Any]() 131       hbaseTableFields.foreach { field =>
132         values += Resolver.resolve(field, result) 133  } 134  Row.fromSeq(values.toSeq) 135  }) 136 
137  rs 138  } 139 } 140 
141 object Resolver extends Serializable { 142 
143   def resolve(hbaseField: HBaseSchemaField, result: Result): Any = { 144     val cfColArray = hbaseField.fieldName.split(":", -1) 145     val cfName = cfColArray(0) 146     val colName = cfColArray(1) 147     var fieldRs: Any = null
148     //resolve row key otherwise resolve column
149     if (cfName == "" && colName == "key") { 150       fieldRs = resolveRowKey(result, hbaseField.fieldType) 151     } else { 152       fieldRs = resolveColumn(result, cfName, colName, hbaseField.fieldType) 153  } 154  fieldRs 155  } 156 
157   private def resolveBytes(bytes: Array[Byte], isFloatType: Boolean = false): Any = { 158     val length = bytes.length 159     try { 160  length match { 161         case 2 => Bytes.toShort(bytes) 162         case 4 => { 163           if (isFloatType) Bytes.toFloat(bytes) 164           else Bytes.toInt(bytes).toString 165  } 166         case 8 => { 167           if (isFloatType) Bytes.toDouble(bytes) 168           else Bytes.toLong(bytes) 169  } 170         case _ => Bytes.toString(bytes) 171  } 172     } catch { 173       case _: Exception => Bytes.toString(bytes) 174  } 175  } 176 
177   private def resolveRowKey(result: Result, resultType: String): Any = { 178     val rowkey = resultType match { 179       case "string" =>
180  resolveBytes(result.getRow) 181       case "int" =>
182  resolveBytes(result.getRow) 183       case "long" =>
184  resolveBytes(result.getRow) 185       case "float" =>
186         resolveBytes(result.getRow, true) 187       case "double" =>
188         resolveBytes(result.getRow, true) 189  } 190     println(s"rowkey->${rowkey}") 191  rowkey 192  } 193 
194   private def resolveColumn(result: Result, columnFamily: String, columnName: String, resultType: String): Any = { 195     val column = resultType match { 196       case "string" =>
197  resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes)) 198       case "int" =>
199  resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes)) 200       case "long" =>
201  resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes)) 202       case "float" =>
203         resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes), true) 204       case "double" =>
205         resolveBytes(result.getValue(columnFamily.getBytes, columnName.getBytes), true) 206  } 207  column 208  } 209 }

 

3.Package

  主要是常量

 1 package com.ibeifeng.spark.sql  2 
 3 import org.apache.spark.sql.SQLContext  4 
 5 import scala.collection.immutable.HashMap  6 
 7 /**
 8  * Created by ibf on 02/16.  9   */
10 package object hbase { 11 
12   abstract class SchemaField extends Serializable 13 
14   case class RegisteredSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable 15 
16   case class HBaseSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable 17 
18   case class Parameter(name: String) 19 
20   protected val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema") 21   protected val HBASE_TABLE_NAME = Parameter("hbase_table_name") 22   protected val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema") 23   protected val ROW_RANGE = Parameter("row_range") 24 
25   implicit class HBaseContext(sqlContext: SQLContext) { 26     def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = { 27       var params = new HashMap[String, String] 28       params += (SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema) 29       params += (HBASE_TABLE_NAME.name -> hbaseTableName) 30       params += (HBASE_TABLE_SCHEMA.name -> hbaseTableSchema) 31       //get star row and end row
32       params += (ROW_RANGE.name -> rowRange) 33  sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext)); 34  } 35  } 36 
37 }

 

4.運行測試程序

 1 package com.ibeifeng.senior.spark_sql_hbase  2 
 3 import org.apache.spark.sql.{SQLContext, SaveMode}  4 import org.apache.spark.{SparkConf, SparkContext}  5 
 6 /**
 7  * 讀寫HBase SparkSQL Demo案例  8  * Created by ibf on 02/16.  9   */
10 object ReaderAndWriterHbaseSparkSQLDemo { 11   def main(args: Array[String]): Unit = { 12     val conf = new SparkConf() 13       .setMaster("local[*]") 14       .setAppName("hbase-reader") 15       .set("spark.hadoop.hbase.zookeeper.quorum", "hadoop-senior01:2181") 16     val sc = SparkContext.getOrCreate(conf) 17     val sqlContext = new SQLContext(sc) 18 
19     // ==========================================
20  sqlContext 21  .read 22       .format("com.ibeifeng.spark.sql.hbase") 23       .option("sparksql_table_schema", "(rowkey string, ca string, cb string, cc string)") 24       .option("hbase_table_name", "test") 25       .option("hbase_table_schema", "(:key,info:a,info:b,info:c)") 26  .load() 27       .registerTempTable("t_abc") 28 
29     sqlContext.sql("select * from t_abc").show() 30 
31     sqlContext.sql("select * from t_abc").toDF("row_key", "v1", "v2", "v3") 32  .write 33  .mode(SaveMode.Overwrite) 34       .format("com.ibeifeng.spark.sql.hbase") 35       .option("hbase_table_name", "test2") 36       .option("hbase_table_family", "info") 37       .save() // 要求實現CreatableRelationProvider接口
38 
39  sqlContext.createDataFrame(Array( 40       (1, "gerry", "13166291750", 12345L), 41       (2, "Tom", "132521412", 12121L), 42       (3, "張三", "1232512542", 125215L), 43       (4, "李四", "1235215421", 12351L) 44     )).toDF("row_key", "name", "phone", "salary") 45  .write 46  .mode(SaveMode.Append) 47       .format("com.ibeifeng.spark.sql.hbase") 48       .option("hbase_table_name", "test3") 49       .option("hbase_table_family", "info") 50  .save() 51 
52  sqlContext 53  .read 54       .format("com.ibeifeng.spark.sql.hbase") 55       .option("sparksql_table_schema", "(id string, name string, phone string, salary long)") 56       .option("hbase_table_name", "test3") 57       .option("hbase_table_schema", "(:key,info:name,info:phone,info:salary)") 58  .load() 59  .show() 60  } 61 }
相關文章
相關標籤/搜索