问题描述
我有 3 个文件需要发布到 API 端.我正在使用 FetchHDFS 进程获取 3 个文件,我想将它们传递给 API.我怎样才能通过它们.
I am having 3 files which i need to post to API end. I am fetching 3 files using FetchHDFS process and i want to pass them to API. How can i pass them.
输入:
3 files in HDFS
Content-Type: multipart/form-data
错误:
invokehttp.response.body
{ "message": "Multipart: Boundary not found (user: 'undefined')", "level": "error", "timestamp": "2019-12-11T09:59:05.464Z" }
流程尝试:
inputPort --> 3 FetchHDFS process to fetch 3 different file --> invokeHttps
curl 命令示例:
curl -X POST "https://xxxxxx/xxxxx" -H "accept: application/json" -H "Content-Type: multipart/form-data" -F "[email protected];type=application/vnd.ms-excel" -F "[email protected];type=text/plain" -F "[email protected];type=application/vnd.ms-excel" -F "format=flat"
推荐答案
想法:在从多个流文件构建多部分之前,您需要将它们合并为一个.
The idea: Before building multipart from several flow files you need to merge them into one.
为此使用 MergeContent
处理器和 Merge Format = TAR
.
For this use the MergeContent
processor with Merge Format = TAR
.
然后使用ExecuteGroovyScript
将TAR
转换为multipart
.
@Grab(group='org.apache.httpcomponents', module='httpmime', version='4.5.9')
@Grab(group='org.apache.commons', module='commons-compress', version='1.19')
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.entity.ContentType
def ff = session.get()
if(!ff)return
//delegate inputstream class to be able to set the `delegate` later
@groovy.transform.CompileStatic
class TarContentInputStream extends InputStream{
@Delegate TarArchiveInputStream delegate
@Override void close(){
println "--------- try to close"
if(!delegate.getNextTarEntry())delegate.close()
}
}
def multipart = MultipartEntityBuilder.create()
def tarContent = new TarContentInputStream()
//iterate through TAR entries and build multipart
def tarInput=new TarArchiveInputStream(ff.read())
def tarEntry = tarInput.getNextTarEntry()
while (tarEntry != null) {
//reference tarContent to be used as body
multipart.addBinaryBody( tarEntry.getName(), tarContent, ContentType.DEFAULT_BINARY, tarEntry.getName() )
tarEntry = tarInput.getNextTarEntry()
}
tarInput.close()
//write multipart content
ff.write{streamIn, streamOut->
//set real input stream to be used as tar content
tarContent.delegate = new TarArchiveInputStream(streamIn)
assert tarContent.delegate.getNextTarEntry() //move to first entry
multipart = multipart.build()
multipart.writeTo(streamOut)
}
ff."mime.type" = multipart.getContentType().getValue()
ff.filename = ff.filename+".multipart"
REL_SUCCESS << ff
注意:
对于组合成 tar
的 3 个测试文件,上面的代码产生如下结果:
for 3 test files combined into tar
the code above produces something like:
--boundary
Content-Disposition: form-data; name="file1.txt"; filename="file1.txt"
Content-Type: application/octet-stream
Content-Transfer-Encoding: binary
file1 content
--boundary
Content-Disposition: form-data; name="file2.txt"; filename="file2.txt"
Content-Type: application/octet-stream
Content-Transfer-Encoding: binary
file2 content
--boundary
Content-Disposition: form-data; name="file3.txt"; filename="file3.txt"
Content-Type: application/octet-stream
Content-Transfer-Encoding: binary
file3 content
--boundary--
目前代码扫描输入流两次:第一次 - 扫描 tar 条目,第二次 - 构建内容.
Currently code scans input stream two times: 1st - to scan tar entries, 2nd - to build content.
我认为可以重写代码以一次性将 tar 转换为多部分...
I think it's possible to rewrite the code to convert tar to multipart in one shot...
这篇关于使用 nifi 使用 invokehttp 将多个文件发布到 API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!