前言

Apache Hudi bootstrap源码简要走读,不了解Hudi bootstrap的可以参考:利用Hudi Bootstrap转化现有Hive表的parquet/orc文件为Hudi表

版本

Hudi 0.12.0
Spark 2.4.4

入口

val bootstrapDF = spark.emptyDataFrame
    bootstrapDF.write.
      format("hudi").
      options(extraOpts).
      option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL).
      ......
      save(basePath)

根据文章:Hudi Spark源码学习总结-df.write.format(“hudi”).save可知,save方法会走到DefaultSource.createRelation

  override def createRelation(sqlContext: SQLContext,
                              mode: SaveMode,
                              optParams: Map[String, String],
                              df: DataFrame): BaseRelation = {
    val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)

    if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
      HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
    } else {
      HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
    }
    new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
  }

它会判断OPERATION是否为BOOTSTRAP_OPERATION_OPT_VAL,这里为true,所以会调用HoodieSparkSqlWriter.bootstrap

HoodieSparkSqlWriter.bootstrap

这里首先获取一些参数,然后判断表是否存在,如果不存在证明是第一次写,需要设置写一些配置参数,然后进行初始化:HoodieTableMetaClient.initTable,接着调用writeClient.bootstrap

  def bootstrap(sqlContext: SQLContext,
                mode: SaveMode,
                optParams: Map[String, String],
                df: DataFrame,
                hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
                hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {

    assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
    val path = optParams("path")
    val basePath = new Path(path)
    val sparkContext = sqlContext.sparkContext
    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
    val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)

    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
    val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
    val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
    val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
      s"'${BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
        " operation'")
    val bootstrapIndexClass = hoodieConfig.getStringOrDefault(INDEX_CLASS_NAME)

    var schema: String = null
    if (df.schema.nonEmpty) {
      val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
      schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
    } else {
      schema = HoodieAvroUtils.getNullSchema.toString
    }

    if (mode == SaveMode.Ignore && tableExists) {
      log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
      if (!hoodieWriteClient.isEmpty) {
        hoodieWriteClient.get.close()
      }
      false
    } else {
      // Handle various save modes
      handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)

      if (!tableExists) { // 表如果不存在
        val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
        val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
        val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
        val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
        val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
          HoodieTableConfig.POPULATE_META_FIELDS.key(),
          String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
        ))
        val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
        val useBaseFormatMetaFile = java.lang.Boolean.parseBoolean(parameters.getOrElse(
          HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
          String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
        ))

        // 进行一些配置后,初始化Hudi表
        HoodieTableMetaClient.withPropertyBuilder()
          .setTableType(HoodieTableType.valueOf(tableType))
          .setTableName(tableName)
          .setRecordKeyFields(recordKeyFields)
          .setArchiveLogFolder(archiveLogFolder)
          .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
          .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
          .setBootstrapIndexClass(bootstrapIndexClass)
          .setBaseFileFormat(baseFileFormat)
          .setBootstrapBasePath(bootstrapBasePath)
          .setPartitionFields(partitionColumns)
          .setPopulateMetaFields(populateMetaFields)
          .setKeyGeneratorClassProp(keyGenProp)
          .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
          .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
          .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
          .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
          .initTable(sparkContext.hadoopConfiguration, path)
      }

      val jsc = new JavaSparkContext(sqlContext.sparkContext)
      val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
        schema, path, tableName, mapAsJavaMap(parameters)))
      try {
        writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
      } finally {
        writeClient.close()
      }
      val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
      metaSyncSuccess
    }
  }

writeClient.bootstrap

这里的writeClient为SparkRDDWriteClient,然后调用HoodieTable的bootstrap,我们这里使用表类型为COW,所以为HoodieSparkCopyOnWriteTable

  initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata);
  public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
                                                                           HoodieSparkEngineContext context,
                                                                           HoodieTableMetaClient metaClient) {
    HoodieSparkTable<T> hoodieSparkTable;
    switch (metaClient.getTableType()) {
      case COPY_ON_WRITE:
        hoodieSparkTable = new HoodieSparkCopyOnWriteTable<>(config, context, metaClient);
        break;
      case MERGE_ON_READ:
        hoodieSparkTable = new HoodieSparkMergeOnReadTable<>(config, context, metaClient);
        break;
      default:
        throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
    }
    return hoodieSparkTable;
  }

HoodieSparkCopyOnWriteTable.bootstrap

  public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
    return new SparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
  }

