こんにちは。スマートソサエティセンター 行政デジタル部所属の宇佐美です。
今回は、Vertex AIのGeminiバッチ予測機能を使って、大量のテキストデータを一括で処理する方法をご紹介します。
バッチ予測とは
Vertex AIのバッチ予測は、大量のプロンプトを非同期で一括処理できる機能です。リアルタイム推論と比較して以下のメリットがあります。
- コスト効率: リアルタイム推論と比較して安価な料金で利用可能。特にGemini 2.5 Flash はリアルタイム推論のおよそ半額で利用することが出来ます。
- 高スループット: 1回のジョブで数十万件のリクエストを処理可能
- シンプルなワークフロー: 個別リクエストの管理が不要
データの入出力先としてBigQueryまたはCloud Storageを使用できます。今回はBigQueryを使用します。
BigQueryについて
BigQueryはGoogle Cloudが提供するフルマネージドのデータウェアハウスです。大規模データの分析に特化しており、ペタバイト規模のデータに対してもSQLで高速にクエリを実行できます。
BigQueryのデータ構造は以下の階層になっています。
今回使用するデータセット
今回はJaGovFaqs-22kという日本の官公庁のFAQデータセットを使用します。各レコードには質問と回答が含まれており、今回はそれらを要約対象とします。
前提条件
実装手順
1. 必要なライブラリのインストール
pip install google-cloud-bigquery google-genai datasets
2. BigQueryスキーマの作成
まず、BigQueryのデータセットと、入力用データを管理するためのテーブル(faq_input)を定義します。
from google.cloud import bigquery PROJECT_ID = "your-project-id" LOCATION = "us-central1" client = bigquery.Client(project=PROJECT_ID) # データセット作成 dataset = bigquery.Dataset(f"{PROJECT_ID}.batch_demo") dataset.location = LOCATION client.create_dataset(dataset, exists_ok=True) # テーブル作成 schema = [ bigquery.SchemaField("Id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("Question", "STRING"), bigquery.SchemaField("Answer", "STRING"), bigquery.SchemaField("copyright", "STRING"), bigquery.SchemaField("url", "STRING"), bigquery.SchemaField("request", "STRING", mode="REQUIRED"), ] table = bigquery.Table(f"{PROJECT_ID}.batch_demo.faq_input", schema=schema) client.create_table(table, exists_ok=True)
ポイント:
requestカラムは、GenerateContentRequestの構造に準拠する必要があります。
コードを実行すると、以下のとおりデータセットとテーブルが作成されます。

3. データ登録
Hugging Faceからデータセットを取得し、バッチ予測用のフォーマットに変換してBigQueryにロードします。
せっかくなので、構造化出力でレスポンスのフォーマットを指定してみましょう。
import json import pandas as pd from datasets import load_dataset response_schema = { "type": "OBJECT", "properties": { "summary": {"type": "STRING", "description": "質問と回答の要約"}, }, } hf_dataset = load_dataset("matsuxr/JaGovFaqs-22k") rows = [{ "Id": i, "Question": item["Question"], "Answer": item["Answer"], "copyright": item["copyright"], "url": item["url"], "request": json.dumps({ "contents": [{"role": "user", "parts": [{"text": f"以下の質問と回答を140文字以内で要約してください。\n\n## 質問\n{item['Question']}\n\n## 回答\n{item['Answer']}" }]}], "generationConfig": {"responseMimeType": "application/json", "responseSchema": response_schema}, }, ensure_ascii=False) } for i, item in enumerate(hf_dataset["train"])] df = pd.DataFrame(rows) job = client.load_table_from_dataframe(df, f"{PROJECT_ID}.batch_demo.faq_input") job.result()
余談ですが、日本語のような非ASCIIコードの文字は出力時にエスケープされてしまうため、ensure_ascii=Falseでエスケープしないように設定しています。
4. バッチ予測タスクの作成
次に、バッチ予測ジョブを作成して実行します。
モデルは速度と品質、利用料金のバランスに優れているGemini 2.5 Flashを利用します。
出力先には別のテーブル(faq_output)を指定します。
from google import genai from google.genai.types import CreateBatchJobConfig genai_client = genai.Client( vertexai=True, project=PROJECT_ID, location=LOCATION ) job = genai_client.batches.create( model="gemini-2.5-flash", src=f"bq://{PROJECT_ID}.batch_demo.faq_input", config=CreateBatchJobConfig(dest=f"bq://{PROJECT_ID}.batch_demo.faq_output"), )
5. 結果の確認
Vertex AI → バッチ推論から結果を確認してみましょう。正常に終了していれば、ステータスが「完了」になっています。

