0%

Spark create function 逻辑

问题来源

函数注册成功, 但是在调用的时候提示无法加载对应的类.

结论

  • spark 创建函数的时候并不会直接去加载对应的资源/类, 只有在使用时才会加载对应的资源和类, 因此创建成功并不能代表能正常使用
  • hive 创建函数的时候会去加载对应的类, 所以创建成功即代表可用

spark-sql 测试

Untitled

create temp func 和 create func 的区别

1
2
CREATE [ OR REPLACE ] [ TEMPORARY ] FUNCTION [ IF NOT EXISTS ]
function_name AS class_name [ resource_locations ]
  • 从语法上都是, 创建一个自定义函数, 只是前者注册的是临时函数, 只在当前 Session 有效. 后者注册的函数是永久函数, 一直生效.

查看对应的代码: org.apache.spark.sql.execution.command.CreateFunctionCommand#run

https://github.com/apache/spark/blob/807e0a484d1de767d1f02bd8a622da6450bdf940/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala#L75-L95

  • 从代码里面可以看到 create tmp func 是通过 loadResource 加载对应 class 后, 直接注册到当前 Session 的 functionRegistry 里面. 而 create func 是直接在 catelog 里面创建对应的函数, 并不加载资源.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
if (isTemp) {
// We first load resources and then put the builder in the function registry.
catalog.loadFunctionResources(resources)
catalog.registerFunction(func, overrideIfExists = replace)
} else {
// Handles `CREATE OR REPLACE FUNCTION AS ... USING ...`
if (replace && catalog.functionExists(func.identifier)) {
// alter the function in the metastore
catalog.alterFunction(func)
} else {
// For a permanent, we will store the metadata into underlying external catalog.
// This function will be loaded into the FunctionRegistry when a query uses it.
// We do not load it into FunctionRegistry right now.
catalog.createFunction(func, ignoreIfExists)
}
}
Seq.empty[Row]
}
}
  • 再结合这几个 issues 的 Conversation 和最新的 spark 代码, 我们可以知道 “创建自定义函数时加载对应类” 这个概念是没有问题的. 当类不存在的时候, 在 Hive 通过这两种方式创建函数, 都是会失败的. 之所以 Spark 兼容 create func, 是因为考虑到 多 Spark 集群/多租户 共用 Catalog, 其他的 SparkSession 可能没有权限访问所有的资源或 UDF class.

https://github.com/apache/spark/pull/29502#issuecomment-689615806

Untitled

那么这两种方式创建的自定义函数在加载资源方面有什么不用呢?

为什么 Hive 测试都会失败呢?

loadResource 时间点

  • 从上面已经知道, create tmp func 在创建的时候资源加载已经完成了. create func 的资源加载发生在执行的时候.
  • 这里加载注册, 走的是统一的外部 Catalog 自定义函数加载逻辑
  • SQL Text → QueryPlan → ResolveFunction → HiveCatalog → SessionCatalog → functionRegistry.lookupFunction → ExternalCataLog → LoadResource → functionRegistry.registerFunction

https://github.com/apache/spark/blob/807e0a484d1de767d1f02bd8a622da6450bdf940/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala#L1270-L1310

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def lookupFunction(
name: FunctionIdentifier,
children: Seq[Expression]): Expression = synchronized {
// Note: the implementation of this function is a little bit convoluted.
// We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
// (built-in, temp, and external).
if (name.database.isEmpty && functionRegistry.functionExists(name)) {
// This function has been already loaded into the function registry.
return functionRegistry.lookupFunction(name, children)
}

// If the name itself is not qualified, add the current database to it.
val database = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val qualifiedName = name.copy(database = Some(database))

if (functionRegistry.functionExists(qualifiedName)) {
// This function has been already loaded into the function registry.
// Unlike the above block, we find this function by using the qualified name.
return functionRegistry.lookupFunction(qualifiedName, children)
}

// The function has not been loaded to the function registry, which means
// that the function is a permanent function (if it actually has been registered
// in the metastore). We need to first put the function in the FunctionRegistry.
// TODO: why not just check whether the function exists first?
val catalogFunction = try {
externalCatalog.getFunction(database, name.funcName)
} catch {
case _: AnalysisException => failFunctionLookup(name)
case _: NoSuchPermanentFunctionException => failFunctionLookup(name)
}
loadFunctionResources(catalogFunction.resources)
// Please note that qualifiedName is provided by the user. However,
// catalogFunction.identifier.unquotedString is returned by the underlying
// catalog. So, it is possible that qualifiedName is not exactly the same as
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
// At here, we preserve the input from the user.
registerFunction(catalogFunction.copy(identifier = qualifiedName), overrideIfExists = false)
// Now, we need to create the Expression.
functionRegistry.lookupFunction(qualifiedName, children)
}

hive create func 逻辑

