具有多个加密密钥提供者的EMR

具有多个加密密钥提供者的EMR

本文介绍了具有多个加密密钥提供者的EMR的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行启用了 s3客户端加密.但是现在我需要使用不同的加密模式将数据写入多个s3目标:

I'm running EMR cluster with enabled s3 client-side encryption using custom key provider. But now I need to write data to multiple s3 destinations using different encryption schemas:

  1. CSE自定义密钥提供者
  2. CSE-KMS

是否可以通过在s3存储桶和加密类型之间定义某种映射来将EMR配置为同时使用两种加密类型?

Is it possible to configure EMR to use both encryption types by defining some kind of mapping between s3 bucket and encryption type?

或者,由于我使用火花结构化流来处理数据并将数据写入s3,所以我想知道是否有可能在EMRFS上禁用加密,然后分别为每个流启用CSE吗?

Alternatively since I use spark structured streaming to process and write data to s3 I'm wondering if it's possible to disable encryption on EMRFS but then enable CSE for each stream separately?

推荐答案

该想法是支持任何文件系统方案并分别进行配置.例如:

The idea is to support any file systems scheme and configure it individually. For example:

# custom encryption key provider
fs.s3x.cse.enabled = true
fs.s3x.cse.materialsDescription.enabled = true
fs.s3x.cse.encryptionMaterialsProvider = my.company.fs.encryption.CustomKeyProvider

#no encryption
fs.s3u.cse.enabled = false

#AWS KMS
fs.s3k.cse.enabled = true
fs.s3k.cse.encryptionMaterialsProvider = com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider
fs.s3k.cse.kms.keyId = some-kms-id

然后像这样在火花中使用它:

And then to use it in spark like this:

StreamingQuery writeStream = session
        .readStream()
        .schema(RecordSchema.fromClass(TestRecord.class))
        .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
        .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
        .csv("s3x://aws-s3-bucket/input")
        .as(Encoders.bean(TestRecord.class))
        .writeStream()
        .outputMode(OutputMode.Append())
        .format("parquet")
        .option("path", "s3k://aws-s3-bucket/output")
        .option("checkpointLocation", "s3u://aws-s3-bucket/checkpointing")
        .start();

为此,我实现了一个自定义Hadoop文件系统(扩展了org.apache.hadoop.fs.FileSystem),该文件系统将调用委派给实际文件系统,但配置有所修改.

Ta handle this I’ve implemented a custom Hadoop file system (extends org.apache.hadoop.fs.FileSystem) that delegates calls to real file system but with modified configurations.

// Create delegate FS
this.config.set("fs.s3n.impl", "com.amazon.ws.emr.hadoop.fs.EmrFileSystem");
this.config.set("fs.s3n.impl.disable.cache", Boolean.toString(true));
this.delegatingFs = FileSystem.get(s3nURI(originalUri, SCHEME_S3N), substituteS3Config(conf));

传递给委派文件系统的配置应采用所有原始设置,并用fs.s3n.替换任何出现的fs.s3*..

Configuration that passes to delegating file system should take all original settings and replace any occurrences of fs.s3*. with fs.s3n..

private Configuration substituteS3Config(final Configuration conf) {
    if (conf == null) return null;

    final String fsSchemaPrefix = "fs." + getScheme() + ".";
    final String fsS3SchemaPrefix = "fs.s3.";
    final String fsSchemaImpl = "fs." + getScheme() + ".impl";
    Configuration substitutedConfig = new Configuration(conf);
    for (Map.Entry<String, String> configEntry : conf) {
        String propName = configEntry.getKey();
        if (!fsSchemaImpl.equals(propName)
            && propName.startsWith(fsSchemaPrefix)) {
            final String newPropName = propName.replace(fsSchemaPrefix, fsS3SchemaPrefix);
            LOG.info("Substituting property '{}' with '{}'", propName, newPropName);
            substitutedConfig.set(newPropName, configEntry.getValue());
        }
    }

    return substitutedConfig;
}

此外,还要确保委派fs接收具有支持方案的uri和路径,并返回具有自定义方案的路径

Besides that make sure that delegating fs receives uris and paths with supporting scheme and returns paths with custom scheme

@Override
public FileStatus getFileStatus(final Path f) throws IOException {
    FileStatus status = this.delegatingFs.getFileStatus(s3Path(f));
    if (status != null) {
        status.setPath(customS3Path(status.getPath()));
    }
    return status;
}

private Path s3Path(final Path p) {
    if (p.toUri() != null && getScheme().equals(p.toUri().getScheme())) {
        return new Path(s3nURI(p.toUri(), SCHEME_S3N));
    }
    return p;
}

private Path customS3Path(final Path p) {
    if (p.toUri() != null && !getScheme().equals(p.toUri().getScheme())) {
        return new Path(s3nURI(p.toUri(), getScheme()));
    }
    return p;
}

private URI s3nURI(final URI originalUri, final String newScheme) {
     try {
         return new URI(
             newScheme,
             originalUri.getUserInfo(),
             originalUri.getHost(),
             originalUri.getPort(),
             originalUri.getPath(),
             originalUri.getQuery(),
             originalUri.getFragment());
     } catch (URISyntaxException e) {
         LOG.warn("Unable to convert URI {} to {} scheme", originalUri, newScheme);
     }

     return originalUri;
}

最后一步是向Hadoop(spark-defaults分类)注册自定义文件系统

The final step is to register custom file system with Hadoop (spark-defaults classification)

spark.hadoop.fs.s3x.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3u.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3k.impl = my.company.fs.DynamicS3FileSystem

这篇关于具有多个加密密钥提供者的EMR的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-23 14:42