電通総研 テックブログ

電通総研が運営する技術ブログ

Databricksで実現するデータ名寄せ【確率的マッチング編】

こんにちは。エンタープライズ第三本部 マーケティングIT部の熊倉です。

このブログでは、高速に動作する分散処理エンジン「Apache Spark」オープンテーブルフォーマット「Delta Lake」を基盤としたレイクハウス環境を構築できるDatabricks上で管理しているデータセットに対して、名寄せ処理を行うアプローチについて紹介します。

実際のノートブックの処理についても紹介しようと思っていますが、想定よりも内容が多くなってしまったので、名寄せの概要を紹介する「概要編」、ソースコードなど具体的な名寄せ処理の具体的な内容を紹介する「決定論的マッチング編」「確率的マッチング編」の三部作にしようと思います。

  1. Databricksで実現するデータ名寄せ【概要編】
  2. Databricksで実現するデータ名寄せ【決定論的マッチング編】
  3. Databricksで実現するデータ名寄せ【確率的マッチング編】(本記事)

本記事は確率的マッチング編となります。名寄せ処理の概要について説明したブログ記事も公開しておりますので、名寄せ処理にまだ馴染みのない方は、ぜひそちらもご覧いただけると幸いです。

本記事ではZinggというライブラリを利用し、確率的マッチングを行う具体的な処理内容や流れについて説明します。Zinggの概要については「Databricksで実現するデータ名寄せ【概要編】」で紹介しておりますので、こちらも合わせてご覧ください。

また、本記事の内容はZinggが公開しているExampleとDatabricksが公開しているソリューションアクセラレータを参考にしております(Example + ソリューションアクセラレーター + 自分の方でDatabricksの設計に合わせて加筆変更を加えたような形)
適宜こちらのページも参考にしていただきながら本記事の方も読んでいただけると良いかと思います。

では、早速データの名寄せを開始していきましょう!

Zinggを利用した名寄せ処理の概要

Zinggを利用した名寄せは次のステップで行います。

  1. まず最初に名寄せ対象データセットの項目の属性(テキストや数値項目なのか、どのようなマッチングをしたいかなど)をそれぞれ定義します
  2. 定義した内容を元にデータセットから一致している可能性の高いペアをサンプリングします
  3. サンプリングされたペア候補を元に、ユーザがペアの一致、不一致、不明を判断し、ラベリングを行います
    • モデルのトレーニングに十分な数が集まるまでサンプリングとラベリングを繰り返します
  4. 教師データを元にモデルを作成し、クラスター化を行います
    • クラスター化されたデータセットではペアの一致確率が確認できます
      • 一致確率が確認できるので、「XX%以上は一致、XX ~ YY%はユーザで改めて手動マッチング、YY%以下は別人とする」といったようなオペレーションが可能です

では、次から実際にDatabricksでZinggの名寄せ処理を行う方法について、具体的に紹介していきます。

1. ライブラリをインストールする

Zinggをノートブックで利用するためには、次の2ステップが必要になります。

  1. (ノートブックにアタッチする)クラスターにライブラリをインストールする
  2. ノートブックでzinggのPythonパッケージをインストールする

手順を順番に紹介します。

1. クラスターにライブラリをインストールする

1. ZinggのJARファイルをGitHubからダウンロードします。

以下のGitHubリポジトリから、最新のZingg JARファイルをダウンロードします。(25/5月現在0.4.0が最新)
https://github.com/zinggAI/zingg/releases

2. 適当なVolumeにJARファイルを配置する

ダウンロードしたJARファイルを、任意のVolumeにアップロードします。

クラスターのライブラリはワークスペース、Volume、S3などのクラウドストレージ等に配置されてある必要があるので、Volume以外の場所が良い場合は適宜配置先を変更してください。

https://docs.databricks.com/aws/en/libraries/cluster-libraries

3. クラスターの設定からライブラリをインストールする

クラスター設定ページを開き、「ライブラリ」タブから「新規をインストール」を選択し、ステップ2でVolumeに配置したJARファイルを指定してインストールします。

2. ノートブックでZinggのPythonパッケージをインストールする

1.でインストールしたクラスターをアタッチしたノートブック上で、以下のセルを実行し、ZinggのPythonパッケージをインストールします。

