本文介绍了如何通过mapInPandas正确转换spark数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过最新的 spark 3.0.1 函数将 spark 数据帧转换为 10k 行 mapInPandas.

I'm trying to transform spark dataframe with 10k rows by latest spark 3.0.1 function mapInPandas.

预期输出:mapped pandas_function() 将一行转换为三行,因此输出transformed_df 应该有30k 行

Expected output: mapped pandas_function() transforms one row to three, so output transformed_df should have 30k rows

当前输出:我得到 3 行 1 核和 24 行 8 核.

Current output: I'm getting 3 rows with 1 core and 24 rows with 8 cores.

输入:respond_sdf 有 10k 行

INPUT: respond_sdf has 10k rows

+-----+-------------------------------------------------------------------+
|url  |content                                                            |
+-----+-------------------------------------------------------------------+
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
+-----+-------------------------------------------------------------------+
only showing top 20 rows
Input respond_sdf has 10000 rows

输出 A) 3 行 - 1 个核心 - .master('local [1]')

OUTPUT A) 3 rows - with 1 core - .master('local [1]')

{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }    (0 + 1) / 1]
+-----+---+---+
|  api|  A|  B|
+-----+---+---+
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
+-----+---+---+

{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
 Output transformed_df has 3 rows

OUTPUT B) 24 行 - 8 核 - .master('local[8]')

OUTPUT B) 24 rows - with 8 cores - .master('local[8]')

{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }    (0 + 1) / 1]
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
+-----+---+---+
|  api|  A|  B|
+-----+---+---+
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_1|  1|  4|
|api_1|  2|  5|
+-----+---+---+
only showing top 20 rows

{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }    (3 + 5) / 8]
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
 Output transformed_df has 24 rows

示例代码:

#### IMPORT PYSPARK ###
import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
    .master('local[1]') \
    .getOrCreate()
sc = spark.sparkContext

####### INPUT DATAFRAME WITH LIST OF JSONS ########################

# Create list with 10k nested tuples(url,content)
rdd_list = [('api_1',"{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"),
            (' api_2', "{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }")]*5000

schema = StructType([
  StructField('url', StringType(), True),
  StructField('content', StringType(), True)
  ])

#Create input dataframe with 10k rows
jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
respond_sdf.show(truncate=False)

print(f'Input respond_sdf has {respond_sdf.count()} rows')

####### TRANSFORMATION DATAFRAME ########################

# Pandas transformation function returning pandas dataframe
def pandas_function(iter):
    for df in iter:
        print(df['content'][0])
        yield pd.DataFrame(eval(df['content'][0]))

transformed_df = respond_sdf.mapInPandas(pandas_function, "api string, A int, B int")
transformed_df.show()
print(f' Output transformed_df has {transformed_df.count()} rows')
print(f'Expected output dataframe should has 30k rows')

相关讨论链接:如何生成熊猫数据帧行以触发数据帧

推荐答案

抱歉,在我对您上一个问题的回答中,使用 mapInPandas 的部分不正确.我认为下面的这个函数是编写 pandas 函数的正确方法.上次我犯了一个错误,因为我之前认为 iter 是一个可迭代的行,但它实际上是一个可迭代的数据帧.

Sorry that in my answer to your previous question, the part that uses mapInPandas was incorrect. I think this function below is the correct way to write the pandas function. I made a mistake last time because I previously thought iter was an iterable of rows, but it's actually an iterable of dataframes.

def pandas_function(iter):
    for df in iter:
        yield pd.concat(pd.DataFrame(x) for x in df['content'].map(eval))

(PS 感谢来自此处的回答.)

(PS Thanks to answer from here.)

这篇关于如何通过mapInPandas正确转换spark数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-29 13:16
查看更多