SparkBootstrapCommitActionExecutor.execute

这里首先通过listAndProcessSourcePartitions返回mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD,然后对于METADATA_ONLY对应的分区路径执行metadataBootstrap,FULL_RECORD对应的分区路径执行fullBootstrap,从这里可以看出两点:1、通过listAndProcessSourcePartitions返回的mode值判断是进行METADATA_ONLY还是FULL_RECORD 2、具体的逻辑分别在metadataBootstrap,fullBootstrap。那么我们分别来看一下,首先看一下listAndProcessSourcePartitions是如何分会mode的

  @Override
  public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute() {
    validate();
    try {
      HoodieTableMetaClient metaClient = table.getMetaClient();
      Option<HoodieInstant> completedInstant =
          metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
      ValidationUtils.checkArgument(!completedInstant.isPresent(),
          "Active Timeline is expected to be empty for bootstrap to be performed. "
              + "If you want to re-bootstrap, please rollback bootstrap first !!");
      // 返回 mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD       
      Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();

      // First run metadata bootstrap which will auto commit
      // 首先运行metadataBootstrap,如果partitionSelections中有METADATA_ONLY则继续执行metadataBootstrap的逻辑,没有的话,什么都不执行,直接返回
      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
      // if there are full bootstrap to be performed, perform that too
      // 然后运行fullBootstrap,如果partitionSelections中有FULL_RECORD则继续执行fullBootstrap的逻辑,没有的话,什么都不执行,直接返回
      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
      // Delete the marker directory for the instant
      WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
      return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
    } catch (IOException ioe) {
      throw new HoodieIOException(ioe.getMessage(), ioe);
    }
  }

listAndProcessSourcePartitions

这里的主要实现是selector.select,这里的select是通过MODE_SELECTOR_CLASS_NAME(hoodie.bootstrap.mode.selector)配置的,默认值为MetadataOnlyBootstrapModeSelector,我们的例子中FULL_RECORD设置的为FullRecordBootstrapModeSelector,让我们分别看一下他们具体的实现

  private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions() throws IOException {
    List<Pair<String, List<HoodieFileStatus>>> folders = BootstrapUtils.getAllLeafFoldersWithFiles(
            table.getMetaClient(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), context);

    LOG.info("Fetching Bootstrap Schema !!");
    HoodieBootstrapSchemaProvider sourceSchemaProvider = new HoodieSparkBootstrapSchemaProvider(config);
    bootstrapSchema = sourceSchemaProvider.getBootstrapSchema(context, folders).toString();
    LOG.info("Bootstrap Schema :" + bootstrapSchema);

    BootstrapModeSelector selector =
        (BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);

    Map<BootstrapMode, List<String>> result = selector.select(folders);
    Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
        Collectors.toMap(Pair::getKey, Pair::getValue));

    // Ensure all partitions are accounted for
    ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
        result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));

    return result.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
        .map(p -> Pair.of(p, partitionToFiles.get(p))).collect(Collectors.toList())))
        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
  }

selector.select

MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector都是UniformBootstrapModeSelector的子类,区别是bootstrapMode不一样,它们的select方法是在父类UniformBootstrapModeSelector实现的

public class MetadataOnlyBootstrapModeSelector extends UniformBootstrapModeSelector {

  public MetadataOnlyBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) {
    super(bootstrapConfig, BootstrapMode.METADATA_ONLY);
  }
}

public class FullRecordBootstrapModeSelector extends UniformBootstrapModeSelector {

  public FullRecordBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) {
    super(bootstrapConfig, BootstrapMode.FULL_RECORD);
  }
}

UniformBootstrapModeSelector.select

很显然上面的mode的返回值和bootstrapMode是对应的,所以当MODE_SELECTOR_CLASS_NAME为MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector时,他们的mode值是唯一的,要么执行metdata的逻辑要么执行full的逻辑,那么有没有两种模式同时会运行的情况呢,答案是有的。

  public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
    return partitions.stream().map(p -> Pair.of(bootstrapMode, p))
        .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(x -> x.getValue().getKey(),
            Collectors.toList())));
  }

BootstrapRegexModeSelector

