我正在尝试编写一个允许我加载Json(从 flex 搜索中提取并转储到hdfs上)的Pig脚本。

我已经为此苦苦挣扎了好几天,也许有人可以给我一些我遇到的问题的见解。

这是我写的一个快速的Pig脚本,是从hbase读取的,可以任意修改数据,然后将其存储回hbase(以确保一切正常)

REGISTER hbase-common-1.1.1.jar
REGISTER /tmp/udfs/json-simple-1.1.1.jar
REGISTER /tmp/udfs/elephant-bird-hadoop-compat-4.9.jar
REGISTER /tmp/udfs/elephant-bird-pig-4.9.jar
REGISTER /user/hdfs/share/libs/guava-11.0.jar
REGISTER /user/hdfs/share/libs/zookeeper-3.4.6.2.2.4.2-2.jar

set hbase.zookeeper.quorum 'list of servers';

raw = LOAD 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:a', '-loadKey true -limit 5') AS (id:bytearray, a:chararray);
keys = FOREACH raw GENERATE id, CONCAT(a, '1');

keys = LIMIT keys 1;

STORE keys INTO 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:id esinfo:a');

运行此脚本的结果是,可以从hbase中读取数据并将其存储回hbase中,并且数据可以完美运行。

然后,我尝试修改脚本以现在从json文件而不是从Hbase加载数据。
REGISTER hbase-common-1.1.1.jar
REGISTER /tmp/udfs/json-simple-1.1.1.jar
REGISTER /tmp/udfs/elephant-bird-hadoop-compat-4.9.jar
REGISTER /tmp/udfs/elephant-bird-pig-4.9.jar
REGISTER /user/hdfs/share/libs/guava-11.0.jar
REGISTER /user/hdfs/share/libs/zookeeper-3.4.6.2.2.4.2-2.jar

set hbase.zookeeper.quorum 'list of servers';

raw_data = LOAD '/user/hdfs/input/EsImports/2014-04-22.json' using com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]);
keys = FOREACH raw_data GENERATE
    json#'sid' as id:bytearray,
    json#'userAgent' as a:chararray;

limit_keys = LIMIT keys 1;

STORE limit_keys INTO 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:id esinfo:a');

这是一个失败的脚本,我觉得它与正在加载的数据的架构有关,但是当我对数据执行描述和转储时,它们似乎都具有完全相同的结构

此外,脚本失败时出现的错误如下



完整的错误日志
Log Type: syslog
Log Upload Time: Mon Aug 24 13:28:43 +0200 2015
Log Length: 4121
2015-08-24 13:28:35,504 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1439978375936_0238_000001
2015-08-24 13:28:35,910 WARN [main] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-08-24 13:28:35,921 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Executing with tokens:
2015-08-24 13:28:35,921 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: YARN_AM_RM_TOKEN, Service: , Ident: (appAttemptId { application_id { id: 238 cluster_timestamp: 1439978375936 } attemptId: 1 } keyId: 176959833)
2015-08-24 13:28:36,056 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: mapreduce.job, Service: job_1439978375936_0236, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@331fef77)
2015-08-24 13:28:36,057 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: RM_DELEGATION_TOKEN, Service: {ip removed}, Ident: (owner=darryn, renewer=mr token, realUser=hcat, issueDate=1440415651774, maxDate=1441020451774, sequenceNumber=176, masterKeyId=149)
2015-08-24 13:28:36,070 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Using mapred newApiCommitter.
2015-08-24 13:28:36,699 WARN [main] org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2015-08-24 13:28:36,804 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: OutputCommitter set in config null
2015-08-24 13:28:36,950 FATAL [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Error starting MRAppMaster
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/mapreduce/TableInputFormat
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:657)
    at org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:726)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore.getStoreFunc(POStore.java:251)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.getCommitters(PigOutputCommitter.java:88)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.<init>(PigOutputCommitter.java:71)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat.getOutputCommitter(PigOutputFormat.java:289)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$1.call(MRAppMaster.java:470)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$1.call(MRAppMaster.java:452)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.callWithJobClassLoader(MRAppMaster.java:1541)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.createOutputCommitter(MRAppMaster.java:452)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.serviceInit(MRAppMaster.java:371)
    at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster$4.run(MRAppMaster.java:1499)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.initAndStartAppMaster(MRAppMaster.java:1496)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.main(MRAppMaster.java:1429)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.TableInputFormat
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 20 more
2015-08-24 13:28:36,954 INFO [main] org.apache.hadoop.util.ExitUtil: Exiting with status 1

编辑:

因此,我注意到一些有趣的行为,如果我使用PigStorage将存储的数据保存到ailias中并指定-schema选项,然后在单独的脚本中将该文件备份(仍然使用PigStorage),然后我可以直接将其插入HBase,这导致我怀疑它与架构的存储方式有关

最佳答案

因此,我最终使用的解决方案绝非最佳方案,但效果很好。

从json文件中读取数据并生成架构后,您要做的就是使用Pig存储将其保存回文件,然后再读回该文件。

fs -rm -r /tmp/estest2
Store test into '/tmp/estest2' USING PigStorage('\t', '-schema');

processed_data = LOAD '/tmp/estest2' USING PigStorage('\t');

EXEC; //Used to sync the script and allow it to finish up until this point

我怀疑正在发生的事情是HbaseStorage误解了JsonLoader使用的象鸟类型,但它确实了解PigStorage类型,因此允许它将数据加载到hbase中。

在执行此操作时,我发现了其他几件事。您需要在数据别名中使用“id”字段,但不得在传递给hbase的参数列表中指定此字段。

使用此解决方案的简化工作脚本如下所示
REGISTER hbase-common-1.1.1.jar
REGISTER /tmp/udfs/json-simple-1.1.1.jar
REGISTER /tmp/udfs/elephant-bird-hadoop-compat-4.9.jar
REGISTER /tmp/udfs/elephant-bird-pig-4.9.jar
REGISTER /user/hdfs/share/libs/guava-11.0.jar
REGISTER /user/hdfs/share/libs/zookeeper-3.4.6.2.2.4.2-2.jar

set hbase.zookeeper.quorum 'list of servers';

raw_data = LOAD '/user/hdfs/input/EsImports/2014-04-22.json' using com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]);
keys = FOREACH raw_data GENERATE
    json#'sid' as id:bytearray, //ID field will not be included in HBaseStorage function call but will be used
    json#'userAgent' as a:chararray;

limit_keys = LIMIT keys 1;

//This is super hacky but works
fs -rm -r /tmp/estest2 //fails if the directory does not exist
Store limit_keys into '/tmp/estest2' USING PigStorage('\t', '-schema');
processed_data = LOAD '/tmp/estest2' USING PigStorage('\t');

EXEC; //Used to sync the script and allow it to finish up until this point before starting to insert to hbase

STORE processed_data INTO 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:a');

10-08 07:18
查看更多