電通総研 テックブログ

電通総研が運営する技術ブログ

データエンジニアリングのAI駆動開発: Data Contractを応用したデータパイプラインの生成

はじめに

こんにちは、XI本部エンジニアリングテクノロジーセンターの徳山広士です。

この記事では、データ基盤/データ分析基盤におけるデータパイプラインのAI駆動開発の手法を提案します。
ODCS (Open Data Contract Standard)」フォーマットの"Data Contract"を仕様書として生成AIへ与えてデータパイプラインのコードを生成させる手法です。
Data Contractは、データの仕様(テーブル構造、カラム定義、データ型など)と品質要件をYAML形式で定義する標準フォーマットです。

当記事の主な内容は、Data Contractやローカル開発環境などの基礎的な説明から当件のAI駆動開発方法、コード生成に成功した検証結果についてです。

  • 想定読者: データエンジニア、データマネジメント業務担当者
  • 当記事で得られる知見: データパイプラインのAI駆動開発の勘所
  • 前提知識: データパイプライン(ETL/ELT処理)の基礎知識

一般的に労力と時間のかかるデータマネジメント・サイクルをAI駆動開発によって加速できればと考えて、データエンジニアリングのAI駆動開発手法を探求し、この記事にまとめました。

データパイプラインのAI駆動開発実現の課題

生成AIに何を伝えるべきか

アプリケーションのAI駆動開発においては、ドメインユースケース、機能、制約などを生成AIに説明することでコード生成するアプローチを見聞きします。では、データエンジニアリングにおいては何を説明すれば良いのか?どのように指示を出せば包括的に開発してくれるのか?これが最初に直面した課題でした。

自然言語のプロンプトで目的のSQLクエリについて説明する方法も考えましたが、複雑なロジックの表現が困難であり、細かい要件が伝わらず、プロンプトの再現性・再利用性にも疑問がありました。

アプローチ: データアーキテクチャとデータモデルを伝える

この課題に対して、データエンジニアリングの観点から「データアーキテクチャ」と「データモデル」の2つの軸で情報を整理し、データ基盤/データ分析基盤の全体像と開発方法を生成AIに伝えるアプローチを採用しました。

データアーキテクチャの説明

生成AIが全体像を理解するための情報:

  • データレイヤー構造と各レイヤーの責務: raw, staging, core, martなど
  • データストアの構成: データレイク, データウェアハウス, データマートなど
  • 各データストアのデータフォルダ構成: "raw/システム名/YYYYMMDD"など
  • データ処理アプローチ: ELTなど
  • インフラストラクチャ構成: 各データストアの物理実装で利用のSaaSなど
  • データパイプライン構成技術: dagster, dbt coreなど

これらの内容をプロジェクトドキュメント(DESIGN.md等)として整備しました。

データモデルの説明

生成AIがデータパイプラインで実現すべき対象の情報:

  • データの構成: テーブル, カラム, リレーションシップなど
  • データ仕様: データ型, 主キー等の制約, 変換ロジックなど
  • データ品質仕様: 満たすべき基準, テストルールなど

これらをData Contract(ODCS形式)としてドキュメント整備しました。

この2つを組み合わせることで、生成AIはデータパイプラインの実行環境全体の設計意図を理解し、データ基盤/データ分析基盤に適した具体的な実装を行えるようになりました。

データパイプライン作成依頼

生成AIへの開発依頼は、データパイプラインの要件を ADR (Architecture Decision Record) に記述し、そのADRで定義されたデータパイプラインを関連するData Contractにもとづいて開発するように生成AIへ依頼する方法を採用しました。
ADRに少なくとも以下の内容を書くようにしました。

  • データパイプラインの作成対象のデータ名
  • データ提供の背景や目的, データ利用のユースケース

ADRの記載内容の参考イメージ

  • タイトル: P001 ECビジネスデータパイプライン構造
  • ステータス: Accepted
  • コンテキスト: Eコマースビジネスパイプラインのデータ統合対象のデータを決める必要がある。
  • 決定事項: Eコマースビジネスパイプラインは、web_salesとweb_returnsを対象とした構成にする。

検証内容の詳細

実際の検証環境や要件、作成対象のデータなどを説明します。

