GravitinoSparkConnector实现原理
- 互联网
- 2025-09-11 22:42:01

Gravitino SparkConnector 实现原理
本文参考了官网介绍,想看官方解析请参考 官网地址 本文仅仅介绍原理
文章目录 Gravitino SparkConnector 实现原理背景知识-Spark Plugin 介绍(1) **插件加载**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) **插件执行**(5) **插件销毁** 背景知识-Driver Plugin 介绍(1) **`init` 方法**(2) **`registerMetrics` 方法**(3) **`onTaskStart` 方法**(4) **`onTaskSucceeded` 方法**(5) **`onTaskFailed` 方法**(6) **`close` 方法** SparkConnector使用方式加载spark.sql.catalog.xxx 具体执行的配置 背景知识-Spark Plugin 介绍spark在[spark-29399]pr提交更新了SparkPlugin插件
SparkPlugin插件执行生命周期
SparkPlugin 的生命周期与 Spark 应用程序的生命周期一致,具体如下:
(1) 插件加载 当 Spark 应用程序启动时,Spark 会扫描类路径下的 SparkPlugin 实现类。如果插件被正确配置(例如通过 spark.plugins 配置项),Spark 会实例化该类。 (2) DriverPlugin 初始化 Spark 调用 driverPlugin() 方法,获取 DriverPlugin 实例。DriverPlugin 的生命周期开始,其方法(如 init、registerMetrics 等)会被调用。 (3) ExecutorPlugin 初始化 Spark 调用 executorPlugin() 方法,获取 ExecutorPlugin 实例。ExecutorPlugin 的生命周期开始,其方法(如 init、shutdown 等)会被调用。 (4) 插件执行 DriverPlugin 在 Driver 端执行自定义逻辑,例如注册指标、拦截 SQL 解析、修改 Catalog 等。ExecutorPlugin 在 Executor 端执行自定义逻辑,例如监控 Task 执行、收集指标等。 (5) 插件销毁 当 Spark 应用程序结束时,DriverPlugin 和 ExecutorPlugin 的生命周期结束,其 close() 方法会被调用以释放资源。 背景知识-Driver Plugin 介绍DriverPlugin 是用于在 Driver 端执行自定义逻辑的插件,其生命周期方法包括:
(1) init 方法 在 Driver 插件初始化时调用。可以在此方法中执行初始化逻辑,例如注册自定义 Catalog、拦截 SQL 解析器等。 (2) registerMetrics 方法 在 Driver 插件初始化时调用。可以在此方法中注册自定义指标(Metrics)。 (3) onTaskStart 方法 在 Task 启动时调用。可以在此方法中执行与 Task 相关的逻辑。 (4) onTaskSucceeded 方法 在 Task 成功完成时调用。可以在此方法中执行与 Task 成功相关的逻辑。 (5) onTaskFailed 方法 在 Task 失败时调用。可以在此方法中执行与 Task 失败相关的逻辑。 (6) close 方法 在 Driver 插件销毁时调用。可以在此方法中释放资源,例如关闭连接、清理缓存等。 SparkConnector使用方式 ./bin/spark-sql -v \ --conf spark.plugins="org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin" \ --conf spark.sql.gravitino.uri=http://127.0.0.1:8090 \ --conf spark.sql.gravitino.metalake=test \ --conf spark.sql.gravitino.enableIcebergSupport=true \ --conf spark.sql.warehouse.dir=hdfs://127.0.0.1:9000/user/hive/warehouse-hive可以看出SparkConnector指定了加载的插件是GravitinoSparkPlugin
public class GravitinoSparkPlugin implements SparkPlugin { @Override public DriverPlugin driverPlugin() { return new GravitinoDriverPlugin(); } @Override public ExecutorPlugin executorPlugin() { return null; } }可以看出实现方式很简单,仅仅使用了一个GravitinoDriverPlugin,也就是在Spark应用程序启动的时候扫描SparkPlugin扫描到了这个GravitinoSparkPlugin然后立马就去执行GravitinoDriverPlugin初始化程序。在DriverPlugin初始化过程中 插件仅仅覆写了两个函数,init() 和shutdown()。 说明这个插件仅仅做了一些初始化和资源销毁操作。
在Driver端进行初始化
配置检查检查gravitino_uri和gravitino_metalake是否配置
如果开启了iceberg则将gravitinoDriverExtensions放入到数组中方便配置
初始化Gravtino客户端和GravitinoCatalogManager,并且将relational类型的表加载到缓存中
将缓存中的catalog进行如果是非iceberg类型(当前仅仅只有Hive)进行注册,这里定义的注册的实际操作配置Spark的配置项(spark.sql.catalog.catalogName)这里的catalogName对应的是缓存中的catalogName,配置的值为根据Gravitino自己的Catalog使用的Provider进行适配比如可以是(org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33)具体情况由适配器进行处理。
然后注册SqlExtensions其实就是将第2步骤的数组配置到SPARK_SESSION_EXTENSIONS这个SparkConf配置里面
稍微贴一下注册Catalog代码,比较重要
//初始化的时候调用注册逻辑,将Gravitino中的Catalog加载到缓存 //然后将缓存中的数据作为第二个参数gravitinoCatalogs传递进来 private void registerGravitinoCatalogs( SparkConf sparkConf, Map<String, Catalog> gravitinoCatalogs) { gravitinoCatalogs .entrySet() .forEach( entry -> { String catalogName = entry.getKey(); Catalog gravitinoCatalog = entry.getValue(); String provider = gravitinoCatalog.provider(); if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT)) && enableIcebergSupport == false) { return; } try { registerCatalog(sparkConf, catalogName, provider); } catch (Exception e) { LOG.warn("Register catalog {} failed.", catalogName, e); } }); } //这里根据适配器去配置spark.sql.catalog.xxx 的具体执行CatalogClass private void registerCatalog(SparkConf sparkConf, String catalogName, String provider) { if (StringUtils.isBlank(provider)) { LOG.warn("Skip registering {} because catalog provider is empty.", catalogName); return; } String catalogClassName = CatalogNameAdaptor.getCatalogName(provider); if (StringUtils.isBlank(catalogClassName)) { LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider); return; } String sparkCatalogConfigName = "spark.sql.catalog." + catalogName; Preconditions.checkArgument( !sparkConf.contains(sparkCatalogConfigName), catalogName + " is already registered to SparkCatalogManager"); sparkConf.set(sparkCatalogConfigName, catalogClassName); LOG.info("Register {} catalog to Spark catalog manager.", catalogName); }到这里GravitinoConnector的代码机制已经说完了,下面聊聊Spark机制
加载spark.sql.catalog.xxx 具体执行的配置经过上面GravitinoDriverPlugin的初始化之后,已经将具体的catalog名称和对应的处理类映射起来,这里以GravitinoHiveCatalogSpark33为例。
GravitinoHiveCatalogSpark33这个类继承关系是继承了BaseCatalog 而BaseCatalog是Spark中定义的CatalogPlugin的一个实现类。
Spark在解析SQL的时候会查找catalog对应的Catalog,可以看到调用了CatalogManager.catalog()方法
private object CatalogAndMultipartIdentifier { def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match { case Seq(_) => Some((None, parts)) case Seq(catalogName, tail @ _*) => try { Some((Some(catalogManager.catalog(catalogName)), tail)) } catch { case _: CatalogNotFoundException => Some((None, parts)) } } }这个catalog方法调用了Catalogs.load()方法
def catalog(name: String): CatalogPlugin = synchronized { if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) { v2SessionCatalog } else { catalogs.getOrElseUpdate(name, Catalogs.load(name, conf)) } }这个方法才是真正的加载方法,他真正根据conf配置将GravitinoHiveCatalogSpark33名称根据定义的反射构造函数实例化到内存中
def load(name: String, conf: SQLConf): CatalogPlugin = { val pluginClassName = try { val _pluginClassName = conf.getConfString(s"spark.sql.catalog.$name") // SPARK-39079 do configuration check first, otherwise some path-based table like // `org.apache.spark.sql.json`.`/path/json_file` may fail on analyze phase if (name.contains(".")) { throw QueryExecutionErrors.invalidCatalogNameError(name) } _pluginClassName } catch { case _: NoSuchElementException => throw QueryExecutionErrors.catalogPluginClassNotFoundError(name) } val loader = Utils.getContextOrSparkClassLoader try { val pluginClass = loader.loadClass(pluginClassName) if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) { throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName) } val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin] plugin.initialize(name, catalogOptions(name, conf)) plugin } catch { // 省略 } }到这里流程就分析结束了
GravitinoSparkConnector实现原理由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“GravitinoSparkConnector实现原理”
 
               
               
               
               
               
               
               
               
   
   
   
   
   
   
   
   
   
   
   
  