本文介绍了使用Databricks将Google Api的结果写入数据湖的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在通过Databricks上的Python SDK从Google管理员报告用户使用情况Api中获取用户使用情况数据.每天的数据量大约为10万条记录,我通过一个批处理过程做了一个晚上. api返回的最大页面大小为1000,因此我大致将其称为1000,以获取当天所需的数据.一切正常.

I am getting back user usage data from the Google Admin Report User Usage Api via the Python SDK on Databricks. The data size is around 100 000 records per day which I do a night via a batch process. The api returns a max page size of 1000 so I call it 1000 roughly to get the data I need for the day. This is working fine.

我的最终目标是将原始格式的数据存储在数据湖中(Azure Gen2,但与该问题无关).稍后,我将使用Databricks将数据转换为汇总报告模型,并将PowerBI置于其之上,以跟踪一段时间内Google App的使用情况.

My ultimate aim is to store the data in its raw format in a data lake (Azure Gen2, but irrelevant to this question). Later on, I will transform the data using Databricks into an aggregated reporting model and put PowerBI on top of it to track Google App usage over time.

作为C#程序员,我是Python和Spark的新手:我当前的方法是从api请求第一千条记录的第一页,然后将其作为JSON文件直接写入datalake,然后获取下一页集和也写.文件夹结构类似于"\ raw \ googleuser \ YYYY \ MM \ DD \ data1.json".

As a C# programmer, I am new to Python and Spark: my current approach is to request the first page of 1000 records from the api and then write it to the datalake directly as a JSON file, then get the next pageset and write that too. The folder structure would be something like "\raw\googleuser\YYYY\MM\DD\data1.json".

我想在原始区域中将数据保持为最原始的形式,并且不进行过多的转换.第二个过程可以提取我需要的字段,用元数据标记它,并将其写回Parquet,以供按功能使用.这就是为什么我正在考虑将其编写为JSON的原因.

I would like to keep data in it's rawest form possible in the raw zone and not apply too many transformations. The 2nd process can extract the fields I need, tag it with metadata and write it back as Parquet ready for consumption by function. This is why I am thinking of writing it as JSON.

这意味着第二个过程需要将JSON读取到一个数据帧中,在这里我可以对其进行转换并将其编写为镶木地板(这部分也很简单).

This means that the 2nd process needs to read the JSON into a dataframe where I can transform it and write it as parquet (this part is also straight forward).

因为我使用的是Google Api,所以我不使用Json-它返回dict对象(具有复杂的嵌套).我可以使用json.dump()将其提取为Json字符串,但无法弄清楚如何直接将STRING写入我的数据湖.一旦将它放入数据帧中,我就可以轻松地以任何格式编写它,但是将其从Json转换为数据帧,然后实质上只是写回Json似乎在性能上有开销.

Because I am using the Google Api I am not working with Json - it returns dict objects (with complex nesting). I can extract it as a Json string using json.dump() but I cannot figure out how to write a STRING directly to my datalake. Once I get it into a dataframe I can easily write it in any format, however it seems like a performance overhead to convert it from Json into a dataframe and then essentially back to Json just to write it.

以下是我尝试过的事情和结果:

Here are the things I have tried and the results:

  1. 建立一个pyspark.sql.Rows列表,并在所有分页的末尾(100k行)-使用spark.createDataFrame(rows)将其转换为数据框.一旦它是一个数据框,那么我可以将其另存为Json文件.这可行,但效率低下.
  2. 使用json.dump(request)在Json中获取1000条记录的字符串.我可以使用以下代码将其写入Databricks文件系统:

  1. Build up a list of pyspark.sql.Rows and at the end of all the paging (100k of rows) - use spark.createDataFrame(rows) to turn it into a dataframe. Once it is a dataframe then I can save it as a Json file. This works, but seems inefficient.
  2. Use json.dump(request) to get a string of 1000 records in Json. I am able to write it to the Databricks File System using this code:

with open("/dbfs/tmp/googleuserusagejsonoutput-{0}.json" .format(keyDateFilter), 'w') as f:f.write(json.dumps(response))

with open("/dbfs/tmp/googleuserusagejsonoutput-{0}.json" .format(keyDateFilter), 'w') as f:f.write(json.dumps(response))

但是,然后我必须使用以下命令将其移动到我的Azure数据湖:

However, I then have to move it to my Azure data lake with:

dbutils.fs.cp("/tmp/test_dbfs1.txt", datalake_path + dbfs_path + "xyz.json")

然后,我获得接下来的1000条记录,并继续执行此操作.我似乎无法在数据湖存储(Azure abfss驱动程序)中使用open()方法目录,否则这将是一个不错的解决方案.首先在本地转储然后再移动它似乎很脆弱而且很奇怪.

Then I get the next 1000 records and keep doing this. I cannot seem to use the open() method directory to the data lake store (Azure abfss driver) or this would be a decent solution. It seems fragile and strange to dump it locally first and then move it.

与选项1相同,但每隔1000条记录将数据帧转储到datalake并覆盖它(以便内存一次最多增加1000条记录)

Same as option 1, but do dump the dataframe to datalake every 1000 records and overwrite it (so that memory does not increase more than 1000 records at a time)

忽略转储原始Json的规则.将数据整理成我想要的最简单的格式,并删除我不需要的所有多余数据.这将导致更小的占用空间,然后将采用上述选项1或3. (这是第二个问题-以原始格式保存来自Api的所有数据的原则,以便随着需求的变化,我总是将历史数据保存在数据湖中,并且可以更改转换例程以从中提取不同的指标因此,我不愿意在此阶段删除任何数据.

Ignore the rule of dumping raw Json. Massage the data into the simplest format I want and get rid of all extra data I don't need. This would result in a much smaller footprint and then Option 1 or 3 above would be followed. (This is the second question - the principle of saving all data from the Api in it's raw format so that as requirements change over time I always have the historical data in the data lake and can just change the transformation routines to extract different metrics out of it. Hence I am reluctant to drop any data at this stage.

任何建议请感激……

推荐答案

将湖泊安装到您的数据块环境中,因此您可以将其保存到湖泊中,就好像它是普通文件夹一样:

Mount the lake to your databricks environment so you can just save it to the lake as if it was a normal folder:

with open('/dbfs/mnt/mydatalake/googleuserusagejsonoutput-{0}.json', 'wb') as f:
            json.dump(data, codecs.getwriter('utf-8')(f), sort_keys = True, indent = 4, ensure_ascii=False)
            f.close()

您只需要安装一次湖泊:

You only need to mount the lake once:

https://docs.databricks.com/spark/latest/data-sources/azure/azure-datalake-gen2.html#mount-the-azure-data-lake -storage-gen2-filesystem-with-dbfs

话虽这么说

以json格式存储大数据并不是最佳选择;对于要存储键(列名)的每个值(单元格),因此您的数据将远远大于所需的大小.另外,您可能应该具有重复数据删除功能,以确保这两者:(1)数据中没有间隙,并且(2)您没有将相同的数据存储在多个文件中. Databricks delta可以解决这个问题.

Storing big data in json format is not optimal; for each and every value (cell) you are storing the key (column name), so your data will be much larger than it needs to be. Also, you should probably have a de-duplication function to ensure both, (1) there are not gaps in the data, and (2) you aren't storing the same data in multiple files. Databricks delta takes care of that.

https://docs.databricks.com/delta/delta-intro.html

这篇关于使用Databricks将Google Api的结果写入数据湖的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 08:37