はじめに
電通総研XI本部AIトランスフォーメーションセンターの岩本です。この記事では、Azure Cosmos DB for MongoDBのデータベース移行手段としてAzure DatabricksとAzure Data Factoryを取り上げ、それぞれの方法のメリット・デメリットを比較します。また、Azure Data Factoryを用いた具体的な実装例を紹介します。
背景
AIトランスフォーメーションセンターでは、Azure OpenAI Serviceを活用したAIチャット・RAGの製品であるKnow Narratorを開発しています。Know NarratorのバックエンドではスケーラブルなNoSQLデータベースであるAzure Cosmos DB for MongoDBを利用しています。理由としては以下の2点です。
- Know Narratorのユーザー数は導入いただいているお客様によって様々であり、DBはスケーラブルである必要がある
- 頻繁なアップデートで後から機能を追加することを想定しているため、DBスキーマは柔軟に変更可能である必要がある
AIトランスフォーメーションセンターでは、今年3月にデータベース移行を伴うKnow Narratorの大型アップデートを顧客に提供しました。データベース移行手段として当初はAzure Data Factoryを検討していましたが、Data Factory上でのUUIDの取り扱いに制限があることが分かったため、最終的にはAzure Databricksを利用することに決定しました。
Azure Data Factory利用時の注意点
UUIDはMongoDB上ではbinary
とtype
の2つのフィールドを持つオブジェクトとして表現されます。type
はUUIDの生成方法の違いを表しており、これが異なるとUUIDの一意性を維持できません。
CosmosDB for Mongo APIに対しData Factoryを利用する際の注意として、元々DBにtype=4
として保存している値が、Data Factory上にデータを読み出す際にtype=3
として認識されてしまいます。Data Factory上でtype
を書き換えることはできますが、DBに書き出す際に再度type=3
となってしまいます。Microsoftサポートによると、これはData FactoryのCosmosDB for MongoDBコネクタの仕様であり、解消するには書き込み後にCosmosDB側でtype
を書き換えるコマンドを実行する必要があるとのことでした。マイグレーション作業手順が複雑になることを避けるため、今回はData Factoryを利用しませんでした。
Azure DatabricksとAzure Data Factoryの比較
以下のようにAzure DatabricksとAzure Data Factoryはそれぞれ異なる目的と機能を持っています。
Azure Databricks:
- 高度な分析、データサイエンス、機械学習のためのプラットフォーム。
- Apache Sparkベースで、高速なビッグデータ処理が可能。
- ノートブック環境を提供し、データ探索、視覚化、モデルのトレーニングが容易。
- Python、Scala、SQL、Rなど多くの言語をサポート。
Azure Data Factory:
- データ統合とETL(Extract, Transform, Load)パイプラインの構築に特化。
- データの移動、変換、オーケストレーションを視覚的にデザイン可能。
- さまざまなデータソース(オンプレミスやクラウド)と連携。
- 定義されたスケジュールやイベントに基づいたデータパイプラインの実行。
Databricksはデータ処理エンジンとしてApache Sparkを使用しており、Data Factoryに比べて非常に高速にデータ処理を行うことができます。また、料金体系に関してもDatabricksがランタイムの稼働時間のみに課金されるのに対し、Data Factoryはランタイムの稼働時間に加えてデータ量や計算量に応じた課金となるため、一度に大量のデータを処理する場合にはDatabricksが適していると言えます。
また、DatabricksはPythonやSQLで処理を記述できるため複雑なデータ操作に適しています。Data FactoryはGUIで直感的に処理フローを構成できますが、その分処理の自由度は落ちます。今回のDBマイグレーションでは単純にデータを移行するだけでなく、あるコレクションのドキュメントを別のコレクションのドキュメントのフィールドの入れ子にするなどある程度複雑な操作が伴いました。このような操作はData Factoryでも実現不可能ではありませんが、複雑な操作はGUIではかえって時間がかかってしまうためDatabricksを選択しました。
Azure Databricksの利用方法
Azure Databricksの作成
Azure Portal上でリソースを作成します。
DatabricksからCosmosDBに接続するには、DatabricksのリソースをCosmosDBと同じVNetに配置する(VNetインジェクション)必要があります。
DatabricksのVNetインジェクションでは2つのサブネット(コンテナサブネットとホストサブネット)を利用するため、これらのサブネットを新たにCosmosDBが配置されているVNet上に作成する必要があります。サブネットの範囲はDatabricksで使用できるクラスターノード数に影響するため、/26
より小さいサブネットは推奨されません。
サブネット範囲とクラスターノード数の具体的な関係については以下のページに記載されています。
クラスターの作成
ワークスペースを起動し、コンピューティングクラスターを作成します。デフォルトの設定値でも特に問題はありませんが、ワーカータイプを安価なものに変更したり、開発環境であればスポットインスタンスを選択するなどしてコストを抑えることができます。
クラスター作成後、ライブラリタブの[新規をインストール]をクリックし、以下のMaven座標を使ってApache SparkとMongoDBのコネクタをインストールします。
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
ノートブックを作成し、[接続]をクリックして先ほど作成したクラスターを選択します。
クラスターの起動が完了していれば以下のようにクラスター名が表示され、Pythonコードが実行可能な状態となります。
![](https://media.shodousercontents.com/task_images/983/14ccd520-da84-472e-b3b1-0985c4fea8ec.png
PySparkの記述方法
今回、具体的なデータ操作の記述にはApache SparkのPython APIであるPySparkを使用します。Pandasのデータフレームと同じような要領でデータを操作することができ、Pandasを使ったことがある場合はそれほど学習コストをかけずに利用できます。
2つのコレクション (stores, orders) を結合し、ネストされたフィールドをもつコレクションmerged_storesを作成するというシナリオでサンプルコードを以下に示します。
storesコレクション
_id | store_id | phone-number | region |
---|---|---|---|
60c5ba5e4f1a2f6d2c9b7f92 | {"type": 4, "data": "U3RvcmUx"} | 123-456-7890 | North |
60c5ba5e4f1a2f6d2c9b7f93 | {"type": 4, "data": "U3RvcmUy"} | 987-654-3210 | South |
ordersコレクション
_id | order_id | price | created_at | store_id |
---|---|---|---|---|
70d5ba6e4f1a2f6d2c9b7f94 | {"type": 4, "data": "T3JkZXIw"} | 100 | 2021-06-13 12:00:00 | {"type": 4, "data": "U3RvcmUx"} |
70d5ba6e4f1a2f6d2c9b7f95 | {"type": 4, "data": "T3JkZXIx"} | 150 | 2021-06-14 13:00:00 | {"type": 4, "data": "U3RvcmUx"} |
70d5ba6e4f1a2f6d2c9b7f96 | {"type": 4, "data": "T3JkZXIy"} | 200 | 2021-06-15 14:00:00 | {"type": 4, "data": "U3RvcmUy"} |
70d5ba6e4f1a2f6d2c9b7f97 | {"type": 4, "data": "T3JkZXIz"} | 250 | 2021-06-16 15:00:00 | {"type": 4, "data": "U3RvcmUy"} |
merged_storesコレクション
_id | store_id | phone-number | region | orders |
---|---|---|---|---|
60c5ba5e4f1a2f6d2c9b7f92 | {"type": 4, "data": "U3RvcmUx"} | 123-456-7890 | North | { id: 70d5ba6e4f1a2f6d2c9b7f94, order_id: {"type": 4, "data": "T3JkZXIw"}, price: 100, created_at: 2021-06-13 12:00:00 }, { id: 70d5ba6e4f1a2f6d2c9b7f95, order_id: {"type": 4, "data": "T3JkZXIx"}, price: 150, created_at: 2021-06-14 13:00:00 } |
60c5ba5e4f1a2f6d2c9b7f93 | {"type": 4, "data": "U3RvcmUy"} | 987-654-3210 | South | { id: 70d5ba6e4f1a2f6d2c9b7f96, order_id: {"type": 4, "data": "T3JkZXIy"}, price: 200, created_at: 2021-06-15 14:00:00 }, { id: 70d5ba6e4f1a2f6d2c9b7f97, order_id: {"type": 4, "data": "T3JkZXIz"}, price: 250, created_at: 2021-06-16 15:00:00 } |
# 必要なライブラリをインポート from pyspark.sql import SparkSession from pyspark.sql.functions import collect_list, struct from pyspark.sql.types import ( BinaryType, ByteType, IntegerType, StringType, StructField, StructType, TimestampType, ) # データベース接続に必要な情報を設定 src_connection_string = "<移行元DBの接続文字列>" dest_connection_string = "<移行先DBの接続文字列>" source_db = "<移行元DB名>" target_db = "<移行先DB名>" stores_collection = "stores" orders_collection = "orders" merged_stores_collection = "merged_stores" # SparkSessionの初期化 my_spark = SparkSession.builder.appName("myApp").getOrCreate() # storesコレクションのスキーマを定義 stores_schema = StructType( [ StructField("_id", StructType([StructField("oid", StringType(), True)]), True), StructField( "store_id", StructType( [ StructField("subType", ByteType(), False), StructField("data", BinaryType(), True), ] ), True, ), StructField("phone-number", StringType(), True), StructField("region", StringType(), True), ] ) # storesコレクションからデータを読み込み df_stores = ( my_spark.read.schema(stores_schema) .format("com.mongodb.spark.sql.DefaultSource") .option("uri", src_connection_string) .option("database", source_db) .option("collection", stores_collection) .load() ) # ordersコレクションのスキーマを定義 orders_schema = StructType( [ StructField("_id", StructType([StructField("oid", StringType(), True)]), True), StructField( "order_id", StructType( [ StructField("subType", ByteType(), False), StructField("data", BinaryType(), True), ] ), True, ), StructField("price", IntegerType(), True), StructField("created_at", TimestampType(), True), StructField( "store_id", StructType( [ StructField("subType", ByteType(), False), StructField("data", BinaryType(), True), ] ), True, ), ] ) # ordersコレクションからデータを読み込み df_orders = ( my_spark.read.schema(orders_schema) .format("com.mongodb.spark.sql.DefaultSource") .option("uri", src_connection_string) .option("database", source_db) .option("collection", orders_collection) .load() ) # 初期状態のstoresデータフレームの列名を保存 stores_new_columns = df_stores.columns # 結合時に列名の重複を避けるためstoresデータフレームの列名を変更 df_stores = df_stores.toDF( *[column_name + "_stores" for column_name in df_stores.columns] ) # storesデータフレームとordersデータフレームを結合 df_joined = df_stores.join( df_orders, df_stores.stores_id_stores == df_orders.stores_id, "left_outer", ) # stores_idを除外したうえでordersをグループ化 orders_new_columns = df_orders.columns orders_new_columns.remove("stores_id") df_merged_stores = df_joined.groupBy(df_stores.columns).agg( collect_list(struct(orders_new_columns)).alias("orders") ) # 列名リストにordersを追加したうえで重複を避けるために変更した列名を元に戻す stores_new_columns.append("orders") df_merged_stores = df_merged_stores.toDF(*stores_new_columns) # 結合されたデータを新しいコレクションに保存 df_merged_stores.write.format("mongo").mode("append").option( "uri", dest_connection_string ).option("maxBatchSize", 1024).option("database", target_db).option( "collection", merged_stores_collection ).save()
注意すべき点として、ネストされたフィールドをもつコレクションは読み込み時にスキーマが自動で推論されません。スキーマを指定しないと、ネストされたフィールドの値は構造化されたデータではなくただのテキストとして扱われてしまいます。これを避けるため、サンプルコードではDBからのデータ読み込み時にスキーマを指定するようにしています。
終わりに
この記事ではAzure Cosmos DB for MongoDBのデータベース移行に関してAzure DatabricksとAzure Data Factoryを比較し、Azure Databricksを使った実装例を紹介しました。最後までご覧いただきありがとうございました。
執筆:@iwamoto.yoshik85ca341e9dca4b22、レビュー:@miyazawa.hibiki
(Shodoで執筆されました)