本文介绍了Apache Flink:Python 流式 API 中的 Kafka 连接器,“无法加载用户类"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试 Flink 的新 Python 流 API,并尝试使用 ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py 运行我的脚本.python 脚本相当简单,我只是尝试从现有主题中使用并将所有内容发送到 stdout(或日志目录中的 *.out 文件,默认情况下输出方法在该文件中发出数据).

I am trying out Flink's new Python streaming API and attempting to run my script with ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py. The python script is fairly straightforward, I am just trying to consume from an existing topic and send everything to stdout (or the *.out file in the log directory where the output method emits data by default).

import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema

directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
    for jar in glob.glob(os.path.join(directory,'*.jar')):
                sys.path.append(jar)

from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09

props = Properties()
config = {"bootstrap_servers": "localhost:9092",
          "group_id": "flink_test",
          "topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")

def main(factory):
    consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)

    env = factory.get_execution_environment()
    env.add_java_source(consumer) \
        .output()
    env.execute()

我从 maven repos 中抓取了一些 jar 文件,即 flink-connector-kafka-0.9_2.11-1.6.1.jarflink-connector-kafka-base_2.11-1.6.1.jarkafka-clients-0.9.0.1.jar 并复制到 Flink 的 lib 目录中.除非我误解了文档,否则这足以让 Flink 加载 kafka 连接器.确实,如果我删除这些 jar 中的任何一个,导入将失败,但这似乎不足以实际调用该计划.添加 for 循环以将这些动态添加到 sys.path 也不起作用.以下是控制台中打印的内容:

I grabbed a handful of jar files from the maven repos, namely flink-connector-kafka-0.9_2.11-1.6.1.jar, flink-connector-kafka-base_2.11-1.6.1.jar and kafka-clients-0.9.0.1.jar and copied them in Flink's lib directory. Unless I misunderstood the documentation, this should suffice for Flink to load the kafka connector. Indeed, if I remove any of these jars the import fails, but this doesn't seem to be enough to actually invoke the plan.Adding a for loop to dynamically add these to sys.path didn't work either. Here's what gets printed in the console:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
    at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)

org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)

The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.

这是我在日志中看到的:

This is what I see in the logs:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class:    org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
    file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

有没有办法解决这个问题并使连接器可用于 Python?我怀疑这是 Jython 的类加载器问题,但我不知道如何进一步调查(也考虑到我不了解 Java).非常感谢.

Is there a way to fix this and make the connector available to Python? I suspect this is a Classloader issue with Jython, but I don't know how to investigate further (also given that I have no knowledge of Java). Many thanks.

推荐答案

您在这里使用了错误的 Kafka 使用者.在您的代码中,它是 FlinkKafkaConsumer09,但您使用的库是 flink-connector-kafka-0.11_2.11-1.6.1.jar,用于 FlinkKafkaConsumer011.尝试用这个FlinkKafkaConsumer011替换FlinkKafkaConsumer09,或者使用lib文件flink-connector-kafka-0.9_2.11-1.6.1.jar而不是当前的.

You are using wrong Kafka consumer here. In your code, it is FlinkKafkaConsumer09, but the lib you are using is flink-connector-kafka-0.11_2.11-1.6.1.jar, which is for FlinkKafkaConsumer011. Try to replace FlinkKafkaConsumer09 with this FlinkKafkaConsumer011, or use the lib file flink-connector-kafka-0.9_2.11-1.6.1.jar instead of current one.

这篇关于Apache Flink:Python 流式 API 中的 Kafka 连接器,“无法加载用户类"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 21:25