%pip install zingg
dbutils.library.restartPython()

2. 名寄せ処理に必要なモジュールやライブラリをインストールする

Zinggの他に、ノートブックで使用するその他のライブラリやモジュール、変数をあらかじめインストールまたは準備しておきます。

ライブラリのインストール

データを表形式で整形して表示するためのPythonパッケージ tabulateをインストールします

!pip install tabulate
dbutils.library.restartPython()

Zinggで利用するワークスペースを整理

Zinggで名寄せ対象とするデータセットの格納場所や、処理結果の出力先となるパスを指定します。

ただし、ここに関して少し注意が必要で、ZinggはApache Sparkで動作する名寄せライブラリですが、Databricksとネイティブに統合されているわけではないため、実装する際に気をつけなければいけないポイントがあります。

具体的には次のような点に注意が必要です。

  • Zinggで名寄せしたいデータセットはDatabricks上では外部テーブルとして登録されている必要がある
  • Zinggで作成されるデータセットや、トレーニングデータ、モデルなどはUnity Catalogのテーブルとして登録することができないため、DBFSやボリュームなどに作成される

ZinggとDatabricks、クラウドストレージの関係をまとめると次のような図になります。

そのため、事前に「インプットしたいデータセットを外部テーブルとして作成」し、「Zinggの出力先となるVolumeを作成」しておく必要があります。

本記事では作成方法については割愛させていただきますが、以下ページを参考に作成していただけると良いかと思います。

ノートブックでは次のように変数にパスを格納しておきます。

# Zinggで作成されるデータセットや、トレーニングデータ、モデルが格納されるVolume
# catalog, schema, volumeはそれぞれ適当な値を指定してください。
zinggDir = "/Volumes/catalog/schema/volume"

# モデルのID。名寄せの処理ごとに編集してください。
modelId = "hogehoge-model"

# 後述の名寄せ処理でラベル付けした情報が格納されるフォルダ
MARKED_DIR = zinggDir + "/" + modelId + "/trainingData/marked/"
UNMARKED_DIR = zinggDir + "/" + modelId + "/trainingData/unmarked/"

名寄せ処理で利用する関数の定義、モジュールのimport

後続の処理で利用する関数を定義したり、モジュールを先にimportします。

import pandas as pd
import numpy as np

import time
import uuid

from tabulate import tabulate
from ipywidgets import widgets, interact, GridspecLayout
import base64

import pyspark.sql.functions as fn

##this code sets up the Zingg Python interface
from zingg.client import *
from zingg.pipes import *

def cleanModel(isFull=False):
    if(isFull):
        dbutils.fs.rm(zinggDir + "/" + modelId, recurse=True)
    else:
        dbutils.fs.rm(MARKED_DIR, recurse=True)
        # drop unmarked data
        dbutils.fs.rm(UNMARKED_DIR, recurse=True)
    return

# assign label to candidate pair
def assign_label(candidate_pairs_pd, z_cluster, label):
  '''
  The purpose of this function is to assign a label to a candidate pair
  identified by its z_cluster value.  Valid labels include:
     0 - not matched
     1 - matched
     2 - uncertain
  '''

  # assign label
  candidate_pairs_pd.loc[ candidate_pairs_pd['z_cluster']==z_cluster, 'z_isMatch'] = label

  return

def count_labeled_pairs(marked_pd):
  '''
  The purpose of this function is to count the labeled pairs in the marked folder.
  '''

  n_total = len(np.unique(marked_pd['z_cluster']))
  n_positive = len(np.unique(marked_pd[marked_pd['z_isMatch']==1]['z_cluster']))
  n_negative = len(np.unique(marked_pd[marked_pd['z_isMatch']==0]['z_cluster']))

  return n_positive, n_negative, n_total

# setup widget
available_labels = {
    'No Match':0,
    'Match':1,
    'Uncertain':2
    }

# dbutils.widgets.dropdown('label', 'Uncertain', available_labels.keys(), 'Is this pair a match?')

3. Zinggの設定(Config)を定義

Zinggではモデルを作成する際、必要な設定情報はすべてConfig(実体はJSONオブジェクト)を介して渡すことになります。

#Zinggを動作させるの必要なConfigの作成をサポートするクラス
args = Arguments()
# モデル名やZinggの作業ワークスペースをセット
args.setModelId(modelId)
args.setZinggDir(zinggDir)