BootstrapRegexModeSelector我们在之前的文章中讲过:首先有配置:hoodie.bootstrap.mode.selector.regex.mode 默认值METADATA_ONLY、hoodie.bootstrap.mode.selector.regex默认值.*
但是如果不是默认值的话,比如上面的2020/08/2[0-9],假设我们有分区”2020/08/10,2020/08/10/11,2020/08/20,2020/08/21”,那么匹配成功的2020/08/20和2020/08/21对应的类型为METADATA_ONLY,匹配不成功的2020/08/10和2020/08/10/11则为FULL_RECORD。而至于我的为啥都是FULL_RECORD,原因是regex设置错误,我设置的是2022/10/0[0-9],但实际的分区值为2022-10-08和2022-10-09(分隔符不一样),而如果用默认的.*的话,则全部能匹配上,也就都是METADATA_ONLY(默认情况)

  public BootstrapRegexModeSelector(HoodieWriteConfig writeConfig) {
    super(writeConfig);
    this.pattern = Pattern.compile(writeConfig.getBootstrapModeSelectorRegex());
    this.bootstrapModeOnMatch = writeConfig.getBootstrapModeForRegexMatch();
    // defaultMode和bootstrapModeOnMatch对立
    this.defaultMode = BootstrapMode.FULL_RECORD.equals(bootstrapModeOnMatch)
        ? BootstrapMode.METADATA_ONLY : BootstrapMode.FULL_RECORD;
    LOG.info("Default Mode :" + defaultMode + ", on Match Mode :" + bootstrapModeOnMatch);
  }

  @Override
  public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
    return partitions.stream()
        // 匹配上的话,值为bootstrapModeOnMatch,默认为METADATA_ONLY,否则为defaultMode,也就是另外一种类型`FULL_RECORD`
        // bootstrapModeOnMatch 和 defaultMode是对立的
        .map(p -> Pair.of(pattern.matcher(p.getKey()).matches() ? bootstrapModeOnMatch : defaultMode, p.getKey()))
        .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
  }

关于BootstrapModeSelector的实现一共只有上面讲的这三种,下面让我们来看一下metadataBootstrap,fullBootstrap

metadataBootstrap

这里首先创建keyGenerator,然后获取bootstrapPaths,核心逻辑在于后面的getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap,其中getMetadataHandler我们在之前的文章中讲过了,根据文件类型返回ParquetBootstrapMetadataHandler或者OrcBootstrapMetadataHandler,我们这里返回ParquetBootstrapMetadataHandler

  private HoodieData<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitions) {
    if (null == partitions || partitions.isEmpty()) {
      return context.emptyHoodieData();
    }

    TypedProperties properties = new TypedProperties();
    properties.putAll(config.getProps());

    KeyGeneratorInterface keyGenerator;
    try {
      keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);
    } catch (IOException e) {
      throw new HoodieKeyGeneratorException("Init keyGenerator failed ", e);
    }

    BootstrapPartitionPathTranslator translator = (BootstrapPartitionPathTranslator) ReflectionUtils.loadClass(
        config.getBootstrapPartitionPathTranslatorClass(), properties);

    List<Pair<String, Pair<String, HoodieFileStatus>>> bootstrapPaths = partitions.stream()
        .flatMap(p -> {
          String translatedPartitionPath = translator.getBootstrapTranslatedPath(p.getKey());
          return p.getValue().stream().map(f -> Pair.of(p.getKey(), Pair.of(translatedPartitionPath, f)));
        })
        .collect(Collectors.toList());

    context.setJobStatus(this.getClass().getSimpleName(), "Bootstrap metadata table: " + config.getTableName());
    return context.parallelize(bootstrapPaths, config.getBootstrapParallelism())
        .map(partitionFsPair -> getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
                partitionFsPair.getRight().getLeft(), keyGenerator));
  }

ParquetBootstrapMetadataHandler.runMetadataBootstrap

ParquetBootstrapMetadataHandler的runMetadataBootstrap是在其父类BaseBootstrapMetadataHandler中实现的,这里的核心逻辑在executeBootstrap

  public BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String partitionPath, KeyGeneratorInterface keyGenerator) {
    Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
    HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
        table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier());
    try {
      Schema avroSchema = getAvroSchema(sourceFilePath);
      List<String> recordKeyColumns = keyGenerator.getRecordKeyFieldNames().stream()
          .map(HoodieAvroUtils::getRootLevelFieldName)
          .collect(Collectors.toList());
      Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
      LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
      AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);
      AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);
      executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, avroSchema);
    } catch (Exception e) {
      throw new HoodieException(e.getMessage(), e);
    }

    BootstrapWriteStatus writeStatus = (BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0);
    BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping(
        config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
        srcFileStatus, writeStatus.getFileId());
    writeStatus.setBootstrapSourceFileMapping(bootstrapFileMapping);
    return writeStatus;
  }

