我做了以下管道:
任务管理器-> SQS->刮板工(我的应用程序)-> AWS Firehose-> S3文件-> Spark->(?)Redshift。

我正在尝试解决/改善的一些事情,我很乐意提供指导:

  • 刮板可能会获取重复的数据,然后再次将它们冲洗到firehose,这将导致 Spark 堆积。在开始计算之前,我是否应该使用Distinct函数在 Spark 中解决此问题?
  • 我没有删除S3处理的文件,因此数据越来越大。这是一个好习惯吗? (将s3作为输入数据库)还是我应该处理每个文件并在完成Spark之后将其删除?目前,我正在执行sc.textFile("s3n://...../*/*/*")-它将收集我的所有存储桶文件并对其进行计算。
  • 要将结果放置在Redshift(或s3)中->如何进行此操作?也就是说,如果s3变得越来越大,则redshift将具有重复的数据...我是否应该总是先刷新它?如何?
  • 最佳答案

    尽管没有在一个单一的管道中,但我之前曾遇到过这些问题。这是我所做的。

  • 删除重复项

    一个。我使用BloomFilter删除了本地重复项。请注意,文档相对不完整,但是您可以轻松保存/加载/联合/相交布隆过滤器对象。您甚至可以在过滤器上执行reduce

    b。如果将数据直接从Spark保存到RedShift,则可能需要花费一些时间和精力来更新当前批次的BloomFilter,进行广播,然后进行过滤以确保全局没有重复。在我在RDS中使用UNIQUE约束并忽略该错误之前,不幸的是RedShift does not honour the constraint
  • 和3.数据越来越大

  • 我使用EMR集群运行s3-dist-cp command来移动和合并数据(因为通常会有很多小的日志文件,这会影响Spark的性能)。如果您恰巧使用EMR来托管Spark集群,只需在分析之前添加一个步骤即可将数据从一个存储桶移至另一个存储桶。该步骤将command-runner.jar作为“自定义” jar,命令看起来像
    s3-dist-cp --src=s3://INPUT_BUCKET/ --dest=s3://OUTPUT_BUCKET_AND_PATH/ --groupBy=".*\.2016-08-(..)T.*" --srcPattern=".*\.2016-08.*" --appendToLastFile --deleteOnSuccess
    

    请注意,原始distcp不支持合并文件。

    通常,您应避免在同一存储桶(或至少路径)中同时处理和处理未处理的数据。

    09-30 15:33
    查看更多