4. Zinggのインプット、アウトプット先を設定する

インプット(名寄せ対象データセットの格納先)とアウトプット(処理結果の出力先)を設定します

# 名寄せ対象のデータセット
# 前述しましたが外部テーブルとして登録されてある必要があります。
input_path = spark.sql("DESCRIBE DETAIL catalog.schema.table").select('location').collect()[0]['location']

# Pipeというクラスより、インプットのデータセットを指定する
inputPipe = Pipe(name='input', format='delta')
inputPipe.addProperty('path', input_path )

# Argumentsに追加
args.setData(inputPipe)
# アウトプットとなるデータセットやモデルが格納されるDBFS上のパス
output_path =  zinggDir + "/" + modelId + "/outputData"

# Pipeというクラスより、アウトプットのデータセットを指定する
outputPipe = Pipe("resultOutput", format="delta")
outputPipe.addProperty("path", output_path)

# Argumentsに追加
args.setOutput(outputPipe)

5. フィールドの属性を指定する

次に、Zinggがマッチング処理の中でフィールドの取り扱いについて判断するために、使用するフィールド毎に属性を指定します。

マッチのタイプは以下の中から指定できます。詳細はZinggのドキュメントを参照してください。

マッチタイプ 説明 適用対象
FUZZY タイポ、略語、その他のバリエーションを含む大まかなマッチ。 string, integer, long, double, date
EXACT バリエーションを許容しない。国コード、PINコード、およびバリエーションが想定されないその他のカテゴリ変数に適しています。 string, integer, long, date, boolean
DONT_USE 出力には表示されますが、計算は行われません。出力に必要なIDなどのフィールドに役立ちます。showConciseがtrueに設定されている場合、DONT_USEフィールドはラベル付け中にユーザーに表示されません。 any
EMAIL @文字の前のメールのID部分のみをマッチします。 any
PINCODE xxxxx-xxxxのようなPINコードをマッチします(xxxxx付き)。 string
NULL_OR_BLANK デフォルトでは、Zinggはnullをマッチとして扱いますが、FUZZYのような別のマッチタイプを持つフィールドにこれ(NULL_OR_BLANK)を追加すると、Zinggはnull値の機能を構築して学習します。 string, integer, long, date, boolean
TEXT 2つの文字列間の単語の重複を比較します。タイプミスがあまりない記述的なフィールドに適しています。 string
NUMERIC 文字列から数字を抽出し、両方の文字列間で同じものがいくつあるかを比較します(例:アパート番号)。 string
NUMERIC_WITH_UNITS 製品コードまたは単位付きの数字を抽出します(例:16gbの文字列)。両方の文字列間で同じものがいくつあるかを比較します。 string
ONLY_ALPHABETS_EXACT アルファベット文字のみを見て、それらが完全に同じであるかどうかを比較します。文字列内の数字が問題ではない場合(例:建物を調べているが、フラット番号を無視したい場合)。 string
ONLY_ALPHABETS_FUZZY 文字列内の数字を無視し、あいまいな比較を実行します。タイプミスのある住所などのフィールドに役立ちます。NUMERICを使用して番地を個別に調べたい場合。 string
MAPPING_(FILENAME) タイプミス、略語、および指定された値に対するその他のバリエーションとのマッチング。ニックネーム、性別、および会社略語などの場合に使用できます。たとえば、IBM、Int. Biz. MachineをInternational Business Machineに、M、0、MrをMaleに(つまり、複数のエンティティを単一のエンティティに)マッピングする場合。 string

ノートブックでは次のように設定します。

# フィールド毎にマッチタイプを指定します。
# 名寄せ処理をしたいデータセット毎に実際は設定してください
rec_id = FieldDefinition("rec_id", "string", MatchType.DONT_USE)
name = FieldDefinition("name", "string", MatchType.FUZZY)
## Match Typeは複数指定可能です。
email = FieldDefinition("email", "string", [MatchType.FUZZY,MatchType.EMAIL])
dob = FieldDefinition("dob", "string", [MatchType.FUZZY,MatchType.NUMERIC])
address = FieldDefinition("address", "string", MatchType.FUZZY)
phone = FieldDefinition("phone", "string", [MatchType.FUZZY,MatchType.NUMERIC])

