问题描述
我有一个看起来像这样的csv.
I have a csv which looks like this.
Jc,TXF,timer,alpha,beta
15,44,55,12,33
18,87,33,111
9,87,61,29,77
Alpha和Beta共同组成了城市代码.我想将城市名称添加到csv中作为新列.
Alpha and Beta combined makes up a city code. I want to add the name of the city to the csv as a new column.
Jc,TXF,timer,alpha,beta,city
15,44,55,12,33,York
18,87,33,111,London
9,87,61,29,77,Sydney
我还有一个csv,只包含列alpha,beta,city
.看起来像这样:
I have another csv with only the columns alpha,beta,city
. Which looks like this:
alpha,beta,city
12,33,York
33,111,London
29,77,Sydney
如何使用Apache NiFi实现此目标.请提出实现这一目标所需的处理器和工作流程.
How can I achieve this using Apache NiFi. Please suggest the processors and workflow needed to be used to achieve this.
推荐答案
我看到了两种解决方法.
I see two ways of solving this.
首先使用CsvLookupService
.但是CsvLookupService
仅支持一个键,但是您有两个键,即alpha和beta.因此,要使用此解决方案,您必须将两个键都连接到一个键中,例如12_33.
First by using CsvLookupService
. However the CsvLookupService
only supports a single key, but you have two, alpha and beta. So to use this solution you have to concatenate both keys into a single key, like 12_33.
其次,使用ExecuteScript
处理器.这是更好的选择,因为您不必修改源数据.策略:
Second by using ExecuteScript
processor. This one is better, because you don't have to modify your source data. Strategy:
- 将CSV文本分成几行
- 通过在映射文件中查找alpha和beta键来丰富城市列中的每一行
- 将各行合并到一个CSV文件中.
总体流量:
GenerateFlowFile:
GenerateFlowFile:
SplitText:
SplitText:
将header line count
设置为1,以将标题行包含在拆分内容中.对于ExecuteScript
处理器,将python设置为scripting engine
并提供以下script body
:
Set header line count
to 1 to include the header line in the split content. For the ExecuteScript
processor set python as scripting engine
and provide following script body
:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import csv
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
# fetch the mapping CSV file
with open('/home/nifi/mapping.csv', 'r') as mapping:
# read the mapping file
mappingContent = csv.reader(mapping, delimiter=',')
# flowfile content is CSV text with two lines, header and actual content
# split by newline to get access to each inidvidual line
lines = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split('\n')
# the result will contain the header line
# the result will have the additional city column
result = lines[0] + ',city\n'
# take the second line and split it
# to get access to alpha, beta and city values
lineSplit = lines[1].split(',')
# Go through the mapping file
# item[0] -> alpha
# item[1] -> beta
# item[2] -> city
# See if you find alpha and beta on the line content
for item in mappingContent:
if item[0] == lineSplit[3] and item[1] == lineSplit[4]:
result += lines[1] + ',' + item[2]
break
if result is None:
raise Exception('No matching found.')
else:
outputStream.write(bytearray(result.encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
try:
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
except Exception as e:
session.transfer(flowFile, REL_FAILURE)
有关脚本的详细说明,请参见注释. /home/nifi/mapping.csv
必须在您的NiFi实例上可用.如果要了解有关ExecuteScript
处理器的更多信息,请参考 ExecuteScript Cookbook .最后,将所有行合并到一个CSV文件中:
See comments for a detailed description of the script. /home/nifi/mapping.csv
has to be available on your NiFi instance. If you want to learn more about the ExecuteScript
processor, refer to the ExecuteScript Cookbook. Finally you merge all the lines into a single CSV file:
设置CSV读取器和写入器.保留其默认属性.调整MergeContent
属性,以控制每个结果CSV文件中应有多少行.结果:
Set CSV reader and writer. Leave their default properties. Adjust MergeContent
properties to control how many lines should be in each resulting CSV file. Result:
这篇关于Apache NiFi:如何比较csv中的多行并创建新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!