Roman KlimenkoBlogPhotography

Deduplication of CluedIn data sets in Python

September 17, 2023

cluedindatapythondeduplicationlevenshteinmetaphonejupyter-notebook

The cluedin library is a Python library that works with CluedIn – a data management platform. It provides a convenient way to work with CluedIn's REST API and GraphQL API.

The CluedIn platform is an outstanding Data Management Platform. It can be used to import data from various sources, transform it, and export it to other systems. It provides a convenient GraphQL API to work with the data. We can use it to query the data and perform data analysis with Python and Jupyter Notebooks like with any other data source.

Let us first install the required libraries:

%pip install cluedin %pip install levenshtein %pip install Metaphone %pip install pandas

We create a JSON file with credentials to our instance (assuming the instance is at foobar.244.117.198.49.sslip.io):

{ "domain": "244.117.198.49.sslip.io", "org_name": "foobar", "user_email": "admin@foobar.com", "user_password": "yourStrong(!)Password" }

And we can store it in a convenient hidden place and then save the path in an environment variable:

export CLUEDIN_CONTEXT=~/.cluedin/cluedin.json

Then we load data from CluedIn and store it in a JSON file for further processing:

import json import os import pandas as pd import cluedin def flatten_properties(d): for k, v in d['properties'].items(): if k.startswith('property-'): d[k[9:]] = v del d['properties'] return d def get_entities_from_cluedin(entity_type): ctx = cluedin.Context.from_json_file(os.environ['CLUEDIN_CONTEXT']) ctx.get_token() query = """ query searchEntities($cursor: PagingCursor, $query: String, $pageSize: Int) { search( query: $query sort: FIELDS cursor: $cursor pageSize: $pageSize sortFields: {field: "id", direction: ASCENDING} ) { totalResults cursor entries { id properties(propertyNames:["imdb.name.primaryName"]) } } } """ variables = { 'query': f'entityType:{entity_type}', 'pageSize': 10_000 } entities = list( map(flatten_properties, cluedin.gql.entries(ctx, query, variables))) with open('data.json', 'w', encoding='utf-8') as file: json.dump(entities, file, ensure_ascii=False, indent=2, sort_keys=False) if not os.path.exists('data.json'): get_entities_from_cluedin('/IMDb/Person') data = pd.read_json('data.json') data.set_index('id', inplace=True) data

The result is a Pandas DataFrame with all the entities of type /IMDb/Person:

data 0

The number of iterations in the worst case

Our goal is to calculate possible duplicates of the imdb.name.primaryName property.

Naïvely, we can iterate the dataset, and for each element, iterate the same dataset in a nested loop. That would be a Cartesian product (or rather a Cartesian power) of our set. So for a million of elements, it will be a 1.000.000.000.000 (one trillion, or a million of millions) of pairs.

Luckily, the order of elements does not matter: for any elements like A and B, if A is a possible duplicate of B, then B is a possible duplicate of A. That saves us about 51% of iterations because instead of n^2 of iterations, we only need n * (n - 1) / 2.

Let's check if this calculation is accurate:

dataset_size = 1_000 temp = range(dataset_size) # the result of n(n-1) is always even, because either n or n-1 is even expected_iterations = int(dataset_size * (dataset_size - 1) / 2) actual_iterations = 0 for i, a in enumerate(temp): for b in temp[i + 1:]: actual_iterations += 1 difference = expected_iterations - actual_iterations print( f'Expected iterations: {expected_iterations}, actual iterations: {actual_iterations}, difference: {difference}')

The result is:

Expected iterations: 499500, actual iterations: 499500, difference: 0

Or we can use the itertools.combinations function:

from itertools import combinations_with_replacement dataset_size = 1_000 df1 = pd.DataFrame({"name": [f"Person {i}" for i in range(dataset_size)]}) new_df = pd.DataFrame(combinations_with_replacement( df1.index, 2), columns=['a', 'b']) new_df[new_df['a'] != new_df['b']]

The result is 499500 rows, exactly as expected.

Exact matching

However, there are ways to avoid such a vast number of iterations. For example, we can use Pandas duplicated function to mark entities with exact duplicates:

data['exact_duplicate'] = data.duplicated( subset=['imdb.name.primaryName'], keep=False) data

This code runs less than 0.1 seconds and marks all exact duplicates in our 1.000.000-entities dataset:

data 1

Double Metaphone

For the Double Metaphone, we need to calculate the Double Metaphone hash of each name and then compare them in the same way we did for the exact matches:

import metaphone data['metaphone'] = data['imdb.name.primaryName'].apply( lambda x: metaphone.doublemetaphone(x)[0]) data = data[data['metaphone'] != ''] data['metaphone_duplicates_count'] = \ data[data['metaphone'] != ''] \ .groupby('metaphone') .transform('count') data.sort_values(['metaphone_duplicates_count', 'metaphone'], ascending=[False, True], inplace=True) data

This calculation takes a bit less than ten seconds on my three-year-old laptop and gives us the following result:

data 2 data[data['metaphone_duplicates_count'] > 1].nunique()['metaphone'] # 126320

Levenshtein distance

We need to calculate the distance between each pair of names for the Levenshtein distance. This can be a very long-running process (a naive implementation will take about 285 years seconds on my laptop). But we can optimize it by reducing the number of comparisons.

First, we can skip exact duplicates and operate only on the unique names.

Second, we can skip names that have different lengths and differ by more than one character.

Third, we will use multiprocessing to speed up the calculation.

We calculate the Levenshtein distance threshold as Levenshtein distance divided by the length of the longest string, so the longer one of the strings, the more characters it can differ from the other string.

We will also use the map-reduce approach: calculate the Levenshtein distance for each pair of strings and store the result for each entity in a separate file, and then we will reduce the results to a single file.

# pylint: disable=missing-module-docstring, missing-function-docstring, missing-class-docstring, line-too-long, disable=invalid-name # deduplication.py import json import os import time from datetime import timedelta from hashlib import md5 from multiprocessing import Pool, cpu_count from Levenshtein import distance os.makedirs('levenshtein', exist_ok=True) def find_duplicates(strings): a = strings[0] md5_hash = md5(a.encode('utf-8')).hexdigest() file_path = f'levenshtein/{md5_hash}.json' if os.path.exists(file_path): return length_distance_threshold = 1 levenshtein_distance_threshold = 0.2 result = { 'name': a, 'duplicates': [] } for b in strings[1:]: if abs(len(a) - len(b)) > length_distance_threshold: continue levenshtein_distance = distance(a, b) / max(len(a), len(b)) if levenshtein_distance < levenshtein_distance_threshold: result['duplicates'].append({ 'name': b, 'distance': levenshtein_distance }) if len(result['duplicates']) > 0: with open(file_path, 'w', encoding='utf-8') as file: json.dump(result, file, indent=2) def main(): process_start = int(time.time()) with open('data.json', 'r', encoding='utf-8') as file: # load the dataset data = list({x['imdb.name.primaryName'] for x in json.load(file)}) print( f'{timedelta(seconds=time.time() - process_start)} Loaded {len(data)} unique strings from data.json.') with Pool(processes=cpu_count()) as pool: print(f'{timedelta(seconds=time.time() - process_start)} Starting {cpu_count()} processes to find duplicates...') total = len(data) thousand_start = time.time() for i, _ in enumerate(pool.imap(find_duplicates, [data[i:] for i, _ in enumerate(data)])): if i % 1_000 == 0: print( f'{timedelta(seconds=time.time() - process_start)} {timedelta(seconds=time.time() - thousand_start)} strings: {i:_} of {total:_} ({i / total * 100:.0f}%)') thousand_start = time.time() print(f'{timedelta(seconds=time.time() - process_start)} Done!') # Reduce files = os.listdir('levenshtein') duplicates = {} for i, file in enumerate(files): if i % 1_000 == 0: print( f'{timedelta(seconds=time.time() - process_start)} {i:_} of {len(files):_} ({i / len(files) * 100:.0f}%)') with open(f'levenshtein/{file}', 'r', encoding='utf-8') as f: data = json.load(f) if data['name'] not in duplicates: duplicates[data['name']] = { 'count': 0, 'duplicates': [] } duplicates[data['name']]['count'] += len(data['duplicates']) duplicates[data['name']]['duplicates'].extend(data['duplicates']) duplicates = {k: v for k, v in sorted( duplicates.items(), key=lambda item: item[1]['count'], reverse=True)} for _, v in duplicates.items(): v['duplicates'] = sorted( v['duplicates'], key=lambda item: item['distance'], reverse=False) with open('duplicates.json', 'w', encoding='utf-8') as f: # save the result json.dump(duplicates, f, indent=2) print(len(files)) if __name__ == '__main__': main()

Well, it takes several hours for a million entities, but it is still better than 285 years.

We can dig further and use BK-trees to reduce the number of comparisons even more, but it is a topic for another article.

In real life, it takes a few iterations to explore and narrow down the conditions on a small sample of data and run the final calculation on the whole dataset.