转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/12434915

第八章 事务性Topologies

在Storm中,正如本书前边提到的,你可以通过使用ack和fail策略来确保消息处理。但是如果元组被重放了会发生什么?你怎样确保你不会计数过多?

事务性Topologies是包含在Storm0.7.0版本中的新特性,它激活消息语义来确保你以一种安全的方式重放元组并且它们只会被处理一次。没有事务性topologies的支持,你不可能以一种完全精确、可扩展和容错的方式计数。

getting start with storm 翻译 第八章 part-1-LMLPHP事务性Topologies是建立标准Storm spout和bolts之上的一个抽象。

设计

在事务性topology中,Storm使用并行和顺序元组处理的混合模式。Spout产生的批量的元组被bolts并行的处理。这些bolts中的一部分被认为是提交者,它们以某种严格排序的方式提交处理过的批量元组。这意味着如果你有两个批量,每个批量包含五个元组,两边的元组会被bolts以并行的方式处理,但是提交者bolts直到第一个元组被提交成功后才会提交第二个元组。

getting start with storm 翻译 第八章 part-1-LMLPHP当处理事务性Topology时,可以从源重放批量的元组,甚至有时候是多次重放是非常重要的。所以确保你的数据源--你的spout将要连接的那个--具备这个能力。

这可以被描述成两个不同的步骤,或者阶段:

处理阶段

完全并行的阶段,许多批量被同时执行。

提交阶段

严格排序的阶段,第二个批量直到第一个批量被提交成功后才提交。

把这两个阶段称为Storm事务。

getting start with storm 翻译 第八章 part-1-LMLPHPstorm使用zookeeper来保存事务元数据。缺省的情况下就使用为topology服务的那个zookeeper来保存元数据。你可以通过覆盖配置键transactional.zookeeper.

servers 和 transactional.zookeeper.port.来更改。

事务实战

为看清事务怎样工作,你将建立一个Twitter分析工具。你会读取存储在Redis中的tweets,通过一系列bolts处理他们,然后存储--在另一个Redis数据库中--所有标签和它们在tweets中的频率的列表,所有用户和他们出现在tweets中的总计的列表和一个用户及他们标签和频率的列表。

你即将创建的这个工具的topology由图8-1描述.

getting start with storm 翻译 第八章 part-1-LMLPHP

图8-1 Topology视图

正如你看到的,TweetsTransactionalSpout会连接到你的tweets数据库并且在topology中发射批量的元组。两个不同的bolts,UserSplitterBolt 和 HashtagSplitterBolt,将从spout接收元组。UserSplitterBolt会分析tweet并且查找用户--@后边的单词--并发送这些词一个叫做users的自定义流。HashatagSplitterBolt也分析tweet,查找#以前的单词,并且发送这些单词到一个叫做hashtags的自定义流。第三个bolt,UserHashtagJoinBolt,会接收两个流并且计算在一个命名用户的tweet中一个hashtag出现了多少次。为了计数和发送结果,该bolt会是BaseBatchBolt(后边介绍更多)。

最终,最后一个叫做RedisCommitterBolt 的bolt,接收这三个--由UserSplitterBolt, HashtagSplitterBolt和 UserHashtagJoinBolt产生的流。在同一个事物中,它会完成所有计数,并且一旦完成了元组批次的处理就会发送到Redis。该bolt被认为是一种特殊的bolt,叫做提交者bolt,后续的章节中会解释它。

为了构建这个topology,使用TransactionalTopologyBuilder,类似下边的代码块:

TransactionalTopologyBuilder builder =

newTransactionalTopologyBuilder("test","spout",newTweetsTransactionalSpout());

).shuffleGrouping("spout");

builder.setBolt("hashtag-splitter",

).shuffleGrouping("spout");

)

.fieldsGrouping("users-splitter","users", newFields("tweet_id"))

.fieldsGrouping("hashtag-splitter", "hashtags", newFields("tweet_id"));

builder.setBolt("redis-committer", newRedisCommiterCommiterBolt())

.globalGrouping("users-splitter","users")

.globalGrouping("hashtag-splitter", "hashtags")

.globalGrouping("user-hashtag-merger");

我们看一下怎么在事务topology中实现spout.

The Spout

事务topology中的spout与标准的spout完全不同。

public class TweetsTransactionalSpoutextends

BaseTransactionalSpout<TransactionMetadata> {

正如你在类定义中看到的,TweetsTransactionalSpout继承自BaseTransactionalSpout并带一个泛型类型。你设置在这里的类型被认为是事务元数据。它将在后边从源发送批量的元组时被使用。

在这个例子中,TransactionMetadata被定义为:

public class TransactionMetadataimplementsSerializable{

private static final longserialVersionUID=1L;

long from;

int quantity;

public TransactionMetadata(longfrom,intquantity) {

this.from=from;

this.quantity=quantity;

}

}

这里你存储了from和quantity,它们会告诉你具体怎样产生批量的元组。

为完成spout的实现,你需要实现如下方法:

@Override

publicITransactionalSpout.Coordinator<TransactionMetadata>getCoordinator(

Map conf,TopologyContextcontext) {

returnnew TweetsTransactionalSpoutCoordinator();

}

@Override

publicbacktype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata>

getEmitter(

Map conf,TopologyContextcontext) {

returnnew TweetsTransactionalSpoutEmitter();

}

@Override

publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(newFields("txid","tweet_id","tweet"));

}