検証ではTPC-DS(*1)のデータを使用し、生成AIには Claude Code(Sonnet 4.5, Anthropic) を使用しました。
web_salesテーブルの更新用のソースデータを変換してweb_salesへ統合するデータパイプラインのSQLクエリを生成します。

※1 TPC-DS は Transaction Processing Performance Council (TPC) の商標です。
本記事はTPCベンチマーク結果の公表を目的としたものではありません。

ローカル開発環境

データパイプラインのデプロイ先はAWSやAzure、Google CloudなどのSaaSですが、AI駆動開発を効率化するためにデプロイ先を模したローカル完結型の開発環境を整備しました。

DuckDBで構成: SaaSへの外部通信を排除し、生成AIの生産性を最大化。SaaSで巨大になりがちなデータレイクやデータウェアハウスをコンパクトなDuckDBで代用しました。

コードベースのツール選定: 生成AIがコードを直接参照・実装できるよう、全てコードベースのツールを採用。GUIのETLツールは、生成AIとの連携で独自SDK/API経由が必要となり生産性の低下が懸念されるため使用しませんでした。

ツールによるインフラストラクチャの抽象化: デプロイ先のSaaSとは物理的な環境構成が異なりますが、dbt coreなどのソフトウェアが抽象化し環境差分を吸収してくれます。

データアーキテクチャ概要

データパイプラインのデプロイ先のデータ分析基盤のデータアーキテクチャを図にしたものです。論理構成の主要部分のみを抜粋し、簡潔な内容を掲載しております。

dataArchitecture

  • データレイヤー: "raw", "staging", "core", "mart" の4層
  • データストア: データレイクとデータウェアハウス
  • データ処理アプローチ: ELT
  • データパイプライン構成技術: Dagster, dbt core, Soda Core
  • インフラストラクチャ: AWSとAzure, Google Cloudの3大クラウドサービスでそれぞれ実施、詳細は割愛

データモデル

web sales ※一部のカラムは割愛
web_sales_schema.svg

ディメンショナルモデリングされたスタースキーマ構造となっており、複数のサロゲートキーを持っています。データ構造は把握しやすいもののETL/ELT処理の観点では10個以上のテーブル結合や中間テーブルを介したテーブル結合、一部カラムの演算処理が必要な複雑な構造になっています。

作成対象のデータパイプライン

rawレイヤー
生データのデータファイルが外部テーブル (External table) としてデータウェアハウスに既にテーブル化されており、そのテーブルを参照すれば良いため、処理の実装は不要。

stagingレイヤー
rawレイヤーのデータをもとにマスターデータとの結合によるデータ取得や演算処理などのビジネスロジックをdbtのSQLクエリモデルとして実装

  • SELECT対象のカラム数: 35個
  • テーブル結合数: 12個

coreレイヤー
stagingレイヤーのデータをもとに増分処理でデータ更新する処理をdbtのSQLクエリモデルとして実装

  • SELECT対象のカラム数: 38個
  • テーブル結合数: 0個

実装ルール

  • SELECT文でカラムの型キャストを明記
  • SELECT文でカラムの名称をAS文で指定
  • CTE (Common Table Expression) でソースデータ定義
  • coreレイヤーでは増分処理を実装

生成AIへの依頼

前述の「アプローチ: データアーキテクチャとデータモデルを伝える」に記載のドキュメントに加えて、開発ガイドラインも整備し開発依頼を行いました。

生成AIへ提供した情報

実際のプロンプト

以下のファイルを読み込んで、当プロジェクトについて理解してください。  
- ./docs/DESIGN.md  
- ./docs/DATAMODEL_GENERATION_GUIDELINES.md  
  
ADRで定義されたデータパイプラインを関連する`Data Contract`にもとづいて作成します。  
今回は、ADRの"./docs/adr/pipelines/P001-ecommerce-business-pipeline-structure.md"で決定したデータパイプラインを作成してください。  

実際のData Contractのサンプル
"ws_item_sk"という主要なカラムのData Contractを一部抜粋したものです。より詳しい内容と解説は詳細解説にて後述します。