# フィールドの定義をArrayにする
fieldDefs = [rec_id, name, email, dob, address, phone]

# Argumentsに設定
args.setFieldDefinition(fieldDefs)

6. パラメータを設定する

次に、Spark上で分散処理を行う際のパラメータを設定します。具体的にはnumPartitionslabelDataSampleSizeを設定します。

  • numPartitions
  • labelDataSampleSize
    • labelDataSampleSizeでは、後述するfindTrainingData処理でサンプルする割合を指定できます。サンプリングする割合が大きいほうが、ラベリングする際にケースとして相応しいペアが出力できる可能性が高くなりますが、処理の時間が長くなりすぎる場合もあるので、1/10に減らすなど調整を行ってください。
# Sparkのパーティション数。ドキュメントより推奨はコア数 × 20~30とのこと
num_partitions = sc.defaultParallelism * 20
args.setNumPartitions( num_partitions )

# モデルの学習に使用するデータの割合。
# サンプルサイズを小さく保ち、十分なエッジケースを迅速に検出できるよう、0.0001~0.1の間で調整してください。
# サンプルサイズが大きいと、ジョブがfindTrainingDataサンプルを精査するのに時間がかかります。サイズが小さすぎると、Zinggが適切なエッジケースを見つけられない可能性があります。
args.setLabelDataSampleSize(0.05)

7. トレーニングデータ作成ジョブを実行する

ここまでの設定を用いて、トレーニングデータ(ラベリング候補ペア)を作成するジョブ(findTrainingDataフェーズ)を実行します。

options = ClientOptions([ClientOptions.PHASE,"findTrainingData"])

# トレーニングデータを特定するジョブを実行
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

8. ラベリングに使用するデータセットを作成する

レーニングデータの作成が終わったら、ラベリングに使用するデータセットを作成します。

ちなみに、ステップ7と8の処理(トレーニングデータ作成とラベリング用データセット準備)は、findAndLabelという単一のフェーズ指定で連続実行も可能です。しかし、トレーニングデータの作成には時間がかかる場合があるため、ここでは途中から再開しやすいようにステップを分けて記述しています。

options = ClientOptions([ClientOptions.PHASE,"label"])

# ラベリングに使用するデータセットを作成するジョブを実施
zingg = ZinggWithSpark(args, options)
zingg.init()

次に、ラベリングの候補となるペアが何件あるか確認します。

# get candidate pairs
candidate_pairs_pd = getPandasDfFromDs(zingg.getUnmarkedRecords())

# if no candidate pairs, run job and wait
if candidate_pairs_pd.shape[0] == 0:
  print('No unlabeled candidate pairs found.  Run findTraining job ...')

else:
    # get list of pairs (as identified by z_cluster) to label
    z_clusters = list(np.unique(candidate_pairs_pd['z_cluster']))

    # identify last reviewed cluster
    last_z_cluster = '' # none yet

    # print candidate pair stats
    print('{0} candidate pairs found for labeling'.format(len(z_clusters)))

ラベリングの候補としていくつペアあるか出力されます。(画像ではラベリングの候補として20ペアあることがわかります)

9. ラベリングを行う

いよいよ、抽出された候補ペアに対して実際にラベリングを行います。

ノートブック上に候補ペアがHTML形式で表示され、各ペアに対して「Uncertain(不明)」「Match(一致)」「No Match(不一致)」のいずれかをボタンを押すことでラベリング作業を行います。
(Databricksがノートブック機能を提供している強みですね!)

# Label Training Set

# define variable to avoid duplicate saves
ready_for_save = False

# user-friendly labels and corresponding zingg numerical value
# (the order in the dictionary affects how displayed below)
LABELS = {
  'Uncertain':2,
  'Match':1,
  'No Match':0
  }

# GET CANDIDATE PAIRS
# ========================================================
#candidate_pairs_pd = get_candidate_pairs()
n_pairs = int(candidate_pairs_pd.shape[0]/2)
# ========================================================

# DEFINE IPYWIDGET DISPLAY
# ========================================================
display_pd = candidate_pairs_pd.drop(
  labels=[
    'z_zid', 'z_prediction', 'z_score', 'z_isMatch', 'z_zsource'
    ],
  axis=1)