在getCoordinator方法中,你告诉Storm哪个类将协调批量元组的产生。在getEmitter中,你告诉Storm哪个类负责从源读取批量的元组并且发射它们到topology的一个流中。最后,如你之前所做的,你需要声明发射了哪些域。

The RQ class

为了使例子更简单些,我们决定封装所有的Redis相关的操作在一个类中。

public class RQ{

public static final String NEXT_READ="NEXT_READ";

public static final String NEXT_WRITE="NEXT_WRITE";

Jedis jedis;

public RQ() {

jedis =newJedis("localhost");

}

public longgetAvailableToRead(longcurrent) {

return getNextWrite() -current;

}

publiclonggetNextRead() {

String sNextRead =jedis.get(NEXT_READ);

if(sNextRead==null)

;

returnLong.valueOf(sNextRead);

}

publiclonggetNextWrite() {

returnLong.valueOf(jedis.get(NEXT_WRITE));

}

publicvoidclose() {

jedis.disconnect();

}

publicvoidsetNextRead(longnextRead) {

jedis.set(NEXT_READ,""+nextRead);

}

publicList<String>getMessages(longfrom,intquantity) {

String[]keys=newString[quantity];

;i<quantity;i++)

keys[i] =""+(i+from);

returnjedis.mget(keys);

}

}

仔细阅读每个方法的实现,确保你明白了他们在做什么。

协调器

我们看一下这个例子中协调器的实现。

public static class TweetsTransactionalSpoutCoordinatorimplements

ITransactionalSpout.Coordinator<TransactionMetadata> {

TransactionMetadata lastTransactionMetadata;

RQ rq =newRQ();

;

public TweetsTransactionalSpoutCoordinator() {

nextRead =rq.getNextRead();

}

@Override

public TransactionMetadatainitializeTransaction(BigInteger txid,

TransactionMetadata prevMetadata) {

long quantity=rq.getAvailableToRead(nextRead);

quantity =quantity>MAX_TRANSACTION_SIZE?MAX_TRANSACTION_SIZE:quantity;

TransactionMetadata ret =newTransactionMetadata(nextRead, (int)quantity);

nextRead +=quantity;

returnret;

}

@Override

publicbooleanisReady() {

;

}

@Override

publicvoidclose() {

rq.close();

}

}

很重要的需要提醒的一点是在整个topology中只有一个协调器实例。当协调器被初始化后,它从Redis检索一个时序,该时序告诉协调器下一条要读取的tweet是哪条。第一次时,该值为1,意味着接下来要读取的tweet是第一条。

第一个要被调用的方法是isReady。它在initializeTransaction之前总是会被调用来确保源已经准备好被读取数据。你要相应的返回true或者false。在这个例子中,检索tweets的数量并把他们和你读到的tweets数量做比较。它们之间的差异是可以读取的tweets数量。如果它大于0,说明你还有tweets可读。

最后,initializeTransaction被执行。正如你看到的,你用txid和prevMetadata作为参数。第一个参数是一个由Storm产生的唯一的事务ID,它代表产生的元组的批次。prevMetadata是前一个事务的协调器产生的元数据。

在这个例子中,首先确保有多少tweets可读取。一旦你整理好了,创建一个新的TransactionMetadata,指明哪个是第一个要读的tweet,要读取的量是多少。

你一返回元数据,Storm就把它和txid存到zookeeper中。这确保了一旦有错误,Storm有能力来使发射器重新发送元组。

发射器

创建事务spout的最后一步是实现发射器。

我们从下边的实现开始:

public static class TweetsTransactionalSpoutEmitterimplements

ITransactionalSpout.Emitter<TransactionMetadata> {

RQ rq =newRQ();

public TweetsTransactionalSpoutEmitter() {

}

@Override

publicvoidemitBatch(TransactionAttempt tx,

TransactionMetadata coordinatorMeta,BatchOutputCollector collector) {

rq.setNextRead(coordinatorMeta.from+coordinatorMeta.quantity);

List<String>messages=rq.getMessages(coordinatorMeta.from,

coordinatorMeta.quantity);

longtweetId=coordinatorMeta.from;

for(String message:messages) {

collector.emit(newValues(tx,""+tweetId,message));

tweetId++;

}

}

@Override

publicvoidcleanupBefore(BigInteger txid) {

}

@Override

publicvoidclose() {

rq.close();

}

}

发射器读取源并发射元组到一个流。对于相同的transaction id 和 transaction metadata,发射器总是可以发射相同批次的元组是非常重要的。这样,如果处理一个批次的过程中出错了,Storm可以通过发射器重放相同的transaction id 和 transaction metadata并且确保这个批次被重放了。Storm会增加TransactionAttempt中的attempt id。这样你就可以知道该批次被重放了。

这里emitBatch是一个重要的方法。在这个方法中,使用元数据来作为参数,从Redis中读取tweets。同时增加在Redis中的序列,该序列记录了你到目前为止已读取了多少tweets。当然,还要发射tweets到topology。

05-11 15:24