- name: ws_item_sk
  businessName: ウェブ販売商品サロゲートキー
  logicalType: string
  physicalType: STRING
  primaryKey: true
  primaryKeyPosition: 1
  transformSourceObjects:
    - core_tpcds.item.i_item_sk
  transformLogic: SELECT i_item_sk FROM s_web_order JOIN s_web_order_lineitem ON (word_order_id = wlin_order_id) LEFT JOIN item ON (wlin_item_id = i_item_id AND i_rec_end_date IS NULL)
  relationships:
    - type: foreignKey
      to:
        - core_tpcds.item.i_item_sk
  quality:
    - type: library
      rule: duplicateCount
      name: "主キー重複チェック(ws_item_sk)"
      dimension: uniqueness
      mustBe: 0
      severity: error
      businessImpact: "重複レコードによりファクトテーブルのデータ整合性が崩れ、売上集計に誤差が生じる"

生成されたコード

stagingレイヤー

12テーブルを結合し、35カラムを生成するSQLクエリです。Data ContractのtransformLogicで定義した変換ロジックが正確に反映されています。

with s_web_order as (
    select * from {{ source('raw_tpcds', 's_web_order_1') }}
),

s_web_order_lineitem as (
    select * from {{ source('raw_tpcds', 's_web_order_lineitem_1') }}
)

SELECT
    d1.d_date_sk::STRING AS ws_sold_date_sk,
    t_time_sk::STRING AS ws_sold_time_sk,
    d2.d_date_sk::STRING AS ws_ship_date_sk,
    i_item_sk::STRING AS ws_item_sk,
    c1.c_customer_sk::STRING AS ws_bill_customer_sk,
    c1.c_current_cdemo_sk::STRING AS ws_bill_cdemo_sk,
    c1.c_current_hdemo_sk::STRING AS ws_bill_hdemo_sk,
    c1.c_current_addr_sk::STRING AS ws_bill_addr_sk,
    c2.c_customer_sk::STRING AS ws_ship_customer_sk,
    c2.c_current_cdemo_sk::STRING AS ws_ship_cdemo_sk,
    c2.c_current_hdemo_sk::STRING AS ws_ship_hdemo_sk,
    c2.c_current_addr_sk::STRING AS ws_ship_addr_sk,
    wp_web_page_sk::STRING AS ws_web_page_sk,
    web_site_sk::STRING AS ws_web_site_sk,
    sm_ship_mode_sk::STRING AS ws_ship_mode_sk,
    w_warehouse_sk::STRING AS ws_warehouse_sk,
    p_promo_sk::STRING AS ws_promo_sk,
    word_order_id::STRING AS ws_order_number,
    wlin_quantity::INTEGER AS ws_quantity,
    i_wholesale_cost::DECIMAL(7,2) AS ws_wholesale_cost,
    i_current_price::DECIMAL(7,2) AS ws_list_price,
    wlin_sales_price::DECIMAL(7,2) AS ws_sales_price,
    ((i_current_price::DECIMAL(7,2) - wlin_sales_price::DECIMAL(7,2)) * wlin_quantity::INTEGER)::DECIMAL(7,2) AS ws_ext_discount_amt,
    (wlin_sales_price::DECIMAL(7,2) * wlin_quantity::INTEGER)::DECIMAL(7,2) AS ws_ext_sales_price,
    (i_wholesale_cost::DECIMAL(7,2) * wlin_quantity::INTEGER)::DECIMAL(7,2) AS ws_ext_wholesale_cost,
    (i_current_price::DECIMAL(7,2) * wlin_quantity::INTEGER)::DECIMAL(7,2) AS ws_ext_list_price,
    (i_current_price::DECIMAL(7,2) * web_tax_percentage::DECIMAL(7,2))::DECIMAL(7,2) AS ws_ext_tax,
    wlin_coupon_amt::DECIMAL(7,2) AS ws_coupon_amt,
    (wlin_ship_cost::DECIMAL(7,2) * wlin_quantity::INTEGER)::DECIMAL(7,2) AS ws_ext_ship_cost,
    (wlin_sales_price::DECIMAL(7,2) * wlin_quantity::INTEGER - wlin_coupon_amt::DECIMAL(7,2))::DECIMAL(7,2) AS ws_net_paid,
    (((wlin_sales_price::DECIMAL(7,2) * wlin_quantity::INTEGER) - wlin_coupon_amt::DECIMAL(7,2)) * (1 + web_tax_percentage::DECIMAL(7,2)))::DECIMAL(7,2) AS ws_net_paid_inc_tax,
    (((wlin_sales_price::DECIMAL(7,2) * wlin_quantity::INTEGER) - wlin_coupon_amt::DECIMAL(7,2)) - (wlin_quantity::INTEGER * i_wholesale_cost::DECIMAL(7,2)))::DECIMAL(7,2) AS ws_net_paid_inc_ship,
    ((wlin_sales_price::DECIMAL(7,2) * wlin_quantity::INTEGER) - wlin_coupon_amt::DECIMAL(7,2) + (wlin_ship_cost::DECIMAL(7,2) * wlin_quantity::INTEGER) + i_current_price::DECIMAL(7,2) * web_tax_percentage::DECIMAL(7,2))::DECIMAL(7,2) AS ws_net_paid_inc_ship_tax,
    (((wlin_sales_price::DECIMAL(7,2) * wlin_quantity::INTEGER) - wlin_coupon_amt::DECIMAL(7,2)) - (i_wholesale_cost::DECIMAL(7,2) * wlin_quantity::INTEGER))::DECIMAL(7,2) AS ws_net_profit,
    s_web_order.processing_date::STRING AS processing_date,
    current_timestamp::TIMESTAMP AS ingested_at
