SparkSQL從2.0開始已經再也不支持ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment], ...)
這種語法了(下文簡稱add columns語法)。若是你的Spark項目中用到了SparkSQL+Hive這種模式,從Spark1.x升級到2.x頗有可能遇到這個問題。html
爲了解決這個問題,咱們通常有3種方案能夠選擇:git
OK,接下來,咱們進入主題。github
本文基於最新版的Spark 2.1.0,源碼地址:https://github.com/apache/spark/tree/branch-2.1sql
Spark2.1開始使用ANTLR來解析SQL語法,它的語法定義文件借鑑的Presto項目,咱們在Spark源碼中找到這個文件sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
,作以下改動:apache
@@ -127,6 +127,8 @@ statement ('(' key=tablePropertyKey ')')? #showTblProperties | SHOW COLUMNS (FROM | IN) tableIdentifier ((FROM | IN) db=identifier)? #showColumns + | ALTER TABLE tableIdentifier ADD COLUMNS + ('(' columns=colTypeList ')')? #addColumns | SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions | SHOW identifier? FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions @@ -191,7 +193,6 @@ unsupportedHiveNativeCommands | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT - | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CHANGE kw4=COLUMN? | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS | kw1=START kw2=TRANSACTION
194行的kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
是在unsupportedHiveNativeCommands
列表中,咱們首先把它去掉。session
爲了讓Spark能解析ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment], ...)
,咱們還須要在129行處新增| ALTER TABLE tableIdentifier ADD COLUMNS ('(' columns=colTypeList ')')? #addColumns
最後的#addColumns
是爲了讓ANTLR插件(這個插件定義在sql/catalyst/pom.xml中)爲咱們自動生成addColumns相關方法,便於咱們作語法解析處理。這個語法中有2個參數須要咱們處理table_name和columns。ide
SparkSqlAstBuilder
的做用是將ANTLR的語法樹翻譯爲LogicalPlan/Expression/TableIdentifier
oop
要修改的文件爲:sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
,咱們在178行處,新增以下方法:ui
override def visitAddColumns(ctx: AddColumnsContext): LogicalPlan = withOrigin(ctx) { val tableName = visitTableIdentifier(ctx.tableIdentifier()) val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil) AlterTableAddColumnsCommand(tableName, dataCols) }
visitAddColumns方法是ANTLR插件自動爲咱們生成的方法,定義在SparkSqlAstBuilder的父類AstBuilder中(AST,Abstract Syntax Tree ,抽象語法樹),這個方法用來處理咱們在SqlBase.g4中定義的| ALTER TABLE tableIdentifier ADD COLUMNS ('(' columns=colTypeList ')')? #addColumns
,咱們這裏重載了visitAddColumns方法用來提取表名及新增的字段列表,並返回一個LogicalPlan:AlterTableAddColumnsCommand,這個類咱們接下來會說明。spa
修改sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
,在120行處,新增AlterTableAddColumnsCommand類:
case class AlterTableAddColumnsCommand( tableName: TableIdentifier, newColumns: Seq[StructField]) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) DDLUtils.verifyAlterTableType(catalog, table, isView = false) val newSchema = StructType(table.schema.fields ++ newColumns) val newTable = table.copy(schema = newSchema) catalog.alterTable(newTable) Seq.empty[Row] } }
RunnableCommand類繼承自LogicalPlan,run方法用於執行addColumns語法對應的執行邏輯。這個類的處理邏輯比較簡單,就不詳細介紹了。
咱們在第3步的AlterTableAddColumnsCommand中,雖然調用了catalog.alterTable(newTable)
來修改表信息,但實際上並不能將新的字段添加到表中,由於Spark代碼寫死了,不能改Hive表的schema,咱們還須要修改HiveExternalCatalog類(sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala),改動以下:
@@ -588,7 +588,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp val newDef = withStatsProps.copy( storage = newStorage, - schema = oldTableDef.schema, + // allow `alter table xxx add columns(xx)` + schema = tableDefinition.schema, partitionColumnNames = oldTableDef.partitionColumnNames, bucketSpec = oldTableDef.bucketSpec, properties = newTableProps)
咱們將591行的schema = oldTableDef.schema
替換爲schema = tableDefinition.schema
便可。
至此,咱們完成了整個代碼的調整。
最後參考Spark的編譯文檔:http://spark.apache.org/docs/latest/building-spark.html#building-a-runnable-distribution,將Spark編譯打包便可。
Spark 2.x會將編譯後的assembly放到jars目錄下,咱們此次的改動會影響到如下幾個jar包:
若是Spark已經部署過了,能夠直接將以上3個jar替換掉。
更新Spark後,咱們就可使用alter table xxx add columns(xx)
了。