在使用带有字符串匹配算法的dask

在使用带有字符串匹配算法的dask

本文介绍了在使用带有字符串匹配算法的dask map_partitions时遇到问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用基于并行dask基础结构的文本搜索算法进行一些探索.

我正在尝试在4000个字符串列表中的系列对象中找到40,000种搅拌的最佳匹配.

我本可以使用pandas.apply完成它,但是这很费时间,所以我决定尝试在dask中使用map_partitions进行并行化.

我正在将此文本搜索库与python-Levenshtein https://marcobonzanini.com/2015/02/25/fuzzy-string-matching-in-python

如您所见,在来自熊猫数据集的此示例中,它可以正常工作:

 process.extractOne(df['endereco2'][1],choices=choices,scorer=fuzz.token_set_ratio,
score_cutoff=60)

Output: ('R ALVARO DUARTE DE ALMEIDA PROFESSOR', 85)
 

但在使用dask时不起作用:

from dask import dataframe as dd

sd = dd.from_pandas(r13_2["endereco2"],npartitions=3).map_partitions(lambda df : df.apply(process.extractOne,choices=choices,scorer=fuzz.token_set_ratio,score_cutoff=60)).compute(scheduler='processes')

​
Output:
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-69-f39ab0d086b5> in <module>
      1 from dask import dataframe as dd
----> 2 sd = dd.from_pandas(r13_2["endereco2"],npartitions=3).map_partitions(lambda df : df.apply(process.extractOne,choices=choices,scorer=fuzz.token_set_ratio,score_cutoff=60)).compute(scheduler='processes')

~\Anaconda3\envs\mono\lib\site-packages\dask\base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158

~\Anaconda3\envs\mono\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400

~\Anaconda3\envs\mono\lib\site-packages\dask\multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs)
    190                            get_id=_process_get_id, dumps=dumps, loads=loads,
    191                            pack_exception=pack_exception,
--> 192                            raise_exception=reraise, **kwargs)
    193     finally:
    194         if cleanup:

~\Anaconda3\envs\mono\lib\site-packages\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    460                         _execute_task(task, data)  # Re-execute locally
    461                     else:
--> 462                         raise_exception(exc, tb)
    463                 res, worker_id = loads(res_info)
    464                 state['cache'][key] = res

~\Anaconda3\envs\mono\lib\site-packages\dask\compatibility.py in reraise(exc, tb)
    109     def reraise(exc, tb=None):
    110         if exc.__traceback__ is not tb:
--> 111             raise exc.with_traceback(tb)
    112         raise exc
    113

~\Anaconda3\envs\mono\lib\site-packages\dask\local.py in execute_task()
    228     try:
    229         task, data = loads(task_info)
--> 230         result = _execute_task(task, data)
    231         id = get_id()
    232         result = dumps((result, id))

~\Anaconda3\envs\mono\lib\site-packages\dask\core.py in _execute_task()
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~\Anaconda3\envs\mono\lib\site-packages\dask\optimization.py in __call__()
    940                              % (len(self.inkeys), len(args)))
    941         return core.get(self.dsk, self.outkey,
--> 942                         dict(zip(self.inkeys, args)))
    943
    944     def __reduce__(self):

~\Anaconda3\envs\mono\lib\site-packages\dask\core.py in get()
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

~\Anaconda3\envs\mono\lib\site-packages\dask\core.py in _execute_task()
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~\Anaconda3\envs\mono\lib\site-packages\dask\compatibility.py in apply()
     91     def apply(func, args, kwargs=None):
     92         if kwargs:
---> 93             return func(*args, **kwargs)
     94         else:
     95             return func(*args)

~\Anaconda3\envs\mono\lib\site-packages\dask\dataframe\core.py in apply_and_enforce()
   3877     func = kwargs.pop('_func')
   3878     meta = kwargs.pop('_meta')
-> 3879     df = func(*args, **kwargs)
   3880     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   3881         if not len(df):

<ipython-input-69-f39ab0d086b5> in <lambda>()
      1 from dask import dataframe as dd
----> 2 sd = dd.from_pandas(r13_2["endereco2"],npartitions=3).map_partitions(lambda df : df.apply(process.extractOne,choices=choices,scorer=fuzz.token_set_ratio,score_cutoff=60)).compute(scheduler='processes')

~\Anaconda3\envs\mono\lib\site-packages\pandas\core\series.py in apply()
   3589             else:
   3590                 values = self.astype(object).values
