本文介绍了在AWS Glue pySpark脚本中使用SQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我想使用AWS Glue将某些csv数据转换为orc。
我创建的ETL作业生成了以下PySpark脚本:
I want to use AWS Glue to convert some csv data to orc.
The ETL job I created generated the following PySpark script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "tests", table_name = "test_glue_csv", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "int", "id", "int"), ("val", "string", "val", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://glue/output"}, format = "orc", transformation_ctx = "datasink4")
job.commit()
它获取csv数据(从Athena表测试的位置开始)。 test_glue_csv指向),并输出到 s3:// glue / output /
。
It takes the csv data (from the location of which the Athena table tests.test_glue_csv points to) and outputs to s3://glue/output/
.
如何在此脚本中插入一些SQL操作?
How can I insert in this script some SQL manipulations?
谢谢
推荐答案
首先应该从动态框架中创建临时视图/表
You should first create a temp view/table from your dynamic frame
dyf.toDF().createOrReplaceTempView("view_dyf")
此处, dyf
是动态框架。
然后,使用spark对象在其上应用sql查询
Then, use your spark object to apply sql queries on it
sqlDF = spark.sql("select * from view_dyf")
sqlDF.show()
这篇关于在AWS Glue pySpark脚本中使用SQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!