1
2
CREATE FUNCTION [db_name.]function_name AS class_name
[USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
  • 流程: SQL 解析(AST) → FunctionTask → FunctionRegistry → Registry.createPermanentFunction → Registry.registerToSessionRegistry → FunctionTask.addFunctionResources → SessionState.getRegistryForWrite().registerFunction

示例

1
2
3
4
5
// MyAddUDF实现了简单的加法
create function testadd as 'com.test.udf.MyAddUDF'
using
jar 'hdfs://localhost:9000/udf/MyTestUDF-1.0.jar',
jar 'hdfs://localhost:9000/udf/UDFDep-1.0.jar'; // MyTestUDF的依赖

SQL 解析(AST)

1
2
3
4
5
6
7
8
9
10
TOK_CREATEFUNCTION
testadd
'com.test.udf.MyAddUDF'
TOK_RESOURCE_LIST
TOK_RESOURCE_URI
TOK_JAR
'hdfs://localhost:9000/udf/MyTestUDF-1.0.jar'
TOK_RESOURCE_URI
TOK_JAR
'hdfs://localhost:9000/udf/UDFDep-1.0.jar'

FunctionRegistry

这里注意checkLocalFunctionResources:当metastore是local模式时,resource uri才可以使用本地文件系统,否则只能使用如hdfs等文件系统。接下来是两个重要的步骤:

  1. 注册UDF:FunctionRegistry.registerPermanentFunction。这里有两个关键点:
  2. 将根据resource URL将jar包UDF下载到本地,并将本地jar包URL加入当前Session的UDFClassLoader的classpath;
  3. FunctionRegistry的静态常量Registry system里注册UDF信息。
  4. UDF信息写入metastore

FunctionRegistry调用Registry system的registerPermanentFunction函数,Registry system是静态常量,里面保存了所有UDF的信息。

1
2
3
4
5
// from FunctionRegistry
public static FunctionInfo registerPermanentFunction(String functionName,
String className, boolean registerToSession, FunctionResource[] resources) {
return system.registerPermanentFunction(functionName, className, registerToSession, resources);
}

Registry

Registry的registerPermanentFunction函数首先执行会registerToSessionRegistry。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// from Registry  
public FunctionInfo registerPermanentFunction(String functionName,
String className, boolean registerToSession, FunctionResource... resources) {
FunctionInfo function = new FunctionInfo(functionName, className, resources);
// register to session first for backward compatibility
if (registerToSession) {
String qualifiedName = FunctionUtils.qualifyFunctionName(
functionName, SessionState.get().getCurrentDatabase().toLowerCase());
if (registerToSessionRegistry(qualifiedName, function) != null) {
addFunction(functionName, function);
return function;
}
} else {
addFunction(functionName, function);
}
return null;
}

registerToSessionRegistry有两个主要功能:

  1. 执行FunctionTask.addFunctionResources(resources),作用是下载resources所指的jar包到本地目录,同时将本地的jar包路径的classpath加入ClassLoader中。
  2. addFunctionResources主要调用SessionState的add_resources
  3. 执行SessionState.getRegistryForWrite().registerFunction,作用是将UDF信息加入当前Session的Registry system(注意区别FunctionRegistry的Registry system)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// from Registry  

// should be called after session registry is checked
private FunctionInfo registerToSessionRegistry(String qualifiedName, FunctionInfo function) {
FunctionInfo ret = null;
// 或许当前Session的ClassLoader
ClassLoader prev = Utilities.getSessionSpecifiedClassLoader();
try {
// Found UDF in metastore - now add it to the function registry
// At this point we should add any relevant jars that would be needed for the UDf.
FunctionResource[] resources = function.getResources();
try {
FunctionTask.addFunctionResources(resources);
} catch (Exception e) {
LOG.error("Unable to load resources for " + qualifiedName + ":" + e, e);
return null;
}
ClassLoader loader = Utilities.getSessionSpecifiedClassLoader();
Class<?> udfClass = Class.forName(function.getClassName(), true, loader);

// Make sure the FunctionInfo is listed as PERSISTENT (rather than TEMPORARY)
// when it is registered to the system registry.
ret = SessionState.getRegistryForWrite().registerFunction(
qualifiedName, FunctionType.PERSISTENT, udfClass, resources);
if (ret == null) {
LOG.error(function.getClassName() + " is not a valid UDF class and was not registered.");
}
if (SessionState.get().isHiveServerQuery()) {
SessionState.getRegistryForWrite().addToUDFLoaders(loader);
}
} catch (ClassNotFoundException e) {
// Lookup of UDf class failed
LOG.error("Unable to load UDF class: " + e);
Utilities.restoreSessionSpecifiedClassLoader(prev);
}
function.shareStateWith(ret);
return ret;
}

SessionState

SessionState的add_resources两个要点:

  1. resolveAndDownload下载UDF jar包到本地,目录由hive.downloaded.resources.dir指定,默认是/tmp/${session_id}_resources。
  2. 将本地UDFjar包路径加入ClassLoader的classpath,这一点比较隐蔽,是调用t.preHook函数实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// From SessionState

public List<String> add_resources(ResourceType t, Collection<String> values)
throws RuntimeException {
// By default don't convert to unix
return add_resources(t, values, false);
}

public List<String> add_resources(ResourceType t, Collection<String> values, boolean convertToUnix)
throws RuntimeException {
Set<String> resourceSet = resourceMaps.getResourceSet(t);
Map<String, Set<String>> resourcePathMap = resourceMaps.getResourcePathMap(t);
Map<String, Set<String>> reverseResourcePathMap = resourceMaps.getReverseResourcePathMap(t);
List<String> localized = new ArrayList<String>();
try {
for (String value : values) {
String key;

//get the local path of downloaded jars. 下载jar包
List<URI> downloadedURLs = resolveAndDownload(value, convertToUnix);
....
Set<String> downloadedValues = new HashSet<String>();

for (URI uri : downloadedURLs) {
String resourceValue = uri.toString();
...
}
resourcePathMap.put(key, downloadedValues);
}
// !!!这里很重要,通过调用这个preHook将UDF jar包本地路径加入ClassLoader。
t.preHook(resourceSet, localized);
....
}

参考

  • https://github.com/apache/spark/pull/29502
  • https://github.com/apache/spark/pull/29713
  • https://github.com/apache/spark
  • https://zhuanlan.zhihu.com/p/405189154
  • https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/ReloadFunction
-------------本文结束再接再厉-------------

本文标题:Spark create function 逻辑

文章作者:IITII

发布时间:2023年10月24日 - 11:10

最后更新:2024年01月24日 - 11:01

原始链接:https://iitii.github.io/2023/10/24/1/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。