问题描述
我正在使用AWS Athena查询来自S3的原始数据。由于Athena将查询输出写入S3输出存储桶,所以我经常这样做:
I'm using AWS Athena to query raw data from S3. Since Athena writes the query output into S3 output bucket I used to do:
df = pd.read_csv(OutputLocation)
但这似乎是一种昂贵的方法。最近,我注意到 boto3
的 get_query_results
方法返回一个复杂的结果字典。
But this seems like an expensive way. Recently I noticed the get_query_results
method of boto3
which returns a complex dictionary of the results.
client = boto3.client('athena')
response = client.get_query_results(
QueryExecutionId=res['QueryExecutionId']
)
我面临两个主要问题:
- 如何将
get_query_results
的结果格式化为pandas
数据框? -
get_query_results
仅返回1000行。如何使用它获得两百万行?
- How can I format the results of
get_query_results
intopandas
data frame? get_query_results
only returns 1000 rows. How can I use it to get two million rows?
推荐答案
get_query_results仅返回1000行。我如何使用它将200万行放入Pandas数据框中?
如果尝试添加:
client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)
您将获得下一个错误:
如果获得,则可以获得数百万行文件直接从存储桶s3(在下一个示例中转换为Pandas Dataframe)中:
You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe):
def obtain_data_from_s3(self):
self.resource = boto3.resource('s3',
region_name = self.region_name,
aws_access_key_id = self.aws_access_key_id,
aws_secret_access_key= self.aws_secret_access_key)
response = self.resource \
.Bucket(self.bucket) \
.Object(key= self.folder + self.filename + '.csv') \
.get()
return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')
self.filename可以是:
The self.filename can be:
self.filename = response['QueryExecutionId'] + ".csv"
因为雅典娜将文件命名为QueryExecutionId。我将为您编写所有需要查询的代码,并返回包含所有行和列的数据框。
Because Athena names the files as the QueryExecutionId. I will write you all my code that takes a query and return a dataframe with all the rows and columns.
import time
import boto3
import pandas as pd
import io
class QueryAthena:
def __init__(self, query, database):
self.database = database
self.folder = 'my_folder/'
self.bucket = 'my_bucket'
self.s3_input = 's3://' + self.bucket + '/my_folder_input'
self.s3_output = 's3://' + self.bucket + '/' + self.folder
self.region_name = 'us-east-1'
self.aws_access_key_id = "my_aws_access_key_id"
self.aws_secret_access_key = "my_aws_secret_access_key"
self.query = query
def load_conf(self, q):
try:
self.client = boto3.client('athena',
region_name = self.region_name,
aws_access_key_id = self.aws_access_key_id,
aws_secret_access_key= self.aws_secret_access_key)
response = self.client.start_query_execution(
QueryString = q,
QueryExecutionContext={
'Database': self.database
},
ResultConfiguration={
'OutputLocation': self.s3_output,
}
)
self.filename = response['QueryExecutionId']
print('Execution ID: ' + response['QueryExecutionId'])
except Exception as e:
print(e)
return response
def run_query(self):
queries = [self.query]
for q in queries:
res = self.load_conf(q)
try:
query_status = None
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
print(query_status)
if query_status == 'FAILED' or query_status == 'CANCELLED':
raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
time.sleep(10)
print('Query "{}" finished.'.format(self.query))
df = self.obtain_data()
return df
except Exception as e:
print(e)
def obtain_data(self):
try:
self.resource = boto3.resource('s3',
region_name = self.region_name,
aws_access_key_id = self.aws_access_key_id,
aws_secret_access_key= self.aws_secret_access_key)
response = self.resource \
.Bucket(self.bucket) \
.Object(key= self.folder + self.filename + '.csv') \
.get()
return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')
except Exception as e:
print(e)
if __name__ == "__main__":
query = "SELECT * FROM bucket.folder"
qa = QueryAthena(query=query, database='myAthenaDb')
dataframe = qa.run_query()
这篇关于如何使用Boto3 get_query_results方法从AWS Athena创建数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!