FROM s_web_order
    LEFT JOIN {{ ref('date_dim') }} d1 ON (cast(word_order_date AS date) = d1.d_date)
    LEFT JOIN {{ ref('time_dim') }} ON (word_order_time = t_time)
    LEFT JOIN {{ ref('customer') }} c1 ON (word_bill_customer_id = c1.c_customer_id)
    LEFT JOIN {{ ref('customer') }} c2 ON (word_ship_customer_id = c2.c_customer_id)
    LEFT JOIN {{ ref('web_site') }} ON (word_web_site_id = web_site_id AND web_rec_end_date IS NULL)
    LEFT JOIN {{ ref('ship_mode') }} ON (word_ship_mode_id = sm_ship_mode_id)
    JOIN s_web_order_lineitem ON (word_order_id = wlin_order_id)
    LEFT JOIN {{ ref('date_dim') }} d2 ON (cast(wlin_ship_date AS date) = d2.d_date)
    LEFT JOIN {{ ref('item') }} ON (wlin_item_id = i_item_id AND i_rec_end_date IS NULL)
    LEFT JOIN {{ ref('web_page') }} ON (wlin_web_page_id = wp_web_page_id AND wp_rec_end_date IS NULL)
    LEFT JOIN {{ ref('warehouse') }} ON (wlin_warehouse_id = w_warehouse_id)
    LEFT JOIN {{ ref('promotion') }} ON (wlin_promotion_id = p_promo_id)

coreレイヤー

stagingレイヤーのデータを増分処理でcoreレイヤーに取り込むSQLクエリです。dbtのincrementalモデルを使用しています。

{{ config(
    materialized='incremental',
    unique_key=['ws_item_sk', 'ws_order_number'],
    on_schema_change='fail',
    incremental_strategy='merge' if target.type != 'duckdb' else 'delete+insert'
) }}

