有了基本的概念之后,我们用jstorm来做一点小事情吧
做一个很无聊的事情:给定一个时间戳,输出对应的问候语
规则是:时间戳的十位对应的数字对应不同的时间段,0-2代表早上,3代表中午,4-6代表下午,7-9代表晚上,分别输出早上,中午,下午和晚上。
我们用spout来发送时间戳,bolt来处理时间戳并输出对应的问候语,并且统计每一时间段的问候数目,判断时间戳的生成是否随机。
代码如下:
public class TimeStampSpout implements IRichSpout{
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector; @Override
。。。。一切没有用到的函数不展示 @Override
public void nextTuple() {
long now = System.currentTimeMillis();
Values tuple = new Values(now);
System.out.println("spout:"+tuple);
this.collector.emit(tuple);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} @Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("timestamp"));
} }
public class GreetingBolt implements IRichBolt{
private static final long serialVersionUID = 1L;
private static Map<String, Integer> count = new TreeMap<String, Integer>();
private final String morning = "morning";
private final String noon = "noon";
private final String afternoon = "afternoon";
private final String evening = "evening";
private static Integer total = 0;
@Override
public void cleanup() {
} @Override
public void execute(Tuple input) {
long timestamp= input.getLong(0);
// System.out.println("bolt:"+timestamp);
total+=1;
// 获取十位数
long second = (timestamp/10)%10;
if(second<3){
System.out.println("bolt:"+morning);
count.put(morning, (count.get(morning)==null)?1:count.get(morning)+1);
}
else if(second==3){
System.out.println("bolt:"+noon);
count.put(noon, (count.get(noon)==null)?1:count.get(noon)+1);
}
else if(second<8){
System.out.println("bolt:"+afternoon);
count.put(afternoon, (count.get(afternoon)==null)?1:count.get(afternoon)+1);
}
else{
System.out.println("bolt:"+evening);
count.put(evening, (count.get(evening)==null?1:count.get(evening)+1));
}
if(total%10==0){
System.out.println("distribution show as followed:");
System.out.println(morning+":"+1.0*((count.get(morning)==null)?0:count.get(morning))/total);
System.out.println(noon+":"+1.0*((count.get(noon)==null?0:count.get(noon)))/total);
System.out.println(afternoon+":"+1.0*((count.get(afternoon)==null?0:count.get(afternoon)))/total);
System.out.println(evening+":"+1.0*((count.get(evening)==null?0:count.get(evening)))/total);
}
} }
public class TestTopology {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("TimeStampSpout", new TimeStampSpout());
builder.setBolt("GreetingBolt",new GreetingBolt()).shuffleGrouping("TimeStampSpout");
LocalCluster cluster = new LocalCluster();
Config config = new Config();
cluster.submitTopology("test", config, builder.createTopology());
}
}
p