executeBootstrap

executeBootstrap在ParquetBootstrapMetadataHandler,首先创建一个ParquetReader,然后将reader封装成ParquetReaderIterator,作为BoundedInMemoryExecutor的参数构造wrapper,然后执行wrapper.execute()

  void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
                        Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
    BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null;
    ParquetReader<IndexedRecord> reader =
        AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();
    try {
      wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
          new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
        String recKey = keyGenerator.getKey(inp).getRecordKey();
        GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
        gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
        BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
        HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
        return rec;
      }, table.getPreExecuteRunnable());
      wrapper.execute();
    } catch (Exception e) {
      throw new HoodieException(e);
    } finally {
      reader.close();
      if (null != wrapper) {
        wrapper.shutdownNow();
        wrapper.awaitTermination();
      }
      bootstrapHandle.close();
    }
  }

wrapper.execute()

这里是一个生产者-消费者模型,可以参考生产者-消费者模型在Hudi中的应用

  public E execute() {
    try {
      startProducers();
      Future<E> future = startConsumer();
      // Wait for consumer to be done
      return future.get();
    } catch (InterruptedException ie) {
      shutdownNow();
      Thread.currentThread().interrupt();
      throw new HoodieException(ie);
    } catch (Exception e) {
      throw new HoodieException(e);
    }
  }

startProducers

我们主要看一下producer.produce,这里的producer为Collections.singletonList(new IteratorBasedQueueProducer<>(new ParquetReaderIterator(reader))),所以这里的produce方法为IteratorBasedQueueProducer.produce

  public ExecutorCompletionService<Boolean> startProducers() {
    // Latch to control when and which producer thread will close the queue
    final CountDownLatch latch = new CountDownLatch(producers.size());
    final ExecutorCompletionService<Boolean> completionService =
        new ExecutorCompletionService<Boolean>(producerExecutorService);
    producers.stream().map(producer -> {
      return completionService.submit(() -> {
        try {
          preExecuteRunnable.run();
          producer.produce(queue);
        } catch (Throwable e) {
          LOG.error("error producing records", e);
          queue.markAsFailed(e);
          throw e;
        } finally {
          synchronized (latch) {
            latch.countDown();
            if (latch.getCount() == 0) {
              // Mark production as done so that consumer will be able to exit
              queue.close();
            }
          }
        }
        return true;
      });
    }).collect(Collectors.toList());
    return completionService;
  }

IteratorBasedQueueProducer.produce

其中queue.insertRecord的逻辑是是先执行transformFunction.apply返回payload,然后将payload放到队列里,关于transformFunction我们放到后面分析,先看inputIterator.hasNext()和inputIterator.next()这里的inputIterator为ParquetReaderIterator

  public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
    LOG.info("starting to buffer records");
    while (inputIterator.hasNext()) {
      queue.insertRecord(inputIterator.next());
    }
    LOG.info("finished buffering records");
  }

  // BoundedInMemoryQueue.insertRecord
  public void insertRecord(I t) throws Exception {
    // If already closed, throw exception
    if (isWriteDone.get()) {
      throw new IllegalStateException("Queue closed for enqueueing new entries");
    }

    // We need to stop queueing if queue-reader has failed and exited.
    throwExceptionIfFailed();

    rateLimiter.acquire();
    // We are retrieving insert value in the record queueing thread to offload computation
    // around schema validation
    // and record creation to it.
    final O payload = transformFunction.apply(t);
    adjustBufferSizeIfNeeded(payload);
    queue.put(Option.of(payload));
  }

ParquetReaderIterator.next

可以看到hasNext的逻辑是判断next是否为null,而next,则返回parquetReader.read(),parquetReader.read()在类ParquetReader中实现,这已经是parquet的源码了,我们就不往下分析了,总之生产者的逻辑是读取parquet的内容放到队列里供消费者使用,接下来看一下消费者

  public boolean hasNext() {
    try {
      // To handle when hasNext() is called multiple times for idempotency and/or the first time
      if (this.next == null) {
        this.next = parquetReader.read();
      }
      return this.next != null;
    } catch (Exception e) {
      FileIOUtils.closeQuietly(parquetReader);
      throw new HoodieException("unable to read next record from parquet file ", e);
    }
  }

 // 这里的T为IndexedRecord
  @Override
  public T next() {
    try {
      // To handle case when next() is called before hasNext()
      if (this.next == null) {
        if (!hasNext()) {
          throw new HoodieException("No more records left to read from parquet file");
        }
      }
      T retVal = this.next;
      this.next = parquetReader.read();
      return retVal;
    } catch (Exception e) {
      FileIOUtils.closeQuietly(parquetReader);
      throw new HoodieException("unable to read next record from parquet file ", e);
    }
  }

