从Spark结构化流媒体文档中:
“此检查点位置必须是与HDFS兼容的文件系统中的路径,并且可以在开始查询时在DataStreamWriter中设置为选项。”

确实,将检查点设置为s3路径将引发:

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
        at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
        at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
        at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
        at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133)
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook

这里有几个问题:
  • 为什么不将s3作为检查点目录(常规Spark流支持此功能)?是什么使文件系统“符合HDFS”?
  • 我暂时使用HDFS(因为群集可以一直上升或下降),并使用s3作为持久存储所有数据的位置-在这种设置中为结构化流数据存储检查点数据的建议是什么?
  • 最佳答案

    是什么使FS HDFS“兼容”?它是一个文件系统,具有Hadoop FS specification中指定的行为。此处涵盖了对象存储库和FS之间的区别,关键点是“最终一致的对象存储库没有附加或O(1)原子重命名不兼容”

    特别是对于S3

  • 不一致:创建新的Blob后,列表命令通常不会显示它。删除内容相同。
  • 当Blob被覆盖或删除时,可能需要一段时间才能消失
  • named()通过复制实现,然后删除

  • 通过将所有内容保存到某个位置,然后将其重命名为checkpoint目录来触发流式检查点。这使得到达检查点的时间与在S3中进行数据复制的时间成正比,约为6-10 MB/s。

    当前的流代码不适合s3

    目前,执行以下一项操作

  • 检查点复制到HDFS,然后将结果复制到
  • 检查点到分配并附加到集群
  • 的一些EBS
  • 检查点连接到S3,但检查点之间的间隔较长,因此到达检查点的时间不会降低您的流式应用程序的运行速度。

  • 如果您使用的是EMR,则可以为由dynamo DB支持的一致S3支付更高的费用,从而获得更好的一致性。但是复制时间仍然相同,因此检查点将同样缓慢

    关于apache-spark - Apache Spark(结构化流): S3 Checkpoint support,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42006664/

    10-13 06:32
    查看更多