使RANK运算符在Pig版本0.12.1中工作还没有取得任何成功。以为我会在提出错误报告之前在这里发布。

这是我非常简单的脚本:

InputData = LOAD '$in' USING PigStorage('\u0001') AS (a1:chararray, a2:chararray, score:float);
Ranked = RANK InputData BY score DESC DENSE;
OutputData = FOREACH Ranked GENERATE
    rank_InputData AS rank,
    a1 AS a1,
    score AS score;
STORE OutputData INTO '$out' using PigStorage('\u0001');

我已经使用两种不同版本的输入来运行它:
  • $ in包含4700个输入路径
  • $ in包含60个输入路径

  • 这些输入包含相同的数据(2.8亿行,25 GB),除了第二个输入具有将数据聚合到较少数量的文件中(由this thread related to a counters-per-mapper issue驱动)。

    两个输入的结果(“Java堆空间”错误)相同:
    Backend error message
    ---------------------
    Error: Java heap space
    
    Pig Stack Trace
    ---------------
    ERROR 2244: Job failed, hadoop does not return any error message
    
    org.apache.pig.backend.executionengine.ExecException: ERROR 2244: Job failed, hadoop does not return any error message
            at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:148)
            at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:202)
            at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:173)
            at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:84)
            at org.apache.pig.Main.run(Main.java:478)
            at org.apache.pig.Main.main(Main.java:156)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:606)
            at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
    

    我尝试了该脚本的另一个版本,该版本使用ORDER ... BY来完成一些工作:
    InputData = LOAD '$in' USING PigStorage('\u0001') AS (a1:chararray, a2:chararray, score:float);
    InputDataOrdered = ORDER InputData BY score DESC PARALLEL 60;
    Ranked = RANK InputDataOrdered;
    OutputData = FOREACH Ranked GENERATE
        rank_InputDataOrdered AS rank,
        a1 AS a1,
        score AS score;
    STORE OutputData INTO '$out' using PigStorage('\u0001');
    

    两个输入都给出相同的错误,这一次是“计数器过多”:
    Pig Stack Trace
    ---------------
    ERROR 2043: Unexpected error during execution.
    ...
    Caused by: java.io.IOException: Error reading responses
            at org.apache.hadoop.ipc.Client$Connection.run(Client.java:843)
    Caused by: org.apache.hadoop.mapreduce.counters.LimitExceededException: Too many counters: 121 max=120
            at org.apache.hadoop.mapreduce.counters.Limits.checkCounters(Limits.java:61)
            at org.apache.hadoop.mapreduce.counters.Limits.incrCounters(Limits.java:68)
            at org.apache.hadoop.mapreduce.counters.AbstractCounterGroup.readFields(AbstractCounterGroup.java:174)
            at org.apache.hadoop.mapred.Counters$Group.readFields(Counters.java:278)
            at org.apache.hadoop.mapreduce.counters.AbstractCounters.readFields(AbstractCounters.java:303)
            at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
            at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
            at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:952)
            at org.apache.hadoop.ipc.Client$Connection.run(Client.java:836)
    

    有什么想法吗?

    最佳答案

    建议您在进行排名字段时使用DataFu库中的Enumerate UDF。

    register 'datafu-1.2.0.jar';
    define Enumerate datafu.pig.bags.Enumerate('1');
    
    A = load 'test.dat' using PigStorage() AS ( f1:int, f2:int);
    
    A = group A ALL;
    
    A = foreach A {
            sorted = order A by f1 DESC, f2 ASC;
            generate group, sorted;
    }
    
    B = foreach A generate FLATTEN( Enumerate( sorted ));
    
    dump  B;  <- ( f1, f2, rank)
    

    引用http://datafu.incubator.apache.org/docs/datafu/1.1.0/datafu/pig/bags/Enumerate.html

    10-07 13:25