我有一些交易数据,例如

txn_id,receiver_userid,sender_userid,金额

1,1,2,50

2,1,2,100

3,1,2,500
4,5,3,100
5,5,3,200
6,5,3,300
7,6,2,200

8,6,1,200

现在,我想查找所有从同一用户处接收到两次以上交易的接收方用户,我已经开始将PIG作业写为

txnrecord = LOAD './txndata' USING PigStorage(',') AS (txn_id:int, receiver_userid:int, sender_userid:int, amount:int);
grptxn1 = GROUP txnrecord BY (receiver_userid, sender_userid);
txncount = FOREACH grptxn1 GENERATE FLATTEN(group) as (receiver_userid, sender_userid), COUNT(txnrecord) as num_txns, SUM(txnrecord.amount) as total_sum;
txncount1 = FILTER txncount by num_txns > 2;
dump txncount1;

以上是给我正确的组汇总信息,但我的其他要求是

1)查找聚合的组记录及其关联的元组集(单个txns),例如-如果我的组聚合说userid 1收到了来自userid 2的3笔交易,则我需要将所有三个元组存储在另一个数据文件中。

2)不匹配的组聚合> 2交易条件应被忽略(此处最后两个记录应被忽略)

3)我想为我的组聚合分配序列,并且相同的序列应用作其关联的事务元组中的链接键(以标识这三个事务记录与特定的组聚合相关联)。

我正在尝试使用各种功能,但到目前为止还没有运气。

感谢任何帮助指针,Thx。

最佳答案

您可以携带GROUP BY创建的BAG,它们包含所有原始列,以进行检查

DESCRIBE grptxn1;

回答要求1和2:
txnrecord = LOAD './txndata' USING PigStorage(',') AS (txn_id:int, receiver_userid:int, sender_userid:int, amount:int);
grptxn1 = GROUP txnrecord BY (receiver_userid, sender_userid);
txncount = FOREACH grptxn1 GENERATE FLATTEN(group) as (receiver_userid, sender_userid),
txnrecord, -- carry bags through the filter
COUNT(txnrecord) as num_txns, SUM(txnrecord.amount) as total_sum ;
txncount1 = FILTER txncount by num_txns > 2;
tran_dump = FOREACH  txncount1 GENERATE FLATTEN(txnrecord);
STORE tran_dump INTO 'another data file';

txncount2 = FOREACH txncount1 GENERATE (receiver_userid, sender_userid, num_txns, total_sum);
dump txncount2;

要使MapReduce真正变慢或使用某些联网的ID代理,要实现其需求3并不容易。可能是您不需要它,因为FLATTEN(txnrecord)将转储输入文件中存在的所有列。

关于hadoop - 从Pig Latin中的组操作中获取过滤的元组,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13489392/

10-13 04:39