本文介绍了有效地批处理 Spark 数据帧以调用 API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 Spark 还很陌生,我正在尝试使用 Spotipy 调用 Spotify API.我有一个艺术家 ID 列表,可用于获取艺术家信息.Spotify API 允许一次批量调用多达 50 个 ID.我从 MySQL 数据库加载艺术家 ID 并将它们存储在数据框中.

I am fairly new to Spark and I'm trying to call the Spotify API using Spotipy. I have a list of artist ids which can be used to fetch artist info. The Spotify API allows for batch calls up to 50 ids at once. I load the artist ids from a MySQL database and store them in a dataframe.

我现在的问题是我不知道如何有效地将该数据帧批处理为 50 行或更少行的片段.

My problem now is that I do not know how to efficiently batch that dataframe into pieces of 50 or less rows.

在下面的示例中,我将数据帧转换为常规 Python 列表,我可以从中调用 50 个批次的 API.

In the example below I'm turning the dataframe into a regular Python list from which I can call the API on batches of 50.

有什么想法可以让我在不返回 Python 列表的情况下做到这一点吗?

Any ideas how I could do this without going back to a Python list?

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from pyspark.sql import SparkSession
import os

spark = SparkSession\
        .builder\
        .appName("GetArtists")\
        .getOrCreate()

df = spark.read.format('jdbc') \
    .option("url", "jdbc:mysql://"+os.getenv("DB_SERVER")+":"+os.getenv("DB_PORT")+"/spotify_metadata")\
    .option("user", os.getenv("DB_USER"))\
    .option("password", os.getenv("DB_PW"))\
    .option("query", "SELECT artist_id FROM artists")\
    .load()

sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())

ids = [row['artist_id'] for row in df.collect()]

batch_size = 50
for i in range(0,len(ids), batch_size):
    artists = sp.artists( ids[i:i+batch_size] )

    # process the JSON response

我想过使用 foreach 并为每个 id 调用 API,但这会导致不必要的请求.结果也存储回数据库中,这意味着我正在向数据库写入许多单行.

I thought about using foreach and calling the API for each id, but this results in unnecessary requests. Also the results are stored back in the database, which means that I am writing many single rows to the database.

推荐答案

如果你想根据行号来划分数据框,那么你可以这样做:

If you want to divide the dataframe based on the row number then you can do it like:

from pyspark.sql import functions as f
from pyspark.sql import Window

df = df.withColumn('row_num', f.row_number().over(Window.orderBy(f.lit(1))))
len = df.count()

for i in range(0,len, 50):
    df = df.filter(f.col('row_num')>=i & f.col('row_num')<=i+50)
    #api logic goes here

但是如果您可以直接将 df 传递给 api,那么传递 df 或收集 df 每次只有 50 个值.

But if you can pass the df to the api directly then pass df or collect df which will have only 50 values each time.

这篇关于有效地批处理 Spark 数据帧以调用 API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-30 06:23