我试图将原始流文件作为输入发送到下一个处理器,但最终收到错误消息,我对Nifi还是陌生的,并且对Java有一定的经验。

public void onTrigger(ProessorContext context, ProcessSessionsession) throws ProceException{
   FlowFile flowfile = session.get();
   if(flowfile == null){
        return;
    }

    ArrayList<String> headData = new ArrayList<String>();
    try{
        session.read(flowfile, new InputStreamCallback(){

        final DBCPService= context.getProperty(CONNECTION_POOL).asContollerService(DBCPService.class);
        String query = " CREATE TABLE MODEL (";
        @SuppressWarnings("deprecation")
        public void process(InputStream inputStream) throws IOException {
        try{
            OPCPackage pkg = OPCPackage.open(inputStream);
            XSSFWorkbook workbook = new XSSFWorkbook(pkg);
            workbook.getAllNames();
            String dateheader = "date"
            XSSFSheet sheetName = workbook.getSheet(0);
            Row row = sheetName.getRow(0);
            for(Cell cell: row) {
                switch (cell.getCellType()){
                case NUMERIC:
                    if(HSSDataUtil.isCellDateFromated(cell)){
                        DataFormatter dataFromatter = new DataFormatter();
                        headData.add(dataFromatter.formatCellValue(cell);
                        query +=dataFromatter.formatCellValue(cell)+" " + "INT" ;
                    }else{
                        headData.add(String.valueOf(cell.getNumericCellValue()));
                    }
                    break;
                    case STRING:
                        headData.add(cell.getStringCellValue());
                        if(cell.getStringCellValue().toLowerCase().contains(dateheader))
                            query += cell.getStringCellValue() + " " + "TIMESTAMP,";
                        else
                            query +=cell.getStringCellValue() + " + "VARCHAR(50),";
                            break;
                    case BOOLEAN:
                        headData.add(String.valueOf(cell.getBooleanCellValue());
                        break;
                    default:
                        headData.add("");
                        break;
                        }
                    }
                    query = query.substring(0, query.length() -1);
                    query += ")";
                    workbook.close();
                    final Connection con = DBCPService.getConnection();
                    try{
                        java.sql.PreparedStatement = con.prepareStatement(query);
                        PreparedStatement.execute();
                        con.commit();
                        session.transfer(flowfile, REL_SUCCESS);
                    }catch (SQL Exception e){
                        e.printStackTrace();
                        session.transfer(flowfile, REL_FAILURE);
                    }
                }catch(InvalidFromatException ife){
                    getLogger().error(" only .xlsx excel files are supprted", ife);
                    thrownew UnsupportedOperationException("Only .xlsx OOXML files are substring", ife);
                }
            }

        });
        {catch (RuntimeException ex) {
            getLogger().error("Failed to process incoming Excel document. " + ex.getMessage(), ex);
            FlowFile failedFlowFile = session.putAttribute(flowfile, testxlsqlProcessor.class.getMessage());
        }
        final StringBuilder stringBuilder = new StringBuilder();
        flowfile = session.write(flowfile, new StreamCallback(){
        public void process(InputStream in, OutputStream out) throws IOException{
            stringBuilder.append(IOUtils.copy(in,out));
        }
    });

    }
}


如果不添加输出流,则会得到未指定的异常转移关系。

最佳答案

您将不希望从InputStreamCallback内部传输流文件,这应该在您完成从流文件的读取之后发生。如果您不更改传出流文件的内容,那么也不需要最后的StreamCallback和IOUtils.copy()东西,您只需传输原始流文件即可。对于失败情况,您可以引发IOException,将实际异常包装在InputStreamCallback中,将其捕获到外部,然后将原始流文件传输到失败。如果没有异常,则可以将原始流文件传输成功。

关于java - Nifi自定义处理器显示错误“无法分配本地变量流文件”,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61232174/

10-12 18:07