在Flink集群中为插件添加自定义依赖项

在Flink集群中为插件添加自定义依赖项

本文介绍了在Flink集群中为插件添加自定义依赖项的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Flink会话群集(作业管理器+任务管理器),版本1.11.1,配置了log4j-console.properties以包括Kafka附加程序.另外,在Job Manager和Task Manager中,我都启用了flink-s3-fs-hadoop内置插件.

I have a Flink session cluster (Job Manager + Task Manager), version 1.11.1, with configured log4j-console.properties to include Kafka appender. In addition, in both Job Manager and Task Manager I'm enabling flink-s3-fs-hadoop built-in plugin.

我已经将kafka-clients jar添加到flink/lib目录,这对于容器的运行是必需的.但是当实例化S3插件(并初始化记录器)时,仍然出现以下类加载错误.

I've added kafka-clients jar to the flink/lib directory, which is necessary for the container to be running. But I'm still getting the below class loading error when the S3 plugin is being instantiated (and initializing the logger).

原因:org.apache.kafka.common.config.ConfigException:无效的org.apache.kafka.common.serialization.ByteArraySerializer值,用于配置键.serializer:类org.apache.kafka.common.serialization找不到ByteArraySerializer.

(底部的完整堆栈跟踪)

(full stack trace at the bottom)

据我了解,有专门的插件动态类加载,这与系统类加载是分开的.因此,我在flink-conf.yaml文件中添加了以下配置:

As I understood, there is a dedicated dynamic class loading for plugins, which is separated than the system class loading. Therefore, I added the following configurations to the flink-conf.yaml file:

classloader.parent-first-patterns.additional: org.apache.kafka
classloader.resolve-order: parent-first

但错误仍然出现.

在调试时,我看不到在"allowedFlinkPackages"中添加了其他模式.插件类加载器的版本.

While dubugging, I don't see the additional pattern being added to the "allowedFlinkPackages" of the Plugin Class Loader.

我在这里想念什么?

java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3FileSystemFactory could not be instantiated
        at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
        at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.ExceptionInInitializerError
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
        at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:481) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.logging.log4j.core.appender.mom.kafka.DefaultKafkaProducerFactory.newKafkaProducer(DefaultKafkaProducerFactory.java:40) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.startup(KafkaManager.java:136) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.start(KafkaAppender.java:164) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:304) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:579) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.<clinit>(AbstractS3FileSystemFactory.java:88) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more
[2020-12-06T09:15:45,892][Error] {} [o.a.f.c.f.FileSystem]: Failed to load a file system via services
java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3AFileSystemFactory could not be instantiated
        at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
        at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3hadoop.S3FileSystemFactory
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more

推荐答案

如您所述,Flink插件通过其自己的类加载器加载,并且与其他任何插件完全隔离.

As you stated, Flink plugins are loaded through its own classloader and completely isolated from any other plugin.

如果我们深入研究源代码,则在集群启动时会使用另一个密钥(不幸的是,它在任何地方都没有记录):

If we delve into the source code there is another key which is used while the cluster is boot up(unfortunately its not documented anywhere) :

plugin.classloader.parent-first-patterns.additional

这使您可以使用PluginClassLoader将外部jar添加到类路径中

this let you add external jars into classpath using the PluginClassLoader

声明+用法: https://github.com/apache/flink/blob/53a4b4407816c2780fed2f8995affbebc1f58c3c/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java#L156-L174

在flink-conf.yaml中添加以下内容.

Add the following to flink-conf.yaml.

plugin.classloader.parent-first-patterns.additional: org.apache.kafka

应该可以解决的问题

这篇关于在Flink集群中为插件添加自定义依赖项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-18 15:17