こんにちは。エンタープライズ第三本部 マーケティングIT部の熊倉です。
このブログでは、高速に動作する分散処理エンジン「Apache Spark」とオープンテーブルフォーマット「Delta Lake」を基盤としたレイクハウス環境を構築できるDatabricks上で管理しているデータセットに対して、名寄せ処理を行うアプローチについて紹介します。
実際のノートブックの処理についても紹介しようと思っていますが、想定よりも内容が多くなってしまったので、名寄せの概要を紹介する「概要編」、ソースコードなど具体的な名寄せ処理の具体的な内容を紹介する「決定論的マッチング編」「確率的マッチング編」の三部作にしようと思います。
本記事は決定論的マッチング編となります。名寄せ処理の概要について説明したブログ記事も公開しておりますので、名寄せ処理にまだ馴染みのない方は、ぜひそちらもご覧いただけると幸いです。
本記事ではSplinkというライブラリを利用し、決定論的マッチングを行う具体的な処理内容や流れについて説明していきますが、Pysparkの文法など基本的な内容については割愛させていただきます。(Splinkの概要については前編の概要編で紹介してますのでこちらも合わせてご覧ください)
では、早速データの名寄せを開始していきましょう!
1. ライブラリをインストールする
決定論的マッチング処理における最終的なゴールは、SQLのWHERE句に相当する条件の組み合わせを作成することです。
SQLを記述することでも、もちろん決定論的マッチングは実現可能ですが、この記事では再利用性や可読性を考慮し、PythonのOSS名寄せライブラリであるSplinkを用いた手法を紹介します。
まずは最初にライブラリをインストールします
%pip install splink
2. 名寄せを行いたいデータセットを取り込む
df = spark.read.table("catalog.schema.table")
(本記事ではUnity Catalogに登録されているテーブルから読み込んでいますが、もちろんファイルから直接データを読み込むことも可能です)
3. ルールを定義する
次にマッチングに使用するルールを定義します。
決定論的マッチングは「ルールベースマッチング」とも呼ばれるように、このルール定義のステップが処理の根幹であり、最も重要なステップです。
ルールの定義は大きく分けて2つあります。
ルールの定義①: SQLで定義する
ルールの定義方法の1つ目は、SQLで定義する方法になります。
具体的には次のような形でルールを定義します。
rules = []
rules.append("l.name = r.name AND l.email = r.email")
SQL文中のlおよびrというテーブルエイリアスは、同一テーブルをクロス結合(自己結合)する際の左テーブル (l) と右テーブル (r) をそれぞれ指します。
SQL文の中でANDやORを使用して複雑な条件も設定できますが、OR条件はパフォーマンスが低下する可能性があるため、Splinkでは直接の使用は非推奨とされています。
OR条件のように複数の条件のいずれかに合致する場合を扱いたい場合は、各条件を個別のルールとしてrulesリスト(配列)に追加することで、同等の処理を実現できます。
具体的には、
rules.append("l.name = r.name OR l.email = r.email")
というルールを設定したい場合、
rules.append("l.name = r.name") rules.append("l.email = r.email")
のようにリストに追加します。(ルールはリストに格納された順に評価されます)
補足すると、OR条件を含む結合は非等価結合となり、テーブルの直積に近い処理が必要になるため、一般的にパフォーマンスが低下します。Splinkでは、複数のルールをリストに格納することで、内部的にUNIONとNOTを組み合わせた処理に変換し、OR条件と同等の結果をより効率的に得られるように工夫されています。(詳細はSplinkの公式ドキュメントをご参照ください)
詳細については、以下Splink公式ドキュメントに紹介されているので、こちらも合わせてご確認ください。
https://moj-analytical-services.github.io/splink/topic_guides/blocking/performance.html
ルールの定義②: block_onを利用する
ルールの定義方法の2つ目は、Splinkで用意されているblock_on関数の利用になります
from splink import block_on rules.append(block_on("dob", "address"))
Splinkの関数block_onは、SQLのl.dob = r.dob AND l.address = r.addressと変換されます。
注意が必要なルール: 等価結合以外の条件
単純な列の一致比較(等価結合)以外、例えばLevenstein距離(編集距離)が特定の閾値以上であるといった条件(非等価結合)を用いる場合、多くのSQLエンジンでは、全レコードの総当たりに近いペア計算が発生し、計算量が膨大になる傾向があります。このような非等価結合を含むルールを設定する際は、パフォーマンスへの影響に十分な注意が必要です。
要件として、マッチングルールとして設定する必要がある場合もあるかと思いますが、注意が必要な条件になります。
4. マッチ数を確認する
マッチング処理を行いレコードを突合する前に、3. で設定したルールで何件のレコードがマッチされるか確認できます。
from splink import SparkAPI from splink.blocking_analysis import cumulative_comparisons_to_be_scored_from_blocking_rules_data db_api = SparkAPI(spark_session=spark) # ruleの配列には以下を格納 # "l.name = r.name AND l.email = r.email" # block_on("dob", "address") count = cumulative_comparisons_to_be_scored_from_blocking_rules_data( table_or_tables=df, blocking_rules=rules, db_api=db_api, link_type="dedupe_only", unique_id_column_name="id" ) display(count)
出力結果は以下のようになります。

