问题描述
我是 Pig 的新用户.
I'm a new Pig user.
我有一个要修改的现有架构.我的源数据如下,有6列:
I have an existing schema which I want to modify. My source data is as follows with 6 columns:
Name Type Date Region Op Value
-----------------------------------------------------
john ab 20130106 D X 20
john ab 20130106 D C 19
jphn ab 20130106 D T 8
jphn ab 20130106 E C 854
jphn ab 20130106 E T 67
jphn ab 20130106 E X 98
等等.每个 Op
值总是 C
、T
或 X
.
and so on. Each Op
value is always C
, T
or X
.
我基本上想通过以下方式将我的数据分成 7 列:
I basically want to split my data in the following way into 7 columns:
Name Type Date Region OpX OpC OpT
----------------------------------------------------------
john ab 20130106 D 20 19 8
john ab 20130106 E 98 854 67
基本上将 Op
列分成 3 列:每一列对应一个 Op
值.这些列中的每一列都应包含来自 Value
列的适当值.
Basically split the Op
column into 3 columns: each for one Op
value. Each of these columns should contain appropriate value from column Value
.
如何在 Pig 中执行此操作?
How can I do this in Pig?
推荐答案
达到预期结果的一种方法:
One way to achieve the desired result:
IN = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray,
date:int, region:chararray, op:chararray, value:int);
A = order IN by op asc;
B = group A by (name, type, date, region);
C = foreach B {
bs = STRSPLIT(BagToString(A.value, ','),',',3);
generate flatten(group) as (name, type, date, region),
bs.$2 as OpX:chararray, bs.$0 as OpC:chararray, bs.$1 as OpT:chararray;
}
describe C;
C: {name: chararray,type: chararray,date: int,region: chararray,OpX:
chararray,OpC: chararray,OpT: chararray}
dump C;
(john,ab,20130106,D,20,19,8)
(john,ab,20130106,E,98,854,67)
更新:
如果你想跳过 order by
这会在计算中增加一个额外的 reduce 阶段,你可以在 tuple v.然后使用 自定义 UDF 对元组字段进行排序具有所需的 OpX、OpC、OpT 顺序:
If you want to skip order by
which adds an additional reduce phase to the computation, you can prefix each value with its corresponding op in tuple v. Then sort the tuple fields by using a custom UDF to have the desired OpX, OpC, OpT order:
register 'myjar.jar';
A = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray,
date:int, region:chararray, op:chararray, value:int);
B = group A by (name, type, date, region);
C = foreach B {
v = foreach A generate CONCAT(op, (chararray)value);
bs = STRSPLIT(BagToString(v, ','),',',3);
generate flatten(group) as (name, type, date, region),
flatten(TupleArrange(bs)) as (OpX:chararray, OpC:chararray, OpT:chararray);
}
其中 mjar.jar 中的 TupleArrange
是这样的:
where TupleArrange
in mjar.jar is something like this:
..
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class TupleArrange extends EvalFunc<Tuple> {
private static final TupleFactory tupleFactory = TupleFactory.getInstance();
@Override
public Tuple exec(Tuple input) throws IOException {
try {
Tuple result = tupleFactory.newTuple(3);
Tuple inputTuple = (Tuple) input.get(0);
String[] tupleArr = new String[] {
(String) inputTuple.get(0),
(String) inputTuple.get(1),
(String) inputTuple.get(2)
};
Arrays.sort(tupleArr); //ascending
result.set(0, tupleArr[2].substring(1));
result.set(1, tupleArr[0].substring(1));
result.set(2, tupleArr[1].substring(1));
return result;
}
catch (Exception e) {
throw new RuntimeException("TupleArrange error", e);
}
}
@Override
public Schema outputSchema(Schema input) {
return input;
}
}
这篇关于Pig 将架构更改为所需类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!