问题描述
我有两个处理器的流程:QueryDataBaseTable
->PutDatabaseRecord
.
通过AvroReader
从Oracle
到Postgresql
.
这是来自 Oracle
的查询结果:
nmcl_id|assortment_id|nname |aname |modify_date |load_date|-------+-------------+-----------------------------------------------------------------------------+------------------------------------------------------------------------------+-------------------+----------+1|7|ДЛЯ РОЗНИЦЫ: ПАКЕТ-МАЙКА МАЛЫЙ 30+16Х50 НА КАССЫ ПЛАТНЫЙ |без ассорт.|2021-06-25 10:54:10||3|7|ДЛЯ РОЗНИЦЫ:ПАКЕТ-МАЙКА БОЛЬШОЙ 46+22Х60 (ЛОГОТИП "СЕТЬ МАКСИото;)с |берзас|2021-06-25 10:54:10||78|7|КАКАО-НАПИТОКMIX FIX"375ГР ПЛАСТИК (6811) |без ассорт.|2021-06-25 10:54:10||
这是Postgres
端的DML
:
创建表 src.task_meta_data (序列号,nmcl_id int 非空,assortment_id int 非空,nname 字符变化(100)非空,名称字符变化(100)非空,modify_date 时间戳非空,load_date 时间戳 NULL);仅更改表 src.task_meta_data添加约束 pk_task_meta_data PRIMARY KEY (id);
这是源码的配置:
这是目标的配置:
这是一个错误:
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |2021-06-30 21:50:12,937 错误 [Timer-Driven Process Thread-8] oanpstandard.PutDatabaseRecord PutDatabaseRecord[id=5de90010-017a-1000-94c8-eb00fa683473] 无法将标准数据库放入标准库 783473RecordFiles[Timer-Driven Process Thread-8]-464b-4162-ab52-49451ee94990,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1625074391585-2, container=default, section=2], offset=0, length=2951648,name=878ace-4162-ab52-49451ee94990,大小=13864].路由失败.:java.lang.NumberFormatException:对于输入字符串:КРАБОВЫЕ ПАЛОЧКИ (ВЕСОВЫЕ) !";nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |java.lang.NumberFormatException: 对于输入字符串:КРАБОВЫЕ ПАЛОЧКИ (ВЕСОВЫЕ) !";nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.lang.Integer.parseInt(Integer.java:580)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.lang.Integer.parseInt(Integer.java:615)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.serialization.record.util.DataTypeUtils.toInteger(DataTypeUtils.java:1594)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:200)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:153)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:149)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.processors.standard.PutDatabaseRecord.executeDML(PutDatabaseRecord.java:709)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.processors.standard.PutDatabaseRecord.putToDatabase(PutDatabaseRecord.java:841)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.processors.standard.PutDatabaseRecord.onTrigger(PutDatabaseRecord.java:487)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)nifi_ml_nifi.1.187vh8egkv5o@KoshDomain |在 java.lang.Thread.run(Thread.java:748)
OK,所以看来 Nifi
中存在一个错误,因此它检索列类型以进行插入而不是按字段名在文件,但按列顺序.
所以它试图将 nname
列的值转换为 int
,因此它是 assortment_id
的类型,因为从 id 列出现了移位.
我的意思是,我们在 FlowFile 中有这样一个列顺序 id DB 和字段顺序
DB: id, nmcl_id, assortment_id, nname, aname, modify_date, load_dateFF: nmcl_id, assortment_id, nname, aname, modify_date, load_date
所以我删除了 id
列并将 nmcl_id
和 assortment_id
合并为 primary key
.>
I have flow with two processors: QueryDataBaseTable
-> PutDatabaseRecord
.
From Oracle
to Postgresql
via AvroReader
.
This is result of the query from Oracle
:
nmcl_id|assortment_id|nname |aname |modify_date |load_date|
-------+-------------+----------------------------------------------------------------------------+------------------------------------------------------------------------------+-------------------+---------+
1| 7|ДЛЯ РОЗНИЦЫ: ПАКЕТ-МАЙКА МАЛЫЙ 30+16Х50 НА КАССЫ ПЛАТНЫЙ |без ассорт. |2021-06-25 10:54:10| |
3| 7|ДЛЯ РОЗНИЦЫ:ПАКЕТ-МАЙКА БОЛЬШОЙ 46+22Х60 (ЛОГОТИП "СЕТЬ МАКСИ") |без ассорт. |2021-06-25 10:54:10| |
78| 7|КАКАО-НАПИТОК "MIX FIX" 375ГР ПЛАСТИК (6811) |без ассорт. |2021-06-25 10:54:10| |
This is DML
in Postgres
side:
CREATE TABLE src.task_meta_data (
id SERIAL,
nmcl_id int NOT NULL,
assortment_id int NOT NULL,
nname character varying(100) NOT NULL,
aname character varying(100) NOT NULL,
modify_date timestamp NOT NULL,
load_date timestamp NULL
);
ALTER TABLE ONLY src.task_meta_data
ADD CONSTRAINT pk_task_meta_data PRIMARY KEY (id);
This is config of source:
This is config of target:
And this is an error:
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | 2021-06-30 21:50:12,937 ERROR [Timer-Driven Process Thread-8] o.a.n.p.standard.PutDatabaseRecord PutDatabaseRecord[id=5de90010-017a-1000-94c8-eb00fa683473] Failed to put Records to database for StandardFlowFileRecord[uuid=778ace8f-464b-4162-ab52-49451ee94990,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1625074391585-2, container=default, section=2], offset=0, length=2951648],offset=0,name=778ace8f-464b-4162-ab52-49451ee94990,size=13864]. Routing to failure.: java.lang.NumberFormatException: For input string: "КРАБОВЫЕ ПАЛОЧКИ (ВЕСОВЫЕ) !"
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | java.lang.NumberFormatException: For input string: "КРАБОВЫЕ ПАЛОЧКИ (ВЕСОВЫЕ) !"
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.lang.Integer.parseInt(Integer.java:580)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.lang.Integer.parseInt(Integer.java:615)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.serialization.record.util.DataTypeUtils.toInteger(DataTypeUtils.java:1594)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:200)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:153)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:149)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.executeDML(PutDatabaseRecord.java:709)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.putToDatabase(PutDatabaseRecord.java:841)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.processors.standard.PutDatabaseRecord.onTrigger(PutDatabaseRecord.java:487)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
nifi_ml_nifi.1.187vh8egkv5o@KoshDomain | at java.lang.Thread.run(Thread.java:748)
OK, so it seams that there is a bug in Nifi
, thus it retrieves column types for insert not by field names in the file, but by the column order.
So it tried to convert nname
column's value to int
, thus it was type of assortment_id
taken because of shift appeared from id column.
I mean, we have such a column order id DB and fields order in FlowFile
DB: id, nmcl_id, assortment_id, nname, aname, modify_date, load_date
FF: nmcl_id, assortment_id, nname, aname, modify_date, load_date
So I just removed id
column and united nmcl_id
and assortment_id
to be a primary key
.
这篇关于Nifi中unicode字符串的NumberFormatException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!