点击(此处)折叠或打开
- import backtype.storm.spout.SpoutOutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichSpout;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Values;
- import java.io.BufferedReader;
- import java.io.FileNotFoundException;
- import java.io.FileReader;
- import java.util.Map;
- public class WordReader extends BaseRichSpout {
- private SpoutOutputCollector collector;
- private FileReader fileReader;
- private boolean completed = false;
- public void ack(Object msgId) {
- System.out.println("OK:"+msgId);
- }
- public void close() {}
- public void fail(Object msgId) {
- System.out.println("FAIL:"+msgId);
- }
- /**
- * The only thing that the methods will do It is emit each
- * file line
- */
- public void nextTuple() {
- /**
- * The nextuple it is called forever, so if we have been readed the file
- * we will wait and then return
- */
- if(completed){
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- //Do nothing
- }
- return;
- }
- String str;
- //Open the reader
- BufferedReader reader = new BufferedReader(fileReader);
- try{
- //Read all lines
- while((str = reader.readLine()) != null){
- /**
- * By each line emmit a new value with the line as a their
- */
- //System.out.println(str);
- this.collector.emit(new Values(str),str);
- }
- }catch(Exception e){
- throw new RuntimeException("Error reading tuple",e);
- }finally{
- completed = true;
- }
- }
- /**
- * We will create the file and get the collector object
- */
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- try {
- this.fileReader = new FileReader(conf.get("wordsFile").toString());
- } catch (FileNotFoundException e) {
- throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
- }
- this.collector = collector;
- }
- /**
- * Declare the output field "word"
- */
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("line"));
- }
- }
点击(此处)折叠或打开
- import backtype.storm.topology.BasicOutputCollector;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseBasicBolt;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
- public class WordNormalizer extends BaseBasicBolt {
- public void cleanup() {}
- /**
- * The bolt will receive the line from the
- * words file and process it to Normalize this line
- *
- * The normalize will be put the words in lower case
- * and split the line to get all words in this
- */
- public void execute(Tuple input, BasicOutputCollector collector) {
- String sentence = input.getString(0);
- String[] words = sentence.split(" ");
- for(String word : words){
- word = word.trim();
- if(!word.isEmpty()){
- word = word.toLowerCase();
- System.out.println("-------word:"+word);
- collector.emit(new Values(word));
- }
- }
- }
-
- /**
- * The bolt will only emit the field "word"
- */
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
点击(此处)折叠或打开
- import java.util.HashMap;
- import java.util.Map;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.BasicOutputCollector;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseBasicBolt;
- import backtype.storm.tuple.Tuple;
- public class WordCounter extends BaseBasicBolt {
- Integer id;
- String name;
- Map<String, Integer> counters;
- /**
- * At the end of the spout (when the cluster is shutdown
- * We will show the word counters
- */
- @Override
- public void cleanup() {
- System.out.println("-- Word Counter ["+name+"-"+id+"] --");
- for(Map.Entry<String, Integer> entry : counters.entrySet()){
- System.out.println(entry.getKey()+": "+entry.getValue());
- }
- }
- /**
- * On create
- */
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- this.counters = new HashMap<String, Integer>();
- this.name = context.getThisComponentId();
- this.id = context.getThisTaskId();
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {}
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- String str = input.getString(0);
- /**
- * If the word dosn't exist in the map we will create
- * this, if not We will add 1
- */
- if(!counters.containsKey(str)){
- counters.put(str, 1);
- }else{
- Integer c = counters.get(str) + 1;
- counters.put(str, c);
- }
- }
- }
点击(此处)折叠或打开
- import spouts.WordReader;
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.tuple.Fields;
- import bolts.WordCounter;
- import bolts.WordNormalizer;
- public class TopologyMain {
- public static void main(String[] args) throws InterruptedException {
-
- //Topology definition
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("word-reader",new WordReader());
- builder.setBolt("word-normalizer", new WordNormalizer())
- .shuffleGrouping("word-reader");
- builder.setBolt("word-counter", new WordCounter(),2)
- .fieldsGrouping("word-normalizer", new Fields("word"));
-
- //Configuration
- Config conf = new Config();
- //String fileName=args[0];
- String fileName="D:\\words.txt";
- conf.put("wordsFile", fileName);
- conf.setDebug(false);
- //Topology run
- conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("Getting-Started-Toplogy", conf, builder.createTopology());
- Thread.sleep(50000);
- cluster.shutdown();
- }
- }