-- Incrementalモデル:staging層からデータを取得
select
    ws_sold_date_sk::STRING as ws_sold_date_sk,
    ws_sold_time_sk::STRING as ws_sold_time_sk,
    ws_ship_date_sk::STRING as ws_ship_date_sk,
    ws_item_sk::STRING as ws_item_sk,
    ws_bill_customer_sk::STRING as ws_bill_customer_sk,
    ws_bill_cdemo_sk::STRING as ws_bill_cdemo_sk,
    ws_bill_hdemo_sk::STRING as ws_bill_hdemo_sk,
    ws_bill_addr_sk::STRING as ws_bill_addr_sk,
    ws_ship_customer_sk::STRING as ws_ship_customer_sk,
    ws_ship_cdemo_sk::STRING as ws_ship_cdemo_sk,
    ws_ship_hdemo_sk::STRING as ws_ship_hdemo_sk,
    ws_ship_addr_sk::STRING as ws_ship_addr_sk,
    ws_web_page_sk::STRING as ws_web_page_sk,
    ws_web_site_sk::STRING as ws_web_site_sk,
    ws_ship_mode_sk::STRING as ws_ship_mode_sk,
    ws_warehouse_sk::STRING as ws_warehouse_sk,
    ws_promo_sk::STRING as ws_promo_sk,
    ws_order_number::STRING as ws_order_number,
    ws_quantity::INTEGER as ws_quantity,
    ws_wholesale_cost::DECIMAL(7, 2) as ws_wholesale_cost,
    ws_list_price::DECIMAL(7, 2) as ws_list_price,
    ws_sales_price::DECIMAL(7, 2) as ws_sales_price,
    ws_ext_discount_amt::DECIMAL(7, 2) as ws_ext_discount_amt,
    ws_ext_sales_price::DECIMAL(7, 2) as ws_ext_sales_price,
    ws_ext_wholesale_cost::DECIMAL(7, 2) as ws_ext_wholesale_cost,
    ws_ext_list_price::DECIMAL(7, 2) as ws_ext_list_price,
    ws_ext_tax::DECIMAL(7, 2) as ws_ext_tax,
    ws_coupon_amt::DECIMAL(7, 2) as ws_coupon_amt,
    ws_ext_ship_cost::DECIMAL(7, 2) as ws_ext_ship_cost,
    ws_net_paid::DECIMAL(7, 2) as ws_net_paid,
    ws_net_paid_inc_tax::DECIMAL(7, 2) as ws_net_paid_inc_tax,
    ws_net_paid_inc_ship::DECIMAL(7, 2) as ws_net_paid_inc_ship,
    ws_net_paid_inc_ship_tax::DECIMAL(7, 2) as ws_net_paid_inc_ship_tax,
    ws_net_profit::DECIMAL(7, 2) as ws_net_profit,
    processing_date::STRING as processing_date,
    ingested_at::TIMESTAMP as ingested_at,
    current_timestamp::TIMESTAMP as inserted_at,
    current_timestamp::TIMESTAMP as updated_at
from {{ ref('web_sales_view') }}

{% if is_incremental() %}
  -- incrementalの場合、processing_dateでフィルタリング
  where processing_date = '{{ var("processing_date", "20240101") }}'
{% endif %}

詳細解説

Data Contractとは?

Data Contractは、データの提供者と利用者の間でデータ仕様やデータ品質要件を合意するための標準フォーマットです。
フォーマットの既定の項目を使ってデータ仕様やデータ品質要件を定義することができます。
既定の項目がサポートしているものは、テーブルスキーマや各種の論理名、データの粒度などのメタデータに加えて、サンプルデータ、データ品質ルールなど多岐に渡ります。

当検証では「ODCS (Open Data Contract Standard)」というYAML形式のフォーマットを使用します。ODCSはLinux Foundation傘下のBitolプロジェクトがサポートする代表的な仕様です。

当記事では、Data Contractを生成AIへ与えるコンテキストとして活用します。Data Contractの作成自体は設計プロセスで生成AIに行わせることも可能です(別記事で説明予定)。

今回使用のData Contractの主要項目

主に使用した項目と各項目の簡単な説明を記載します。

  • name: カラム物理名
  • businessName: カラム論理名
  • logicalType: 論理データ型
  • physicalType: 物理データ型
  • transformSourceObjects: カラムのデータを作成する際に必要なソースデータに関する情報
  • transformLogic: カラムのデータを作成する際に必要なデータ変換ロジック
  • relationships: カラムのリレーションシップ情報
  • quality: カラムのデータ品質要件

実際のData Contract

以下、代表的な2つのカラムのData Contractを抜粋します。

主キー兼外部キーの例: ws_item_sk

こちらは前述のData Contractの詳述です。
複合主キーの一部であり外部キーでもあるカラムです。3テーブルの結合と、3種類のデータ品質テストが定義されています。