22794件のデータに対して15分弱で処理が完了しています。

先ほど作成したfaq_outputテーブルも確認してみます。response.candidates[0].content.parts[0].textの中身が実際に出力された回答です。

指定されたフォーマット通りに出力されています。
{"text":"{\n \"summary\": \"会計検査院は、国や地方公共団体の公共工事に対し、会計経理面だけでなく設計・積算・施工まで実地で検査します。現場では資料確認や担当者への聴取に加え、非破壊検査装置でコンクリート強度や鉄筋を確認し、必要に応じて破壊検査も実施。不適切な事態の是正を図っています。\"\n}"}
まとめ
ここまでの処理をまとめて一つのサンプルコードにします。
import json import pandas as pd from datasets import load_dataset from google import genai from google.cloud import bigquery from google.genai.types import CreateBatchJobConfig, HttpOptions PROJECT_ID = "your-project-id" LOCATION = "us-central1" # BigQueryセットアップ client = bigquery.Client(project=PROJECT_ID) # データセット作成 dataset = bigquery.Dataset(f"{PROJECT_ID}.batch_demo") dataset.location = LOCATION client.create_dataset(dataset, exists_ok=True) # テーブル作成 schema = [ bigquery.SchemaField("Id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("Question", "STRING"), bigquery.SchemaField("Answer", "STRING"), bigquery.SchemaField("copyright", "STRING"), bigquery.SchemaField("url", "STRING"), bigquery.SchemaField("request", "STRING", mode="REQUIRED"), ] table = bigquery.Table(f"{PROJECT_ID}.batch_demo.faq_input", schema=schema) client.create_table(table, exists_ok=True) # データ準備 response_schema = { "type": "OBJECT", "properties": { "summary": {"type": "STRING", "description": "質問と回答の要約"}, }, } hf_dataset = load_dataset("matsuxr/JaGovFaqs-22k") rows = [{ "Id": i, "Question": item["Question"], "Answer": item["Answer"], "copyright": item["copyright"], "url": item["url"], "request": json.dumps({ "contents": [{"role": "user", "parts": [{"text": f"以下の質問と回答を140文字以内で要約してください。\n\n## 質問\n{item['Question']}\n\n## 回答\n{item['Answer']}" }]}], "generationConfig": {"responseMimeType": "application/json", "responseSchema": response_schema}, }, ensure_ascii=False) } for i, item in enumerate(hf_dataset["train"])] df = pd.DataFrame(rows) job = client.load_table_from_dataframe(df, f"{PROJECT_ID}.batch_demo.faq_input") job.result() # バッチ予測実行 genai_client = genai.Client( vertexai=True, project=PROJECT_ID, location=LOCATION, http_options=HttpOptions(api_version="v1") ) job = genai_client.batches.create( model="gemini-2.5-flash", src=f"bq://{PROJECT_ID}.batch_demo.faq_input", config=CreateBatchJobConfig(dest=f"bq://{PROJECT_ID}.batch_demo.faq_output"), )
注意点
リージョンの制約
バッチ予測ジョブとBigQueryデータセットは同じリージョンに配置する必要があります。マルチリージョンデータセット(US、EUなど)はサポートされていないため、us-central1のような特定のリージョンを指定してください。
最後に
Vertex AIのGeminiバッチ予測を使用することで、大量のデータを効率的に処理できます。
データ分析、コンテンツ生成、文書分類など、即時性を必要としない大規模なLLM処理タスクには、バッチ予測の活用をぜひご検討ください。
執筆:@usami.tsubasa
レビュー:@miyazawa.hibiki
(Shodoで執筆されました)



