在本地群集中运行拓扑后,我创建了一个远程风暴群集(storm-deploy Nathan)。在创建带有“程序包依赖项”的可运行jar之前,我已经从Eclipse的构建路径中删除了Storm jar。我的拓扑使用Storm-kafka-0.9.0-wip16a-scala292.jar,我要么将其留在构建路径中,然后在创建可运行jar之前将其从构建路径中删除(以尝试解决此问题..)。当我使用以下命令时:
./storm jar /home/ubuntu/Virtual/stormTopologia4.jar org.vicomtech.main.StormTopologia
它总是回答:
Exception in thread "main" java.lang.NoClassDefFoundError: OpaqueTridentKafkaSpout
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2451)
at java.lang.Class.getMethod0(Class.java:2694)
at java.lang.Class.getMethod(Class.java:1622)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: OpaqueTridentKafkaSpout
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
由于此拓扑在单个实例上作为可运行的jar在AWS上运行良好,因此我无法弄清丢失的内容...
这是我主要方法中的代码:
Config conf = new Config();
OpaqueTridentKafkaSpout tridentSpout = crearSpout(
kafkadir, "test");
OpaqueTridentKafkaSpout logUpvSpout = crearSpout(kafkadir,
"logsUpv");
OpaqueTridentKafkaSpout logSnortSpout = crearSpout(
kafkadir, "logsSnort");
try {
StormSubmitter.submitTopology(
"hackaton",
conf,
buildTopology( tridentSpout, logUpvSpout,
logSnortSpout));
} catch (AlreadyAliveException | InvalidTopologyException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TwitterException e) {
e.printStackTrace();
}
}
private static OpaqueTridentKafkaSpout crearSpout(
String testKafkaBrokerHost, String topic) {
KafkaConfig.ZkHosts hosts = new ZkHosts(testKafkaBrokerHost, "/brokers");
TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic);
config.forceStartOffsetTime(-2);
config.scheme = new SchemeAsMultiScheme(new StringScheme());
return new OpaqueTridentKafkaSpout(config);
}
public static StormTopology buildTopology(OpaqueTridentKafkaSpout tridentSpout,
OpaqueTridentKafkaSpout logUpvSpout,
OpaqueTridentKafkaSpout logSnortSpout
) throws IOException,
TwitterException {
TridentTopology topology = new TridentTopology();
topology.newStream("tweets2", tridentSpout)
.each(new Fields("str"), new OnlyEnglishSpanish())
.each(new Fields("str"), new WholeTweetToMongo())
.each(new Fields("str"), new TextLangExtracter(),
new Fields("text", "lang")).parallelismHint(6)
.project(new Fields("text", "lang"))
.partitionBy(new Fields("lang"))
.each(new Fields("text", "lang"), new Analisis(),
new Fields("result")).parallelismHint(6)
.each(new Fields("result"), new ResultToMongo());
return topology.build();
}
有什么办法可以使OpaqueTridentKafkaSpout可用?
先感谢您
希望这不是一个愚蠢的暗示,我是这个领域的新手
最佳答案
当您生成具有依赖关系的jar时,我们可以将Storm jar保留在构建路径中,我们只需要告诉maven不要将其捆绑,就像这样(请参阅“提供的”作用域,这意味着jar由运行时环境提供) ,因此不必捆绑):
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0-rc2</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
但是,Kafka spout必须包含在jar-with-dependencies中,因此其maven声明如下所示:
<dependency>
<groupId>storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.0-wip16a-scala292</version>
</dependency>
为了验证内容,您总是可以解压缩生成的jar,并在部署到Storm之前手动检查是否存在必需的类(是否应该存在)。
关于java - Storm-deploy提交拓扑java.lang.NoClassDefFoundError,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/22351348/