-> 3591                 mapped = lib.map_infer(values, f, convert=convert_dtype)
   3592
   3593         if len(mapped) and isinstance(mapped[0], Series):

pandas/_libs/lib.pyx in pandas._libs.lib.map_infer()

~\Anaconda3\envs\mono\lib\site-packages\pandas\core\series.py in f()
   3576         if kwds or args and not isinstance(func, np.ufunc):
   3577             def f(x):
-> 3578                 return func(x, *args, **kwds)
   3579         else:
   3580             f = func

~\Anaconda3\envs\mono\lib\site-packages\fuzzywuzzy\process.py in extractOne()
    218     best_list = extractWithoutOrder(query, choices, processor, scorer, score_cutoff)
    219     try:
--> 220         return max(best_list, key=lambda i: i[1])
    221     except ValueError:
    222         return None

~\Anaconda3\envs\mono\lib\site-packages\fuzzywuzzy\process.py in extractWithoutOrder()
     76
     77     # Run the processor on the input query.
---> 78     processed_query = processor(query)
     79
     80     if len(processed_query) == 0:

~\Anaconda3\envs\mono\lib\site-packages\fuzzywuzzy\utils.py in full_process()
     93         s = asciidammit(s)
     94     # Keep only Letters and Numbers (see Unicode docs).
---> 95     string_out = StringProcessor.replace_non_letters_non_numbers_with_whitespace(s)
     96     # Force into lowercase.
     97     string_out = StringProcessor.to_lower_case(string_out)

