我想使用Google Cloud Storage从我的流作业中使用 DataStream
写入(接收)StreamingFileSink
的元素。
为此,我将Google Cloud Storage connector用于Hadoop作为org.apache.hadoop.fs.FileSystem
的实现,并使用 HadoopFileSystem
as an implementation of org.apache.flink.core.fs.FileSystem
包装了Flink的hadoop FileSystem类。
我在gradle文件中包括以下依赖项:
compile(
"com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2"
)
compile(
"org.apache.flink:flink-connector-filesystem_2.11:1.6.0"
)
provided(
"org.apache.flink:flink-shaded-hadoop2:1.6.0"
)
现在,根据我了解的源[1] [2] [3]的理解,Flink在运行时动态加载
FileSystemFactory
的实现(通过java.util.ServiceLoader
),还在运行时加载 HadoopFsFactory
(通过反射(如果在类路径中找到Hadoop则通过反射)),然后将其用于创建FileSystem
的实例。我面临的问题是,Hadoop兼容性软件包的默认
RecoverableWriter
仅支持hdfs
文件方案(我使用gs
),因此也支持throws an error at runtime。因此,我对
extended
进行HadoopFileSystem
(我称为GCSFileSystem
),然后对 @overrided
进行FileSystem#createRecoverableWriter()
的操作,以返回自定义的 RecoverableWriter
实现,然后处理恢复的详细信息,等等。此外,我还创建了一个对应的FileSystemFactory
类(该类用@AutoService
装饰,因此应该可通过ServiceLoader
发现)。该设置在本地和本地Docker集群上均能很好地工作(实际上GCS连接器由于缺少授权而引发错误,但这很好,因为这意味着
FileSystem
已加载并正在运行),但是当我将其部署到Docker集群时会失败在Google Compute Engine上运行。在GCE上,默认的
HadoopFileSystem
被加载并抛出异常,因为该方案是gs
而不是hdfs
,但是我的假设是它应该已经加载了工厂的实现,因此应该不会出现此错误。我正在Flink v1.6.0 上并以long running session cluster on Docker using docker-flink身份运行
最佳答案
答案在OP的最后一行!!
我运行的 session 集群很长,在我执行job.jar
时,已经完成了FileSystem
初始化,并且工厂已经加载!因此,添加我的Job时没有进行初始化调用。
解决方案?有几种方法取决于您部署工作的方式:
FileSystem
实现的jar添加到lib/
目录manual
):将包含FileSystem
实现的jar添加到lib/
或图像或其他内容的zip
目录中。 docker
)(long-living
):创建一个自定义容器镜像,并将jar添加到该镜像的lib/
目录中。 docker
)(per-job-session
):创建一个自定义容器镜像,并将所有jar(包含FileSystem
和您的工作等)添加到lib/
目录,read more about per-job session here.