我编写了一个Spark应用程序,该应用程序稍后会通过LoadIncrementalHFiles
命令生成用于批量加载的HFile。由于源数据池非常大,因此将输入文件拆分为多个迭代,然后逐个进行处理。每次迭代都会创建自己的HFile
目录,因此我的HDFS结构如下所示:
/user/myuser/map_data/hfiles_0
... /hfiles_1
... /hfiles_2
... /hfiles_3
...
该
map_data
目录中大约有500个文件,因此,我正在寻找一种自动调用LoadIncrementalHFiles
函数的方法,以便稍后在迭代中也处理这些子目录。相应的命令是这样的:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable
我需要将其更改为迭代命令,因为该命令不适用于子目录(当我使用
/user/myuser/map_data
目录调用它时)!我尝试使用Java
Process
实例自动执行上面的命令,但这没有执行任何操作(没有输出到控制台,也没有HBase表中的更多行)。在我的代码中使用
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
Java类也不起作用,它也没有响应!有没有人对我有用?还是有一个参数可以在父目录上运行上述
hbase
命令?我正在Hortonworks Data Platform 2.5集群中使用HBase 1.1.2。编辑我试图从Hadoop客户端Java应用程序运行
LoadIncrementalHFiles
命令,但是我遇到了与快速压缩有关的异常,请参阅Run LoadIncrementalHFiles from Java client 最佳答案
解决方案是将hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable
命令分成许多部分(每个命令部分一个),请参见以下Java代码段:
TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_PATH), hadoopConf);
for(String hFileDir : subDirs) {
try {
String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir;
==> String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, hbaseTableName};
ProcessBuilder pb = new ProcessBuilder(execCode);
pb.redirectErrorStream(true);
final Process p = pb.start();
// Write the output of the Process to the console
new Thread(new Runnable() {
public void run() {
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = null;
try {
while ((line = input.readLine()) != null)
System.out.println(line);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
// Wait for the end of the execution
p.waitFor();
...
}
关于hadoop - 使用LoadIncrementalHFiles和子目录进行批量加载,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46404238/