问题描述
我试图创建一个计时器触发azure函数,该函数从blob中获取数据,对其进行聚合,然后将这些聚合放入cosmosDB中.我以前曾尝试在azure函数中使用绑定来将blob用作输入,但我被告知是不正确的(请参见此线程:).
I am trying to create a timer trigger azure function that takes data from blob, aggregates it, and puts the aggregates in a cosmosDB. I previously tried using the bindings in azure functions to use blob as input, which I was informed was incorrect (see this thread: Azure functions python no value for named parameter).
我现在正在使用SDK,并遇到以下问题:
I am now using the SDK and am running into the following problem:
import sys, os.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'myenv/Lib/site-packages')))
import json
import pandas as pd
from azure.storage.blob import BlockBlobService
data = BlockBlobService(account_name='accountname', account_key='accountkey')
container_name = ('container')
generator = data.list_blobs(container_name)
for blob in generator:
print("{}".format(blob.name))
json = json.loads(data.get_blob_to_text('container', open(blob.name)))
df = pd.io.json.json_normalize(json)
print(df)
这会导致错误:
IOError: [Errno 2] No such file or directory: 'test.json'
我意识到这可能是绝对路径问题,但是我不确定这在Azure存储中如何工作.关于如何规避这一点的任何想法?
I realize this might be an absolute path issue, but im not sure how that works with azure storage. Any ideas on how to circumvent this?
通过执行以下操作使其起作用":
Made it "work" by doing the following:
for blob in generator:
loader = data.get_blob_to_text('kvaedevdystreamanablob',blob.name,if_modified_since=delta)
json = json.loads(loader.content)
这适用于一个json文件,即我的存储空间只有一个,但是添加更多文件时出现此错误:
This works for ONE json file, i.e I only had one in storage, but when more are added I get this error:
ValueError: Expecting object: line 1 column 21907 (char 21906)
即使我将if_modified_since
添加为仅吸收一个blob,也会发生这种情况.如果我知道有什么会更新.随时欢迎您的帮助.
This happens even if i add if_modified_since
as to only take in one blob. Will update if I figure something out. Help always welcome.
另一个更新:我的数据通过流分析输入,然后下降到blob.我选择将数据作为数组输入,这就是发生错误的原因.当流终止时,blob不会立即将]
追加到json中的EOF行,因此json文件无效.现在将尝试在流分析中使用逐行而不是数组.
Another update: My data is coming in through stream analytics, and then down to the blob. I have selected that the data should come in as arrays, this is why the error is occurring. When the stream is terminated, the blob doesnt immediately append ]
to the EOF line in json, thus the json file isnt valid. Will try now with using line-by-line in stream analytics instead of array.
推荐答案
将其弄清楚了.最后,这是一个非常简单的修复程序:
figured it out. In the end it was a quite simple fix:
我必须确保blob中的每个json条目少于1024个字符,否则它将创建新行,从而使阅读行有问题.
I had to make sure each json entry in the blob was less than 1024 characters, or it would create a new line, thus making reading lines problematic.
遍历每个blob文件,读取并添加到列表的代码如下:
The code that iterates through each blob file, reads and adds to a list is a follows:
data = BlockBlobService(account_name='accname', account_key='key')
generator = data.list_blobs('collection')
dataloaded = []
for blob in generator:
loader = data.get_blob_to_text('collection',blob.name)
trackerstatusobjects = loader.content.split('\n')
for trackerstatusobject in trackerstatusobjects:
dataloaded.append(json.loads(trackerstatusobject))
由此,您可以添加到数据框并执行您想做的任何事情:)希望这对有人遇到类似问题的人有所帮助.
From this you can add to a dataframe and do what ever you want :)Hope this helps if someone stumbles upon a similar problem.
这篇关于使用SDK在Azure函数中将Azure Blob存储到JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!