前言
Apache Hudi bootstrap源码简要走读,不了解Hudi bootstrap的可以参考:利用Hudi Bootstrap转化现有Hive表的parquet/orc文件为Hudi表
版本
Hudi 0.12.0
Spark 2.4.4
入口
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write.
format("hudi").
options(extraOpts).
option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL).
......
save(basePath)
根据文章:Hudi Spark源码学习总结-df.write.format(“hudi”).save可知,save
方法会走到DefaultSource.createRelation
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
}
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}
它会判断OPERATION是否为BOOTSTRAP_OPERATION_OPT_VAL,这里为true,所以会调用HoodieSparkSqlWriter.bootstrap
HoodieSparkSqlWriter.bootstrap
这里首先获取一些参数,然后判断表是否存在,如果不存在证明是第一次写,需要设置写一些配置参数,然后进行初始化:HoodieTableMetaClient.initTable
,接着调用writeClient.bootstrap
def bootstrap(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
val path = optParams("path")
val basePath = new Path(path)
val sparkContext = sqlContext.sparkContext
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
s"'${BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
" operation'")
val bootstrapIndexClass = hoodieConfig.getStringOrDefault(INDEX_CLASS_NAME)
var schema: String = null
if (df.schema.nonEmpty) {
val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
} else {
schema = HoodieAvroUtils.getNullSchema.toString
}
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
if (!hoodieWriteClient.isEmpty) {
hoodieWriteClient.get.close()
}
false
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)
if (!tableExists) { // 表如果不存在
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
))
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val useBaseFormatMetaFile = java.lang.Boolean.parseBoolean(parameters.getOrElse(
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
))
// 进行一些配置后,初始化Hudi表
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
.setRecordKeyFields(recordKeyFields)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
.setBootstrapIndexClass(bootstrapIndexClass)
.setBaseFileFormat(baseFileFormat)
.setBootstrapBasePath(bootstrapBasePath)
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.initTable(sparkContext.hadoopConfiguration, path)
}
val jsc = new JavaSparkContext(sqlContext.sparkContext)
val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schema, path, tableName, mapAsJavaMap(parameters)))
try {
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
} finally {
writeClient.close()
}
val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
metaSyncSuccess
}
}
writeClient.bootstrap
这里的writeClient为SparkRDDWriteClient
,然后调用HoodieTable的bootstrap
,我们这里使用表类型为COW,所以为HoodieSparkCopyOnWriteTable
initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata);
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieTableMetaClient metaClient) {
HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
hoodieSparkTable = new HoodieSparkCopyOnWriteTable<>(config, context, metaClient);
break;
case MERGE_ON_READ:
hoodieSparkTable = new HoodieSparkMergeOnReadTable<>(config, context, metaClient);
break;
default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}
return hoodieSparkTable;
}
HoodieSparkCopyOnWriteTable.bootstrap
public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
return new SparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
}
SparkBootstrapCommitActionExecutor.execute
这里首先通过listAndProcessSourcePartitions返回mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD,然后对于METADATA_ONLY对应的分区路径执行metadataBootstrap,FULL_RECORD对应的分区路径执行fullBootstrap,从这里可以看出两点:1、通过listAndProcessSourcePartitions返回的mode值判断是进行METADATA_ONLY还是FULL_RECORD 2、具体的逻辑分别在metadataBootstrap,fullBootstrap。那么我们分别来看一下,首先看一下listAndProcessSourcePartitions是如何分会mode的
@Override
public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute() {
validate();
try {
HoodieTableMetaClient metaClient = table.getMetaClient();
Option<HoodieInstant> completedInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
ValidationUtils.checkArgument(!completedInstant.isPresent(),
"Active Timeline is expected to be empty for bootstrap to be performed. "
+ "If you want to re-bootstrap, please rollback bootstrap first !!");
// 返回 mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD
Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();
// First run metadata bootstrap which will auto commit
// 首先运行metadataBootstrap,如果partitionSelections中有METADATA_ONLY则继续执行metadataBootstrap的逻辑,没有的话,什么都不执行,直接返回
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
// if there are full bootstrap to be performed, perform that too
// 然后运行fullBootstrap,如果partitionSelections中有FULL_RECORD则继续执行fullBootstrap的逻辑,没有的话,什么都不执行,直接返回
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
// Delete the marker directory for the instant
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
listAndProcessSourcePartitions
这里的主要实现是selector.select,这里的select是通过MODE_SELECTOR_CLASS_NAME(hoodie.bootstrap.mode.selector)配置的,默认值为MetadataOnlyBootstrapModeSelector,我们的例子中FULL_RECORD设置的为FullRecordBootstrapModeSelector,让我们分别看一下他们具体的实现
private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions() throws IOException {
List<Pair<String, List<HoodieFileStatus>>> folders = BootstrapUtils.getAllLeafFoldersWithFiles(
table.getMetaClient(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), context);
LOG.info("Fetching Bootstrap Schema !!");
HoodieBootstrapSchemaProvider sourceSchemaProvider = new HoodieSparkBootstrapSchemaProvider(config);
bootstrapSchema = sourceSchemaProvider.getBootstrapSchema(context, folders).toString();
LOG.info("Bootstrap Schema :" + bootstrapSchema);
BootstrapModeSelector selector =
(BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);
Map<BootstrapMode, List<String>> result = selector.select(folders);
Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
Collectors.toMap(Pair::getKey, Pair::getValue));
// Ensure all partitions are accounted for
ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));
return result.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
.map(p -> Pair.of(p, partitionToFiles.get(p))).collect(Collectors.toList())))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
selector.select
MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector都是UniformBootstrapModeSelector的子类,区别是bootstrapMode不一样,它们的select方法是在父类UniformBootstrapModeSelector实现的
public class MetadataOnlyBootstrapModeSelector extends UniformBootstrapModeSelector {
public MetadataOnlyBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) {
super(bootstrapConfig, BootstrapMode.METADATA_ONLY);
}
}
public class FullRecordBootstrapModeSelector extends UniformBootstrapModeSelector {
public FullRecordBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) {
super(bootstrapConfig, BootstrapMode.FULL_RECORD);
}
}
UniformBootstrapModeSelector.select
很显然上面的mode的返回值和bootstrapMode是对应的,所以当MODE_SELECTOR_CLASS_NAME为MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector时,他们的mode值是唯一的,要么执行metdata的逻辑要么执行full的逻辑,那么有没有两种模式同时会运行的情况呢,答案是有的。
public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
return partitions.stream().map(p -> Pair.of(bootstrapMode, p))
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(x -> x.getValue().getKey(),
Collectors.toList())));
}
BootstrapRegexModeSelector
BootstrapRegexModeSelector我们在之前的文章中讲过:首先有配置:hoodie.bootstrap.mode.selector.regex.mode 默认值METADATA_ONLY
、hoodie.bootstrap.mode.selector.regex默认值.*
但是如果不是默认值的话,比如上面的2020/08/2[0-9],假设我们有分区”2020/08/10,2020/08/10/11,2020/08/20,2020/08/21”,那么匹配成功的2020/08/20和2020/08/21对应的类型为METADATA_ONLY,匹配不成功的2020/08/10和2020/08/10/11则为FULL_RECORD。而至于我的为啥都是FULL_RECORD,原因是regex设置错误,我设置的是2022/10/0[0-9],但实际的分区值为2022-10-08和2022-10-09(分隔符不一样),而如果用默认的.*的话,则全部能匹配上,也就都是METADATA_ONLY(默认情况)
public BootstrapRegexModeSelector(HoodieWriteConfig writeConfig) {
super(writeConfig);
this.pattern = Pattern.compile(writeConfig.getBootstrapModeSelectorRegex());
this.bootstrapModeOnMatch = writeConfig.getBootstrapModeForRegexMatch();
// defaultMode和bootstrapModeOnMatch对立
this.defaultMode = BootstrapMode.FULL_RECORD.equals(bootstrapModeOnMatch)
? BootstrapMode.METADATA_ONLY : BootstrapMode.FULL_RECORD;
LOG.info("Default Mode :" + defaultMode + ", on Match Mode :" + bootstrapModeOnMatch);
}
@Override
public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
return partitions.stream()
// 匹配上的话,值为bootstrapModeOnMatch,默认为METADATA_ONLY,否则为defaultMode,也就是另外一种类型`FULL_RECORD`
// bootstrapModeOnMatch 和 defaultMode是对立的
.map(p -> Pair.of(pattern.matcher(p.getKey()).matches() ? bootstrapModeOnMatch : defaultMode, p.getKey()))
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
}
关于BootstrapModeSelector的实现一共只有上面讲的这三种,下面让我们来看一下metadataBootstrap,fullBootstrap
metadataBootstrap
这里首先创建keyGenerator,然后获取bootstrapPaths,核心逻辑在于后面的getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap,其中getMetadataHandler我们在之前的文章中讲过了,根据文件类型返回ParquetBootstrapMetadataHandler或者OrcBootstrapMetadataHandler,我们这里返回ParquetBootstrapMetadataHandler
private HoodieData<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitions) {
if (null == partitions || partitions.isEmpty()) {
return context.emptyHoodieData();
}
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
KeyGeneratorInterface keyGenerator;
try {
keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);
} catch (IOException e) {
throw new HoodieKeyGeneratorException("Init keyGenerator failed ", e);
}
BootstrapPartitionPathTranslator translator = (BootstrapPartitionPathTranslator) ReflectionUtils.loadClass(
config.getBootstrapPartitionPathTranslatorClass(), properties);
List<Pair<String, Pair<String, HoodieFileStatus>>> bootstrapPaths = partitions.stream()
.flatMap(p -> {
String translatedPartitionPath = translator.getBootstrapTranslatedPath(p.getKey());
return p.getValue().stream().map(f -> Pair.of(p.getKey(), Pair.of(translatedPartitionPath, f)));
})
.collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(), "Bootstrap metadata table: " + config.getTableName());
return context.parallelize(bootstrapPaths, config.getBootstrapParallelism())
.map(partitionFsPair -> getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
partitionFsPair.getRight().getLeft(), keyGenerator));
}
ParquetBootstrapMetadataHandler.runMetadataBootstrap
ParquetBootstrapMetadataHandler的runMetadataBootstrap是在其父类BaseBootstrapMetadataHandler中实现的,这里的核心逻辑在executeBootstrap
public BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String partitionPath, KeyGeneratorInterface keyGenerator) {
Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier());
try {
Schema avroSchema = getAvroSchema(sourceFilePath);
List<String> recordKeyColumns = keyGenerator.getRecordKeyFieldNames().stream()
.map(HoodieAvroUtils::getRootLevelFieldName)
.collect(Collectors.toList());
Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);
AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);
executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, avroSchema);
} catch (Exception e) {
throw new HoodieException(e.getMessage(), e);
}
BootstrapWriteStatus writeStatus = (BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0);
BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping(
config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
srcFileStatus, writeStatus.getFileId());
writeStatus.setBootstrapSourceFileMapping(bootstrapFileMapping);
return writeStatus;
}
executeBootstrap
executeBootstrap在ParquetBootstrapMetadataHandler,首先创建一个ParquetReader,然后将reader封装成ParquetReaderIterator,作为BoundedInMemoryExecutor的参数构造wrapper,然后执行wrapper.execute()
void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null;
ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();
try {
wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
return rec;
}, table.getPreExecuteRunnable());
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
} finally {
reader.close();
if (null != wrapper) {
wrapper.shutdownNow();
wrapper.awaitTermination();
}
bootstrapHandle.close();
}
}
wrapper.execute()
这里是一个生产者-消费者模型,可以参考生产者-消费者模型在Hudi中的应用
public E execute() {
try {
startProducers();
Future<E> future = startConsumer();
// Wait for consumer to be done
return future.get();
} catch (InterruptedException ie) {
shutdownNow();
Thread.currentThread().interrupt();
throw new HoodieException(ie);
} catch (Exception e) {
throw new HoodieException(e);
}
}
startProducers
我们主要看一下producer.produce,这里的producer为Collections.singletonList(new IteratorBasedQueueProducer<>(new ParquetReaderIterator(reader))),所以这里的produce方法为IteratorBasedQueueProducer.produce
public ExecutorCompletionService<Boolean> startProducers() {
// Latch to control when and which producer thread will close the queue
final CountDownLatch latch = new CountDownLatch(producers.size());
final ExecutorCompletionService<Boolean> completionService =
new ExecutorCompletionService<Boolean>(producerExecutorService);
producers.stream().map(producer -> {
return completionService.submit(() -> {
try {
preExecuteRunnable.run();
producer.produce(queue);
} catch (Throwable e) {
LOG.error("error producing records", e);
queue.markAsFailed(e);
throw e;
} finally {
synchronized (latch) {
latch.countDown();
if (latch.getCount() == 0) {
// Mark production as done so that consumer will be able to exit
queue.close();
}
}
}
return true;
});
}).collect(Collectors.toList());
return completionService;
}
IteratorBasedQueueProducer.produce
其中queue.insertRecord的逻辑是是先执行transformFunction.apply返回payload,然后将payload放到队列里,关于transformFunction我们放到后面分析,先看inputIterator.hasNext()和inputIterator.next()这里的inputIterator为ParquetReaderIterator
public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
LOG.info("starting to buffer records");
while (inputIterator.hasNext()) {
queue.insertRecord(inputIterator.next());
}
LOG.info("finished buffering records");
}
// BoundedInMemoryQueue.insertRecord
public void insertRecord(I t) throws Exception {
// If already closed, throw exception
if (isWriteDone.get()) {
throw new IllegalStateException("Queue closed for enqueueing new entries");
}
// We need to stop queueing if queue-reader has failed and exited.
throwExceptionIfFailed();
rateLimiter.acquire();
// We are retrieving insert value in the record queueing thread to offload computation
// around schema validation
// and record creation to it.
final O payload = transformFunction.apply(t);
adjustBufferSizeIfNeeded(payload);
queue.put(Option.of(payload));
}
ParquetReaderIterator.next
可以看到hasNext的逻辑是判断next是否为null,而next,则返回parquetReader.read(),parquetReader.read()在类ParquetReader中实现,这已经是parquet的源码了,我们就不往下分析了,总之生产者的逻辑是读取parquet的内容放到队列里供消费者使用,接下来看一下消费者
public boolean hasNext() {
try {
// To handle when hasNext() is called multiple times for idempotency and/or the first time
if (this.next == null) {
this.next = parquetReader.read();
}
return this.next != null;
} catch (Exception e) {
FileIOUtils.closeQuietly(parquetReader);
throw new HoodieException("unable to read next record from parquet file ", e);
}
}
// 这里的T为IndexedRecord
@Override
public T next() {
try {
// To handle case when next() is called before hasNext()
if (this.next == null) {
if (!hasNext()) {
throw new HoodieException("No more records left to read from parquet file");
}
}
T retVal = this.next;
this.next = parquetReader.read();
return retVal;
} catch (Exception e) {
FileIOUtils.closeQuietly(parquetReader);
throw new HoodieException("unable to read next record from parquet file ", e);
}
}
startConsumer
这里主要是consumer.consume,这里的consumer为new BootstrapRecordConsumer(bootstrapHandle)
private Future<E> startConsumer() {
return consumer.map(consumer -> {
return consumerExecutorService.submit(() -> {
LOG.info("starting consumer thread");
preExecuteRunnable.run();
try {
E result = consumer.consume(queue);
LOG.info("Queue Consumption is done; notifying producer threads");
return result;
} catch (Exception e) {
LOG.error("error consuming records", e);
queue.markAsFailed(e);
throw e;
}
});
}).orElse(CompletableFuture.completedFuture(null));
}
BootstrapRecordConsumer.consume
它的consume方法是在父类BoundedInMemoryQueueConsumer中实现的,首先通过queue.iterator().next()从队列queue里获取数据,再调用consumeOneRecord,
public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
Iterator<I> iterator = queue.iterator();
while (iterator.hasNext()) {
consumeOneRecord(iterator.next());
}
// Notifies done
finish();
return getResult();
}
BootstrapRecordConsumer.consumeOneRecord
这里的record和Payload是啥呢?这就要看我们上面提到的transformFunction了,他是在上面的方法executeBootstrap中定义的
protected void consumeOneRecord(HoodieRecord record) {
try {
bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
.getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
public Option<IndexedRecord> getInsertValue(Schema schema) {
return Option.ofNullable(record);
}
transformFunction
这里的inp应该为queue.iterator().next()返回的IndexedRecord,首先从IndexedRecord中获取recKey,然后将recKey作为hudi的主键元数据字段放到GenericRecord中,然后将gr作为BootstrapRecordPayload的record构造BootstrapRecordPayload,最后返回new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload),所以上面的record即为下面的rec,而上面的HoodieRecordPayload是通过record.getData()获取的,也就是HoodieAvroRecord中的data,即为下面的BootstrapRecordPayload
inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
return rec;
}
BootstrapRecordPayload.getInsertValue
这里的逻辑也很简单, 直接返回record,在上面的transformFunction中可知,record为GenericRecord,只有一个字段RECORD_KEY_METADATA_FIELD,这样我们知道了getInsertValue返回值,就可以继续看一下后面的write方法了
public Option<IndexedRecord> getInsertValue(Schema schema) {
return Option.ofNullable(record);
}
HoodieBootstrapHandle.write
HoodieBootstrapHandle的write方法是在父类HoodieCreateHandle中实现的,这里的核心逻辑是通过fileWriter将avroRecord写到对应的parquet中,具体的实现也是在parquet源码中,我们知道这里的Record信息只有主键信息和分区信息,这就是为什么metadata为啥只会生成主键、页脚的基本框架文件,不会重写全部数据的原因了,关于METADATA_ONLY我们就分析到这里了,接着看一下FULL_RECORD
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
Option recordMetadata = ((HoodieRecordPayload) record.getData()).getMetadata();
if (HoodieOperation.isDelete(record.getOperation())) {
avroRecord = Option.empty();
}
try {
if (avroRecord.isPresent()) {
if (avroRecord.get().equals(IGNORE_RECORD)) {
return;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
if (preserveMetadata) {
fileWriter.writeAvro(record.getRecordKey(),
rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
} else {
fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get()));
}
// update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
record.seal();
recordsWritten++;
insertRecordsWritten++;
} else {
recordsDeleted++;
}
writeStatus.markSuccess(record, recordMetadata);
// deflate record payload after recording success. This will help users access payload as a
// part of marking
// record successful.
record.deflate();
} catch (Throwable t) {
// Not throwing exception from here, since we don't want to fail the entire job
// for a single record
writeStatus.markFailure(record, t, recordMetadata);
LOG.error("Error writing record " + record, t);
}
}
SparkBootstrapCommitActionExecutor.fullBootstrap
首先通过反射构造inputProvider,它是通过FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME(hoodie.bootstrap.full.input.provider)配置的,默认值为SparkParquetBootstrapDataProvider,然后由inputProvider.generateInputRecords读取源表的parquet文件返回inputRecordsRDD,这里返回的是全部的内容,最后将inputRecordsRDD作为参数,由getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute(),最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中,也就是以Hudi表的形式执行数据的完整复制/重写。关于bulkInsert的源码逻辑,由于比较多,限于篇幅和精力以及能力的原因,本文就不深入介绍了,有机会的话我会再单独分享一篇的。
protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
if (null == partitionFilesList || partitionFilesList.isEmpty()) {
return Option.empty();
}
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
FullRecordBootstrapDataProvider inputProvider =
(FullRecordBootstrapDataProvider) ReflectionUtils.loadClass(config.getFullBootstrapInputProvider(),
properties, context);
JavaRDD<HoodieRecord> inputRecordsRDD =
(JavaRDD<HoodieRecord>) inputProvider.generateInputRecords("bootstrap_source", config.getBootstrapSourceBasePath(),
partitionFilesList);
// Start Full Bootstrap
final HoodieInstant requested = new HoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(),
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
table.getActiveTimeline().createNewInstant(requested);
// Setup correct schema and run bulk insert.
return Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());
}
在父类SparkFullBootstrapDataProviderBase
@Override
public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath,
List<Pair<String, List<HoodieFileStatus>>> partitionPathsWithFiles) {
String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
.toArray(String[]::new);
Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths);
try {
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
String structName = tableName + "_record";
String namespace = "hoodie." + tableName;
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
Option.empty());
return genericRecords.toJavaRDD().map(gr -> {
String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
gr, props.getString("hoodie.datasource.write.precombine.field"), false, props.getBoolean(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
try {
return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
props.getString("hoodie.datasource.write.payload.class"));
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
});
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
getBulkInsertActionExecutor
返回SparkBulkInsertCommitActionExecutor
protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
return new SparkBulkInsertCommitActionExecutor((HoodieSparkEngineContext) context, new HoodieWriteConfig.Builder().withProps(config.getProps())
.withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
inputRecordsRDD, Option.empty(), extraMetadata);
}
SparkBulkInsertCommitActionExecutor.execute
执行SparkBulkInsertHelper.bulkInsert
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
try {
return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD, instantTime, table, config,
this, true, bulkInsertPartitioner);
} catch (HoodieInsertException ie) {
throw ie;
} catch (Throwable e) {
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}
SparkBulkInsertHelper.bulkInsert
最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中
public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(final HoodieData<HoodieRecord<T>> inputRecords,
final String instantTime,
final HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
final HoodieWriteConfig config,
final BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> executor,
final boolean performDedupe,
final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
//transition bulk_insert state to inflight
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
// write new files
HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
//update index
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
总结
本文简单的对Hudi bootstrap的一些关键的源码逻辑进行了分析,希望能对大家有所帮助。限于精力及能力的原因,有些地方可能不够深入,或者不对的地方,还请大家多多指正,让我们共同进步。