本文介绍了使用 nifi 使用 invokehttp 将多个文件发布到 API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 3 个文件需要发布到 API 端.我正在使用 FetchHDFS 进程获取 3 个文件,我想将它们传递给 API.我怎样才能通过它们.

I am having 3 files which i need to post to API end. I am fetching 3 files using FetchHDFS process and i want to pass them to API. How can i pass them.

输入:

3 files in HDFS
Content-Type: multipart/form-data

错误:

invokehttp.response.body
{ "message": "Multipart: Boundary not found (user: 'undefined')", "level": "error", "timestamp": "2019-12-11T09:59:05.464Z" }

流程尝试:

inputPort --> 3 FetchHDFS process to fetch 3 different file --> invokeHttps

curl 命令示例:

curl -X POST "https://xxxxxx/xxxxx" -H "accept: application/json" -H "Content-Type: multipart/form-data" -F "[email protected];type=application/vnd.ms-excel" -F "[email protected];type=text/plain" -F "[email protected];type=application/vnd.ms-excel" -F "format=flat"

推荐答案

想法:在从多个流文件构建多部分之前,您需要将它们合并为一个.

The idea: Before building multipart from several flow files you need to merge them into one.

为此使用 MergeContent 处理器和 Merge Format = TAR.

For this use the MergeContent processor with Merge Format = TAR.

然后使用ExecuteGroovyScriptTAR转换为multipart.

@Grab(group='org.apache.httpcomponents', module='httpmime', version='4.5.9')
@Grab(group='org.apache.commons', module='commons-compress', version='1.19')
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.entity.ContentType

def ff = session.get()
if(!ff)return

//delegate inputstream class to be able to set the `delegate` later
@groovy.transform.CompileStatic
class TarContentInputStream extends InputStream{
    @Delegate TarArchiveInputStream delegate
    @Override void close(){
        println "--------- try to close"
        if(!delegate.getNextTarEntry())delegate.close()
    }
}

def multipart = MultipartEntityBuilder.create()
def tarContent = new TarContentInputStream()

//iterate through TAR entries and build multipart
def tarInput=new TarArchiveInputStream(ff.read())
def tarEntry = tarInput.getNextTarEntry()
while (tarEntry != null) {
    //reference tarContent to be used as body
    multipart.addBinaryBody( tarEntry.getName(), tarContent, ContentType.DEFAULT_BINARY, tarEntry.getName() )
    tarEntry = tarInput.getNextTarEntry()
}
tarInput.close()
//write multipart content
ff.write{streamIn, streamOut->
    //set real input stream to be used as tar content
    tarContent.delegate = new TarArchiveInputStream(streamIn)
    assert tarContent.delegate.getNextTarEntry() //move to first entry
    multipart = multipart.build()
    multipart.writeTo(streamOut)
}

ff."mime.type" = multipart.getContentType().getValue()
ff.filename = ff.filename+".multipart"

REL_SUCCESS << ff

注意:

对于组合成 tar 的 3 个测试文件,上面的代码产生如下结果:

for 3 test files combined into tar the code above produces something like:

--boundary
Content-Disposition: form-data; name="file1.txt"; filename="file1.txt"
Content-Type: application/octet-stream
Content-Transfer-Encoding: binary

file1 content
--boundary
Content-Disposition: form-data; name="file2.txt"; filename="file2.txt"
Content-Type: application/octet-stream
Content-Transfer-Encoding: binary

file2 content
--boundary
Content-Disposition: form-data; name="file3.txt"; filename="file3.txt"
Content-Type: application/octet-stream
Content-Transfer-Encoding: binary

file3 content
--boundary--

目前代码扫描输入流两次:第一次 - 扫描 tar 条目,第二次 - 构建内容.

Currently code scans input stream two times: 1st - to scan tar entries, 2nd - to build content.

我认为可以重写代码以一次性将 tar 转换为多部分...

I think it's possible to rewrite the code to convert tar to multipart in one shot...

这篇关于使用 nifi 使用 invokehttp 将多个文件发布到 API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-01 03:55