本文介绍了Spark Sql: TypeError(“StructType 不能接受 %s 类型的对象"% type(obj))的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我目前正在使用 PyODBC 从 SQL Server 中提取数据,并尝试以近实时 (NRT) 方式将数据插入到 Hive 中的表中.
我从源中获取了一行并转换为 List[Strings] 并以编程方式创建架构,但是在创建 DataFrame 时,Spark 抛出 StructType 错误.
>>>cnxn = pyodbc.connect(con_string)>>>aj = cnxn.cursor()>>>>>>aj.execute("select * from tjob")<pyodbc.Cursor 对象在 0x257b2d0>>>>行 = aj.fetchone()>>>排(1127, u'', u'8196660', u'', u'', 0, u'', u'', 无, 35, 无, 0, 无, 无, 无, 无, 无, 无,无、无、无、无、无、无、无、无、无、无、无、无、无、无、u''、0、无、无)>>>rowstr = map(str,row)>>>行字符串['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None'', '无', '无', '无', '无', '无', '无', '无', '无', '无', '无', '无', '无','无', '无', '无', '无', '无', '无', '', '0', '无', '无']>>>schemaString = " ".join([row.column_name for row in aj.columns(table='tjob')])>>>模式字符串u'ID ExternalID 名称 说明 注释类型 Lot SubLot ParentJobID ProductID PlannedStartDateTime PlannedDurationSeconds Capture01 Capture02 Capture03 Capture04 Capture05 Capture06 Capture07 Capture08 Capture09 Capture10 Capture11 Capture12 Capture13 Capture14 Capture15 Capture16 Capture17 Capture18 CaptureState>>>fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]>>>架构 = 结构类型(字段)>>>[schema.fields 中 f 的 f.dataType][StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType、StringType、StringType、StringType、StringType、StringType、StringType、StringType、StringType、StringType、StringType、StringType]>>>myrdd = sc.parallelize(rowstr)>>>myrdd.collect()['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None'', '无', '无', '无', '无', '无', '无', '无', '无', '无', '无', '无', '无','无', '无', '无', '无', '无', '无', '', '0', '无', '无']>>>schemaPeople = sqlContext.createDataFrame(myrdd, schema)回溯(最近一次调用最后一次):文件<stdin>",第 1 行,在 <module> 中文件/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py",第404行,在createDataFrame中rdd, schema = self._createFromRDD(data, schema, samplingRatio)文件/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py",第298行,_createFromRDD_verify_type(行,模式)文件/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/types.py",第1132行,_verify_typeraise TypeError("StructType 不能接受 %s 类型的对象" % type(obj))类型错误:StructType 不能接受 类型的对象 解决方案
这里是错误信息的原因:
>>>行列['1127', '', '8196660', '', '', '0', '', '', 'None' ... ]#rowstr 是 str 的列表>>>myrdd = sc.parallelize(rowstr)#myrdd 是 str 的 rdd>>>架构 = 结构类型(字段)#schema 是 StructType([StringType, StringType, ....])>>>schemaPeople = sqlContext.createDataFrame(myrdd, schema)#myrdd 应该是 RDD([StringType, StringType,...]) 但是是 RDD(str)要解决这个问题,请使用正确类型的 RDD:
>>>myrdd = sc.parallelize([rowstr])I am currently pulling data from SQL Server using PyODBC and trying to insert into a table in Hive in a Near Real Time (NRT) manner.
I got a single row from source and converted into List[Strings] and creating schema programatically but while creating a DataFrame, Spark is throwing StructType error.
>>> cnxn = pyodbc.connect(con_string)
>>> aj = cnxn.cursor()
>>>
>>> aj.execute("select * from tjob")
<pyodbc.Cursor object at 0x257b2d0>
>>> row = aj.fetchone()
>>> row
(1127, u'', u'8196660', u'', u'', 0, u'', u'', None, 35, None, 0, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, u'', 0, None, None)
>>> rowstr = map(str,row)
>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']
>>> schemaString = " ".join([row.column_name for row in aj.columns(table='tjob')])
>>> schemaString
u'ID ExternalID Name Description Notes Type Lot SubLot ParentJobID ProductID PlannedStartDateTime PlannedDurationSeconds Capture01 Capture02 Capture03 Capture04 Capture05 Capture06 Capture07 Capture08 Capture09 Capture10 Capture11 Capture12 Capture13 Capture14 Capture15 Capture16 Capture17 Capture18 Capture19 Capture20 User UserState ModifiedDateTime UploadedDateTime'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
>>> schema = StructType(fields)
>>> [f.dataType for f in schema.fields]
[StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType, StringType]
>>> myrdd = sc.parallelize(rowstr)
>>> myrdd.collect()
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', 'None', '0', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', 'None', '', '0', 'None', 'None']
>>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame
rdd, schema = self._createFromRDD(data, schema, samplingRatio)
File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
_verify_type(row, schema)
File "/apps/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark/python/pyspark/sql/types.py", line 1132, in _verify_type
raise TypeError("StructType can not accept object in type %s" % type(obj))
TypeError: StructType can not accept object in type <type 'str'>
解决方案
here is the reason for error message:
>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None' ... ]
#rowstr is a list of str
>>> myrdd = sc.parallelize(rowstr)
#myrdd is a rdd of str
>>> schema = StructType(fields)
#schema is StructType([StringType, StringType, ....])
>>> schemaPeople = sqlContext.createDataFrame(myrdd, schema)
#myrdd should have been RDD([StringType, StringType,...]) but is RDD(str)
to fix that, make the RDD of proper type:
>>> myrdd = sc.parallelize([rowstr])
这篇关于Spark Sql: TypeError(“StructType 不能接受 %s 类型的对象"% type(obj))的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!