1.本地调试
a.步骤:生成Topology——实现Spout接口——实现Bolt接口——编译运行
b.加入依赖
<!-- JStorm -->
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-core</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
</exclusions>
</dependency>
注意:jstorm依赖包类含有slf4j包,可能与log4j依赖包中的slf4j包冲突,所有要<exclusion>掉冲突的依赖包
c.新建Topology类
//创建topology的生成器
TopologyBuilder builder = new TopologyBuilder();
//创建spout,第一个参数为名字,注意不要含有空格,第二个参数为spout实体
builder.setSpout("wordReader",new WordReader());
//创建bolt,第一个参数为名字,第二个参数为bolt实体,第三个参数为bolt的并发数
//shuffleGrouping表示以随机的方式接受"wordReader"传来的数据
builder.setBolt("wordNormalizer", new WordNormalizer()).shuffleGrouping("wordReader");
//fieldsGrouping表示以字段分组的方式接受"wordNormalizer"传来额数据
builder.setBolt("wordCounter", new WordCounter(), 2).fieldsGrouping("wordNormalizer", new Fields("word"));
//配置
Config conf = new Config();
conf.put("wordsFile", "E:/test.txt");
conf.setDebug(false);
//设置并行数
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//创建一个本地模式cluster
LocalCluster cluster = new LocalCluster();
//提交拓扑
cluster.submitTopology("SequenceTest", conf, builder.createTopology());
try {
//等待30秒后会停止拓扑和集群, 视调试情况可增大该数值
Thread.sleep(20000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//结束拓扑
cluster.killTopology("SequenceTest");
cluster.shutdown();
d.实现Spout接口
public class WordReader implements IRichSpout { private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false; @Override
public void ack(Object taskId) {
System.out.println("success:" + taskId);
} @Override
public void activate() {
// TODO Auto-generated method stub } @Override
public void close() {
// TODO Auto-generated method stub } @Override
public void deactivate() {
// TODO Auto-generated method stub } @Override
public void fail(Object taskId) {
System.out.println("fail:" + taskId);
} @Override
public void nextTuple() {
if (completed) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Do nothing
}
return;
}
String str;
// Open the reader
BufferedReader reader = new BufferedReader(fileReader);
try {
// Read all lines
while ((str = reader.readLine()) != null) {
/**
* 发射每一行,Values是一个ArrayList的实现
*/
this.collector.emit(new Values(str), str);
}
} catch (Exception e) {
throw new RuntimeException("Error reading tuple", e);
} finally {
completed = true;
} } @Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
//获取创建Topology时指定的要读取的文件路径
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["
+ conf.get("wordFile") + "]");
}
//初始化发射器
this.collector = collector; } @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
} @Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
e.实现Bolt接口
public class WordNormalizer implements IRichBolt {
private OutputCollector collector;
private static final long serialVersionUID = 1L; @Override
public void cleanup() {
// TODO Auto-generated method stub } @Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for (String word : words) {
word = word.trim();
if (!word.isEmpty()) {
word = word.toLowerCase();
// Emit the word
List<Tuple> list = new ArrayList<Tuple>();
list.add(input);
collector.emit(list, new Values(word));
}
}
//确认成功处理一个tuple
collector.ack(input);
} @Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
} @Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
public class WordCounter implements IRichBolt {
Integer id;
String name;
Map<String, Integer> counters;
private OutputCollector collector;
private static final long serialVersionUID = 1L; @Override
public void cleanup() {
System.out.println("-- Word Counter [" + name + "-" + id + "] --");
for (Map.Entry<String, Integer> entry : counters.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
counters.clear();
} @Override
public void execute(Tuple input) {
String str = input.getString(0);
if (!counters.containsKey(str)) {
counters.put(str, 1);
} else {
Integer index = counters.get(str) + 1;
counters.put(str, index);
}
// 确认成功处理一个tuple
collector.ack(input);
} @Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
} @Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub } @Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}