我在 VirtualBox 上有一个 Flink 集群,包括三个节点,1 个主节点和 2 个从节点.我自定义了 WordCount 示例并创建了一个胖 jar 文件以使用 VirtualBox Flink 远程集群运行它,但我遇到了错误.
I have a Flink Cluster on VirtualBox incliding three node, 1 master and 2 slaves. I customized WordCount example and create a fat jar file to run it using VirtualBox Flink remote cluster, But I faced Error.
注意:我手动将依赖项导入到项目中(使用 Intellij IDEA)并且我没有使用 maven 作为依赖项提供者.我在本地机器上测试了我的代码,没问题!
Notice: I imported dependencies manually to the project(using Intellij IDEA) and I didn't use maven as dependency provider. I test my code on local machine and it was OK!
这是我的 Java 代码:
Here is my Java code:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {
// *************************************************************************
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final int port;
final String ip;
DataSet<String> text;
try {
ip = params.get("ip");
port = params.getInt("port");
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);
text = env.readTextFile(params.get("input"));
} catch (Exception e) {
System.err.println("No port or input or ip specified. Please run 'SocketWindowWordCount --ip <ip> --port <port>'" +
" --input <input>");
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
System.out.println("Printing result to stdout. Use --output to specify output path.");
// *************************************************************************
// *************************************************************************
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
我使用命令创建了 ExecutionEnvironment 对象:
I created ExecutionEnvironment object using command:
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);
,我在主机上使用以下命令运行代码(连接到集群节点并且 VirtualBox 正在运行)
, And I run the code using the following command on host machine(that is connected to the cluster nodes and VirtualBox is running on that)
java -cp FlinkWordCountClusetr.jar WordCount --ip --port 6123 --input /usr/local/flink/LICENSE
, But I faced the following error(in summarized):
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'
尝试了很多设置后,都是 maven 依赖与远程集群上安装的 Flink 版本不匹配.Maven 依赖是 Flink 版本 1.3.2
构建在 Scala 2.10
上,而安装在远程集群上的 Flink 是 1.3.2
构建在 Scala >2.11
After trying many settings, it was all about maven dependencies was not matching Flink version installed on the remote cluster. Maven dependencies were Flink version 1.3.2
build on Scala 2.10
, while Flink installed on the remote cluster was 1.3.2
build on Scala 2.11
. Just a minor difference but important!
这篇关于Flink- 在远程集群上运行 WordCount 示例时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!