ルールはリスト(rules)に入っている順に評価されるので、上記のソースコードの例では
- "l.name = r.name AND l.email = r.email"
- block_on("dob", "address")
の順に評価されます。
出力結果より、"l.name = r.name AND l.email = r.email"のルールでは4345件がマッチと判断され、block_on("dob", "address")のルールでは348件マッチと判断され、合計で4693件のペアがマッチと判断されたことが分かります。
注 : 「block_on("dob", "address")のルールでは348件マッチ」とありますが、これはblock_on("dob", "address")のルール単体でヒットしたペアが348件という訳ではなく、"l.name = r.name AND l.email = r.email"のルールでヒットしなかったペアのうち、348件が2. のルールにヒットした、ということになります。
ちなみに、リストに格納されるルールの順を逆にすると以下のような出力結果になります。(ダミーデータの都合上、マッチするペアがほぼ一緒だったりしますが...)

cartesianについて
cartesianカラムに78,131,250という数値が入っていますが、データセット内で作成可能な全てのユニークなペアの総数になります。これは(レコードの総数)* (レコードの総数 -1 )/ 2 で求められます。
(今回使用したダミーデータのレコード総数は12,501件であるため、作成可能なペアの総数は 12,501 × 12,500 / 2 = 78,131,250 となります)
match_keyについて
出力結果にはmatch_keyカラムがありますが、これは各ルールに割り当てられた識別キー(インデックス)です。
後述のマッチング結果を確認する中で改めて登場します。
ネクストアクション
マッチ数を確認し必要に応じて3. で設定したルールの見直しを行います。
マッチ候補となるペア数は、後続のマッチング処理における計算リソースや処理時間に直接影響するため、「マッチ数が過大になっているルールはないか(処理負荷が高すぎないか)」、逆に「マッチ数が極端に少なく、効果の薄いルールはないか」といった観点で確認します。
基本的には、ルールをより厳しくする方向(例:リストに格納するルール数を調整する、ANDで条件を追加するなど)で再定義を検討しますが、実際にはプロジェクトごとの要件(精度、再現率、処理時間など)を総合的に加味して調整していくことになります。
5. マッチングの実施
ルールの定義が完了したら、実際にマッチングを行い突合したレコードを確認します。
settings = SettingsCreator(
link_type="dedupe_only",
blocking_rules_to_generate_predictions=rules,
# ユニークな列を指定します。
unique_id_column_name="id",
)
linker = Linker(df, settings, db_api=db_api)
# マッチングの実施
df_predict = linker.inference.deterministic_link()
df_predict_spark = df_predict.as_spark_dataframe()
df_predict_sparkはマッチングで突合されたレコードのペアが格納されたデータフレームになります。
display(df_predict_spark.count())
display(df_predict_spark.head(15))

ペアの情報のほか、出力結果にはmatch_keyカラムが追加されています。
こちらは「4. マッチ数を確認する」で確認できるmatch_keyに対応しており、どのルールによって検出されたペアなのかを確認することができます。
(図の例だと、52行目のペアはmatch_keyが0、つまり"l.name=r.name AND l.email = r.email"というルールによってヒットしたペアで、53行目のペアはmatch_keyが1、つまり block_on("dob", "address") というルールによってヒットしたことが分かります)

6. クラスターを作成する
同一ペアの検出は5. のステップで完了しましたが、ステップ6ではさらに進んで、同一と判定されたエンティティ(レコード群)に対して新しいユニークIDを付与します(この処理をクラスタリング、またはクラスター化と呼びます)。
from pyspark.sql.functions import col # クラスター化の実行 clusters = linker.clustering.cluster_pairwise_predictions_at_threshold(df_predict) df_cluster_spark = clusters.as_spark_dataframe() # 結果の確認 display(df_cluster_spark.orderBy(col("cluster_id")).head(15))

cluster_idカラムが追加され、同一エンティティと判定されたレコード群ごとにユニークなIDが付与されていることが確認できます。
名寄せ処理後のデータセットでは、この付与されたcluster_idを新たなユニークIDとして扱うことになります。
以上で名寄せの一連のステップが全て終了しました!
まとめ
本ブログではSplinkを使用した決定論的マッチングの具体的な手法について簡単に紹介しました。
処理自体は比較的シンプルかなと思いますが(冒頭で書いたようにSQLのWHERE句を作るのが最終的なゴールなので)、実際のプロジェクトでは、このルール定義部分で難航することが多いのかなと思います。
以上、Databricks上で実現するデータ名寄せ【決定論的マッチング】編でした。
次回はDatabricks上で実現するデータ名寄せ【確率的マッチング】編になります!
執筆:@kumakura.koki、レビュー:@akutsu.masahiro
(Shodoで執筆されました)