- name: ws_item_sk
  businessName: ウェブ販売商品サロゲートキー
  logicalType: string
  physicalType: STRING
  primaryKey: true
  primaryKeyPosition: 1
  transformSourceObjects:
    - core_tpcds.item.i_item_sk
  transformLogic: SELECT i_item_sk FROM s_web_order JOIN s_web_order_lineitem ON (word_order_id = wlin_order_id) LEFT JOIN item ON (wlin_item_id = i_item_id AND i_rec_end_date IS NULL)
  relationships:
    - type: foreignKey
      to:
        - core_tpcds.item.i_item_sk
      customProperties:
        - target_contract_id: "core_tpcds.item"
        - target_contract_path: "./core_tpcds/item.yaml"
        - target_property: "i_item_sk"
  quality:
    - type: library
      rule: duplicateCount
      name: "主キー重複チェック(ws_item_sk)"
      dimension: uniqueness
      mustBe: 0
      severity: error
      businessImpact: "重複レコードによりファクトテーブルのデータ整合性が崩れ、売上集計に誤差が生じる"
    - type: library
      rule: nullCount
      name: "主キーNULLチェック(ws_item_sk)"
      dimension: completeness
      mustBe: 0
      severity: error
      businessImpact: "NULL値が存在すると商品別売上分析が不可能になる"
    - type: sql
      name: "商品マスタ参照整合性チェック"
      dimension: consistency
      query: |
        SELECT COUNT(*)
        FROM ${object} ws
        LEFT JOIN item i ON ws.ws_item_sk = i.i_item_sk
        WHERE ws.ws_item_sk IS NOT NULL AND i.i_item_sk IS NULL
      mustBe: 0
      severity: error
      businessImpact: "参照整合性が崩れると商品マスタとの結合で欠損が発生し、商品情報を取得できない売上データが生じる"
複雑な計算ロジックの例: ws_net_profit

純利益は複数のソースから計算され、計算整合性と平均値の妥当性の両方がチェックされます。

- name: ws_net_profit
  businessName: ウェブ純利益
  logicalType: number
  physicalType: DECIMAL(7, 2)
  transformSourceObjects:
    - raw.s_web_order_lineitem.wlin_sales_price
    - raw.s_web_order_lineitem.wlin_quantity
    - raw.s_web_order_lineitem.wlin_coupon_amt
    - core_tpcds.item.i_wholesale_cost
  transformLogic: SELECT (((wlin_sales_price::DECIMAL(7,2) * wlin_quantity::INTEGER) - wlin_coupon_amt::DECIMAL(7,2)) - (i_wholesale_cost::DECIMAL(7,2) * wlin_quantity::INTEGER))::DECIMAL(7,2) FROM s_web_order JOIN s_web_order_lineitem ON (word_order_id = wlin_order_id) LEFT JOIN item ON (wlin_item_id = i_item_id AND i_rec_end_date IS NULL)
  quality:
    - type: sql
      name: "純利益計算整合性チェック"
      dimension: accuracy
      query: |
        SELECT COUNT(*)
        FROM ${object}
        WHERE ws_net_profit IS NOT NULL
          AND ws_net_paid IS NOT NULL
          AND ws_ext_wholesale_cost IS NOT NULL
          AND ABS(ws_net_profit - (ws_net_paid - ws_ext_wholesale_cost)) > 0.01
      mustBe: 0
      severity: error
      businessImpact: "計算式が不正確な場合、利益分析の信頼性を損なう"
    - type: sql
      name: "純利益平均値チェック"
      dimension: accuracy
      query: |
        SELECT AVG(ws_net_profit)
        FROM ${object}
        WHERE ws_net_profit IS NOT NULL
      mustBeBetween: [-100, 1000]
      severity: warning
      businessImpact: "平均利益が異常値の場合、価格設定や原価データに問題がある可能性"

transformSourceObjectsとtransformLogicの重要性

当検証で特に試行錯誤したのが、この2つのプロパティの記述でした。

当初の仮説: 各プロパティでデータ仕様を記述し、relationshipsプロパティでデータ間の関係性を記述すれば、生成AIがJOIN処理やカラム毎の計算ロジック、変換ロジックを類推できるのではないか

実際の課題: データ仕様やリレーションシップ(データモデル上の関係性)とETL/ELT処理(データ変換ロジック)の間には隔たりがある

例えば、ws_item_skカラムはitemテーブルと外部キー関係にありますが、実際のデータ取得にはs_web_orders_web_order_lineitemitemという3テーブルの結合が必要です。
これは生成AIに分かってもらえそうで分かってもらえませんでした。
s_web_orderitemの2つのテーブルを無理に結合しようとしてしまいますし、チャットで説明して改善しても後で忘れて問題が再発し、忘れないように記録してもらっても個別のケースの実装方法として記録されて他ケースへの応用が懸念されました。

