我试图将原始流文件作为输入发送到下一个处理器,但最终收到错误消息,我对Nifi还是陌生的,并且对Java有一定的经验。
public void onTrigger(ProessorContext context, ProcessSessionsession) throws ProceException{
FlowFile flowfile = session.get();
if(flowfile == null){
return;
}
ArrayList<String> headData = new ArrayList<String>();
try{
session.read(flowfile, new InputStreamCallback(){
final DBCPService= context.getProperty(CONNECTION_POOL).asContollerService(DBCPService.class);
String query = " CREATE TABLE MODEL (";
@SuppressWarnings("deprecation")
public void process(InputStream inputStream) throws IOException {
try{
OPCPackage pkg = OPCPackage.open(inputStream);
XSSFWorkbook workbook = new XSSFWorkbook(pkg);
workbook.getAllNames();
String dateheader = "date"
XSSFSheet sheetName = workbook.getSheet(0);
Row row = sheetName.getRow(0);
for(Cell cell: row) {
switch (cell.getCellType()){
case NUMERIC:
if(HSSDataUtil.isCellDateFromated(cell)){
DataFormatter dataFromatter = new DataFormatter();
headData.add(dataFromatter.formatCellValue(cell);
query +=dataFromatter.formatCellValue(cell)+" " + "INT" ;
}else{
headData.add(String.valueOf(cell.getNumericCellValue()));
}
break;
case STRING:
headData.add(cell.getStringCellValue());
if(cell.getStringCellValue().toLowerCase().contains(dateheader))
query += cell.getStringCellValue() + " " + "TIMESTAMP,";
else
query +=cell.getStringCellValue() + " + "VARCHAR(50),";
break;
case BOOLEAN:
headData.add(String.valueOf(cell.getBooleanCellValue());
break;
default:
headData.add("");
break;
}
}
query = query.substring(0, query.length() -1);
query += ")";
workbook.close();
final Connection con = DBCPService.getConnection();
try{
java.sql.PreparedStatement = con.prepareStatement(query);
PreparedStatement.execute();
con.commit();
session.transfer(flowfile, REL_SUCCESS);
}catch (SQL Exception e){
e.printStackTrace();
session.transfer(flowfile, REL_FAILURE);
}
}catch(InvalidFromatException ife){
getLogger().error(" only .xlsx excel files are supprted", ife);
thrownew UnsupportedOperationException("Only .xlsx OOXML files are substring", ife);
}
}
});
{catch (RuntimeException ex) {
getLogger().error("Failed to process incoming Excel document. " + ex.getMessage(), ex);
FlowFile failedFlowFile = session.putAttribute(flowfile, testxlsqlProcessor.class.getMessage());
}
final StringBuilder stringBuilder = new StringBuilder();
flowfile = session.write(flowfile, new StreamCallback(){
public void process(InputStream in, OutputStream out) throws IOException{
stringBuilder.append(IOUtils.copy(in,out));
}
});
}
}
如果不添加输出流,则会得到未指定的异常转移关系。
最佳答案
您将不希望从InputStreamCallback内部传输流文件,这应该在您完成从流文件的读取之后发生。如果您不更改传出流文件的内容,那么也不需要最后的StreamCallback和IOUtils.copy()东西,您只需传输原始流文件即可。对于失败情况,您可以引发IOException,将实际异常包装在InputStreamCallback中,将其捕获到外部,然后将原始流文件传输到失败。如果没有异常,则可以将原始流文件传输成功。
关于java - Nifi自定义处理器显示错误“无法分配本地变量流文件”,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61232174/