我想使用Google Cloud Storage从我的流作业中使用 DataStream 写入(接收)StreamingFileSink的元素。

为此,我将Google Cloud Storage connector用于Hadoop作为org.apache.hadoop.fs.FileSystem的实现,并使用 HadoopFileSystem as an implementation of org.apache.flink.core.fs.FileSystem 包装了Flink的hadoop FileSystem类。

我在gradle文件中包括以下依赖项:

  • compile( "com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2" )
  • compile( "org.apache.flink:flink-connector-filesystem_2.11:1.6.0" )
  • provided( "org.apache.flink:flink-shaded-hadoop2:1.6.0" )

  • 现在,根据我了解的源[1] [2] [3]的理解,Flink在运行时动态加载 FileSystemFactory 的实现(通过java.util.ServiceLoader),还在运行时加载 HadoopFsFactory (通过反射(如果在类路径中找到Hadoop则通过反射)),然后将其用于创建FileSystem的实例。

    我面临的问题是,Hadoop兼容性软件包的默认 RecoverableWriter 仅支持hdfs文件方案(我使用gs),因此也支持throws an error at runtime

    因此,我对extended进行HadoopFileSystem(我称为GCSFileSystem),然后对 @overrided 进行FileSystem#createRecoverableWriter()的操作,以返回自定义的 RecoverableWriter 实现,然后处理恢复的详细信息,等等。此外,我还创建了一个对应的FileSystemFactory类(该类用@AutoService装饰,因此应该可通过ServiceLoader发现)。

    该设置在本地和本地Docker集群上均能很好地工作(实际上GCS连接器由于缺少授权而引发错误,但这很好,因为这意味着FileSystem已加载并正在运行),但是当我将其部署到Docker集群时会失败在Google Compute Engine上运行。

    在GCE上,默认的HadoopFileSystem被加载并抛出异常,因为该方案是gs而不是hdfs,但是我的假设是它应该已经加载了工厂的实现,因此应该不会出现此错误。

    我正在Flink v1.6.0 上并以long running session cluster on Docker using docker-flink身份运行

    最佳答案

    答案在OP的最后一行!!

    我运行的 session 集群很长,在我执行job.jar时,已经完成了FileSystem初始化,并且工厂已经加载!因此,添加我的Job时没有进行初始化调用。

    解决方案?有几种方法取决于您部署工作的方式:

  • 独立:将包含FileSystem实现的jar添加到lib/目录
  • 群集(manual):将包含FileSystem实现的jar添加到lib/或图像或其他内容的zip目录中。
  • 群集(docker)(long-living):创建一个自定义容器镜像,并将jar添加到该镜像的lib/目录中。
  • 群集(docker)(per-job-session):创建一个自定义容器镜像,并将所有jar(包含FileSystem和您的工作等)添加到lib/目录,read more about per-job session here.
  • 07-25 23:21
    查看更多