Моя команда застряла в запуске алгоритма нечеткой логики на двух больших наборах данных. Первое (подмножество) составляет около 180 тыс. Строк, содержащих имена, адреса и электронные письма людей, которых нам нужно сопоставить во втором (надмножество). Расширенный набор содержит 2,5 млн записей. Оба имеют одинаковую структуру, и данные уже очищены, т. Е. Адреса проанализированы, имена нормализованы и т. Д.
- ContactID int,
- FullName varchar (150),
- Адрес varchar (100),
- Электронная почта varchar (100)
Цель состоит в том, чтобы сопоставить значения в строке подмножества с соответствующими значениями в надмножестве, чтобы выходные данные сочетали подмножество и надмножество и соответствующие проценты сходства для каждого поля (токена).
- ContactID,
- LookupContactID,
- Полное имя,
- LookupFullName,
- FullName_Similarity,
- Адрес,
- LookupAddress,
- Address_Similarity,
- Эл. адрес,
- LookupEmail,
- Email_Similarity
Чтобы сначала упростить и протестировать код, мы объединили строки и знаем, что код работает с очень маленьким надмножеством; однако, как только мы увеличиваем количество записей, он застревает. Мы пробовали разные алгоритмы, Левенштейна, FuzzyWuzzy и т. Д., Но безрезультатно. На мой взгляд, проблема в том, что Python делает это построчно; однако я не уверен. Мы даже пытались запустить его в нашем кластере Hadoop с помощью потоковой передачи; Однако никаких положительных результатов это не дало.
#!/usr/bin/env python
import sys
from fuzzywuzzy import fuzz
import datetime
import time
import Levenshtein
#init for comparison
with open('normalized_set_record_set.csv') as normalized_records_ALL_file:
# with open('delete_this/xab') as normalized_records_ALL_file:
normalized_records_ALL_dict = {}
for line in normalized_records_ALL_file:
key, value = line.strip('\n').split(':', 1)
normalized_records_ALL_dict[key] = value
# normalized_records_ALL_dict[contact_id] = concat_record
def score_it_bag(target_contact_id, target_str, ALL_records_dict):
'''
INPUT target_str, ALL_records_dict
OUTPUT sorted list by highest fuzzy match
'''
return sorted([(value_str, contact_id_index_str, fuzz.ratio(target_str, value_str))
for contact_id_index_str, value_str in ALL_records_dict.iteritems()], key=lambda x:x[2])[::-1]
def score_it_closest_match_pandas(target_contact_id, target_str, place_holder_delete):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match
'''
# simply drop this index target_contact_id
df_score = df_ALL.concat_record.apply(lambda x: fuzz.ratio(target_str, x))
return df_ALL.concat_record[df_score.idxmax()], df_score.max(), df_score.idxmax()
def score_it_closest_match_L(target_contact_id, target_str, ALL_records_dict_input):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
'''
best_score = 100
#score it
for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
if target_contact_id != comparison_contactid:
current_score = Levenshtein.distance(target_str, comparison_record_str)
if current_score < best_score:
best_score = current_score
best_match_id = comparison_contactid
best_match_str = comparison_record_str
return (best_match_str, best_score, best_match_id)
def score_it_closest_match_fuzz(target_contact_id, target_str, ALL_records_dict_input):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
'''
best_score = 0
#score it
for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
if target_contact_id != comparison_contactid:
current_score = fuzz.ratio(target_str, comparison_record_str)
if current_score > best_score:
best_score = current_score
best_match_id = comparison_contactid
best_match_str = comparison_record_str
return (best_match_str, best_score, best_match_id)
def score_it_threshold_match(target_contact_id, target_str, ALL_records_dict_input):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
'''
score_threshold = 95
#score it
for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
if target_contact_id != comparison_contactid:
current_score = fuzz.ratio(target_str, comparison_record_str)
if current_score > score_threshold:
return (comparison_record_str, current_score, comparison_contactid)
return (None, None, None)
def score_it_closest_match_threshold_bag(target_contact_id, target_str, ALL_records_dict):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match
'''
threshold_score = 80
top_matches_list = []
#score it
#iterate through dictionary
for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
if target_contact_id != comparison_contactid:
current_score = fuzz.ratio(target_str, comparison_record_str)
if current_score > threshold_score:
top_matches_list.append((comparison_record_str, current_score, comparison_contactid))
if len(top_matches_list) > 0: return top_matches_list
def score_it_closest_match_threshold_bag_print(target_contact_id, target_str, ALL_records_dict):
'''
INPUT target_str, ALL_records_dict
OUTPUT closest match
'''
threshold_score = 80
#iterate through dictionary
for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
if target_contact_id != comparison_contactid:
#score it
current_score = fuzz.ratio(target_str, comparison_record_str)
if current_score > threshold_score:
print target_contact_id + ':' + str((target_str,comparison_record_str, current_score, comparison_contactid))
pass
#stream in all contacts ie large set
for line in sys.stdin:
# ERROR DIAG TOOL
ts = time.time()
st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
print >> sys.stderr, line, st
contact_id, target_str = line.strip().split(':', 1)
score_it_closest_match_threshold_bag_print(contact_id, target_str, normalized_records_ALL_dict)
# output = (target_str, score_it_closest_match_fuzz(contact_id, target_str, normalized_records_ALL_dict))
# output = (target_str, score_it_closest_match_threshold_bag(contact_id, target_str, normalized_records_ALL_dict))
# print contact_id + ':' + str(output)