~\Anaconda3\envs\mono\lib\site-packages\fuzzywuzzy\string_processing.py in replace_non_letters_non_numbers_with_whitespace()
     24         numbers with a single white space.
     25         """
---> 26         return cls.regex.sub(" ", a_string)
     27
     28     strip = staticmethod(string.strip)

TypeError: expected string or bytes-like object

发生了什么事?

Obs:我使用multplocessing lib中的pool.apply解决了我的问题,但我仍然想知道Dask发生了什么

解决方案

在执行MCVE时,我意识到这是一个天真的语法问题:即使没有指定im使用的列,我也无法在dask数据帧上使用map_partitions只有一列.所以我应该使用sd.map_partitions插入的sd [0] .map_partitions

I'm having some probems apllying a text search algorithm with parallelized dask insfrastructure.

I'm tryng to find the best match for 40,000 stirngs in a series object against a 4000 string list.

I could have done it using pandas.apply but it's to time expensive, so i decided try parallelization with map_partitions in dask.

I'm using this text search library with python-Levenshtein https://marcobonzanini.com/2015/02/25/fuzzy-string-matching-in-python

As you can see, it works ok on this example from a pandas dataset:

process.extractOne(df['endereco2'][1],choices=choices,scorer=fuzz.token_set_ratio,
score_cutoff=60)

Output: ('R ALVARO DUARTE DE ALMEIDA PROFESSOR', 85)

but its not working while using dask:

from dask import dataframe as dd

sd = dd.from_pandas(r13_2["endereco2"],npartitions=3).map_partitions(lambda df : df.apply(process.extractOne,choices=choices,scorer=fuzz.token_set_ratio,score_cutoff=60)).compute(scheduler='processes')

​
Output:
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-69-f39ab0d086b5> in <module>
      1 from dask import dataframe as dd
----> 2 sd = dd.from_pandas(r13_2["endereco2"],npartitions=3).map_partitions(lambda df : df.apply(process.extractOne,choices=choices,scorer=fuzz.token_set_ratio,score_cutoff=60)).compute(scheduler='processes')

~\Anaconda3\envs\mono\lib\site-packages\dask\base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158

~\Anaconda3\envs\mono\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400

~\Anaconda3\envs\mono\lib\site-packages\dask\multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs)
    190                            get_id=_process_get_id, dumps=dumps, loads=loads,
    191                            pack_exception=pack_exception,
--> 192                            raise_exception=reraise, **kwargs)
    193     finally:
    194         if cleanup:

~\Anaconda3\envs\mono\lib\site-packages\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    460                         _execute_task(task, data)  # Re-execute locally
    461                     else:
--> 462                         raise_exception(exc, tb)
    463                 res, worker_id = loads(res_info)
    464                 state['cache'][key] = res

~\Anaconda3\envs\mono\lib\site-packages\dask\compatibility.py in reraise(exc, tb)
    109     def reraise(exc, tb=None):
    110         if exc.__traceback__ is not tb:
--> 111             raise exc.with_traceback(tb)
    112         raise exc
    113

~\Anaconda3\envs\mono\lib\site-packages\dask\local.py in execute_task()
    228     try:
    229         task, data = loads(task_info)
--> 230         result = _execute_task(task, data)
    231         id = get_id()
    232         result = dumps((result, id))

~\Anaconda3\envs\mono\lib\site-packages\dask\core.py in _execute_task()
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~\Anaconda3\envs\mono\lib\site-packages\dask\optimization.py in __call__()
    940                              % (len(self.inkeys), len(args)))
    941         return core.get(self.dsk, self.outkey,
--> 942                         dict(zip(self.inkeys, args)))
    943
    944     def __reduce__(self):

~\Anaconda3\envs\mono\lib\site-packages\dask\core.py in get()
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

~\Anaconda3\envs\mono\lib\site-packages\dask\core.py in _execute_task()
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~\Anaconda3\envs\mono\lib\site-packages\dask\compatibility.py in apply()
     91     def apply(func, args, kwargs=None):
     92         if kwargs:
---> 93             return func(*args, **kwargs)
     94         else:
     95             return func(*args)

~\Anaconda3\envs\mono\lib\site-packages\dask\dataframe\core.py in apply_and_enforce()
   3877     func = kwargs.pop('_func')
   3878     meta = kwargs.pop('_meta')
-> 3879     df = func(*args, **kwargs)
   3880     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   3881         if not len(df):

<ipython-input-69-f39ab0d086b5> in <lambda>()
      1 from dask import dataframe as dd
----> 2 sd = dd.from_pandas(r13_2["endereco2"],npartitions=3).map_partitions(lambda df : df.apply(process.extractOne,choices=choices,scorer=fuzz.token_set_ratio,score_cutoff=60)).compute(scheduler='processes')

~\Anaconda3\envs\mono\lib\site-packages\pandas\core\series.py in apply()
   3589             else:
   3590                 values = self.astype(object).values
-> 3591                 mapped = lib.map_infer(values, f, convert=convert_dtype)
   3592
   3593         if len(mapped) and isinstance(mapped[0], Series):

pandas/_libs/lib.pyx in pandas._libs.lib.map_infer()

~\Anaconda3\envs\mono\lib\site-packages\pandas\core\series.py in f()
   3576         if kwds or args and not isinstance(func, np.ufunc):
   3577             def f(x):
-> 3578                 return func(x, *args, **kwds)
   3579         else:
   3580             f = func

~\Anaconda3\envs\mono\lib\site-packages\fuzzywuzzy\process.py in extractOne()
    218     best_list = extractWithoutOrder(query, choices, processor, scorer, score_cutoff)
    219     try:
--> 220         return max(best_list, key=lambda i: i[1])
    221     except ValueError:
    222         return None

~\Anaconda3\envs\mono\lib\site-packages\fuzzywuzzy\process.py in extractWithoutOrder()
     76
     77     # Run the processor on the input query.
---> 78     processed_query = processor(query)
     79
     80     if len(processed_query) == 0:

~\Anaconda3\envs\mono\lib\site-packages\fuzzywuzzy\utils.py in full_process()
     93         s = asciidammit(s)
     94     # Keep only Letters and Numbers (see Unicode docs).
---> 95     string_out = StringProcessor.replace_non_letters_non_numbers_with_whitespace(s)
     96     # Force into lowercase.
     97     string_out = StringProcessor.to_lower_case(string_out)

~\Anaconda3\envs\mono\lib\site-packages\fuzzywuzzy\string_processing.py in replace_non_letters_non_numbers_with_whitespace()
     24         numbers with a single white space.
     25         """
---> 26         return cls.regex.sub(" ", a_string)
     27
     28     strip = staticmethod(string.strip)

TypeError: expected string or bytes-like object

What's happenig?

Obs: I solved my problem using the pool.apply from multplocessing lib, but i still want to know what happened with Dask

解决方案

Doing the MCVE I realized that it was a naive syntax problem: I can't use the map_partitions on a dask dataframe without specifying the column that im using even if there is only one column. So I should had used sd[0].map_partitions insted of sd.map_partitions

这篇关于在使用带有字符串匹配算法的dask map_partitions时遇到问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 11:46