到目前为止,我发现的例子是将json流式传输到BQ,例如https://cloud.google.com/bigquery/streaming-data-into-bigquery
如何将Csv或任何文件类型流式传输到BQ?下面是流式处理的代码块,似乎“问题”在insert_all_data中,其中“row”定义为json。。谢谢
# [START stream_row_to_bigquery]
def stream_row_to_bigquery(bigquery, project_id, dataset_id, table_name, row,
num_retries=5):
insert_all_data = {
'rows': [{
'json': row,
# Generate a unique id for each row so retries don't accidentally
# duplicate insert
'insertId': str(uuid.uuid4()),
}]
}
return bigquery.tabledata().insertAll(
projectId=project_id,
datasetId=dataset_id,
tableId=table_name,
body=insert_all_data).execute(num_retries=num_retries)
# [END stream_row_to_bigquery]
最佳答案
这就是我如何非常容易地使用wrote库。
def insert_data(datasetname,table_name,DataObject):
client = get_client(project_id, service_account=service_account,
private_key_file=key, readonly=False, swallow_results=False)
insertObject = DataObject
try:
result = client.push_rows(datasetname,table_name,insertObject)
except Exception, err:
print err
raise
return result
这里insertObject是一个字典列表,其中一个字典包含一行。
例如:
[{field1:value1, field2:value2},{field1:value3, field2:value4}]
csv可以如下读取,
import pandas as pd
fileCsv = pd.read_csv(file_path+'/'+filename, parse_dates=C, infer_datetime_format=True)
data = []
for row_x in range(len(fileCsv.index)):
i = 0
row = {}
for col_y in schema:
row[col_y['name']] = _sorted_list[i]['col_data'][row_x]
i += 1
data.append(row)
insert_data(datasetname,table_name,data)
数据列表可以发送到插入数据
这可以做到,但仍然有一个限制,我已经提出了bigquery-python。