我正在使用jupyter笔记本运行sparksession。
我有时会在由spark.read.parquet(some_path)初始化的数据帧上出现错误,即使该路径下的文件已更改,即使我缓存了该数据帧。
例如
读取代码是

sp = spark.read.parquet(TB.STORE_PRODUCT)
sp.cache()
有时,sp不能再访问了,抱怨:
Py4JJavaError: An error occurred while calling o3274.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 326.0 failed 4 times, most recent failure: Lost task 10.3 in stage 326.0 (TID 111818, dc38, executor 7): java.io.FileNotFoundException: File does not exist: hdfs://xxxx/data/dm/sales/store_product/part-00000-169428df-a9ee-431e-918b-75477c073d71-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
问题

  • 我没有配置单元表,它只是一个hdfs路径


  • 我不想重新启动sparksession,这会浪费很多时间


  • 再次执行sp = spark.read.parquet(TB.STORE_PRODUCT)不起作用,我可以理解为什么,spark应该再次扫描路径,或者必须有一个选项/设置来强制进行扫描。将整个路径保存在内存中并不明智。

  • Signature: spark.read.parquet(*paths)
    Docstring:
    Loads Parquet files, returning the result as a :class:`DataFrame`.
    
    You can set the following Parquet-specific option(s) for reading Parquet files:
        * ``mergeSchema``: sets whether we should merge schemas collected from all                 Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.                 The default value is specified in ``spark.sql.parquet.mergeSchema``.
    
    >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
    >>> df.dtypes
    [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
    
    .. versionadded:: 1.4
    Source:
        @since(1.4)
        def parquet(self, *paths):
            """Loads Parquet files, returning the result as a :class:`DataFrame`.
    
            You can set the following Parquet-specific option(s) for reading Parquet files:
                * ``mergeSchema``: sets whether we should merge schemas collected from all \
                    Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
                    The default value is specified in ``spark.sql.parquet.mergeSchema``.
    
            >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
            >>> df.dtypes
            [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
            """
            return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
    File:      /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/readwriter.py
    Type:      method
    
    有解决我问题的适当方法吗?

    最佳答案

    该问题是由Dataframe.cache引起的。
    我需要先清除缓存,然后再读一次即可解决问题
    代码:

    try:
        sp.unpersist()
    except:
        pass
    sp = spark.read.parquet(TB.STORE_PRODUCT)
    sp.cache()
    

    关于apache-spark - 如何刷新HDFS路径?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/62669047/

    10-16 22:26