这就是我想要做的:A = LOAD '...' USING PigStorage(',') AS ( col1:int ,col2:chararray);B = ORDER A by col2;C = CUSTOM_UDF(A);CUSTOM_UDF 遍历需要按顺序排列的元组。 UDF 将为每几个输入元组输出一个聚合元组;即,我不会以 1:1 的方式返回元组。本质上:public class CustomUdf extends EvalFunc<Tuple> { public Tuple exec(Tuple input) throws IOException { Aggregate aggregatedOutput = null; DataBag values = (DataBag)input.get(0); for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) { Tuple tuple = iterator.next(); .... if (some condition regarding current input tuple){ //do something to aggregatedOutput with information from input tuple } else { //Because input tuple does not apply to current aggregateOutput //return current aggregateOutput and apply input tuple //to new aggregateOutput Tuple returnTuple = aggregatedOutput.getTuple(); aggregatedOutputTuple = new Aggregate(tuple); return returnTuple; } } } // Establish the output Schema as a tuple public Schema outputSchema(Schema input) { Schema tupleSchema = new Schema(); ... return new Schema( new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE)); } /** This inner class is simply a wrapper for the output tuple **/ class Aggregate { //member variables public Aggregate(Tuple input) { //set member variables to value of input's fields } public Tuple getTuple() { Tuple output = TupleFactory.getInstance().newTuple(5); //set tuple's fields to values of member variables return output; } }}我已经能够做类似的事情A = LOAD '...' USING PigStorage(',') AS ( col1:int ,col2:chararray);B = ORDER A by col2;C = GROUP B BY col1;D = FOREACH C { GENERATE CUSTOM_UDF(B);}但是,这似乎并没有保留 ORDER BY,而且我无法弄清楚如何订购 d,因为我不断收到无效的字段投影。另外我不需要分组(它恰好在这个用例中工作)并且只想将 B 别名作为一包元组发送到 CUSTOM_UDF。我怎样才能做到这一点? 最佳答案 我认为您对 CustomUdf 的编写方式有疑问。根据您的描述,这听起来应该是 EvalFunc ,而不是 EvalFunc 。然后在实现中,当您遍历输入包中的所有元组时,您将累积的元组附加到在方法结束时返回的 DataBag 中。然后,您的 Pig 代码将类似于我在下面的代码。我认为 ORDER BY 不会像您拥有的那样在单独的语句中保留顺序。然而,它会在嵌套的 FOREACH 中保留顺序,如下所示。A = LOAD '...' USING PigStorage(',') AS ( col1:int ,col2:chararray);B = FOREACH (GROUP A ALL) { A_ordered = ORDER A BY col2; GENERATE FLATTEN(CUSTOM_UDF(A_ordered));}exec 方法类似于下面的修改版本。请注意我所做的更改。public DataBag exec(Tuple input) throws IOException { // different return type Aggregate aggregatedOutput = null; DataBag result = BagFactory.newDefaultBag(); // change here DataBag values = (DataBag)input.get(0); for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) { Tuple tuple = iterator.next(); .... if (some condition regarding current input tuple){ //do something to aggregatedOutput with information from input tuple } else { //Because input tuple does not apply to current aggregateOutput //return current aggregateOutput and apply input tuple //to new aggregateOutput Tuple returnTuple = aggregatedOutput.getTuple(); aggregatedOutputTuple = new Aggregate(tuple); result.add(returnTuple); // change here } } return result; // change here}关于hadoop - Pig : How to send all Tuples to a UDF to be Processed without Grouping them? 或者如何在不分组的情况下将元组转换为包?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/21445730/
10-13 01:20