他に、生成AIはサロゲートキーどうしでテーブル結合したり、主キーで結合するように指示して改善しても今度は肝心のサロゲートキーを取得してくれなかったりしました。
また、カラムの演算処理は間違った内容が実装されるか全く実装されないかでした。

そのため、以下の2つのプロパティを使うようにしました:

  • transformSourceObjects: データの出所(ソーステーブル・カラム)を明示
  • transformLogic: 具体的な変換ロジック(JOIN条件、フィルタ、計算式など)を記述

これにより、生成AIは「何を参照して、どう変換するか」を正確に理解し、意図通りのSQLを生成できるようになりました。

transformSourceObjectsとtransformLogicの作成方法

transformSourceObjectsプロパティとtransformLogicプロパティの作成自体をデータモデルの設計過程で生成AIと共に行います。
もし、データソース側のシステムで業務知識 (例. 純利益の計算式) が何かドキュメントなどに整理されてあれば、その内容をもとに生成AIが両プロパティを定義できる可能性があります。
また、例えばディメンショナルモデリングであれば、データ粒度 (Grain) の定義やConformance Matrixなどを機械判読可能なファイル形式で生成AIと共に作成し、それら成果物をソースとして生成AIが両プロパティを作成できる可能性があります。

relationshipsプロパティは ODCS v3.1.0 にリリース予定

ODCSの現在の最新公式バージョンはv3.0.2ですが、このバージョンにはrelationshipsプロパティが存在しません
ODCS v3.1.0でrelationshipsプロパティが実装される予定で、当検証ではODCSのv3.1.0を先行的に使用しています。

v3.1.0での追加予定: ODCS v3.1.0 RFC0013

その他のプロパティの作成方法

nameプロパティやphysicalTypeプロパティなどの基礎的なプロパティの作成は、データソースのDBのスキーマ情報や生データのデータファイルをソースとして生成AIに与えることで半自動生成もしくは自動生成が可能です。Excelなどの固有ソフトウェアのファイル形式の場合は、機械判読可能なオープンなファイル形式への変換が必要です。

さいごに

検証の振り返り

本記事では、Data Contract(ODCS形式)を活用したデータパイプラインのAI駆動開発手法を検証しました。

成果として得られたこと

  • 12テーブル結合、35カラムの複雑なSQLクエリを生成AIが正確に生成
  • Data Contractに定義した型キャスト、計算ロジック、JOIN条件が意図通りに実装
  • 増分処理やdbtのincremental設定も適切に生成

成功要因

  • データアーキテクチャとデータモデルの2軸でコンテキストを整理
  • データ項目やデータ型、制約などのスキーマ情報に加えて、transformSourceObjectstransformLogicでデータリネージを明示
  • ローカル完結型の開発環境で生成AIの試行錯誤を高速化

所感

データエンジニアリングのAI駆動開発に向けたドキュメント整備方法の1つを見出すことができて良かったと思います。
データ基盤/データ分析基盤を概念的に分解し、既存の専門用語に当てはめて構造化して説明することで生成AIが体系的かつ詳細に理解するようになることが実感できました。

また、特定の製品・サービスに依存せずに進められましたので、再利用性が比較的に高いと考えており、今後の応用として関連の製品・サービスと連携しDataOpsやAIOpsへ昇華できればと期待しています。

データマネジメントのより広い範囲への活用の可能性
本記事ではデータパイプラインのコード生成に焦点を当てましたが、Data Contract自体の生成やデータ品質テストの自動生成など、より広い範囲への活用も考えられます。実際にData Contractのqualityプロパティを応用したデータ品質テストコードの生成を検証しており、その内容は別途記事化の予定です。
また、データの意味的な定義やデータ統合に関するマッピング情報などを整備してData Contractを拡張することで、例えばデータマネジメント全般における機械判読可能なドキュメントとしての活用やAI伴走型の分析データモデル開発への応用も検討しています。
データエンジニアとして、データマネジメントにおけるData Contractと生成AIの活用可能性については、今後も継続的に検証を進めていく予定です。

私たちは一緒に働いてくれる仲間を募集しています!

電通総研 キャリア採用サイト 電通総研 新卒採用サイト

執筆:@shikarashika
レビュー:@yamada.y
Shodoで執筆されました