我是NiFi的新手,我正在开发定制处理器以从psql数据库视图中提取最新数据。初始化自定义处理器后,可以使用以下代码检索数据库视图。
private void GetData(){
Connection connection = DriverManager.getConnection("jdbc:postgresql://example:5432/example", "user", "pass");
Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = statement.executeQuery("SELECT * FROM Example_Table");
while(rs.next()){
//Get data from database
}
connection.close();
}
但是,我正在努力从数据库视图中获取最新更新。主要问题是何时将新条目添加到数据库中。由于初始化处理器时会查询数据库,因此自定义处理器将没有新条目。
我试图在public void onTrigger()函数中实现查询;但是,这将导致管道备份,因为它将查询每个流文件上的数据库(如果每秒有成千上万个流文件进入,则不理想)。
在处理器启动时是否有查询数据库的方法?无需在每个flowfile上查询数据库?或者,是否可以检测数据库是否已被修改并在修改时提取数据?甚至设置计时器以将数据库拉入定制处理器中?
非常感谢您的协助,在此先感谢您。
最佳答案
我认为,如果您可以对高级用例进行更多说明,那么它可能会帮助您找到解决方案,因为这似乎是一种不常见的方法。通常,每个处理器都有一个责任,因此某些处理器与数据库交互,然后输出必要的信息供其他用户使用。
有些LookupService
可能是一个很好的示例,例如MongoDBLookupService
。
如果您的用例实际上是“我有一个定制处理器,该处理器提取包含任意数据的流文件,并且需要使用此数据库表中的最新数据对其执行某些操作”,那么您可以选择以下几种方法:
使用上面的方法执行数据库查询,并在onEnabled()
期间调用该方法一次,以从表中获取大部分数据,然后使用线程以固定的时间间隔调用它,以保持更新并将结果本地存储在领域。当onTrigger()
方法运行时,请使用本地缓存结果,而不是进行数据库调用。这将减少延迟,并为您提供近乎实时的数据。确保通过带有@OnStopped
批注的方法清理线程运行器和本地状态。
在流文件处理(即onTrigger()
)中内联执行数据库查询。这可能导致高延迟并阻止吞吐量。如果能够使用List<FlowFile> flowfiles = session.get(1000);
进行批量处理,则可以潜在地增加每个执行周期中处理的流文件的数量(该数量是可配置的)。
如果没有upserts / in-place修改(即,对数据库表的任何更改都将导致新的行),则可以使用哨兵查询(SELECT COUNT(*) FROM table;
)返回行数,并将其与行数进行比较以前返回的数据,并且如果这些数字不同,则仅执行“昂贵”查询以检索所有数据。在这种情况下,您可以通过记录先前获得的行的最大ID或时间戳来仅检索增量行。如果可以进行更新,则SELECT MAX(lastModified) AS mostRecentTimeModified FROM table;
之类的方法可能会有所帮助。
关于java - NiFi自定义处理器-读取数据库 View ,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56502241/