遇到“ ValueError:要解包的值太多”错误,在运行下面的代码时,目的是为每个键构建值的直方图:

%pyspark

import datetime
from pyspark.sql import SQLContext, Row

def featVecSms( x ):
    sttm = datetime.datetime.strptime( x[1], '%Y%m%dT%H%M%S.%f' )
    hourOfDay = int( sttm.strftime( '%H' ) )
    dayOfWeek = int( sttm.strftime( '%w' ) )
    dayOfMonth = int( sttm.strftime( '%d' ) )
    duration = datetime.datetime.strptime( x[2], '%Y%m%dT%H%M%S.%f' ) - sttm
    duration = duration.total_seconds()
    service = x[3]
    resultCode = int( x[4] )
    msc = x[5]
    actionMap = {
        "0":'fsm',
        "1":'fsm',
        "2000":'sri',
        "2001":'sri',
        "2100":'sri',
        "2101":'sri',
        "2102":'fsm',
        "2200":'sri',
        "2201":'sri',
        "2202":'fsm',
        "2203":'fsm',
        "2204":'fsm',
        "2205":'fsm',
        "2206":'fsm',
        "2207":'sri',
        "2208":'sri',
        "2209":'sri',
        "2210":'fsm',
        "2211":'fsm',
        "2212":'fsm',
        "2213":'fsm',
        "2214":'fsm',
        "2215":'sri',
        "2216":'fsm'
    }
    action = actionMap.get( x[4] )
    return ( x[0], hourOfDay, dayOfWeek, dayOfMonth, duration, service, resultCode,  msc, action )

textFile = sc.textFile("/export/sampleMsesAll.txt")
enTuples = textFile.map(lambda x: x.split("', u'"))
msRec = enTuples.map( featVecSms )

def countByCrit( accVal, currVal, idx ):
    accVal[ int( currVal[ idx ] ) ] = accVal( [ int( currVal[ idx ] ) ] ) + 1
    return accVal

def countByTod( accVal, currVal ):
    return countByCrit( accVal, currVal, 1 )

todmap = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ]
msTodSuccess = msRec.filter( lambda x: x[2] >= 0 ).foldByKey( todmap, countByTod )
#.map( lambda x: ( x[0], reduce( lambda x,y: x + str(y), x[2], "" ) ) )

msTodSuccess.collect()


引发以下错误:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 52.0 failed 1 times, most recent failure: Lost task 1.0 in stage 52.0 (TID 115, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 1780, in combineLocally
    merger.mergeValues(iterator)
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/shuffle.py", line 266, in mergeValues
    for k, v in iterator:
ValueError: too many values to unpack


数据如下所示:

$ head -15 /export/sampleMses10M.txt/part-00000
(u'263775998314', u'20151119T180719.000349', u'20151120T074928.837095', u'GoodMorning', u'2210', u'263775998314')
(u'263779563529', u'20151119T181318.000201', u'20151120T122346.432229', u'GoodMorning', u'2204', u'undefined')
(u'263783104169', u'20151120T092503.000629', u'20151120T111833.430649', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092316.000331', u'20151120T125251.794699', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T125514.904726', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T135521.395529', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092503.000629', u'20151120T145418.069707', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T145526.133207', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154208.000410', u'20151120T154345.379585', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T154647.354102', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T154904.993095', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T164653.173588', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T164909.888433', u'Strangenet', u'2215', u'263770010027')
(u'263774918225', u'20151120T090505.000269', u'20151120T102248.630188', u'StrangeCash', u'0', u'263770010027')
(u'263782099158', u'20151119T182038.000537', u'20151120T064040.240860', u'GoodMorning', u'0', u'263770010500')


只有123k样本,但应用中应有数千万条记录。

最佳答案

代码的问题是类型错误。

首先,*byKey方法在PairwiseRDDs上运行。在Python中,这意味着RDD包含长度为2的元组或其他结构(可以称其为pair),可以像这样解包:

k, v = pair


msRec包含长度为9的元素,显然在这里不起作用。

第二个问题是您使用了错误的转换。让我们看一下Scala中foldByKey的签名:

def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]


其中V是值的类型(RDD[(K, V)])。如您所见,zeroValue和函数的返回类型应该与值的类型相同,这显然不是这种情况。

如果结果类型与输入类型不同,则应使用combineByKeyaggregateByKey

关于python - ValueError:太多值无法解包(同时使用foldByKey减小),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34546404/

10-10 11:44