Storm 开箱
标签(空格分隔): 开箱即用 入门 Storm
1. 什么是 Storm
Storm 是一个分布式的,可靠的,容错的数据流处理系统。
Storm 应用是由 Spout (上图水龙头) 和 Bolt (上图水滴) 构建成的Topology在Storm环境中运行,其中 Spout 负责接收或获取源数据并发送给 Bolt 进行业务处理, Bolt 处理完数据后,发送给下个 Bolt 或结束任务。
Storm 支持两种运行模式:
本地模式 -此模式用于开发,测试和调试,因为它是查看所有拓扑组件协同工作的最简单方法。在这种模式下,我们可以调整参数,使我们能够看到我们的拓扑如何在不同的 Storm 配置环境中运行。在本地模式下, storm 拓扑在本地机器上在单个 JVM 中运行。
远程模式/生产模式 -在这种模式下,我们将拓扑提交到工作 Storm 集群,该集群由许多进程组成,通常运行在不同的机器上。如在 Storm 的工作流中所讨论的,工作集群将无限地运行,直到它被关闭。
若无特殊说明,本文所有的场景和代码都是以本地模式运行
2. Hello World(WordCountTopology)
2.1 简介
需求,统计句子中,任意单词出现的数量。
单词计数器例子流程。
1.SentenceSpout 发射出句子元组(tuple)。
2.SplitSentenceBolt 接收到句子,将句子拆分成单词,并将单词发射出去。
3.WordCountBolt 接收到单词,分别记录单词出现的数量,将统计结果发射出去。
4.ReportBolt 接收并存储计结果,待程序结束运行后打印统计结果。
流程:获取句子 -> 拆分成单词 -> 统计单词 -> 输出报表
st=>start: Start
op1=>operation: SentenceSpout(获取并发射句子)
op2=>operation: SplitSentenceBolt(拆分并发送单词)
op3=>operation: WordCountBolt(统计单词并发送结果)
op4=>operation: ReportBolt (输出结果)
e=>end
st->op1->op2->op3->op4->e
2.2 Getting-Started
在任意IDE中创建一个新的 Maven 项目,并在 pom.xml 添加 Apache Storm 依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.1</version>
</dependency>
2.3 编写 SentenceSpout.java
package spouts;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
/**
* 向后端发射tuple数据流
* @author soul
*
*/
public class SentenceSpout extends BaseRichSpout {
//BaseRichSpout是ISpout接口和IComponent接口的简单实现,接口对用不到的方法提供了默认的实现
/**
*
*/
private static final long serialVersionUID = 6768845742899211592L;
private SpoutOutputCollector collector;
private String[] sentences = {
"my name is soul",
"im a boy",
"i have a dog",
"my dog has fleas",
"my girl friend is beautiful"
};
private int index=0;
/**
* open()方法中是ISpout接口中定义,在Spout组件初始化时被调用。
* open()接受三个参数:一个包含Storm配置的Map,一个TopologyContext对象,提供了topology中组件的信息,SpoutOutputCollector对象提供发射tuple的方法。
* 在这个例子中,我们不需要执行初始化,只是简单的存储在一个SpoutOutputCollector实例变量。
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* nextTuple()方法是任何Spout实现的核心。
* Storm调用这个方法,向输出的collector发出tuple。
* 在这里,我们只是发出当前索引的句子,并增加该索引准备发射下一个句子。
*/
@Override
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index>=sentences.length) {
index=0;
}
Utils.sleep(1);
}
/**
* declareOutputFields是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口
* 用于告诉Storm流组件将会发出那些数据流,每个流的tuple将包含的字段
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));//告诉组件发出数据流包含sentence字段
}
}
2.4 编写 SplitSentenceBolt.java
package bolts;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 订阅sentence spout发射的tuple流,实现分割单词
*
*/
public class SplitSentenceBolt extends BaseRichBolt {
//BaseRichBolt是IComponent和IBolt接口的实现
//继承这个类,就不用去实现本例不关心的方法
private OutputCollector collector;
/**
* prepare()方法类似于ISpout 的open()方法。
* 这个方法在blot初始化时调用,可以用来准备bolt用到的资源,比如数据库连接。
* 本例子和SentenceSpout类一样,SplitSentenceBolt类不需要太多额外的初始化,
* 所以prepare()方法只保存OutputCollector对象的引用。
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.collector=collector;
}
/**
* SplitSentenceBolt核心功能是在类IBolt定义execute()方法,这个方法是IBolt接口中定义。
* 每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
* 本例中,收到的元组中查找“sentence”的值,
* 并将该值拆分成单个的词,然后按单词发出新的tuple。
*/
public void execute(Tuple input) {
// TODO Auto-generated method stub
String sentence = input.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word));//向下一个bolt发射数据
}
}
/**
* plitSentenceBolt类定义一个元组流,每个包含一个字段(“word”)。
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("word"));
}
}
2.5 编写 WordCountBolt.java
package bolts;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 订阅 split sentence bolt的输出流,实现单词计数,并发送当前计数给下一个bolt
* @author soul
*
*/
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
//存储单词和对应的计数
private HashMap<String, Long> counts = null;//注:不可序列化对象需在prepare中实例化
/**
* 大部分实例变量通常是在prepare()中进行实例化,这个设计模式是由topology的部署方式决定的
* 因为在部署拓扑时,组件spout和bolt是在网络上发送的序列化的实例变量。
* 如果spout或bolt有任何non-serializable实例变量在序列化之前被实例化(例如,在构造函数中创建)
* 会抛出NotSerializableException并且拓扑将无法发布。
* 本例中因为HashMap 是可序列化的,所以可以安全地在构造函数中实例化。
* 但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行复制和实例化
* 而在prepare()方法中对不可序列化的对象进行实例化。
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
this.counts = new HashMap<String, Long>();
}
/**
* 在execute()方法中,我们查找的收到的单词的计数(如果不存在,初始化为0)
* 然后增加计数并存储,发出一个新的词和当前计数组成的二元组。
* 发射计数作为流允许拓扑的其他bolt订阅和执行额外的处理。
*/
public void execute(Tuple input) {
// TODO Auto-generated method stub
String word = input.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 0L;//如果不存在,初始化为0
}
count++;//增加计数
this.counts.put(word, count);//存储计数
this.collector.emit(new Values(word,count));
}
/**
*
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
//声明一个输出流,其中tuple包括了单词和对应的计数,向后发射
//其他bolt可以订阅这个数据流进一步处理
declarer.declare(new Fields("word","count"));
}
}
2.6 编写 ReportBolt.java
package bolts;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
/**
* 生成一份报告
* @author soul
*
*/
public class ReportBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;//保存单词和对应的计数
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple input) {
// TODO Auto-generated method stub
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
this.counts.put(word, count);
//实时输出
System.out.println("结果:"+this.counts);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
//这里是末端bolt,不需要发射数据流,这里无需定义
}
/**
* cleanup是IBolt接口中定义
* Storm在终止一个bolt之前会调用这个方法
* 本例我们利用cleanup()方法在topology关闭时输出最终的计数结果
* 通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或数据库连接
* 但是当Storm拓扑在一个集群上运行,IBolt.cleanup()方法不能保证执行(这里是开发模式,生产环境不要这样做)。
*/
public void cleanup(){
System.out.println("---------- FINAL COUNTS -----------");
ArrayList<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for(String key : keys){
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("----------------------------");
}
}
2.7 编写 App.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import bolts.ReportBolt;
import bolts.SplitSentenceBolt;
import bolts.WordCountBolt;
import spouts.SentenceSpout;
/**
* 实现单词计数topology
*
*/
public class App
{
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main( String[] args ) //throws Exception
{
//System.out.println( "Hello World!" );
//实例化spout和bolt
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();//创建了一个TopologyBuilder实例
//TopologyBuilder提供流式风格的API来定义topology组件之间的数据流
//builder.setSpout(SENTENCE_SPOUT_ID, spout);//注册一个sentence spout
//设置两个Executeor(线程),默认一个
builder.setSpout(SENTENCE_SPOUT_ID, spout,2);
// SentenceSpout --> SplitSentenceBolt
//注册一个bolt并订阅sentence发射出的数据流,shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例
//builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
//SplitSentenceBolt单词分割器设置4个Task,2个Executeor(线程)
builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
//fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中
//这里fieldsGrouping()方法保证所有“word”字段相同的tuuple会被路由到同一个WordCountBolt实例中
//builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));
//WordCountBolt单词计数器设置4个Executeor(线程)
builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));
// WordCountBolt --> ReportBolt
//globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
Config config = new Config();//Config类是一个HashMap<String,Object>的子类,用来配置topology运行时的行为
//设置worker数量
//config.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
//本地提交
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology(TOPOLOGY_NAME);
Utils.sleep(1000);
cluster.shutdown();
}
}
该项目Git地址:[email protected]:tclm/storm-book-examples-ch02-getting_started-8e42636.git
3. 常用API
3.1 Spout
通常情况下,继承 org.apache.storm.topology.base.BaseRichSpout 重写以下三个方法即可实现一个Spout。
/**
* open()方法中是ISpout接口中定义,在Spout组件初始化时被调用。
* open()接受三个参数:一个包含Storm配置的Map;
* 一个TopologyContext对象;
* 提供了topology中组件的信息,SpoutOutputCollector对象提供发射tuple的方法。
*/
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
/**
* nextTuple()方法是任何Spout实现的核心。
* Storm调用这个方法,向输出的collector发出tuple。
* 在这里,我们只是发出当前索引的句子,并增加该索引准备发射下一个句子。
*/
nextTuple()
/**
* declareOutputFields是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口
* 用于告诉Storm流组件将会发出那些数据流,每个流的tuple将包含的字段
*/
declareOutputFields(OutputFieldsDeclarer declarer)
3.2 Bolt
通常情况下,继承 org.apache.storm.topology.base.BaseRichBolt 重写以下三个方法即可实现一个Spout。
/**
* prepare()方法类似于ISpout 的open()方法。
* 这个方法在blot初始化时调用,可以用来准备bolt用到的资源,比如数据库连接。
*/
prepare(Map stormConf, TopologyContext context, OutputCollector collector)
/**
* 每一个Bolt核心功能是在接口IBolt定义的execute()方法
* 每次Bolt从流接收一个订阅的tuple,都会调用这个方法
*/
execute(Tuple input)
/**
* declareOutputFields是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口
* 用于告诉Storm流组件将会发出那些数据流,每个流的tuple将包含的字段
*/
declareOutputFields(OutputFieldsDeclarer declarer)
3.3 TopologyBuilder
TopologyBuilder用于创建一个Topology(拓扑结构),Topology 实际就是 spout 和 bolt之间的关系定义.
/**
* 在Topology定义一个新的Spout
* id 用来在需要消费其产生的Tuple的Bolt订阅使用
* parallelism_hint 定义executor的数量
*/
SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint)
/**
* 在Topology定义一个新的Bolt
* id 用来在需要消费其产生的Tuple的Bolt订阅使用
* parallelism_hint 定义executor的数量
*/
BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint)
/**
* 所有的xxxGrouping 都是用来订阅Tuple的消息
* 不同的 Grouping 执行方式不同,Storm默认提供七种不同的流分组策略,具体差异见 5. 流分组策略
*/
BoltDeclarer.shuffleGrouping(String componentId)
4. 基本概念
熟悉了 Storm Hello World 和 常用的 API,现在了解一下 Storm 的一些基本概念。
4.1 spout
数据源(Spout)是拓扑中数据流的来源。一般 Spout 会从一个外部的数据源读取元组然后将他们发送到拓扑中。根据需求的不同,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源。一个可靠的 Spout 能够在它发送的元组处理失败时重新发送该元组,以确保所有的元组都能得到正确的处理;相对应的,不可靠的 Spout 就不会在元组发送之后对元组进行任何其他的处理。
一个 Spout 可以发送多个数据流。为了实现这个功能,可以先通过 OutputFieldsDeclarer 的 declareStream 方法来声明定义不同的数据流,然后在发送数据时在 SpoutOutputCollector 的 emit 方法中将数据流 id 作为参数来实现数据发送的功能。
Spout 中的关键方法是 nextTuple。顾名思义,nextTuple 要么会向拓扑中发送一个新的元组,要么会在没有可发送的元组时直接返回。需要特别注意的是,由于 Storm 是在同一个线程中调用所有的 Spout 方法,nextTuple 不能被 Spout 的任何其他功能方法所阻塞,否则会直接导致数据流的中断(关于这一点,阿里的 JStorm 修改了 Spout 的模型,使用不同的线程来处理消息的发送,这种做法有利有弊,好处在于可以更加灵活地实现 Spout,坏处在于系统的调度模型更加复杂,如何取舍还是要看具体的需求场景吧——译者注)。
Spout 中另外两个关键方法是 ack 和 fail,他们分别用于在 Storm 检测到一个发送过的元组已经被成功处理或处理失败后的进一步处理。注意,ack 和 fail 方法仅仅对上述“可靠的” Spout 有效。
4.2 bolt
拓扑中所有的数据处理均是由 Bolt 完成的。通过数据过滤(filtering)、函数处理(functions)、聚合(aggregations)、联结(joins)、数据库交互等功能,Bolt 几乎能够完成任何一种数据处理需求。
一个 Bolt 可以实现简单的数据流转换,而更复杂的数据流变换通常需要使用多个 Bolt 并通过多个步骤完成。例如,将一个微博数据流转换成一个趋势图像的数据流至少包含两个步骤:其中一个 Bolt 用于对每个图片的微博转发进行滚动计数,另一个或多个 Bolt 将数据流输出为“转发最多的图片”结果(相对于使用2个Bolt,如果使用3个 Bolt 你可以让这种转换具有更好的可扩展性)。
与 Spout 相同,Bolt 也可以输出多个数据流。为了实现这个功能,可以先通过 OutputFieldsDeclarer 的 declareStream 方法来声明定义不同的数据流,然后在发送数据时在 OutputCollector 的 emit 方法中将数据流 id 作为参数来实现数据发送的功能。
在定义 Bolt 的输入数据流时,你需要从其他的 Storm 组件中订阅指定的数据流。如果你需要从其他所有的组件中订阅数据流,你就必须要在定义 Bolt 时分别注册每一个组件。对于声明为默认 id(即上文中提到的“default”——译者注)的数据流,InputDeclarer支持订阅此类数据流的语法糖。也就是说,如果需要订阅来自组件“1”的数据流,declarer.shuffleGrouping("1") 与 declarer.shuffleGrouping("1", DEFAULT_STREAM_ID) 两种声明方式是等价的。
Bolt 的关键方法是 execute 方法。execute 方法负责接收一个元组作为输入,并且使用 OutputCollector 对象发送新的元组。如果有消息可靠性保障的需求,Bolt 必须为它所处理的每个元组调用 OutputCollector 的 ack 方法,以便 Storm 能够了解元组是否处理完成(并且最终决定是否可以响应最初的 Spout 输出元组树)。一般情况下,对于每个输入元组,在处理之后可以根据需要选择不发送还是发送多个新元组,然后再响应(ack)输入元组。
4.3 tuple
元组是Storm中消息传输的基本单元,是一个命名的值列表(List)。
元组支持所有基本类型、字符串、字节数组作为字段的值,只要实现类型的序列化接口就可以使用该类型的对象。
元组本来应该是一个Key-value的Map,但是由于组件之间传递的元组的字段名称已经事先定义好,所以只需要按照顺序,将值填入List即可。
4.4 Stream
数据流(Streams)是 Storm 中最核心的抽象概念。一个数据流指的是在分布式环境中并行创建、处理的一组元组(tuple)的无界序列。数据流可以由一种能够表述数据流中元组的域(fields)的模式来定义。在默认情况下,元组(tuple)包含有整型(Integer)数字、长整型(Long)数字、短整型(Short)数字、字节(Byte)、双精度浮点数(Double)、单精度浮点数(Float)、布尔值以及字节数组等基本类型对象。当然,你也可以通过定义可序列化的对象来实现自定义的元组类型。
在声明数据流的时候需要给数据流定义一个有效的 id。不过,由于在实际应用中使用最多的还是单一数据流的 Spout 与 Bolt,这种场景下不需要使用 id 来区分数据流,因此可以直接使用 OutputFieldsDeclarer来定义“无 id”的数据流。实际上,系统默认会给这种数据流定义一个名为“default”的 id。
4.5 topology
拓扑可以理解成由一系列通过数据流(Stream Grouping)相互关联的 Spout 和 Bolt 组成的的拓扑结构。Spout 和 Bolt 称为拓扑的组件(Component)。
Storm 的拓扑是对实时计算应用逻辑的封装,它的作用与 MapReduce 的任务(Job)很相似,区别在于 MapReduce 的一个 Job 在得到结果之后总会结束,而拓扑会一直在集群中运行,直到你手动去终止它。
4.6 worker
拓扑是在一个或多个工作进程(worker processes)中运行的。每个工作进程都是一个实际的 JVM 进程,并且执行拓扑的一个子集。
4.7 executor
一台服务器可以部署多个worker进程;每个worker进程中可以有多个Executor(对应一个线程);每个Executor中执行一个或者多个task。
Executor的个数,可以通过setSpout或者SetBolt中的并行度参数进行调整。
4.8 task
worker中的每一个Spout或者Bolt线程(相当于Topology中的一个节点),称为一个任务。
Storm集群中,每个Spout或者Bolt执行很多任务,可以通过设置拓扑中每个Spout或Bolt的任务数(setNumTasks()),调整他们执行的任务数。
5. 流分组策略
根据选择的分组策略不同,task的执行方式有差异,Storm默认提供下面8种分组策略,也支持自定分组策略(customGrouping),关于自定义分组,在此不做讨论。
// 1.随机分组、轮询、平均分配,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
topologyBuilder.setBolt("MYBOLTS1", myBolts,4).shuffleGrouping("MYSPOUT");
// 2.按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task。
topologyBuilder.setBolt("MYBOLTS2", myBolts,2).fieldsGrouping("MYSPOUT",new Fields("number")).setNumTasks(4);
// 3.广播发送,对于每一个tuple,所有的bolts都会收到
topologyBuilder.setBolt("MYBOLTS3", myBolts,1).allGrouping("MYSPOUT");
// 4.Stream中的所有的tuple都会发送给同一个bolt任务处理,所有的tuple将会发送给拥有最小task_id的bolt任务处理
topologyBuilder.setBolt("MYBOLTS4", myBolts,4).globalGrouping("MYSPOUT");
// 5.不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。
topologyBuilder.setBolt("MYBOLTS5", myBolts,4).noneGrouping("MYSPOUT");
// 6.指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)
//topologyBuilder.setBolt("MYBOLTS6", myBolts,4).directGrouping("MYSPOUT");
// 7.本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
topologyBuilder.setBolt("MYBOLTS7", myBolts,4).localOrShuffleGrouping("MYSPOUT");
// 8.这种方式与按字段分组很相似,根据定义的域来对数据流进行分组,不同的是,这种方式会考虑下游 Bolt 数据处理的均衡性问题,在输入数据源关键字不平衡时会有更好的性能
topologyBuilder.setBolt("MYBOLTS8", myBolts,4).partialKeyGrouping(("MYSPOUT"), new Fields("number"));
有关流分组策略的执行效果,可参考下面这个项目
[email protected]:tclm/HelloStorm.git
Run.java
6. 并行度
并行度可以理解的同时执行的进程、线程、任务数量
在 Worker 中运行的是拓扑的一个子集。一个 worker 进程是从属于某一个特定的拓扑的,在 worker 进程中会运行一个或者多个与拓扑中的组件相关联的 executor。一个运行中的拓扑就是由这些运行于 Storm 集群中的很多机器上的进程组成的。
一个 executor 是由 worker 进程生成的一个线程。在 executor 中可能会有一个或者多个 task,这些 task 都是为同一个组件(spout 或者 bolt)服务的。
task 是实际执行数据处理的最小工作单元(注意,task 并不是线程) —— 在你的代码中实现的每个 spout 或者 bolt 都会在集群中运行很多个 task。在拓扑的整个生命周期中每个组件的 task 数量都是保持不变的,不过每个组件的 executor 数量却是有可能会随着时间变化。在默认情况下 task 的数量是和 executor 的数量一样的,也就是说,默认情况下 Storm 会在每个线程上运行一个 task。
并行度的设置
其中进程(Worker)数通过 Config.setNumWorkers(Number)设置。
线程(Executors)数量通过下面两个方法的parallelism_hint参数设置。
TopologyBuilder.setSpout()
TopologyBuilder.setBolt()
任务(task)数量通过下面两个方法设置
SpoutDeclarer.setNumTasks(Number)
BoltDeclarer.setNumTasks(Number)
效果上 Worker > Executors > Task
7. Acker机制
Storm通过Acker机制保证了可靠性.
要实现Ack机制,需要进行一下操作。
1.spout 需重写 ack(Object msgId) 和 fail(Object msgId)两个方法
2.spout发射tuple的时候指定messageId.
3.spout需对自己发射tuple进行缓存
4.spout根据messageId对于ack的tuple则从缓存队列中删除,对于fail的tuple可以选择重发。
5.通过Config.setNumAckers(conf, ackerParal) 设置 acker数至少大于0;
有关ACK机制的执行效果,可参考下面这个项目
[email protected]:tclm/HelloStorm.git
RunAck.java
8. 参考资料
[1].Apache Storm 官方文档中文版
[2].最详细的Storm入门教程
[3].Storm的ack机制在项目应用中的坑