问题描述
我有运行 Spark2 (v2.2) 的 Hortonworks HDP 2.6.3.我的测试用例很简单:
用一些随机值创建一个 Hive 表.Hive 在 10000 端口
在 10016 开启 Spark Thrift 服务器
运行pyspark并通过10016查询Hive表
但是,由于 NumberFormatException,我无法从 Spark 获取数据.
这是我的测试用例:
- 使用示例行创建 Hive 表:
beeline>!connect jdbc:hive2://localhost:10000/default hive hive创建表 test1 (id int, desc varchar(40));插入表 test1 值 (1,"aa"),(2,"bb");
- 运行 Spark Thrift 服务器:
su - spark -c '/usr/hdp/2.6.3.0-235/spark2/sbin/start-thriftserver.sh --master yarn-client --executor-memory 512m --hiveconf hive.server2.thrift.端口=10016'
以 spark 用户身份运行 pysparksu - spark -c 'pyspark'
输入以下代码:
df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test1",user="hive", password="hive").load()
df.select("*").show()
我收到此错误:
17/12/15 08:04:13 错误执行程序:阶段 2.0 中任务 0.0 中的异常(TID 2)java.sql.SQLException:无法将第 1 列转换为integerjava.lang.NumberFormatException:对于输入字符串:id"在org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$6.apply(JdbcUtils.scala:394)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$6.apply(JdbcUtils.scala:393)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)在 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)在org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)在org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)在org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在org.apache.spark.scheduler.Task.run(Task.scala:108) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:745) 引起的:java.lang.NumberFormatException:对于输入字符串:id"在java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)在 java.lang.Integer.parseInt(Integer.java:580) 在java.lang.Integer.valueOf(Integer.java:766) 在org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346)... 23 更多 17/12/15 08:04:13 警告 TaskSetManager:丢失任务 0.0阶段 2.0(TID 2、本地主机、执行程序驱动程序):java.sql.SQLException:无法将第 1 列转换为 integerjava.lang.NumberFormatException:对于输入字符串:id"在org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$6.apply(JdbcUtils.scala:394)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$6.apply(JdbcUtils.scala:393)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)在 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)在org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)在org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)在org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在org.apache.spark.scheduler.Task.run(Task.scala:108) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:745) 引起的:java.lang.NumberFormatException:对于输入字符串:id"在java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)在 java.lang.Integer.parseInt(Integer.java:580) 在java.lang.Integer.valueOf(Integer.java:766) 在org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346)……还有 23 个
17/12/15 08:04:14 错误 TaskSetManager:阶段 2.0 中的任务 0 失败 1次;中止作业回溯(最近一次调用最后一次):文件"",第 1 行,在文件中/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py",行336,在秀打印(self._jdf.showString(n,20))文件/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py",第 1133 行,在 call 文件中/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py",第 63 行,在装饰返回 f(*a, **kw) 文件/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py",第 319 行,在 get_return_value py4j.protocol.Py4JJavaError: An error调用 o75.showString 时发生.:org.apache.spark.SparkException:由于阶段失败,作业中止:阶段 2.0 中的任务 0 失败 1 次,最近失败:丢失任务 0.0在 2.0 阶段(TID 2、本地主机、执行程序驱动程序):java.sql.SQLException:无法将第 1 列转换为integerjava.lang.NumberFormatException:对于输入字符串:id"在org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$6.apply(JdbcUtils.scala:394)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$6.apply(JdbcUtils.scala:393)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)在 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)在org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)在org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)在org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在org.apache.spark.scheduler.Task.run(Task.scala:108) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:745) 引起的:java.lang.NumberFormatException:对于输入字符串:id"在java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)在 java.lang.Integer.parseInt(Integer.java:580) 在java.lang.Integer.valueOf(Integer.java:766) 在org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346)……还有 23 个
驱动程序堆栈跟踪:在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)在scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)在org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)在 scala.Option.foreach(Option.scala:257) 在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:2050) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:2069) 在org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)在org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)在org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2854)在org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2154)在org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2154)在 org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2838)在org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)在 org.apache.spark.sql.Dataset.withAction(Dataset.scala:2837) 在org.apache.spark.sql.Dataset.head(Dataset.scala:2154) 在org.apache.spark.sql.Dataset.take(Dataset.scala:2367) 在org.apache.spark.sql.Dataset.showString(Dataset.scala:245) 在sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:498) 在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在py4j.Gateway.invoke(Gateway.java:280) 在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在py4j.GatewayConnection.run(GatewayConnection.java:214) 在java.lang.Thread.run(Thread.java:745) 引起:java.sql.SQLException:无法将第 1 列转换为integerjava.lang.NumberFormatException:对于输入字符串:id"在org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$6.apply(JdbcUtils.scala:394)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$6.apply(JdbcUtils.scala:393)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)在org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)在 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)在org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)在org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)在org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(未知来源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)在org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在org.apache.spark.scheduler.Task.run(Task.scala:108) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)... 1 more 引起:java.lang.NumberFormatException: For input字符串:id"在java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)在 java.lang.Integer.parseInt(Integer.java:580) 在java.lang.Integer.valueOf(Integer.java:766) 在org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346)……还有 23 个
我怀疑它与
id
列有关,所以我改为:df.select("desc").show()然后我得到了这个奇怪的结果:
+----+|降序|+----+|降序||降序|+----+
- 如果我回到 Hive 查询,通过端口 10016 一切正常:
beeline>!connect jdbc:hive2://localhost:10016/default hive hive从 test1 中选择 *;+-----+-----+--+|身份证 |描述 |+-----+-----+--+|1 |一个||2 |bb |+-----+-----+--+
- 如果我在 pyspark 中更改端口 10000,同样的问题仍然存在.
你能帮我理解为什么以及如何通过 Spark 获取行吗?
更新 1
在这两种情况下,我都遵循了@Achyuth 的建议,但它们仍然不起作用.
案例 1
直线:
create table test4 (id String, desc String);插入表 test4 值 ("1","aa"),("2","bb");从 test4 中选择 *;
Pyspark:
>>>df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test4",user="hive", password="hive").option("fetchsize", "10").load()>>>df.select("*").show()+---+----+|id|desc|+---+----+|id|desc||id|desc|+---+----+由于某种原因,它在列名中返回?!
案例 2
直线:
create table test5 (id int, desc varchar(40)) STORED AS ORC;插入表 test5 值 (1,"aa"),(2,"bb");从 test5 中选择 *;
Pyspark:
还是一样的错误 Caused by: java.lang.NumberFormatException: For input string: "id"
更新 2
创建一个表并通过 Hive 端口 10000 插入值然后查询它.这通过直线工作正常
beeline>!connect jdbc:hive2://localhost:10000/default hive hive连接到 jdbc:hive2://localhost:10000/default连接到:Apache Hive(版本 1.2.1000.2.5.3.0-37)驱动程序:Hive JDBC(版本 1.2.1000.2.5.3.0-37)事务隔离:TRANSACTION_REPEATABLE_READ0: jdbc:hive2://localhost:10000/default>创建表 test2 (id String, desc String) 存储为 ORC;不影响行(0.3 秒)0: jdbc:hive2://localhost:10000/default>插入表 test2 值 ("1","aa"),("2","bb");信息:会话已经打开信息:Dag 名称:插入表 tes..."1","aa"),("2","bb")(Stage-1)信息:Tez 会话已关闭.重新开放...信息:会话重新建立.信息:信息:状态:正在运行(在 YARN 集群上执行,应用程序 ID 为 application_1514019042819_0006)信息:地图 1:-/-信息:地图 1:0/1信息:地图 1:0(+1)/1信息:地图 1:1/1信息:从 webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test2/.hive-staging_hive_2017-12-23_04-29-54_569_60114786848075300ext-100 将数据加载到表 default.test2信息:表 default.test2 统计信息:[numFiles=1,numRows=2,totalSize=317,rawDataSize=342]没有受影响的行(15.414 秒)0: jdbc:hive2://localhost:10000/default>从表2中选择*;错误:编译语句时出错:FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'table2' (state=42S02,code=10001)0: jdbc:hive2://localhost:10000/default>从 test2 中选择 *;+-----------+-------------+--+|test2.id |test2.desc |+-----------+-------------+--+|1 |一个||2 |bb |+-----------+-------------+--+选择了 2 行(0.364 秒)
同样通过 beeline,我可以使用 Spark Thrift Server 10016 做同样的事情,而且效果很好:
beeline>!connect jdbc:hive2://localhost:10016/default hive hive连接到 jdbc:hive2://localhost:10016/default1:jdbc:hive2://localhost:10016/default>创建表 test3 (id String, desc String) 存储为 ORC;+---------+--+|结果 |+---------+--++---------+--+未选择任何行(1.234 秒)1:jdbc:hive2://localhost:10016/default>插入表 test3 值 ("1","aa"),("2","bb");+---------+--+|结果 |+---------+--++---------+--+未选择任何行(9.111 秒)1:jdbc:hive2://localhost:10016/default>从 test3 中选择 *;+-----+-----+--+|身份证 |描述 |+-----+-----+--+|1 |一个||2 |bb |+-----+-----+--+选择了 2 行(3.387 秒)
这意味着 Spark 和 Thrift Server 工作正常.但是使用 pyspark
我遇到了同样的问题,因为结果是空的:
更新 3
描述扩展测试3;
# 详细表格信息 |目录表(表:`default`.`test3`拥有者:蜂巢创建时间:2017 年 12 月 23 日星期六 04:37:14 PST最后访问时间:1969 年 12 月 31 日星期三 16:00:00 PST类型:管理架构:[`id` 字符串,`desc` 字符串]属性:[totalSize=620,numFiles=2,transient_lastDdlTime=1514032656,STATS_GENERATED_VIA_STATS_TASK=true]存储(位置:webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test3,输入格式:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,输出格式:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,Serde:org.apache.hadoop.hive.ql.io.orc.OrcSerde,属性:[serialization.format=1]))
显示创建表 test3;
CREATE TABLE `test3`(`id` 字符串,`desc` 字符串)行格式 SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'具有 SERDE 属性 ('serialization.format' = '1')存储为INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'输出格式'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'TBL 属性 ('总大小' = '620','numFiles' = '2','transient_lastDdlTime' = '1514032656','STATS_GENERATED_VIA_STATS_TASK' = '真')
su - spark -c 'hdfs dfs -cat webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test3/part-00000'
即使您正在创建具有特定数据类型的 hive 表,插入时表中的基础数据仍以字符串格式存储.
因此,当 spark 尝试读取数据时,它会使用 Metastore 来查找数据类型.它在 hive Metastore 中以 int 形式存在,在文件中以 string 形式存在,并且抛出了强制转换异常.
解决方案
将表创建为字符串并从 spark 中读取它会起作用.
create table test1 (id String, desc String);
如果您希望保留数据类型,请指定创建表的文件格式之一,例如 orc 或 parquet,然后将其插入.您可以毫无例外地从spark读取文件
创建表 test1 (id int, desc varchar(40) STORED AS ORC);
现在的问题是为什么蜂巢能够读取它?Hive 有很好的投射选项,而 spark 没有.
I have Hortonworks HDP 2.6.3 running Spark2 (v2.2). My test case is very simple:
Create a Hive table with some random values. Hive at port 10000
Turn on Spark Thrift server at 10016
Run pyspark and query the Hive table via 10016
However, I was unable to get the data from Spark due to NumberFormatException.
Here is my test case:
- Create Hive table with sample rows:
- Run Spark Thrift server:
su - spark -c '/usr/hdp/2.6.3.0-235/spark2/sbin/start-thriftserver.sh --master yarn-client --executor-memory 512m --hiveconf hive.server2.thrift.port=10016'
Run pyspark as spark usersu - spark -c 'pyspark'
Type in below code:
df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test1",user="hive", password="hive").load()
df.select("*").show()
I got this error:
I suspected it has something to do with
id
column so I changed to this: df.select("desc").show()Then I got this strange result:
- If I go back to Hive to query, everything went fine via port 10016:
- If I change port 10000 in pyspark, same problem persisted.
Could you please help me understand why and how to get the rows via Spark?
UPDATE 1
I followed @Achyuth advise below in both cases and they still don't work.
Case 1
Beeline:
create table test4 (id String, desc String);
insert into table test4 values ("1","aa"),("2","bb");
select * from test4;
Pyspark:
>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test4",user="hive", password="hive").option("fetchsize", "10").load()
>>> df.select("*").show()
+---+----+
| id|desc|
+---+----+
| id|desc|
| id|desc|
+---+----+
For some reason, it returned in the column names?!
Case 2
Beeline:
create table test5 (id int, desc varchar(40)) STORED AS ORC;
insert into table test5 values (1,"aa"),(2,"bb");
select * from test5;
Pyspark:
Still same error Caused by: java.lang.NumberFormatException: For input string: "id"
UPDATE 2
Create a table and insert values via Hive port 10000 then query it. This works fine via beeline
beeline> !connect jdbc:hive2://localhost:10000/default hive hive
Connecting to jdbc:hive2://localhost:10000/default
Connected to: Apache Hive (version 1.2.1000.2.5.3.0-37)
Driver: Hive JDBC (version 1.2.1000.2.5.3.0-37)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000/default> create table test2 (id String, desc String) STORED AS ORC;
No rows affected (0.3 seconds)
0: jdbc:hive2://localhost:10000/default> insert into table test2 values ("1","aa"),("2","bb");
INFO : Session is already open
INFO : Dag name: insert into table tes..."1","aa"),("2","bb")(Stage-1)
INFO : Tez session was closed. Reopening...
INFO : Session re-established.
INFO :
INFO : Status: Running (Executing on YARN cluster with App id application_1514019042819_0006)
INFO : Map 1: -/-
INFO : Map 1: 0/1
INFO : Map 1: 0(+1)/1
INFO : Map 1: 1/1
INFO : Loading data to table default.test2 from webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test2/.hive-staging_hive_2017-12-23_04-29-54_569_601147868480753216-3/-ext-10000
INFO : Table default.test2 stats: [numFiles=1, numRows=2, totalSize=317, rawDataSize=342]
No rows affected (15.414 seconds)
0: jdbc:hive2://localhost:10000/default> select * from table2;
Error: Error while compiling statement: FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'table2' (state=42S02,code=10001)
0: jdbc:hive2://localhost:10000/default> select * from test2;
+-----------+-------------+--+
| test2.id | test2.desc |
+-----------+-------------+--+
| 1 | aa |
| 2 | bb |
+-----------+-------------+--+
2 rows selected (0.364 seconds)
Also via beeline, I can use Spark Thrift Server 10016 to do the same thing and it worked fine:
beeline> !connect jdbc:hive2://localhost:10016/default hive hive
Connecting to jdbc:hive2://localhost:10016/default
1: jdbc:hive2://localhost:10016/default> create table test3 (id String, desc String) STORED AS ORC;
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (1.234 seconds)
1: jdbc:hive2://localhost:10016/default> insert into table test3 values ("1","aa"),("2","bb");
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (9.111 seconds)
1: jdbc:hive2://localhost:10016/default> select * from test3;
+-----+-------+--+
| id | desc |
+-----+-------+--+
| 1 | aa |
| 2 | bb |
+-----+-------+--+
2 rows selected (3.387 seconds)
This means Spark and Thrift Server work fine. But using pyspark
I got same problem as the results are empty:
>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test3",user="hive", password="hive").load()
>>> df.select("*").show()
+---+----+
| id|desc|
+---+----+
+---+----+
UPDATE 3
DESCRIBE EXTENDED test3;
# Detailed Table Information | CatalogTable(
Table: `default`.`test3`
Owner: hive
Created: Sat Dec 23 04:37:14 PST 2017
Last Access: Wed Dec 31 16:00:00 PST 1969
Type: MANAGED
Schema: [`id` string, `desc` string]
Properties: [totalSize=620, numFiles=2, transient_lastDdlTime=1514032656, STATS_GENERATED_VIA_STATS_TASK=true]
Storage(Location: webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test3, InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, Serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde, Properties: [serialization.format=1]))
SHOW CREATE TABLE test3;
CREATE TABLE `test3`(`id` string, `desc` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
TBLPROPERTIES (
'totalSize' = '620',
'numFiles' = '2',
'transient_lastDdlTime' = '1514032656',
'STATS_GENERATED_VIA_STATS_TASK' = 'true'
)
su - spark -c 'hdfs dfs -cat webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test3/part-00000'
Even though you are creating the hive table with specific datatype, The underlying data in the table when you inserted is stored as String format.
So when the spark is trying to read the data, it uses the metastore to find the data types. It is present as int in the hive metastore and as string in the file and it is throwing the cast exception.
Solutions
Create the table as string and read from spark it will work.
create table test1 (id String, desc String);
If you want data type preserved, then specify the one of the file formats such as orc or parquet which creating the table and then insert it. You can able to read the file from spark without exceptions
create table test1 (id int, desc varchar(40) STORED AS ORC);
Now question is wwhy hive able to read it?Hive has good cast options avialable while spark doesn't.
这篇关于查询 Hive 表时,数据帧 NumberFormatException 上的 Spark 2.2 Thrift 服务器错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!