# define header to be used with each displayed pair
html_prefix = "<p><span style='font-family:Courier New,Courier,monospace'>"
html_suffix = "</p></span>"
header = widgets.HTML(value=f"{html_prefix}<b>" + "<br />".join([str(i)+"&nbsp;&nbsp;" for i in display_pd.columns.to_list()]) + f"</b>{html_suffix}")

# initialize display
vContainers = []
vContainers.append(widgets.HTML(value=f'<h2>Indicate if each of the {n_pairs} record pairs is a match or not</h2></p>'))

# for each set of pairs
for n in range(n_pairs):

  # get candidate records
  candidate_left = display_pd.loc[2*n].to_list()
  candidate_right = display_pd.loc[(2*n)+1].to_list()

  # define grid to hold values
  html = ''

  for i in range(display_pd.shape[1]):

    # get column name
    column_name = display_pd.columns[i]

    # if field is image
    if column_name == 'image_path':

      # define row header
      html += '<tr>'
      html += '<td><b>image</b></td>'

      # read left image to encoded string
      l_endcode = ''
      if candidate_left[i] != '':
        with open(candidate_left[i], "rb") as l_file:
          l_encode = base64.b64encode( l_file.read() ).decode()

      # read right image to encoded string
      r_encode = ''
      if candidate_right[i] != '':
        with open(candidate_right[i], "rb") as r_file:
          r_encode = base64.b64encode( r_file.read() ).decode()

      # present images
      html += f'<td><img src="data:image/png;base64,{l_encode}"></td>'
      html += f'<td><img src="data:image/png;base64,{r_encode}"></td>'
      html += '</tr>'

    elif column_name != 'image_path':  # display text values

      if column_name == 'z_cluster': z_cluster = candidate_left[i]

      html += '<tr>'
      html += f'<td style="width:10%"><b>{column_name}</b></td>'
      html += f'<td style="width:45%">{str(candidate_left[i])}</td>'
      html += f'<td style="width:45%">{str(candidate_right[i])}</td>'
      html += '</tr>'

  # insert data table
  table = widgets.HTML(value=f'<table data-title="{z_cluster}" style="width:100%;border-collapse:collapse" border="1">'+html+'</table>')
  z_cluster = None

  # assign label options to pair
  label = widgets.ToggleButtons(
    options=LABELS.keys(),
    button_style='info'
    )

  # define blank line between displayed pair and next
  blankLine=widgets.HTML(value='<br>')

  # append pair, label and blank line to widget structure
  vContainers.append(widgets.VBox(children=[table, label, blankLine]))

# present widget
display(widgets.VBox(children=vContainers))
# ========================================================

# mark flag to allow save
ready_for_save = True

ノートブックのアウトプットでは次のようなHTMLが表示されます。

実際にレコードが一致しているか、一致していないかを人の目で1つひとつチェックを行い、ボタンを押してラベリングを行っていきます。

ステップ8で作成されたペア分のチェックが終了したら、次のセルを実行し、内容を保存します。

if not ready_for_save:
  print('No labels have been assigned. Run the previous cell to create candidate pairs and assign labels to them before re-running this cell.')

else:

  # ASSIGN LABEL VALUE TO CANDIDATE PAIRS IN DATAFRAME
  # ========================================================
  # for each pair in displayed widget
  for pair in vContainers[1:]:

    # get pair and assigned label
    html_content = pair.children[1].get_interact_value() # the displayed pair as html
    user_assigned_label = pair.children[1].get_interact_value() # the assigned label

    # extract candidate pair id from html pair content
    start = pair.children[0].value.find('data-title="')
    if start > 0:
      start += len('data-title="')
      end = pair.children[0].value.find('"', start+2)
    pair_id = pair.children[0].value[start:end]

    # assign label to candidate pair entry in dataframe
    candidate_pairs_pd.loc[candidate_pairs_pd['z_cluster']==pair_id, 'z_isMatch'] = LABELS.get(user_assigned_label)
  # ========================================================

  # SAVE LABELED DATA TO ZINGG FOLDER
  # ========================================================
  # make target directory if needed
  dbutils.fs.mkdirs(MARKED_DIR)

  # save label assignments
  # save labels
  zingg.writeLabelledOutputFromPandas(candidate_pairs_pd,args)

  # count labels accumulated
  marked_pd_df = getPandasDfFromDs(zingg.getMarkedRecords())
  n_pos, n_neg, n_tot = count_labeled_pairs(marked_pd_df)
  print(f'You have accumulated {n_pos} pairs labeled as positive matches.')
  print(f'You have accumulated {n_neg} pairs labeled as not matches.')
  print("If you need more pairs to label, re-run the cell for 'findTrainingData'")
  # ========================================================

  # save completed
  ready_for_save = False

