问题描述
我在NiFi中有一个管道,该管道可以提取一些我需要清除的无效JSON.我认为最好的解决方案是通过 ExecuteStreamCommand
运行Python脚本,并一口气将其清理/拆分.但是,即使我在for循环中使用了 sys.stdout.write()
,在NiFi的输出流中也只有原始的JSON输出.
I have a pipeline in NiFi that pulls down some invalid JSON that I need to clean up. The best solution I've concocted is to run a Python script via ExecuteStreamCommand
and simultaneously clean/split it up in one fell swoop. However, even though I use sys.stdout.write()
in my for loop, only the original JSON comes out in the output stream in NiFi.
我误用了 sys.stdout.write()
还是有可能,但是我做错了什么?我的最终目标是使json的每一行成为一个新的流文件,即文件1是 {"fruit":"apple",...
,文件2是 {"fruit:"樱桃,...
等.
Am I misusing sys.stdout.write()
or is this possible, but I've just done something wrong? My end goal is for each line of the json to be a new flow file, i.e. file 1 is {"fruit":"apple",...
, file 2 is {"fruit":"cherry",...
, and so on.
示例JSON
{"fruit":"apple", "vegetable":"celery", "location":{"country":"nor\\way", "city":"oslo", }, "color":"blue"}
{"fruit":"cherry", "vegetable":"kale", "location":{"country":"france", "city":"calais", }, "color":"green"}
{"fruit":"peach", "vegetable":"peas", "location":{"country":"united\\kingdom", "city":"london", }, "color":"yellow"}
脚本
import json
import re
import sys
flow_file = sys.stdin.read()
try:
load = json.loads(flow_file)
sys.stdout.write(flow_file)
except:
flow_file_esc = re.sub(r"[(\\)]", "", flow_file)
for f in flow_file_esc.splitlines():
sys.stdout.write(str(f))
推荐答案
是否可以先使用ReplaceText清理文件,然后使用SplitJson,SplitRecord或ForkRecord对其进行拆分?
Can you clean the file first with ReplaceText and then split it with SplitJson, SplitRecord, or ForkRecord?
如果您需要结合这两个操作并编写脚本,可以尝试使用Jython的ExecuteScript(因为它看起来不像您在使用本机CPython库),因此我在我的食谱和我的博客.
If you need to combine the two operations and want to script it, you could try ExecuteScript with Jython (since it doesn't look like you're using native CPython libraries), I have some simple examples in my cookbook and my blog.
这篇关于使用sys.stdout.write()在NiFi中创建多个文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!