本身動手爲Spark 2.x添加ALTER TABLE ADD COLUMNS語法支持

SparkSQL從2.0開始已經再也不支持ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment], ...)這種語法了(下文簡稱add columns語法)。若是你的Spark項目中用到了SparkSQL+Hive這種模式,從Spark1.x升級到2.x頗有可能遇到這個問題。html

爲了解決這個問題,咱們通常有3種方案能夠選擇:git

  1. 啓動一個hiveserver2服務,經過jdbc直接調用hive,讓hive執行add columns語句。這種應該是改起來最爲方便的一種方式了,缺點就是,咱們還須要在啓動一個hiveserver服務,多一個服務依賴,會增長整個系統的維護成本。
  2. SparkSQL+Hive這種模式,要求咱們啓動一個HiveMetastore服務,給SparkSQL用,咱們也能夠在代碼中直接直接鏈接HiveMetastore去執行add columns語句。這種方式的好處是不須要額外依賴其餘服務,缺點就是咱們要本身調用HiveMetastore相關接口,本身管理SessionState,用起來比較麻煩。
  3. 最後一種方式就是直接修改Spark,讓他支持add columns語法。這種方式最大的好處就是咱們原有的業務邏輯代碼不用動,問題就在於,要求對Spark源碼有必定的瞭解,不然改起來仍是挺費勁的。這也是我寫這篇文章的目的:讓你們可以參考本文自行爲Spark添加add columns語法支持。

OK,接下來,咱們進入主題。github

爲Spark添加add columns語法支持

本文基於最新版的Spark 2.1.0,源碼地址:https://github.com/apache/spark/tree/branch-2.1sql

1. 改進語法定義

Spark2.1開始使用ANTLR來解析SQL語法,它的語法定義文件借鑑的Presto項目,咱們在Spark源碼中找到這個文件sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4,作以下改動:apache

@@ -127,6 +127,8 @@ statement
         ('(' key=tablePropertyKey ')')?                                #showTblProperties
     | SHOW COLUMNS (FROM | IN) tableIdentifier
         ((FROM | IN) db=identifier)?                                   #showColumns
+    | ALTER TABLE tableIdentifier ADD COLUMNS
+        ('(' columns=colTypeList ')')?                                 #addColumns
     | SHOW PARTITIONS tableIdentifier partitionSpec?                   #showPartitions
     | SHOW identifier? FUNCTIONS
         (LIKE? (qualifiedName | pattern=STRING))?                      #showFunctions
@@ -191,7 +193,6 @@ unsupportedHiveNativeCommands
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
-    | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CHANGE kw4=COLUMN?
     | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
     | kw1=START kw2=TRANSACTION

194行的kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS是在unsupportedHiveNativeCommands列表中,咱們首先把它去掉。session

爲了讓Spark能解析ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment], ...),咱們還須要在129行處新增| ALTER TABLE tableIdentifier ADD COLUMNS ('(' columns=colTypeList ')')? #addColumns最後的#addColumns是爲了讓ANTLR插件(這個插件定義在sql/catalyst/pom.xml中)爲咱們自動生成addColumns相關方法,便於咱們作語法解析處理。這個語法中有2個參數須要咱們處理table_name和columns。ide

2. 改進SparkSqlAstBuilder,使其能處理addColumns

SparkSqlAstBuilder的做用是將ANTLR的語法樹翻譯爲LogicalPlan/Expression/TableIdentifieroop

要修改的文件爲:sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala,咱們在178行處,新增以下方法:ui

override def visitAddColumns(ctx: AddColumnsContext): LogicalPlan = withOrigin(ctx) {
  val tableName = visitTableIdentifier(ctx.tableIdentifier())
  val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
  
  AlterTableAddColumnsCommand(tableName, dataCols)
}

visitAddColumns方法是ANTLR插件自動爲咱們生成的方法,定義在SparkSqlAstBuilder的父類AstBuilder中(AST,Abstract Syntax Tree ,抽象語法樹),這個方法用來處理咱們在SqlBase.g4中定義的| ALTER TABLE tableIdentifier ADD COLUMNS ('(' columns=colTypeList ')')? #addColumns,咱們這裏重載了visitAddColumns方法用來提取表名及新增的字段列表,並返回一個LogicalPlan:AlterTableAddColumnsCommand,這個類咱們接下來會說明。spa

3. 新增一個爲表添加字段的命令

修改sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala,在120行處,新增AlterTableAddColumnsCommand類:

case class AlterTableAddColumnsCommand(
    tableName: TableIdentifier,
    newColumns: Seq[StructField]) extends RunnableCommand {

  override def run(sparkSession: SparkSession): Seq[Row] = {
    val catalog = sparkSession.sessionState.catalog
    val table = catalog.getTableMetadata(tableName)

    DDLUtils.verifyAlterTableType(catalog, table, isView = false)

    val newSchema = StructType(table.schema.fields ++ newColumns)
    val newTable = table.copy(schema = newSchema)
    catalog.alterTable(newTable)
    Seq.empty[Row]
  }
}

RunnableCommand類繼承自LogicalPlan,run方法用於執行addColumns語法對應的執行邏輯。這個類的處理邏輯比較簡單,就不詳細介紹了。

4. 修復HiveExternalCatalog沒法修改表schema的問題

咱們在第3步的AlterTableAddColumnsCommand中,雖然調用了catalog.alterTable(newTable)來修改表信息,但實際上並不能將新的字段添加到表中,由於Spark代碼寫死了,不能改Hive表的schema,咱們還須要修改HiveExternalCatalog類(sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala),改動以下:

@@ -588,7 +588,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp
       val newDef = withStatsProps.copy(
         storage = newStorage,
-        schema = oldTableDef.schema,
+        // allow `alter table xxx add columns(xx)`
+        schema = tableDefinition.schema,
         partitionColumnNames = oldTableDef.partitionColumnNames,
         bucketSpec = oldTableDef.bucketSpec,
         properties = newTableProps)

咱們將591行的schema = oldTableDef.schema替換爲schema = tableDefinition.schema便可。

至此,咱們完成了整個代碼的調整。

最後參考Spark的編譯文檔:http://spark.apache.org/docs/latest/building-spark.html#building-a-runnable-distribution,將Spark編譯打包便可。

Spark 2.x會將編譯後的assembly放到jars目錄下,咱們此次的改動會影響到如下幾個jar包:

  • spark-catalyst_2.11-2.1.0.jar
  • spark-sql_2.11-2.1.0.jar
  • spark-hive_2.11-2.1.0.jar

若是Spark已經部署過了,能夠直接將以上3個jar替換掉。

更新Spark後,咱們就可使用alter table xxx add columns(xx)了。

相關文章
相關標籤/搜索