我正在研究使用BucketingSink类写入hdfs文件的POC。即使数据正在写入hdfs文件,但文件在hdfs上都带有“.pending”。
下面是我正在使用的代码。有人可以帮助我确定问题并帮助我解决此问题吗?
BucketingSink<String> HdfsSink = new BucketingSink<String>("hdfs://xxxx/xxxx/xxxx/Test/");
HdfsSink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
HdfsSink.setBatchSize(1024 * 1024 * 2); // this is 2 MB,
HdfsSink.setInactiveBucketCheckInterval(10000L);
HdfsSink.setInactiveBucketThreshold(10000L);
最佳答案
您好,您可以使用此功能。
嗨,未完成的存储分区具有.pending扩展名。关闭存储桶后(例如,用于时间存储,时间结束后),文件将被重命名。由于您使用的是NonRollingBucketer,因此这些文件将永远不会关闭。我建议您使用DateTimeBucketer。
附带说明:我建议您稍微增加检查点间隔。 123毫秒非常频繁,应用程序看起来对延迟的要求不高。像2000毫秒这样的值可能更合适。