Iceberg: COW模式下的MERGE INTO的执行流程

MergeInto命令

MERGE INTO target_table t
USING source_table s
ON s.id = t.id                //这里是JOIN的关联条件
WHEN MATCHED AND s.opType = 'delete' THEN DELETE // WHEN条件是对当前行进行打标的匹配条件
WHEN MATCHED AND s.opType = 'update' THEN UPDATE SET id = s.id, name = s.name
WHEN NOT MATCHED AND s.opType = 'insert' THEN INSERT (key, value) VALUES (key, value)

如上是一条MERGE INTO语句,经过Spark Analyzer解析时,会发现它是MERGE INTO命令,因此将解析target_table对应生成的SparkTable实例封装成RowLevelOperationTable的实例,它会绑定一个SparkCopyOnWriteOperation的实例,并且实现了创建ScanBuilderWriteBuilder的方法。

ScanBuilder和WriteBuilder是Spark中定义的接口,分别用于构建读数据器(Scan)和写数据器(BatchWrite)。

Iceberg基于Spark 3.x提供的外部Catalog及相关的读写接口,实现了对于Iceberg表(存储格式)的数据读写。

下面以SparkCopyOnWriteOperation跟踪分析如何利用Spark写出数据为Iceberg表格式。

Iceberg行级更新的操作,目前支持UPDATE / DELETE / MERGE INTO三个语法。

预备知识

SparkTable定义

public class SparkTableimplements org.apache.spark.sql.connector.catalog.Table, // 继承自Spark的接口SupportsRead,SupportsWrite,SupportsDelete, // 支持删除SupportsRowLevelOperations, // 支持行级的数据更新SupportsMetadataColumns {private final Table icebergTable;private final Long snapshotId;private final boolean refreshEagerly;private final Set<TableCapability> capabilities;private String branch;private StructType lazyTableSchema = null;private SparkSession lazySpark = null;public SparkTable(Table icebergTable, Long snapshotId, boolean refreshEagerly) {this.icebergTable = icebergTable;this.snapshotId = snapshotId;this.refreshEagerly = refreshEagerly;boolean acceptAnySchema =PropertyUtil.propertyAsBoolean(icebergTable.properties(),TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA,TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA_DEFAULT);this.capabilities = acceptAnySchema ? CAPABILITIES_WITH_ACCEPT_ANY_SCHEMA : CAPABILITIES;}/*** 该表支持读取,因此实现了此方法返回一个ScanBuilder实例*/@Overridepublic ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {// skip planning the job and fetch already staged file scan tasks// 如果设置了此参数,则会在读取数据后,将此次生成的Iceberg ScanTasks缓存在本地进程中的ScanTaskSetManager实例里,// 后面再对同相同的FileSet集合(或scan file的任务集合)构建时,可以避免重复构建任务集,// 起到缓存的作用return new SparkFilesScanBuilder(sparkSession(), icebergTable, options);}if (options.containsKey(SparkReadOptions.SCAN_TASK_SET_ID)) {// 作用同上return new SparkStagedScanBuilder(sparkSession(), icebergTable, options);}if (refreshEagerly) {icebergTable.refresh();}// 可以支持基于branch或是基于SnapshotId创建SparkTable// 如果基于SnapshotID,则需要显示地解析SnapshotId归属的branchCaseInsensitiveStringMap scanOptions =branch != null ? options : addSnapshotId(options, snapshotId);return new SparkScanBuilder(sparkSession(), icebergTable, branch, snapshotSchema(), scanOptions);}/*** 该表支持写,因此实现了此方法返回一个WriteBuilder实例*/@Overridepublic WriteBuilder newWriteBuilder(LogicalWriteInfo info) {Preconditions.checkArgument(snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId);return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);}

Spark Analyzer Resolving Table

假设我们有如下配置,定义了一个新的用户自定义的catalog,其name为iceberg。并通过spark.sql.catalog.iceberg指定了这个catalog的实现类org.apache.iceberg.spark.SparkCatalog,其类型为hive(意味着会在Iceberg的侧使用HiveCatalog解析库、表),meta存储地址为thrift://metastore-host:port

spark.sql.catalog.iceberg = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type = hive
spark.sql.catalog.iceberg.uri = thrift://metastore-host:port