startConsumer

这里主要是consumer.consume,这里的consumer为new BootstrapRecordConsumer(bootstrapHandle)

  private Future<E> startConsumer() {
    return consumer.map(consumer -> {
      return consumerExecutorService.submit(() -> {
        LOG.info("starting consumer thread");
        preExecuteRunnable.run();
        try {
          E result = consumer.consume(queue);
          LOG.info("Queue Consumption is done; notifying producer threads");
          return result;
        } catch (Exception e) {
          LOG.error("error consuming records", e);
          queue.markAsFailed(e);
          throw e;
        }
      });
    }).orElse(CompletableFuture.completedFuture(null));
  }

BootstrapRecordConsumer.consume

它的consume方法是在父类BoundedInMemoryQueueConsumer中实现的,首先通过queue.iterator().next()从队列queue里获取数据,再调用consumeOneRecord,

  public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
    Iterator<I> iterator = queue.iterator();

    while (iterator.hasNext()) {
      consumeOneRecord(iterator.next());
    }

    // Notifies done
    finish();

    return getResult();
  }

BootstrapRecordConsumer.consumeOneRecord

这里的record和Payload是啥呢?这就要看我们上面提到的transformFunction了,他是在上面的方法executeBootstrap中定义的

  protected void consumeOneRecord(HoodieRecord record) {
    try {
      bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
          .getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
    } catch (IOException e) {
      throw new HoodieIOException(e.getMessage(), e);
    }
  }
  public Option<IndexedRecord> getInsertValue(Schema schema) {
    return Option.ofNullable(record);
  }

transformFunction

这里的inp应该为queue.iterator().next()返回的IndexedRecord,首先从IndexedRecord中获取recKey,然后将recKey作为hudi的主键元数据字段放到GenericRecord中,然后将gr作为BootstrapRecordPayload的record构造BootstrapRecordPayload,最后返回new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload),所以上面的record即为下面的rec,而上面的HoodieRecordPayload是通过record.getData()获取的,也就是HoodieAvroRecord中的data,即为下面的BootstrapRecordPayload

inp -> {
        String recKey = keyGenerator.getKey(inp).getRecordKey();
        GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
        gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
        BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
        HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
        return rec;
      }

BootstrapRecordPayload.getInsertValue

这里的逻辑也很简单, 直接返回record,在上面的transformFunction中可知,record为GenericRecord,只有一个字段RECORD_KEY_METADATA_FIELD,这样我们知道了getInsertValue返回值,就可以继续看一下后面的write方法了

  public Option<IndexedRecord> getInsertValue(Schema schema) {
    return Option.ofNullable(record);
  }

HoodieBootstrapHandle.write

HoodieBootstrapHandle的write方法是在父类HoodieCreateHandle中实现的,这里的核心逻辑是通过fileWriter将avroRecord写到对应的parquet中,具体的实现也是在parquet源码中,我们知道这里的Record信息只有主键信息和分区信息,这就是为什么metadata为啥只会生成主键、页脚的基本框架文件,不会重写全部数据的原因了,关于METADATA_ONLY我们就分析到这里了,接着看一下FULL_RECORD

  public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
    Option recordMetadata = ((HoodieRecordPayload) record.getData()).getMetadata();
    if (HoodieOperation.isDelete(record.getOperation())) {
      avroRecord = Option.empty();
    }
    try {
      if (avroRecord.isPresent()) {
        if (avroRecord.get().equals(IGNORE_RECORD)) {
          return;
        }
        // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
        if (preserveMetadata) {
          fileWriter.writeAvro(record.getRecordKey(),
              rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
        } else {
          fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get()));
        }
        // update the new location of record, so we know where to find it next
        record.unseal();
        record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
        record.seal();
        recordsWritten++;
        insertRecordsWritten++;
      } else {
        recordsDeleted++;
      }
      writeStatus.markSuccess(record, recordMetadata);
      // deflate record payload after recording success. This will help users access payload as a
      // part of marking
      // record successful.
      record.deflate();
    } catch (Throwable t) {
      // Not throwing exception from here, since we don't want to fail the entire job
      // for a single record
      writeStatus.markFailure(record, t, recordMetadata);
      LOG.error("Error writing record " + record, t);
    }
  }

