orm编程入门API系列之S

orm编程入门API系列之S

前期博客

Storm编程入门API系列之Storm的Topology默认Workers、默认executors和默认tasks数目

  继续编写

StormTopologyMoreWorker.java

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

package zhouls.bigdata.stormDemo;

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; public class StormTopologyMoreWorker { public static class MySpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
} int num = ;
public void nextTuple() {
num++;
System.out.println("spout:"+num);
this.collector.emit(new Values(num));
Utils.sleep();
} public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
} } public static class MyBolt extends BaseRichBolt{ private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
} int sum = ;
public void execute(Tuple input) {
Integer num = input.getIntegerByField("num");
sum += num;
System.out.println("sum="+sum);
} public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spout_id = MySpout.class.getSimpleName();
String bolt_id = MyBolt.class.getSimpleName(); topologyBuilder.setSpout(spout_id, new MySpout());
topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id); Config config = new Config();
config.setNumWorkers();
String topology_name = StormTopologyMoreWorker.class.getSimpleName();
if(args.length==){
//在本地运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
}else{
//在集群运行
try {
StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} } }

  打jar包

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

[hadoop@master jar]$ pwd
/home/hadoop/app/apache-storm-1.0./jar
[hadoop@master jar]$ ll
total
-rw-r--r-- hadoop hadoop Jul : StormTopology.jar
[hadoop@master jar]$ rz [hadoop@master jar]$ ll
total
-rw-r--r-- hadoop hadoop Jul : StormTopology.jar
-rw-r--r-- hadoop hadoop Jul : StormTopologyMoreWorker.jar

提交作业之前

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

  

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

[hadoop@master apache-storm-1.0.]$ pwd
/home/hadoop/app/apache-storm-1.0.
[hadoop@master apache-storm-1.0.]$ ll
total
drwxrwxr-x hadoop hadoop May : bin
-rw-r--r-- hadoop hadoop Jul CHANGELOG.md
drwxrwxr-x hadoop hadoop Jul : conf
drwxrwxr-x hadoop hadoop Jul examples
drwxrwxr-x hadoop hadoop May : external
drwxrwxr-x hadoop hadoop Jul extlib
drwxrwxr-x hadoop hadoop Jul extlib-daemon
drwxrwxr-x hadoop hadoop Jul : jar
drwxrwxr-x hadoop hadoop May : lib
-rw-r--r-- hadoop hadoop Jul LICENSE
drwxrwxr-x hadoop hadoop May : log4j2
drwxrwxr-x hadoop hadoop May : logs
-rw-r--r-- hadoop hadoop Jul NOTICE
drwxrwxr-x hadoop hadoop May : public
-rw-r--r-- hadoop hadoop Jul README.markdown
-rw-r--r-- hadoop hadoop Jul RELEASE
-rw-r--r-- hadoop hadoop Jul SECURITY.md
[hadoop@master apache-storm-1.0.]$ bin/storm jar jar/StormTopologyMoreWorker.jar zhouls.bigdata.stormDemo.StormTopologyMoreWorker aaa
Running: /home/hadoop/app/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/hadoop/app/apache-storm-1.0. -Dstorm.log.dir=/home/hadoop/app/apache-storm-1.0./logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/hadoop/app/apache-storm-1.0./lib/log4j-api-2.1.jar:/home/hadoop/app/apache-storm-1.0./lib/kryo-3.0..jar:/home/hadoop/app/apache-storm-1.0./lib/storm-rename-hack-1.0..jar:/home/hadoop/app/apache-storm-1.0./lib/log4j-core-2.1.jar:/home/hadoop/app/apache-storm-1.0./lib/slf4j-api-1.7..jar:/home/hadoop/app/apache-storm-1.0./lib/minlog-1.3..jar:/home/hadoop/app/apache-storm-1.0./lib/objenesis-2.1.jar:/home/hadoop/app/apache-storm-1.0./lib/clojure-1.7..jar:/home/hadoop/app/apache-storm-1.0./lib/servlet-api-2.5.jar:/home/hadoop/app/apache-storm-1.0./lib/log4j-slf4j-impl-2.1.jar:/home/hadoop/app/apache-storm-1.0./lib/log4j-over-slf4j-1.6..jar:/home/hadoop/app/apache-storm-1.0./lib/storm-core-1.0..jar:/home/hadoop/app/apache-storm-1.0./lib/disruptor-3.3..jar:/home/hadoop/app/apache-storm-1.0./lib/asm-5.0..jar:/home/hadoop/app/apache-storm-1.0./lib/reflectasm-1.10..jar:jar/StormTopologyMoreWorker.jar:/home/hadoop/app/apache-storm-1.0./conf:/home/hadoop/app/apache-storm-1.0./bin -Dstorm.jar=jar/StormTopologyMoreWorker.jar zhouls.bigdata.stormDemo.StormTopologyMoreWorker aaa
[main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -:-
[main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds []
[main] INFO o.a.s.StormSubmitter - Uploading topology jar jar/StormTopologyMoreWorker.jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-ea1d6383-ca31--b784-e06
.jar
[main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-ea1d6383-ca31--b784-e06856000894.jar
[main] INFO o.a.s.StormSubmitter - Submitting topology StormTopologyMoreWorker in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-5319119555480935017:-9144362215090188990","topology.workers":}
[main] INFO o.a.s.StormSubmitter - Finished submitting topology: StormTopologyMoreWorker
[hadoop@master apache-storm-1.0.]$

  提交之后

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

  

  为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

  

  因为,我之前运行的StormTopology没有停掉

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

  现在呢,我将之前运行的StormTopology给停掉,然后,再来看。非常重要

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

  即提示,我们,30秒之后,kill掉。如果大家等不及,可以设置时间短些

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

  为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

Storm编程入门API系列之Storm的Topology多个Workers数目控制实现-LMLPHP

  成功!

05-11 20:07