当我们执行SELECT * FROM iceberg.test_tbl时,在SQL解析过程中,会通过如下的过程来解析CatalogName和TableName,并创建对应的CatalogTable实例,即对应Iceberg中的实现类SparkCatalogSparkTable

  /*** 如果当前Plan是还未解析的表视图或是表,或是INSERT INTO语句,则应用此Rule,* 查看绑定的目标表名是否是SQL层级的临时视图或是Session级别的全局视图表名,最终返回一个新的SubqueryAlias的实例*/object ResolveTempViews extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {case u @ UnresolvedRelation(ident) =>lookupTempView(ident).getOrElse(u)case i @ InsertIntoStatement(UnresolvedRelation(ident), _, _, _, _) =>lookupTempView(ident).map(view => i.copy(table = view)).getOrElse(i)case u @ UnresolvedTable(ident) =>lookupTempView(ident).foreach { _ =>u.failAnalysis(s"${ident.quoted} is a temp view not table.")}ucase u @ UnresolvedTableOrView(ident) =>lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u)}def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {// Permanent View can't refer to temp views, no need to lookup at all.if (isResolvingView) return Noneidentifier match {case Seq(part1) => v1SessionCatalog.lookupTempView(part1)case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)case _ => None}}}/*** Resolve table relations with concrete relations from v2 catalog.** [[ResolveRelations]] still resolves v1 tables.*/object ResolveTables extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {case u: UnresolvedRelation =>lookupV2Relation(u.multipartIdentifier).map { rel =>val ident = rel.identifier.getSubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)}.getOrElse(u)case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>// NonSessionCatalogAndIdentifier的unapply方法,会尝试解析catalog,并通过Spark.CatalogManager::catalog(name)解析并构建Catalog实例// 这里就是一个Iceberg中定义的SparkCatalog实现类,然后通过工具类的方法加载表,并创建SparkTable实例。CatalogV2Util.loadTable(catalog, ident).map(ResolvedTable(catalog.asTableCatalog, ident, _)).getOrElse(u)case u @ UnresolvedTableOrView(NonSessionCatalogAndIdentifier(catalog, ident)) =>CatalogV2Util.loadTable(catalog, ident).map(ResolvedTable(catalog.asTableCatalog, ident, _)).getOrElse(u)case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>lookupV2Relation(u.multipartIdentifier).map(v2Relation => i.copy(table = v2Relation)).getOrElse(i)case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>CatalogV2Util.loadRelation(u.catalog, u.tableName).map(rel => alter.copy(table = rel)).getOrElse(alter)case u: UnresolvedV2Relation =>CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)}/*** Performs the lookup of DataSourceV2 Tables from v2 catalog.*/private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =expandRelationName(identifier) match {case NonSessionCatalogAndIdentifier(catalog, ident) =>CatalogV2Util.loadTable(catalog, ident) match {case Some(table) =>Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident)))case None => None}case _ => None}}

SparkCatalog路由加载表的过程到HiveCatalog

SparkCatalog由于继承自TableCatalog,因此拥有 Table loadTable(Identifier ident) throws NoSuchTableException方法,在Spark内部进行SQL解析时,可以调用此方法,生成用户自定义的Table实例。

SparkCatalog定位于Spark与Iceberg之前的桥梁,最终的实现效果是将某个Catalog的解析并创建表的任务,路由给Iceberg中的Catalog实现,例如HiveCatalog

/*** 实现了Spark中的如下接口:* public interface TableCatalog extends CatalogPlugin* 可以在SQL解析过程时,应用`ResolveTables`规则时,通过Catalog + Identifier创建*/
public class SparkCatalog extends BaseCatalog {/*** 由于继承自CatalogPlugin接口类,因此需要重写initialize(...)方法,以初始化SparkCatalog实例*/@Overridepublic final void initialize(String name, CaseInsensitiveStringMap options) {this.cacheEnabled =PropertyUtil.propertyAsBoolean(options, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);long cacheExpirationIntervalMs =PropertyUtil.propertyAsLong(options,CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);// An expiration interval of 0ms effectively disables caching.// Do not wrap with CachingCatalog.if (cacheExpirationIntervalMs == 0) {this.cacheEnabled = false;}// 创建Iceberg支持的catalog实例,一共支持如下几个//   public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";// public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";// public static final String ICEBERG_CATALOG_TYPE_REST = "rest";// public static final String ICEBERG_CATALOG_HADOOP = "org.apache.iceberg.hadoop.HadoopCatalog";// public static final String ICEBERG_CATALOG_HIVE = "org.apache.iceberg.hive.HiveCatalog";// public static final String ICEBERG_CATALOG_REST = "org.apache.iceberg.rest.RESTCatalog";// 默认情况下,我们创建的是HiveCatalog,后续调用loadTable(...)方法创建SparkTable时,则通过HiveCatalog::loadTable(name)方法生成Catalog catalog = buildIcebergCatalog(name, options);this.catalogName = name;SparkSession sparkSession = SparkSession.active();this.useTimestampsWithoutZone =SparkUtil.useTimestampWithoutZoneInNewTables(sparkSession.conf());this.tables =new HadoopTables(SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name));this.icebergCatalog =cacheEnabled ? CachingCatalog.wrap(catalog, cacheExpirationIntervalMs) : catalog;// 支持通过参数的方式,指定默认的namespace,默认值为defaultif (catalog instanceof SupportsNamespaces) {this.asNamespaceCatalog = (SupportsNamespaces) catalog;if (options.containsKey("default-namespace")) {this.defaultNamespace =Splitter.on('.').splitToList(options.get("default-namespace")).toArray(new String[0]);}}EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "spark");EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION, sparkSession.sparkContext().version());EnvironmentContext.put(CatalogProperties.APP_ID, sparkSession.sparkContext().applicationId());}@Overridepublic Table loadTable(Identifier ident) throws NoSuchTableException {// 基于标识符创建SparkTable实例try {return load(ident);} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {throw new NoSuchTableException(ident);}}@Overridepublic Table loadTable(Identifier ident, String version) throws NoSuchTableException {Table table = loadTable(ident);// ...}@Overridepublic Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {Table table = loadTable(ident);// ...}
}
SparkCatalog::buildIcebergCatalog
  public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Object conf) {String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);if (catalogImpl == null) {String catalogType =PropertyUtil.propertyAsString(options, ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);switch (catalogType.toLowerCase(Locale.ENGLISH)) {case ICEBERG_CATALOG_TYPE_HIVE:catalogImpl = ICEBERG_CATALOG_HIVE;break;case ICEBERG_CATALOG_TYPE_HADOOP:catalogImpl = ICEBERG_CATALOG_HADOOP;break;case ICEBERG_CATALOG_TYPE_REST:catalogImpl = ICEBERG_CATALOG_REST;break;default:throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);}} else {String catalogType = options.get(ICEBERG_CATALOG_TYPE);Preconditions.checkArgument(catalogType == null,"Cannot create catalog %s, both type and catalog-impl are set: type=%s, catalog-impl=%s",name,catalogType,catalogImpl);}return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);}