上記のセルを実行すると、これまでにラベリングしたペアの総数(一致、不一致ごとの件数)が出力され、進捗を確認できます。

Zinggのドキュメントでは、最低でもマッチと判断できるペアが30~40必要とあり、マッチと判断した数が足りない場合は、あらためて「7. トレーニングデータ作成ジョブ実行する」から繰り返し、十分な教師データを収集します。

また、候補となるペアの質が低い場合(明らかに別の人がリストアップされている場合など)、labelDataSampleSizeの割合が小さい可能性があるので、必要に応じてこちらも検討してみてください。

10. モデルのトレーニン

十分な数のラベリング済み教師データが収集できたら、それらを基にマッチングモデルを学習します(trainMatchフェーズ)

options = ClientOptions([ClientOptions.PHASE,"trainMatch"])

#トレーニングジョブを実施します(時間かかります)
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

11. マッチング結果を確認する

モデル学習が完了したら、そのモデルを用いて名寄せ対象の全データに対して予測を実行し、どのレコードペアが同一エンティティとして判断されたかを確認します。

予測結果が出力されるデータセットは、前述のZingg設定(outputPipe)で指定したパスに格納されます。
この出力データセットには、入力データに加えて、Zinggが付与したクラスタIDや類似度スコアなどの情報が含まれます。

from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

# テーブルの取得
outputDF = spark.read.load(zinggDir + "/" + modelId + "/outputData/")

# 内容の確認
display(outputDF.orderBy("z_cluster").head(25))

具体的には、以下の列が追加されています。

  • z_minScore
    • そのレコードが同じクラスタ内の別のレコードと持つ最小の類似性スコアを表します。
  • z_maxScore
    • そのレコードが同じクラスタ内の別のレコードと持つ最大の類似性スコアを表します。
  • z_cluster
    • Zinggによって割り当てられたクラスターのID。同一エンティティと判定されたレコード群には、共通のz_cluster IDが付与されます。

z_cluster をそのまま一意のエンティティを表すユニークIDとして使用できますが、z_minScorez_maxScoreを元に閾値をユーザ側で定義(XX%以上は一致とする。XX ~ YY%はユーザで改めて目検。YY%以下は別人とする等々...)し、任意のオペレーションをすることもできます。
(詳細については、以下のZingg公式ドキュメントもご参照ください)
https://www.zingg.ai/documentation-article/step-by-step-identity-resolution-with-python-and-zingg

以上で名寄せの一連のステップが全て終了しました!

まとめ

本ブログ記事では、Zinggライブラリを用いた確率的マッチングの具体的な手法を紹介しました。

今回取り上げたZinggのような教師あり機械学習を活用する名寄せライブラリも、Databricksのノートブック機能より、データ準備からラベリング、モデル学習、推論までを一貫したワークロードとして効率的に実行できます。

また、今回の記事では1つのデータセット内で名寄せする方法について取り上げましたが、他にもZinggを使用して、増分するデータセットに対して継続的に名寄せ処理を実施できたり (runIncrementalエンタープライズ版で利用可能)、複数のデータセット間で一致し得るレコードを抽出も実行可能です(link )ので、もし興味がある方は是非公式ドキュメントの方もご覧ください。

三記事にわたり名寄せの概要からDatabricks上でSplinkによる決定論的手法、Zinggによる確率的手法まで名寄せのアプローチについて紹介しましたが、いかがだったでしょうか?
今回の一連の記事が、皆様のデータ活用や名寄せ業務の一助となれば幸いです。

私たちは一緒に働いてくれる仲間を募集しています!

電通総研 キャリア採用サイト 電通総研 新卒採用サイト

執筆:@kumakura.koki、レビュー:@kinjo.ryuki
Shodoで執筆されました