使用Python处理大型数据集的模糊逻辑

使用Python处理大型数据集的模糊逻辑

本文介绍了使用Python处理大型数据集的模糊逻辑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的团队一直坚持在两个大型数据集上运行模糊逻辑算法.第一个(子集)大约有18万行,其中包含我们需要在第二个(超集)中匹配的人员的姓名,地址和电子邮件.超集包含250万条记录.两者具有相同的结构,并且数据已经被清除,例如,解析的地址,名称已规范化等.

My team has been stuck with running a fuzzy logic algorithm on a two large datasets.The first (subset) is about 180K rows contains names, addresses, and emails for the people that we need to match in the second (superset). The superset contains 2.5M records. Both have the same structure and the data has been cleaned already, i.e. addresses parsed, names normalized, etc.

目标是将子集中的行中的值与超集中的对应值进行匹配,因此输出将合并子集和超集以及每个字段(令牌)的对应相似百分比.

The goal is to match values in a row of subset to the corresponding values in superset, so the output would combine the subset and superset and the corresponding similarity percentages for each field (token).

为了首先简化和测试代码,我们将字符串串联在一起,我们知道代码可以在非常小的超集上运行;但是,一旦我们增加了记录数量,它就会卡住.我们尝试了不同的算法,如Levenshtein,FuzzyWuzzy等,均无济于事.在我看来,问题在于Python是逐行执行的.但是,我不确定.我们甚至尝试过使用流在Hadoop集群上运行它.但是,它没有产生任何积极的结果.

To simplify and test the code first, we've concatenated strings and we know that the code works on very small superset; however, once we increase the number of records, it gets stuck. We've tried different algorithms, Levenshtein, FuzzyWuzzy, etc. to no avail. The problem, in my opinion, is that Python does it row by row; however, I'm not sure. We've even tried running it on our Hadoop cluster using streaming; however, it has not yielded any positive results.

#!/usr/bin/env python
import sys
from fuzzywuzzy import fuzz
import datetime
import time
import Levenshtein

#init for comparison
with open('normalized_set_record_set.csv') as normalized_records_ALL_file:
# with open('delete_this/xab') as normalized_records_ALL_file:
    normalized_records_ALL_dict = {}
    for line in normalized_records_ALL_file:
        key, value = line.strip('\n').split(':', 1)
        normalized_records_ALL_dict[key] = value
        # normalized_records_ALL_dict[contact_id] = concat_record

def score_it_bag(target_contact_id, target_str, ALL_records_dict):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT sorted list by highest fuzzy match
    '''
    return sorted([(value_str, contact_id_index_str, fuzz.ratio(target_str, value_str))
        for contact_id_index_str, value_str in ALL_records_dict.iteritems()], key=lambda x:x[2])[::-1]

def score_it_closest_match_pandas(target_contact_id, target_str, place_holder_delete):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match
    '''
    # simply drop this index target_contact_id
    df_score = df_ALL.concat_record.apply(lambda x: fuzz.ratio(target_str, x))

    return df_ALL.concat_record[df_score.idxmax()], df_score.max(), df_score.idxmax()

def score_it_closest_match_L(target_contact_id, target_str, ALL_records_dict_input):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
    '''
    best_score = 100

    #score it
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = Levenshtein.distance(target_str, comparison_record_str)


            if current_score < best_score:
                best_score = current_score
                best_match_id = comparison_contactid
                best_match_str = comparison_record_str

    return (best_match_str, best_score, best_match_id)



def score_it_closest_match_fuzz(target_contact_id, target_str, ALL_records_dict_input):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
    '''
    best_score = 0

    #score it
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = fuzz.ratio(target_str, comparison_record_str)

            if current_score > best_score:
                best_score = current_score
                best_match_id = comparison_contactid
                best_match_str = comparison_record_str

    return (best_match_str, best_score, best_match_id)

def score_it_threshold_match(target_contact_id, target_str, ALL_records_dict_input):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
    '''
    score_threshold = 95

    #score it
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = fuzz.ratio(target_str, comparison_record_str)

            if current_score > score_threshold:
                return (comparison_record_str, current_score, comparison_contactid)

    return (None, None, None)


def score_it_closest_match_threshold_bag(target_contact_id, target_str, ALL_records_dict):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match
    '''
    threshold_score = 80
    top_matches_list = []
    #score it
    #iterate through dictionary
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = fuzz.ratio(target_str, comparison_record_str)

            if current_score > threshold_score:
                top_matches_list.append((comparison_record_str, current_score, comparison_contactid))


    if len(top_matches_list) > 0:  return top_matches_list

def score_it_closest_match_threshold_bag_print(target_contact_id, target_str, ALL_records_dict):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match
    '''
    threshold_score = 80


    #iterate through dictionary
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
        if target_contact_id != comparison_contactid:

            #score it
            current_score = fuzz.ratio(target_str, comparison_record_str)
            if current_score > threshold_score:
                print target_contact_id + ':' + str((target_str,comparison_record_str, current_score, comparison_contactid))


    pass


#stream in all contacts ie large set
for line in sys.stdin:
    # ERROR DIAG TOOL
    ts = time.time()
    st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
    print >> sys.stderr, line, st

    contact_id, target_str = line.strip().split(':', 1)

    score_it_closest_match_threshold_bag_print(contact_id, target_str, normalized_records_ALL_dict)
    # output = (target_str, score_it_closest_match_fuzz(contact_id, target_str, normalized_records_ALL_dict))
    # output = (target_str, score_it_closest_match_threshold_bag(contact_id, target_str, normalized_records_ALL_dict))
    # print contact_id + ':' + str(output)

推荐答案

您的方法需要进行180,000 * 2,500,000 = 450,000,000,000个比较.

Your approach requires you to make 180,000 * 2,500,000 = 450,000,000,000 comparisons.

4500亿很多.

要减少比较次数,您可以首先对具有某些共同特征的记录进行分组,例如地址字段的前五个字符或共同标记.然后,仅比较共享功能的记录.这个想法称为阻止",通常会减少您必须对可管理的事物进行总比较的次数.

To reduce the number of comparisons, you can first group records that have some features in common, like the first five characters of an address field, or a common token. Then, only compare records that share a feature. This idea is called "blocking" and will usually reduce the number of total comparisons you have to make to something manageable.

您要解决的一般问题称为"记录链接."由于您使用的是python,因此您可能需要查看 dedupe库,它提供了一种全面的方法(是该库的作者).

The general problem you are trying to solve is called "record linkage." Since you are using python, you might want to look at the dedupe library which provides a comprehensive approach (I am an author of this library).

这篇关于使用Python处理大型数据集的模糊逻辑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-29 05:26