SparkBootstrapCommitActionExecutor.fullBootstrap

首先通过反射构造inputProvider,它是通过FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME(hoodie.bootstrap.full.input.provider)配置的,默认值为SparkParquetBootstrapDataProvider,然后由inputProvider.generateInputRecords读取源表的parquet文件返回inputRecordsRDD,这里返回的是全部的内容,最后将inputRecordsRDD作为参数,由getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute(),最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中,也就是以Hudi表的形式执行数据的完整复制/重写。关于bulkInsert的源码逻辑,由于比较多,限于篇幅和精力以及能力的原因,本文就不深入介绍了,有机会的话我会再单独分享一篇的。

  protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
    if (null == partitionFilesList || partitionFilesList.isEmpty()) {
      return Option.empty();
    }
    TypedProperties properties = new TypedProperties();
    properties.putAll(config.getProps());
    FullRecordBootstrapDataProvider inputProvider =
        (FullRecordBootstrapDataProvider) ReflectionUtils.loadClass(config.getFullBootstrapInputProvider(),
            properties, context);
    JavaRDD<HoodieRecord> inputRecordsRDD =
        (JavaRDD<HoodieRecord>) inputProvider.generateInputRecords("bootstrap_source", config.getBootstrapSourceBasePath(),
            partitionFilesList);
    // Start Full Bootstrap
    final HoodieInstant requested = new HoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(),
        HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
    table.getActiveTimeline().createNewInstant(requested);

    // Setup correct schema and run bulk insert.
    return Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());
  }

在父类SparkFullBootstrapDataProviderBase

  @Override
  public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath,
                                                    List<Pair<String, List<HoodieFileStatus>>> partitionPathsWithFiles) {
    String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
        .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
        .toArray(String[]::new);

    Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths);
    try {
      KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
      String structName = tableName + "_record";
      String namespace = "hoodie." + tableName;
      RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
          Option.empty());
      return genericRecords.toJavaRDD().map(gr -> {
        String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
            gr, props.getString("hoodie.datasource.write.precombine.field"), false, props.getBoolean(
                KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
                Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
        try {
          return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
              props.getString("hoodie.datasource.write.payload.class"));
        } catch (IOException ioe) {
          throw new HoodieIOException(ioe.getMessage(), ioe);
        }
      });
    } catch (IOException ioe) {
      throw new HoodieIOException(ioe.getMessage(), ioe);
    }
  }

getBulkInsertActionExecutor

返回SparkBulkInsertCommitActionExecutor

  protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
    return new SparkBulkInsertCommitActionExecutor((HoodieSparkEngineContext) context, new HoodieWriteConfig.Builder().withProps(config.getProps())
        .withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
        inputRecordsRDD, Option.empty(), extraMetadata);
  }

SparkBulkInsertCommitActionExecutor.execute

执行SparkBulkInsertHelper.bulkInsert

  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
    try {
      return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD, instantTime, table, config,
          this, true, bulkInsertPartitioner);
    } catch (HoodieInsertException ie) {
      throw ie;
    } catch (Throwable e) {
      throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
    }
  }

SparkBulkInsertHelper.bulkInsert

最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中

  public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(final HoodieData<HoodieRecord<T>> inputRecords,
                                                                 final String instantTime,
                                                                 final HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
                                                                 final HoodieWriteConfig config,
                                                                 final BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> executor,
                                                                 final boolean performDedupe,
                                                                 final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
    HoodieWriteMetadata result = new HoodieWriteMetadata();

    //transition bulk_insert state to inflight
    table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
            executor.getCommitActionType(), instantTime), Option.empty(),
        config.shouldAllowMultiWriteOnSameInstant());

    BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));

    // write new files
    HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
        config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
    //update index
    ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
    return result;
  }

总结

本文简单的对Hudi bootstrap的一些关键的源码逻辑进行了分析,希望能对大家有所帮助。限于精力及能力的原因,有些地方可能不够深入,或者不对的地方,还请大家多多指正,让我们共同进步。

10-18 17:30