CatalogUtil::loadCatalog

这里以Hive为例,解析如何加载Custom Catalog

  public static Catalog loadCatalog(String impl, String catalogName, Map<String, String> properties, Object hadoopConf) {// impl = ICEBERG_CATALOG_HIVE// catalogName = hive// properties = spark.sql.catalog.[catalogName].x// 其中properties指的是catalogName对应的配置选项,是从Spark.SQLConf解析得到的Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl class name is null");DynConstructors.Ctor<Catalog> ctor;try {// 通过默认的impl名字,通过Refect机制,调用无参的构造函数,生成对应的类的实例ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked();} catch (NoSuchMethodException e) {throw new IllegalArgumentException(String.format("Cannot initialize Catalog implementation %s: %s", impl, e.getMessage()),e);}Catalog catalog;try {catalog = ctor.newInstance();} catch (ClassCastException e) {throw new IllegalArgumentException(String.format("Cannot initialize Catalog, %s does not implement Catalog.", impl), e);}configureHadoopConf(catalog, hadoopConf);// 通过properties,来助力catalog对象的初始化过程catalog.initialize(catalogName, properties);return catalog;}
HiveCatalog::initialize
  @Overridepublic void initialize(String inputName, Map<String, String> properties) {this.catalogProperties = ImmutableMap.copyOf(properties);this.name = inputName;if (conf == null) {LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");this.conf = new Configuration();}// 解析指定的Hive metastore地址if (properties.containsKey(CatalogProperties.URI)) {this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get(CatalogProperties.URI));}// 解析指定的metastore的工作目录if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION)));}this.listAllTables =Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));// 解析指定的读写文件的 接口实现类,如果不指定则默认为HadoopFileIO// 否则加载用户自定义的实现类String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);this.fileIO =fileIOImpl == null? new HadoopFileIO(conf): CatalogUtil.loadFileIO(fileIOImpl, properties, conf);// 初始化元数据交互的客户端,使用默认使用HiveClientPoolthis.clients = new CachedClientPool(conf, properties);}

HiveCatalog加载SparkTable

在生成SparkCatalog时,会根据spark.sql.catalog.iceberg.type这个配置,知道我们要创建的表在Iceberg中的类型是hive,因此需要通过HiveCatalog加载表。

HiveCatalog::loadTable

  @Overridepublic Table loadTable(TableIdentifier identifier) {Table result;if (isValidIdentifier(identifier)) {// 对于HiveCatalog来说,所有的identifier都是合法的,因此会通过下面的方法得到对应类型的TableOperations实例// 在Iceberg世界中,实际上是不存在表的,只是利用表的概念,将TableMetadata进行了抽象,// 因此Iceberg中的Table都必须绑定一个TableOperations实例,来读取TableMetadata数据// 例如在这里会对应生成HiveTableOperationsTableOperations ops = newTableOps(identifier);if (ops.current() == null) {// the identifier may be valid for both tables and metadata tablesif (isValidMetadataIdentifier(identifier)) {result = loadMetadataTable(identifier);} else {throw new NoSuchTableException("Table does not exist: %s", identifier);}} else {// result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());}} else if (isValidMetadataIdentifier(identifier)) {result = loadMetadataTable(identifier);} else {throw new NoSuchTableException("Invalid table identifier: %s", identifier);}LOG.info("Table loaded by catalog: {}", result);return result;}

Scan的定义、构建及执行流程

MERGE INTO重写时,会为目标表生成一个DataSourceV2Relation的逻辑计划实例,以读取目标表中的相关数据,因此在过滤表达式下推优化时,会同时构建COW模式下的Scan实例。
Scan是Spark中定义的读取数据的接口。

object V2ScanRelationPushDown extends Rule[LogicalPlan] {import DataSourceV2Implicits._override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {case ScanOperation(project, filters, relation: DataSourceV2Relation) =>// 调用这里的Table是一个RowLevelOperationTable的实例,同时它绑定了一个SparkCopyOnWriteOperation实例// 因此底层实际上调用的是SparkCopyOnWriteOperation::newScanBuilder方法val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options)val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output)val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) =normalizedFilters.partition(SubqueryExpression.hasSubquery)// `pushedFilters` will be pushed down and evaluated in the underlying data sources.// `postScanFilters` need to be evaluated after the scan.// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters(scanBuilder, normalizedFiltersWithoutSubquery)val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubqueryval normalizedProjects = DataSourceStrategy.normalizeExprs(project, relation.output).asInstanceOf[Seq[NamedExpression]]// 列裁剪,同时调用scanBuilder.build()方法,生成一个读取具体数据源的Scan实例val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, normalizedProjects, postScanFilters)logInfo(s"""|Pushing operators to ${relation.name}|Pushed Filters: ${pushedFilters.mkString(", ")}|Post-Scan Filters: ${postScanFilters.mkString(",")}|Output: ${output.mkString(", ")}""".stripMargin)val wrappedScan = scan match {case v1: V1Scan =>val translated = filters.flatMap(DataSourceStrategy.translateFilter(_, true))V1ScanWrapper(v1, translated, pushedFilters)case _ => scan}// 这里生成一个读取数据的逻辑计划val scanRelation = DataSourceV2ScanRelation(relation.table, wrappedScan, output)val projectionOverSchema = ProjectionOverSchema(output.toStructType)val projectionFunc = (expr: Expression) => expr transformDown {case projectionOverSchema(newExpr) => newExpr}val filterCondition = postScanFilters.reduceLeftOption(And)val newFilterCondition = filterCondition.map(projectionFunc)val withFilter = newFilterCondition.map(Filter(_, scanRelation)).getOrElse(scanRelation)val withProjection = if (withFilter.output != project) {val newProjects = normalizedProjects.map(projectionFunc).asInstanceOf[Seq[NamedExpression]]Project(newProjects, withFilter)} else {withFilter}// 返回最终的逻辑计划withProjection}
}

Iceberg中实现的Scan

从前面我们知道,在Spark进行Filter Pushdown优化时,会调用Table::newScanBuilder方法构建一个具体的数据描述器(Scan),实际上是会最终调用Iceberg中如下的方法:

class SparkCopyOnWriteOperation implements RowLevelOperation {@Overridepublic ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {if (lazyScanBuilder == null) {lazyScanBuilder =new SparkScanBuilder(spark, table, branch, options) {@Overridepublic Scan build() {// 构建COW模式的Scan实例Scan scan = super.buildCopyOnWriteScan();SparkCopyOnWriteOperation.this.configuredScan = scan;return scan;}};}return lazyScanBuilder;}
}

如下是对SparkCopyOnWriteOperation::buildCopyOnWriteScan方法的完整定义:

  public Scan buildCopyOnWriteScan() {// table变量,是一个BaseTable实例,因为从Spark的代码流转到Iceberg侧时,使用的都是Iceberg中定义的类// 这里是从当前表找到最新的SnapshotSnapshot snapshot = SnapshotUtil.latestSnapshot(table, readConf.branch());if (snapshot == null) {return new SparkCopyOnWriteScan(spark, table, readConf, schemaWithMetadataColumns(), filterExpressions);}Schema expectedSchema = schemaWithMetadataColumns();// Snapshot存在,说明有数据,因此需要生成Scan实例//   default BatchScan newBatchScan() {//     return new BatchScanAdapter(newScan());//   }// 由于这里的table类型为BaseTable,因此会调用newScan()方法生成DataTableScan的实例,而BatchScan则是一个代理类BatchScan scan =table.newBatchScan().useSnapshot(snapshot.snapshotId()).ignoreResiduals().caseSensitive(caseSensitive).filter(filterExpression()).project(expectedSchema);scan = configureSplitPlanning(scan);// 返回一个实现了Spark中的Scan接口的实例return new SparkCopyOnWriteScan(spark, table, scan, snapshot, readConf, expectedSchema, filterExpressions);}

SparkCopyOnWriteScan负责生成Spark.Batch

我们知道SparkCopyOnWriteScan实现的Spark中的Scan接口,而Scan是一个逻辑上的数据读取器,就像逻辑计划那样,因此还需要通过它的Scan::toBatch方法,创建一个直接可执行的实体类对象

class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>@Overridepublic Batch toBatch() {// 返回一个Spark可操作的Batch实例,负责对待读取的数据划分Batches// 注意这里在创建SparkBatch实例时,taskGroups()的调用,这个方法实际上是调用Iceberg的接口,搜索此次Scan任务需要读取的所有数据。return new SparkBatch(sparkContext, table, readConf, groupingKeyType(), taskGroups(), expectedSchema, hashCode());}
}
SparkCopyOnWriteScan::taskGroups基于SnapshotScan::planFiles方法实现
public abstract class SnapshotScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>@Overridepublic CloseableIterable<T> planFiles() {// 获取要读取的SnapshotSnapshot snapshot = snapshot();if (snapshot == null) {LOG.info("Scanning empty table {}", table());return CloseableIterable.empty();}LOG.info("Scanning table {} snapshot {} created at {} with filter {}",table(),snapshot.snapshotId(),DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()),ExpressionUtil.toSanitizedString(filter()));Listeners.notifyAll(new ScanEvent(table().name(), snapshot.snapshotId(), filter(), schema()));List<Integer> projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema()));List<String> projectedFieldNames =projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList());Timer.Timed planningDuration = scanMetrics().totalPlanningDuration().start();return CloseableIterable.whenComplete(doPlanFiles(), // doPlanFiles()方法会通过Iceberg的接口,搜索所有要读取的data文件和delete文件() -> {planningDuration.stop();Map<String, String> metadata = Maps.newHashMap(context().options());metadata.putAll(EnvironmentContext.get());ScanReport scanReport =ImmutableScanReport.builder().schemaId(schema().schemaId()).projectedFieldIds(projectedFieldIds).projectedFieldNames(projectedFieldNames).tableName(table().name()).snapshotId(snapshot.snapshotId()).filter(ExpressionUtil.sanitize(filter())).scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics())).metadata(metadata).build();context().metricsReporter().report(scanReport);});}
}

SparkBatch负责生成Partitions及Partition Reader

SparkBatch继承自Spark中的Batch接口

class SparkBatch implements Batch {private final JavaSparkContext sparkContext;private final Table table;private final String branch;private final SparkReadConf readConf;private final Types.StructType groupingKeyType;// 保存了由SparkCopyOnWriteScan::taskGroups()方法生成的所有要读取的Iceberg管理的data文件和delete文件,// 这些文件按对应的分区数据进行分组,并且一个分区的数据文件可能被划分到多个groupsprivate final List<? extends ScanTaskGroup<?>> taskGroups;private final Schema expectedSchema;private final boolean caseSensitive;private final boolean localityEnabled;private final int scanHashCode;@Overridepublic InputPartition[] planInputPartitions() {// 负责对要读取的数据进行分区// broadcast the table metadata as input partitions will be sent to executorsBroadcast<Table> tableBroadcast =sparkContext.broadcast(SerializableTableWithSize.copyOf(table));String expectedSchemaString = SchemaParser.toJson(expectedSchema);// 一个Group就对应Spark中的一个PartitionInputPartition[] partitions = new InputPartition[taskGroups.size()];Tasks.range(partitions.length).stopOnFailure().executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null).run(index ->partitions[index] =new SparkInputPartition(groupingKeyType, // 一个taskGroup包含的文件拥有相同的Grouping keytaskGroups.get(index),tableBroadcast,branch,expectedSchemaString,caseSensitive,localityEnabled));return partitions;}@Overridepublic PartitionReaderFactory createReaderFactory() {// 负责创建读取数据的Reader,支持列式读取和行式读取if (useParquetBatchReads()) {int batchSize = readConf.parquetBatchSize();return new SparkColumnarReaderFactory(batchSize);} else if (useOrcBatchReads()) {int batchSize = readConf.orcBatchSize();return new SparkColumnarReaderFactory(batchSize);} else {return new SparkRowReaderFactory();}}
}

从SparkBatch构建数据读取的物理执行计划

前文提到的Spark中有关数据的读写接口,都是由DataSourceV2中定义的,因此对于数据读取的逻辑计划(DataSourceV2ScanRelation),会先转换成物理执行计划BatchScanExec。

case class BatchScanExec(output: Seq[AttributeReference],@transient scan: Scan) extends DataSourceV2ScanExecBase {// scan,对应于Iceberg中的SparkCopyOnWriteScan// 因此batch变量是一个SparkBatch实例@transient lazy val batch = scan.toBatch// TODO: unify the equal/hashCode implementation for all data source v2 query plans.override def equals(other: Any): Boolean = other match {case other: BatchScanExec => this.batch == other.batchcase _ => false}override def hashCode(): Int = batch.hashCode()// 调用SparkBatch::planInputPartitions生成partitions信息@transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions()// 调用SparkBatch::createReaderFactory生成Reader工厂对象override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()override lazy val inputRDD: RDD[InternalRow] = {// 执行时,净当前的物理执行计划,转换为一个RDD,并传递给所有的RDD以及Reader工厂new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)}override def doCanonicalize(): BatchScanExec = {this.copy(output = output.map(QueryPlan.normalizeExpressions(_, output)))}
}

DataSourceRDD计算时实例化Reader并完成读数据

这里需要重点关注的是compute方法,在Spark中,每一个Partition都对应一个Task,这个Task负责最终调用compute方法,触发当前分区上的计算逻辑。

// columnar scan.
class DataSourceRDD(sc: SparkContext,@transient private val inputPartitions: Seq[InputPartition],partitionReaderFactory: PartitionReaderFactory,columnarReads: Boolean)extends RDD[InternalRow](sc, Nil) {override protected def getPartitions: Array[Partition] = {inputPartitions.zipWithIndex.map {case (inputPartition, index) => new DataSourceRDDPartition(index, inputPartition)}.toArray}private def castPartition(split: Partition): DataSourceRDDPartition = split match {case p: DataSourceRDDPartition => pcase _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split")}override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {// partition对应于Iceberg中的一个TaskGroup,而一个TaskGroup的数据文件拥有相同的Partition dataval inputPartition = castPartition(split).inputPartitionval (iter, reader) = if (columnarReads) {// 列读// batchReader实际上是一个BatchDataReader的实例val batchReader = partitionReaderFactory.createColumnarReader(inputPartition)val iter = new MetricsBatchIterator(new PartitionIterator[ColumnarBatch](batchReader))(iter, batchReader)} else {// 行读val rowReader = partitionReaderFactory.createReader(inputPartition)val iter = new MetricsRowIterator(new PartitionIterator[InternalRow](rowReader))(iter, rowReader)}context.addTaskCompletionListener[Unit](_ => reader.close())// TODO: SPARK-25083 remove the type erasure hack in data source scannew InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])}override def getPreferredLocations(split: Partition): Seq[String] = {castPartition(split).inputPartition.preferredLocations()}
}

BatchDataReader读取数据

BatchDataReader继承自Spark中的PartitionReader<ColumnarBatch>接口

class BatchDataReader extends BaseBatchReader<FileScanTask>implements PartitionReader<ColumnarBatch> {// 返回一个迭代器,可以在FileScanTask包含的所有data文件和delete文件,@Overrideprotected CloseableIterator<ColumnarBatch> open(FileScanTask task) {String filePath = task.file().path().toString();LOG.debug("Opening data file {}", filePath);// update the current file for Spark's filename() functionInputFileBlockHolder.set(filePath, task.start(), task.length());Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());InputFile inputFile = getInputFile(filePath);Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");// 创建一个SparkDeleteFilter实例,它负责收集等值删除文件 和 位置删除文件,并建立删除数据记录的索引,// 如此在每遍历一个data file时,就可以根据索引信息,确定当前的record是不是存活的。SparkDeleteFilter deleteFilter =task.deletes().isEmpty()? null: new SparkDeleteFilter(filePath, task.deletes(), counter());// newBatchIterable()方法会根据inputFile的类型,创建相应的文件读取器,例如为Parquet创建VectorizedParquetReaderreturn newBatchIterable(inputFile,task.file().format(),task.start(),task.length(),task.residual(),idToConstant,deleteFilter).iterator();}
}

读取数据转换成Spark中的ColumnarBatch

不论是Parquet/Orc文件,最底层都是通过ColumnarBatchReader负责真正的数据读取与过滤

public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {private final boolean hasIsDeletedColumn;private DeleteFilter<InternalRow> deletes = null;private long rowStartPosInBatch = 0;public ColumnarBatchReader(List<VectorizedReader<?>> readers) {super(readers);this.hasIsDeletedColumn =readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader);}@Overridepublic void setRowGroupInfo(PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {super.setRowGroupInfo(pageStore, metaData, rowPosition);this.rowStartPosInBatch = rowPosition;}public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {this.deletes = deleteFilter;}@Overridepublic final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {if (reuse == null) {closeVectors();}// 通过内部类ColumnBatchLoader代理完成数据的读取与结果转换ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();rowStartPosInBatch += numRowsToRead;return columnarBatch;}private class ColumnBatchLoader {private final int numRowsToRead;// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when// there is no deletesprivate int[] rowIdMapping;// the array to indicate if a row is deleted or not, it is null when there is no "_deleted"// metadata columnprivate boolean[] isDeleted;/*** Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]* [F,F,T,F,F,F,T,F] -- After applying position deletes** @param deletedRowPositions a set of deleted row positions* @return the mapping array and the new num of rows in a batch, null if no row is deleted*/Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {if (deletedRowPositions == null) {return null;}int[] posDelRowIdMapping = new int[numRowsToRead];int originalRowId = 0;int currentRowId = 0;while (originalRowId < numRowsToRead) {if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) {posDelRowIdMapping[currentRowId] = originalRowId;currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[originalRowId] = true;}deletes.incrementDeleteCount();}originalRowId++;}if (currentRowId == numRowsToRead) {// there is no delete in this batchreturn null;} else {return Pair.of(posDelRowIdMapping, currentRowId);}}int[] initEqDeleteRowIdMapping() {int[] eqDeleteRowIdMapping = null;if (hasEqDeletes()) {eqDeleteRowIdMapping = new int[numRowsToRead];for (int i = 0; i < numRowsToRead; i++) {eqDeleteRowIdMapping[i] = i;}}return eqDeleteRowIdMapping;}/*** Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <=* 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]* [F,T,T,T,F,F,T,F] -- After applying equality deletes** @param columnarBatch the {@link ColumnarBatch} to apply the equality delete*/void applyEqDelete(ColumnarBatch columnarBatch) {Iterator<InternalRow> it = columnarBatch.rowIterator();int rowId = 0;int currentRowId = 0;while (it.hasNext()) {InternalRow row = it.next();if (deletes.eqDeletedRowFilter().test(row)) {// the row is NOT deleted// skip deleted rows by pointing to the next undeleted row IdrowIdMapping[currentRowId] = rowIdMapping[rowId];currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[rowIdMapping[rowId]] = true;}deletes.incrementDeleteCount();}rowId++;}columnarBatch.setNumRows(currentRowId);}}
}

Write的执行过程

从前面的章节可以看到,在构建Scan的过程中,会同时搜集data files和delete files,因此在调用Reader实例读取每一个TaskGroup中的数据文件时,同时会应用DeleteFilter,来过滤掉那些被删除的记录。

这个过程实际上就是一个\Merge On Read的过程。

而MERGE INTO的Write过程,在我之前的文章有解析,大体的思路就是将从target_table Scan得到的、经过删除过滤后的数据集,与source_table中的数据JOIN;从而产生带有变更标记的结果数据集(每个被标记为INSERT/UPDATE/DELETE);在写出数据到文件时,就可以根据每一行的标记确定写出行为,最终只会产生Data Files,数据文件更加干净。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/524523.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IOS – OpenGL ES 像素化马赛克效果 GPUImagePixellateFilter

目录 一.简介二.效果演示三.源码下载四.猜你喜欢 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 基础 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 转场 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目…

波音737连续坠毁,AI要背锅?

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 若名出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09;2018 年 10 月 29 日&#xff0c;印尼狮航的波音 737MAX8 客机在起飞 13 分钟后坠海&#xff0c;机上 178 名乘客全部不幸遇难。2019 年 3 月 10 日&#xff0c…

IOS – OpenGL ES 同心圆像素化马赛克效果 GPUImagePolarPixel

目录 一.简介二.效果演示三.源码下载四.猜你喜欢 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 基础 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 转场 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目…

云计算时代运维的出路在哪?

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者&#xff1a;王洪鹏做过开发&#xff0c;搞过运维&#xff0c;又在在云计算行业折腾了几年&#xff0c;不说自己技术怎样、怎样……&#xff0c;暂且说说笔者在当前公司做云计算这几年的些许感悟&#xff0c;兴许可以给正在从事…

VSFTPD 服务器 3秒钟搭建

文章目录一、常用命令&#xff08;Linux&#xff09;二、搭建Vsftpd流程2.1. 检测系统2.2. yum安装一、常用命令&#xff08;Linux&#xff09; 作用命令启动sudo service vsftpd start关闭sudo service vsftpd stop重启sudo service vsftpd restart查看运行状态sudo service …

IOS – OpenGL ES 黑白网状效果 GPUImageCrosshatchFilter

目录 一.简介二.效果演示三.源码下载四.猜你喜欢 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 基础 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 转场 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目…

数组下标越界问题

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 4at lesson3.Student.getarr(Student.java:29)at lesson3.demo1.main(demo1.java:30) 报错代码如上&#xff1a; 当使用不合法的数字下标输入访问数组时会报数组越界这种错误&#xff0c; …

IEDA 配置Git_04

前提&#xff1a;打开idea CtrlAltS打开设置&#xff1a; 路径&#xff1a;File\seeting\version control\git

要闻君说:苹果又要新品发布啦;英伟达壕气,狂砸69亿收购Mellanox;谷歌瞄准印度小学生,推出AI学习工具;...

关注并标星星CSDN云计算每周三次&#xff0c;打卡即read更快、更全了解泛云圈精彩newsgo go go 大家好&#xff01;偶是要闻君。话说每年的苹果新品发布会都会被列入“熬夜也要看一看”的名单中&#xff0c;3月将尽&#xff0c;今年又有什么苹果新料可爆&#xff1f;前瞻一把&a…

IOS – OpenGL ES 色彩丢失/模糊效果 GPUImageColorPackingFilter

目录 一.简介二.效果演示三.源码下载四.猜你喜欢 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 基础 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 转场 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目…

Git 用户名和邮箱配置_01

文章目录1. 打开 git bash here2. 用户名和邮箱配置1. 打开 git bash here 2. 用户名和邮箱配置 git config --global user.name "用户名" git config --global user.email "邮箱" git config --list

Storm精华问答 | 如何处理常见故障?

Hadoop能够进行大批量数据的离线处理,但是在实时计算上的表现实在是不尽如人意;而Storm就可以担当这部分的角色&#xff0c;今天&#xff0c;就让我们看看关于Storm的精华问答吧。1Q&#xff1a;发布topology到远程集群时&#xff0c;出现AlreadyAliveException(msg: xxx is al…

在字符串String类型常忘记使用equals()进行值比较的问题

String[] arr {"zhangsan","lisi","wangwu"};int num 0;String name "wangwu";for(int i 0; i<arr.length;i){if(arr[i] .equals(name)){num1;} else{num2;continue;}System.out.println(num); 这里字符串比较用的是equals…

IOS – OpenGL ES 图像晕影移动 GPUImageVignetteFilter

目录 一.简介二.效果演示三.源码下载四.猜你喜欢 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 基础 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 转场 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目…

以安全之名:2019年DevSecOps社区调研白皮书解读

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者&#xff1a;刘淼&#xff0c;HPE架构师&#xff0c;慧与大学讲师&#xff0c;Exin DevOps Master和DevOps Professional授权讲师&#xff0c;CSDN博主(liumiaocn)&#xff0c;爱老婆爱厨艺的终身技术学习者。2019年3月5号&…

docker Gitlab14.5.0 安装、配置、部署、使用

文章目录一、镜像容器1. 安装Docker2. 查找GitLab镜像3. 拉取镜像4. 创建容器二、修改配置文件2.1. gitlab.rb2.2. 重启gitlab容器2.3. 监控日志三、效果验证3.1. 登录gitlab3.2. 获取密码3.3. 创建一个项目四、用户使用1.下载git.exe4.2. 设置ssh4.3. 秘钥查验4.4. 秘钥同步4.…

try-catch 异常处理的执行过程

public void exception(){try{System.out.println("一");System.out.println(2/0);System.out.println("二");}catch(ArithmeticException m){System.out.println("异常解决了");}System.out.println("三");}try中没有异常&#xff0c…

IOS – OpenGL ES 图像晕影扩散 GPUImageVignetteFilter

目录 一.简介二.效果演示三.源码下载四.猜你喜欢 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 基础 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目录 >> OpenGL ES 转场 零基础 OpenGL (ES) 学习路线推荐 : OpenGL (ES) 学习目…

OpenStack精华问答 | 如何处理常见故障?

自诞生以来&#xff0c;OpenStack 似乎一直被质疑&#xff0c;其背后最重要的两大推手 NASA 和 Rackspace 都弃它而去&#xff0c;惠普、思科接连宣布关闭基于 OpenStack 的公有云服务&#xff0c;但是,OpenStack 依旧坚挺。1Q&#xff1a;创建vm没有任何报错&#xff0c;打开控…

第一篇:服务的注册与发现Eureka(Finchley版本)V2.0_dev

Eureka 简介&#xff1a; Eureka是Netflix 开源的服务发现组件&#xff0c; Spring Cloud 将其集成在 Spring Cloud Netflix 中&#xff0c;实现服务的注册和发现。Eureka 主要包含两个组件&#xff1a; Eureka Server 和 Eureka Client。 两者的作用如下&#xff1a; Eureka S…