SparkSQL UDF使用方法與原理詳解

 

UDF是SQL中很常見的功能,但在Spark-1.6及以前的版本,只能建立臨時UDF,不支持建立持久化的UDF,除非修改Spark源碼。從Spark-2.0開始,SparkSQL終於支持持久化的UDF。本文基於當前最新的Spark-2.0.2版本,講解SparkSQL中使用UDF和底層實現的原理。html

轉載註明原文http://www.cnblogs.com/shenh062326/p/6189672.htmlnode

1. 臨時UDFmysql

建立和使用方法:sql

create temporary function tmp_trans_array as ''com.test.spark.udf.TransArray' using jar 'spark-test-udf-1.0.0.jar';

select tmp_trans_array (1, '\\|' , id, position) as (id0, position0) from test_udf limit 10;

  實現原理,在org.apache.spark.sql.execution.command.CreateFunctionCommand類的run方法中,會判斷建立的Function是不是臨時方法,如果,則會建立一個臨時Function。從下面的代碼我能夠看到,臨時函數直接註冊到functionRegistry(實現類是SimpleFunctionRegistry),即內存中。數據庫

def createTempFunction(
    name: String,
    info: ExpressionInfo,
    funcDefinition: FunctionBuilder,
    ignoreIfExists: Boolean): Unit = {
  if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
    throw new TempFunctionAlreadyExistsException(name)
  }
  functionRegistry.registerFunction(name, info, funcDefinition)
}

下面是實際的註冊代碼,全部須要的UDF都會加載到StringKeyHashMap。apache

protected val functionBuilders =
  StringKeyHashMap[(ExpressionInfo, FunctionBuilder)](caseSensitive = false)

override def registerFunction(
    name: String,
    info: ExpressionInfo,
    builder: FunctionBuilder): Unit = synchronized {
  functionBuilders.put(name, (info, builder))
}

2. 持久化UDF
ide

使用方法以下,注意jar包最好放在HDFS上,在其餘機器上也能使用。函數

create function trans_array as 'com.test.spark.udf.TransArray'  using jar 'hdfs://namenodeIP:9000/libs/spark-test-udf-1.0.0.jar';

select trans_array (1, ' \\|' , id, position) as (id0, position0) from test_spark limit 10;
 

實現原理ui

(1)建立永久函數時,在org.apache.spark.sql.execution.command.CreateFunctionCommand中,會調用SessionCatalog的createFunction,最終執行了HiveExternalCatalog的createFunction,這裏能夠看出,建立永久函數會在Hive元數據庫中建立相應的函數。經過查詢元數據庫咱們能夠看到以下記錄,說明函數已經建立到元數據庫中。 this

mysql> select *  from FUNCS;
| FUNC_ID    | CLASS_NAME                    | CREATE_TIME | DB_ID | FUNC_NAME     | FUNC_TYPE | OWNER_NAME | OWNER_TYPE |
| 96         | com.test.spark.udf.TransArray |  1481459766 | 1     | trans_array   | 1         | NULL       | USER       |

mysql> select *  from FUNC_RU;
| FUNC_ID | RESOURCE_TYPE | RESOURCE_URI                                         | INTEGER_IDX |  
|  96     | 1             | hdfs://namenodeIP:9000/libs/spark-test-udf-1.0.0.jar |  0          |

(2)使用永久函數,在解析SQL中的UDF時,會調用SessionCatalog的lookupFunction0方法,在此方法中,首先會檢查內存中是否存在,若是不存在則會加載此UDF,加載時會把RESOURCE_URI發到ClassLoader的路徑中,若是把UDF註冊到內存的functionRegistry中。主要代碼在SessionCatalog,以下:

def lookupFunction(
    name: FunctionIdentifier,
    children: Seq[Expression]): Expression = synchronized {
  // Note: the implementation of this function is a little bit convoluted.
  // We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
  // (built-in, temp, and external).
  if (name.database.isEmpty && functionRegistry.functionExists(name.funcName)) {
    // This function has been already loaded into the function registry.
    return functionRegistry.lookupFunction(name.funcName, children)
  }

  // If the name itself is not qualified, add the current database to it.
  val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
  val qualifiedName = name.copy(database = database)

  if (functionRegistry.functionExists(qualifiedName.unquotedString)) {
    // This function has been already loaded into the function registry.
    // Unlike the above block, we find this function by using the qualified name.
    return functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
  }

  // The function has not been loaded to the function registry, which means
  // that the function is a permanent function (if it actually has been registered
  // in the metastore). We need to first put the function in the FunctionRegistry.
  // TODO: why not just check whether the function exists first?
  val catalogFunction = try {
    externalCatalog.getFunction(currentDb, name.funcName)
  } catch {
    case e: AnalysisException => failFunctionLookup(name.funcName)
    case e: NoSuchPermanentFunctionException => failFunctionLookup(name.funcName)
  }
  loadFunctionResources(catalogFunction.resources)
  // Please note that qualifiedName is provided by the user. However,
  // catalogFunction.identifier.unquotedString is returned by the underlying
  // catalog. So, it is possible that qualifiedName is not exactly the same as
  // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
  // At here, we preserve the input from the user.
  val info = new ExpressionInfo(catalogFunction.className, qualifiedName.unquotedString)
  val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
  createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
  // Now, we need to create the Expression.
  functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
}
相關文章
相關標籤/搜索