使用Spark 1.1

我的工作如下:

  • 读取给定根目录下的文件夹列表,并行化列表
  • 对于每个文件夹,请阅读其下的文件-这些是gzip压缩文件
  • 对于每个文件,请提取内容-这些是几行,每行代表一个事件,各字段之间用制表符(TSV)分隔。
  • 为所有行创建一个RDD。
  • 将TSV转换为json。

  • (现在,这些行表示某种事件类型。共有4种类型: session ,请求,推荐,用户事件)
  • 仅过滤掉 session 事件。根据一些用户ID字段,仅对其中的1:100进行采样。将它们转换为一对,并使用代表某种输出结构的键(例如:事件类型/日期/事件),然后将其写入FS。
  • 对请求和用户事件执行相同的操作

  • (对于建议,无法根据用户ID进行采样(因为那里不存在采样),但是我们知道,根据相互请求ID字段,请求与建议之间存在1:1的关系。所以:)
  • 创建不同请求ID的列表。将这个列表与基于请求id的推荐列表一起作为键,从而实现我们想要的过滤。然后将缩小后的列表输出到FS。

  • 现在,这是我的问题。我用于执行这些操作的代码在较小范围内起作用。但是,当我在相对较大的输入上运行时,我使用了由80台机器组成的群集,每台机器有8个核心,每个内存50GB,因此我发现没有使用很多机器,这意味着只有一个核心被占用(也只有约20%),并且为该作业配置的40GB中只有16GB的内存。

    我认为我的转换不能很好地并行化,但是我不确定在哪里以及为什么。这是我的大部分代码(我忽略了一些我认为与该问题无关的辅助功能)
     public static void main(String[] args) {
    
        BasicConfigurator.configure();
    
        conf[0] = new Conf("local[4]");
        conf[1] = new Conf("spark://hadoop-m:7077");
        Conf configuration = conf[1];
    
        if (args.length != 4) {
            log.error("Error in parameters. Syntax: <input path> <output_path> <filter_factor> <locality>\nfilter_factor is what fraction of sessions to process. For example, to process 1/100 of sessions, use 100\nlocality should be set to \"local\" in case running on local environment, and to \"remote\" otherwise.");
            System.exit(-1);
        }
    
        final String inputPath = args[0];
        final String outputPath = args[1];
        final Integer filterFactor;
    
        if (args[3].equals("local")) {
            configuration = conf[0];
        }
    
        log.setLevel(Level.DEBUG);
        Logger.getRootLogger().removeAppender("console");
        final SparkConf conf = new SparkConf().setAppName("phase0").setMaster(configuration.getMaster());
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.kryo.registrator", "com.doit.customer.dataconverter.MyRegistrator");
        final JavaSparkContext sc = new JavaSparkContext(conf);
        if (configuration.getMaster().contains("spark:")) {
            sc.addJar("/home/hadoop/hadoop-install/phase0-1.0-SNAPSHOT-jar-with-dependencies.jar");
        }
        try {
            filterFactor = Integer.parseInt(args[2]);
            // read all folders from root
            Path inputPathObj = new Path(inputPath);
            FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true));
            FileStatus[] statusArr = fs.globStatus(inputPathObj);
            List<FileStatus> statusList = Arrays.asList(statusArr);
    
            List<String> pathsStr = convertFileStatusToPath(statusList);
    
            JavaRDD<String> paths = sc.parallelize(pathsStr);
    
            // read all files from each folder
            JavaRDD<String> filePaths = paths.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
                @Override
                public Iterable<String> call(Iterator<String> pathsIterator) throws Exception {
                    List<String> filesPath = new ArrayList<String>();
                    if (pathsIterator != null) {
                        while (pathsIterator.hasNext()) {
                            String currFolder = pathsIterator.next();
                            Path currPath = new Path(currFolder);
                            FileSystem fs = FileSystem.get(currPath.toUri(), new Configuration(true));
                            FileStatus[] files = fs.listStatus(currPath);
                            List<FileStatus> filesList = Arrays.asList(files);
                            List<String> filesPathsStr = convertFileStatusToPath(filesList);
                            filesPath.addAll(filesPathsStr);
                        }
                    }
                    return filesPath;
                }
            });
    
    
            // Transform list of files to list of all files' content in lines
            JavaRDD<String> typedData = filePaths.map(new Function<String, List<String>>() {
                @Override
                public List<String> call(String filePath) throws Exception {
                    Tuple2<String, List<String>> tuple = null;
                    try {
                        String fileType = null;
                        List<String> linesList = new ArrayList<String>();
                        Configuration conf = new Configuration();
                        CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                        Path path = new Path(filePath);
                        fileType = getType(path.getName());
    
                        // filter non-trc files
                        if (!path.getName().startsWith("1")) {
                            return linesList;
                        }
    
                        CompressionCodec codec = compressionCodecs.getCodec(path);
                        FileSystem fs = path.getFileSystem(conf);
                        InputStream in = fs.open(path);
                        if (codec != null) {
                            in = codec.createInputStream(in);
                        } else {
                            throw new IOException();
                        }
    
                        BufferedReader r = new BufferedReader(new InputStreamReader(in, "UTF-8"), BUFFER_SIZE);
    
                        // This line will not be added to the list ,
                        // which is what we want - filter the header row
                        String line = r.readLine();
    
                        // Read all lines
                        while ((line = r.readLine()) != null) {
                            try {
                                String sliceKey = getSliceKey(line, fileType);
                                // Adding event type and output slice key as additional fields
                                linesList.add(fileType + "\t" + sliceKey + "\t" + line);
                            } catch(ParseException e) {
                            }
                        }
    
                        return linesList;
                    } catch (Exception e) { // Filtering of files whose reading went wrong
                        log.error("Reading of the file " + filePath + " went wrong: " + e.getMessage());
                        return new ArrayList();
                    }
                }
                // flatten to one big list with all the lines
            }).flatMap(new FlatMapFunction<List<String>, String>() {
                @Override
                public Iterable<String> call(List<String> strings) throws Exception {
                    return strings;
                }
            });
    
            // convert tsv to json
    
            JavaRDD<ObjectNode> jsons = typedData.mapPartitions(new FlatMapFunction<Iterator<String>, ObjectNode>() {
                @Override
                public Iterable<ObjectNode> call(Iterator<String> stringIterator) throws Exception {
                    List<ObjectNode> res = new ArrayList<>();
                    while(stringIterator.hasNext()) {
                        String currLine = stringIterator.next();
                        Iterator<String> i = Splitter.on("\t").split(currLine).iterator();
                        if (i.hasNext()) {
                            String type = i.next();
                            ObjectNode json = convert(currLine, type, filterFactor);
                            if(json != null) {
                                res.add(json);
                            }
                        }
                    }
                    return res;
                }
            }).cache();
    
    
            createOutputType(jsons, "Session", outputPath, null);
            createOutputType(jsons, "UserEvent", outputPath, null);
            JavaRDD<ObjectNode> requests = createOutputType(jsons, "Request", outputPath, null);
    
    
            // Now leave only the set of request ids - to inner join with the recommendations
            JavaPairRDD<String,String> requestsIds = requests.mapToPair(new PairFunction<ObjectNode, String, String>() {
                @Override
                public Tuple2<String, String> call(ObjectNode jsonNodes) throws Exception {
                    String id = jsonNodes.get("id").asText();
                    return new Tuple2<String, String>(id,id);
                }
            }).distinct();
    
            createOutputType(jsons,"RecommendationList", outputPath, requestsIds);
    
        } catch (IOException e) {
            log.error(e);
            System.exit(1);
        } catch (NumberFormatException e) {
            log.error("filter factor is not a valid number!!");
            System.exit(-1);
        }
    
        sc.stop();
    
    }
    
    private static JavaRDD<ObjectNode> createOutputType(JavaRDD jsonsList, final String type, String outputPath,JavaPairRDD<String,String> joinKeys) {
    
        outputPath = outputPath + "/" + type;
    
        JavaRDD events = jsonsList.filter(new Function<ObjectNode, Boolean>() {
            @Override
            public Boolean call(ObjectNode jsonNodes) throws Exception {
                return jsonNodes.get("type").asText().equals(type);
            }
        });
    
    
        // This is in case we need to narrow the list to match some other list of ids... Recommendation List, for example... :)
        if(joinKeys != null) {
            JavaPairRDD<String,ObjectNode> keyedEvents = events.mapToPair(new PairFunction<ObjectNode, String, ObjectNode>() {
                @Override
                public Tuple2<String, ObjectNode> call(ObjectNode jsonNodes) throws Exception {
                    return new Tuple2<String, ObjectNode>(jsonNodes.get("requestId").asText(),jsonNodes);
                }
            });
    
            JavaRDD<ObjectNode> joinedEvents = joinKeys.join(keyedEvents).values().map(new Function<Tuple2<String, ObjectNode>, ObjectNode>() {
               @Override
               public ObjectNode call(Tuple2<String, ObjectNode> stringObjectNodeTuple2) throws Exception {
                   return stringObjectNodeTuple2._2;
               }
            });
            events = joinedEvents;
        }
    
    
        JavaPairRDD<String,Iterable<ObjectNode>> groupedEvents = events.mapToPair(new PairFunction<ObjectNode, String, ObjectNode>() {
            @Override
            public Tuple2<String, ObjectNode> call(ObjectNode jsonNodes) throws Exception {
                return new Tuple2<String, ObjectNode>(jsonNodes.get("sliceKey").asText(),jsonNodes);
            }
        }).groupByKey();
        // Add convert jsons to strings and add "\n" at the end of each
    
        JavaPairRDD<String, String> groupedStrings = groupedEvents.mapToPair(new PairFunction<Tuple2<String, Iterable<ObjectNode>>, String, String>() {
            @Override
            public Tuple2<String, String> call(Tuple2<String, Iterable<ObjectNode>> content) throws Exception {
                String string = jsonsToString(content._2);
                log.error(string);
                return new Tuple2<>(content._1, string);
            }
        });
        groupedStrings.saveAsHadoopFile(outputPath, String.class, String.class, KeyBasedMultipleTextOutputFormat.class);
        return events;
    }
    
    // Notice the special case of if(joinKeys != null) in which I join the recommendations with request ids.
    

    最后,我用来启动Spark作业的命令是:
    spark-submit --class com.doit.customer.dataconverter.Phase0 --driver-cores 8 --total-executor-cores 632 --driver-memory 40g --executor-memory 40G --deploy-mode cluster /home/hadoop/hadoop-install/phase0-1.0-SNAPSHOT-jar-with-dependencies.jar gs://input/2014_07_31* gs://output/2014_07_31 100 remote
    

    最佳答案

    您的初始分区基于根目录中的文件夹集(sc.parallelize(pathsStr))。流程中有两个步骤可能会严重破坏分区的平衡:1)如果某些文件夹的文件比其他文件夹多,则读取每个文件夹中的文件列表; 2)如果某些文件具有比其他文件更多的行,则从每个文件读取TSV行。

    如果文件大小大致相同,但是某些文件夹中的文件数量更多,则可以在收集文件名后重新平衡分区。设置filePaths的初始值后,请尝试添加以下行:

    filePaths = filePaths.repartition(sc.defaultParallelism());
    

    这会将收集到的文件名混洗成平衡的分区。

    如果由于某些文件比其他文件大得多而导致不平衡,您可以尝试通过类似地对其调用分区来重新平衡您的typedData RDD,尽管这样做会更加昂贵,因为它将洗刷所有TSV数据。

    或者,如果您重新平衡filePaths并仍然由于一些较大的文件最终导致几个分区而导致某些分区不平衡,则可以通过在repartition参数中使用较大的数量来获得更好的性能,例如乘以四,那么您得到的分区数是核心数的四倍。这将稍微增加通信成本,但如果它可以更好地平衡typedData中生成的分区大小,则可能是一个胜利。

    10-04 15:49
    查看更多