将架构更改为所需类型

将架构更改为所需类型

本文介绍了Pig 将架构更改为所需类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Pig 的新用户.

I'm a new Pig user.

我有一个要修改的现有架构.我的源数据如下,有6列:

I have an existing schema which I want to modify. My source data is as follows with 6 columns:

Name        Type    Date        Region    Op    Value
-----------------------------------------------------
john        ab      20130106    D         X     20
john        ab      20130106    D         C     19
jphn        ab      20130106    D         T     8
jphn        ab      20130106    E         C     854
jphn        ab      20130106    E         T     67
jphn        ab      20130106    E         X     98

等等.每个 Op 值总是 CTX.

and so on. Each Op value is always C, T or X.

我基本上想通过以下方式将我的数据分成 7 列:

I basically want to split my data in the following way into 7 columns:

Name        Type    Date        Region    OpX    OpC   OpT
----------------------------------------------------------
john        ab      20130106    D         20     19    8
john        ab      20130106    E         98     854   67

基本上将 Op 列分成 3 列:每一列对应一个 Op 值.这些列中的每一列都应包含来自 Value 列的适当值.

Basically split the Op column into 3 columns: each for one Op value. Each of these columns should contain appropriate value from column Value.

如何在 Pig 中执行此操作?

How can I do this in Pig?

推荐答案

达到预期结果的一种方法:

One way to achieve the desired result:

IN = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray,
       date:int, region:chararray, op:chararray, value:int);
A = order IN by op asc;
B = group A by (name, type, date, region);
C = foreach B {
  bs = STRSPLIT(BagToString(A.value, ','),',',3);
  generate flatten(group) as (name, type, date, region),
    bs.$2 as OpX:chararray, bs.$0 as OpC:chararray, bs.$1 as OpT:chararray;
}

describe C;
C: {name: chararray,type: chararray,date: int,region: chararray,OpX:
    chararray,OpC: chararray,OpT: chararray}

dump C;
(john,ab,20130106,D,20,19,8)
(john,ab,20130106,E,98,854,67)

更新:

如果你想跳过 order by 这会在计算中增加一个额外的 reduce 阶段,你可以在 tuple v.然后使用 自定义 UDF 对元组字段进行排序具有所需的 OpX、OpC、OpT 顺序:

If you want to skip order by which adds an additional reduce phase to the computation, you can prefix each value with its corresponding op in tuple v. Then sort the tuple fields by using a custom UDF to have the desired OpX, OpC, OpT order:

register 'myjar.jar';
A = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray,
      date:int, region:chararray, op:chararray, value:int);
B = group A by (name, type, date, region);
C = foreach B {
  v = foreach A generate CONCAT(op, (chararray)value);
  bs = STRSPLIT(BagToString(v, ','),',',3);
  generate flatten(group) as (name, type, date, region),
    flatten(TupleArrange(bs)) as (OpX:chararray, OpC:chararray, OpT:chararray);
}

其中 mjar.jar 中的 TupleArrange 是这样的:

where TupleArrange in mjar.jar is something like this:

..
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class TupleArrange extends EvalFunc<Tuple> {

    private static final TupleFactory tupleFactory = TupleFactory.getInstance();

    @Override
    public Tuple exec(Tuple input) throws IOException {
        try {
            Tuple result = tupleFactory.newTuple(3);
            Tuple inputTuple = (Tuple) input.get(0);
            String[] tupleArr = new String[] {
                    (String) inputTuple.get(0),
                    (String) inputTuple.get(1),
                    (String) inputTuple.get(2)
            };
            Arrays.sort(tupleArr); //ascending
            result.set(0, tupleArr[2].substring(1));
            result.set(1, tupleArr[0].substring(1));
            result.set(2, tupleArr[1].substring(1));
            return result;
        }
        catch (Exception e) {
            throw new RuntimeException("TupleArrange error", e);
        }
    }

    @Override
    public Schema outputSchema(Schema input) {
        return input;
    }
}

这篇关于Pig 将架构更改为所需类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 23:25