遇到“ ValueError:要解包的值太多”错误,在运行下面的代码时,目的是为每个键构建值的直方图:
%pyspark
import datetime
from pyspark.sql import SQLContext, Row
def featVecSms( x ):
sttm = datetime.datetime.strptime( x[1], '%Y%m%dT%H%M%S.%f' )
hourOfDay = int( sttm.strftime( '%H' ) )
dayOfWeek = int( sttm.strftime( '%w' ) )
dayOfMonth = int( sttm.strftime( '%d' ) )
duration = datetime.datetime.strptime( x[2], '%Y%m%dT%H%M%S.%f' ) - sttm
duration = duration.total_seconds()
service = x[3]
resultCode = int( x[4] )
msc = x[5]
actionMap = {
"0":'fsm',
"1":'fsm',
"2000":'sri',
"2001":'sri',
"2100":'sri',
"2101":'sri',
"2102":'fsm',
"2200":'sri',
"2201":'sri',
"2202":'fsm',
"2203":'fsm',
"2204":'fsm',
"2205":'fsm',
"2206":'fsm',
"2207":'sri',
"2208":'sri',
"2209":'sri',
"2210":'fsm',
"2211":'fsm',
"2212":'fsm',
"2213":'fsm',
"2214":'fsm',
"2215":'sri',
"2216":'fsm'
}
action = actionMap.get( x[4] )
return ( x[0], hourOfDay, dayOfWeek, dayOfMonth, duration, service, resultCode, msc, action )
textFile = sc.textFile("/export/sampleMsesAll.txt")
enTuples = textFile.map(lambda x: x.split("', u'"))
msRec = enTuples.map( featVecSms )
def countByCrit( accVal, currVal, idx ):
accVal[ int( currVal[ idx ] ) ] = accVal( [ int( currVal[ idx ] ) ] ) + 1
return accVal
def countByTod( accVal, currVal ):
return countByCrit( accVal, currVal, 1 )
todmap = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ]
msTodSuccess = msRec.filter( lambda x: x[2] >= 0 ).foldByKey( todmap, countByTod )
#.map( lambda x: ( x[0], reduce( lambda x,y: x + str(y), x[2], "" ) ) )
msTodSuccess.collect()
引发以下错误:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 52.0 failed 1 times, most recent failure: Lost task 1.0 in stage 52.0 (TID 115, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 1780, in combineLocally
merger.mergeValues(iterator)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/shuffle.py", line 266, in mergeValues
for k, v in iterator:
ValueError: too many values to unpack
数据如下所示:
$ head -15 /export/sampleMses10M.txt/part-00000
(u'263775998314', u'20151119T180719.000349', u'20151120T074928.837095', u'GoodMorning', u'2210', u'263775998314')
(u'263779563529', u'20151119T181318.000201', u'20151120T122346.432229', u'GoodMorning', u'2204', u'undefined')
(u'263783104169', u'20151120T092503.000629', u'20151120T111833.430649', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092316.000331', u'20151120T125251.794699', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T125514.904726', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T135521.395529', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092503.000629', u'20151120T145418.069707', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T145526.133207', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154208.000410', u'20151120T154345.379585', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T154647.354102', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T154904.993095', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T164653.173588', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T164909.888433', u'Strangenet', u'2215', u'263770010027')
(u'263774918225', u'20151120T090505.000269', u'20151120T102248.630188', u'StrangeCash', u'0', u'263770010027')
(u'263782099158', u'20151119T182038.000537', u'20151120T064040.240860', u'GoodMorning', u'0', u'263770010500')
只有123k样本,但应用中应有数千万条记录。
最佳答案
代码的问题是类型错误。
首先,*byKey
方法在PairwiseRDDs
上运行。在Python中,这意味着RDD包含长度为2的元组或其他结构(可以称其为pair
),可以像这样解包:
k, v = pair
msRec
包含长度为9的元素,显然在这里不起作用。第二个问题是您使用了错误的转换。让我们看一下Scala中
foldByKey
的签名:def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
其中
V
是值的类型(RDD[(K, V)]
)。如您所见,zeroValue
和函数的返回类型应该与值的类型相同,这显然不是这种情况。如果结果类型与输入类型不同,则应使用
combineByKey
或aggregateByKey
。关于python - ValueError:太多值无法解包(同时使用foldByKey减小),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34546404/