みなさんこんにちは、電通国際情報サービス(ISID)コーポレート本部 システム推進部の佐藤太一です。
このエントリではGoogle Dataflowを使ったデータ分析パイプライン構築において中心的なAPIの使い方について説明します。
- Google Dataflowとはなにか
- Dataflowの開発環境構築
- Apache Beamの基礎
- ParDoを使った逐次処理の書き方
- Dataflowによるユニットテストの書き方
- フィルター
- フィルターのテスト
- 値の増幅処理
- 増幅処理のテスト
- PCollectionの分岐
- PCollectionの分岐をテストする
- まとめ
Google Dataflowとはなにか
Google DataflowはいわゆるExtract/Transform/Load(ETL)ツールの一種です。Apache Beamというバッチ処理基盤をGCPの分散処理環境で動かしてくれます。
Apache Beam自体は、Apache FlinkやApache Spark、Hazelcast Jetといったオンプレミスで動作する実行環境を利用することもできます。
Apache Beamでは、JavaやPython、Goといった言語で処理を記述できますが、今回はJavaを使って説明します。
Dataflowの開発環境構築
まずは、Dataflowの開発環境を作っていきましょう。
開発環境として使うマシンには、事前にJava17とGradle7.5以上をインストールしておいてください。
GradleによるDataflowプロジェクトの作り方
最初にプロジェクト全体を格納するためのディレクトリを作成しましょう。
ここからは、この記事内でシェルコマンドを実行するよう説明している部分では、必ずこのルートディレクトリで実行してください。
作るプロジェクトは、説明のために dataflow-example
とします。作ったdataflow-example
ディレクトリの中で、以下のコマンドを実行して最小限のプロジェクトを作成します。
gradle init --type basic --dsl kotlin --project-name dataflow-example --incubating
最小限とはいえ、Gradle Wrapperとなるシェルスクリプトやgit用の設定ファイルが生成されていますね。
この中から、 build.gradle.kts
を以下のように編集します。
plugins { id("java") } group = "com.example.dataflow" version = "0.1.0-SNAPSHOT" java.toolchain { languageVersion.set(JavaLanguageVersion.of(17)) // 1. } repositories { mavenCentral() maven("https://packages.confluent.io/maven/") // 2. } dependencies { var beamVersion = "2.41.0" // 3. var slf4jVersion = "1.7.36" implementation(platform("com.google.cloud:libraries-bom:25.4.0")) // 4. implementation("org.apache.beam:beam-sdks-java-core:${beamVersion}") // 5. implementation("org.apache.beam:beam-sdks-java-io-google-cloud-platform:${beamVersion}") // 5. implementation("org.apache.beam:beam-runners-google-cloud-dataflow-java:${beamVersion}") // 5. implementation("org.apache.commons:commons-csv:1.9.0") implementation("org.slf4j:slf4j-api:${slf4jVersion}") implementation("org.slf4j:slf4j-jdk14:${slf4jVersion}") testImplementation("junit:junit:4.13.2") // 6. testImplementation("org.apache.beam:beam-runners-direct-java:${beamVersion}") // 7. } tasks.withType<JavaCompile>().configureEach { options.encoding = "UTF-8" }
- このビルドスクリプトで利用するコンパイラやランタイムのバージョン番号を指定しています。
- 依存ライブラリをダウンロードする先を宣言しています。
- Apache Beamのバージョンを参照する依存性がいくつかあるので、ここでは変数として切り出しています。
- Dataflowを動かすために必要なGCPのSDKに対する依存性を宣言しています。GradleのPlatform機能を使っていますね。
- Apache Beamに対する依存性を宣言しています。後半の二つはGCPでBeamを動かすために必要な依存性です。
- ユニットテスト用の依存性としてJUnit4を指定しています。JUnitの最新版はJUnit5系ですが、記事執筆時点においてApache BeamはJUnit5をサポートしていません。cf. JUnit5 support
- Apache Beam用のユニットテストライブラリに対する依存性を宣言しています。
これでDataflow用のローカルビルド環境の構築は完了です。
Apache Beamの基礎
Apache Beamを理解するなら、まずはPipelineとPCollectionをしっかり理解してください。
他のコンセプトについては、Dataflowのドキュメントを参照してください。 * Apache Beam のプログラミング モデル
Pipelineについて
Pipelineは複数のステップから構成される処理全体を表すオブジェクトで、データの読み取り処理から始まりフィルターや変換を経て、出力処理までを行います。一つのパイプラインが一つのジョブとなります。
Pipelineを構成する各ステップは、実行環境が必要に応じて分散処理してくれます。つまり、各ステップを効率よく動作させるには、それぞれのステップが全く違ったプロセスの上で非同期に実行されても問題がおきないようにしましょう。 具体的には、処理単位になるデータの独立性をできるかぎり高めるようにします。つまり、RDBにおける非正規化を積極的に行うようなデータの持ち方をします。
順序に強い整合性を求める書き方もできますが、そうすると分散処理環境がもつ性能を十分に引き出せません。
PCollectionについて
PCollectionはPipelineを流れるデータの集合を表すオブジェクトです。 逐次的に要素を扱えるのでJavaのCollection Frameworkと似ていますが、PCollectionのAPIだけではデータの開始と終了を明示的に調べられません。 また、PCollectionに格納されている要素は、分散処理環境内における実行環境の都合でシリアライズされたりコピーされる可能性があります。つまり、処理に必要な情報は全て要素内に内包する必要があります。
ParDoを使った逐次処理の書き方
基本的な概念が分かった所で本題に入っていきましょう。
最初に作るのは、PCollectionを流れる要素を1:1で変換していく処理です。
この図は四角い枠がPCollectionで、〇が各要素、矢印が処理です。つまり、全体がPipelineとなります。
今回の記事では、全てのコードをテストコードとして実装しますので、以下のようにディレクトリを作成します。
mkdir src/test/java/com/example/dataflow
出来たディレクトリに CsvFn.java
というファイルを以下の内容で作成します。
package com.example.dataflow; import org.apache.beam.sdk.transforms.DoFn; import java.util.*; public class CsvFn extends DoFn<String, List<String>> { // 1. @ProcessElement // 2. public void processElement(ProcessContext c) throws Exception { // 3. var element = c.element(); // 4. var list = Arrays.asList(element.split(",")); // 5. c.output(list); // 6. } }
この処理では、単一の文字列を入力すると、それをカンマ区切りで分割したリストとして後続の処理に引き渡します。
- 逐次処理を実装する際に使うクラスは
DoFn
を継承します。 - 一つ目の型パラメータは、各入力要素を表す型を設定します。ここでは
String
を設定しています。 - 二つ目の型パラメータは、各出力要素を表す型を設定します。ここでは
List<String>
を設定しています。 - 逐次処理を行うメソッドは
@ProcessElement
アノテーションを付与します。 - 逐次処理を行うメソッドのアクセス修飾子は
public
、戻り値はvoid
です。送出される例外としてはException
を定義しておきます。 - なお、このメソッドの中から例外を送出するとジョブ全体が停止します。
ProcessContext
のelement
メソッドを呼ぶと、DoFn
を継承する際に設定した一番目の型パラメータの変数が得られます。ここではString
型の変数が得られるわけです。String
のsplit
メソッドを呼びだして得られた配列をList
に格納しています。ProcessContext
のoutput
メソッドを呼ぶ際には、DoFn
を継承する際に設定した二番目の型パラメータの変数を渡します。ここでは既に作成済みのlist
を渡していますね。
Dataflowによるユニットテストの書き方
次は、逐次処理をユニットテストしてみましょう。
CsvFn.java
と同じディレクトリ内に CsvFnTest.java
というファイルを以下の内容で作成します。
package com.example.dataflow; import org.apache.beam.sdk.testing.*; import org.apache.beam.sdk.transforms.*; import org.junit.*; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.util.List; @RunWith(JUnit4.class) public class CsvFnTest { static final List<String> values = List.of( "foo,bar,baz", "fo1,ba2,ba3", "ba1,ba2,ba3"); @Rule public TestPipeline pipeline = TestPipeline.create(); // 1. @Test public void testSimplePipeline() throws Exception { var output = pipeline .apply(Create.of(values)) // 2. .apply(ParDo.of(new CsvFn())); // 3. PAssert.that(output).containsInAnyOrder( // 4. List.of("foo", "bar", "baz"), List.of("ba1", "ba2", "ba3"), // 5. List.of("fo1", "ba2", "ba3") ); pipeline.run().waitUntilFinish(); // 6. } }
- ユニットテスト用のパイプラインを生成しています。パイプラインを構成するための共通処理があるので
@Rule
を付与しています。 Create
のof
メソッドを使って文字列のリストをパイプラインに流せる形に変換しています。ここでは、List
の各要素がパイプラインを流れていきます。ParDo
のof
メソッドに先ほど実装したCsvFn
をインスタンス化して渡しています。これによって、パイプラインを流れる各要素ごとにCsvFn
のprocessElement
メソッドが呼びだされます。- パイプラインを流れる要素が正しく変換されているか確認するには、Apache Beamで用意されている専用の
PAssert
を使います。ここではcontainsInAnyOrder
メソッドを使ってそれぞれの要素が正しくカンマ区切りで分解されたか確認しています。 values
変数として定義した要素の順序とは違った順序で要素を検証しています。これは、パイプラインを流れる要素の処理順序は保証されておらず、実行環境の都合で任意に入れ替わる可能性があることを意図しています。つまり、Create
のof
メソッドで作った要素がそのままの順序でCsvFn
のprocessElement
メソッドに入ってくるとは限りません。TestPipeline
のrun
メソッドを呼びだした上で、さらにwaitUntilFinish
メソッドを呼んでパイプラインの処理が終わるのを待っています。デバッガで実行する際に注意してほしいのは、この時点で初めてパイプラインの処理が動き始めることです。つまり、 4. の時点では、まだCsvFn
のprocessElement
メソッドは呼びだされません。
フィルター
CsvFnでは単純な1:1の変換処理を実装しましたので、次はフィルター処理を実装してみましょう。
フィルター処理として作るのは、指定した長さよりも長い文字列だけを後続の処理に流すフィルターです。
CsvFn.java
と同じディレクトリ内に FilterFn.java
というファイルを以下の内容で作成します。
package com.example.dataflow; import org.apache.beam.sdk.transforms.DoFn; public class FilterFn extends DoFn<String, String> { // 1. final int size; // 2. public FilterFn(int size) { this.size = size; } @ProcessElement public void processElement(ProcessContext c) { var element = c.element(); if (size < element.length()) { // 3. c.output(element); } } }
- ここで実装するのはフィルター処理なので、入力と出力の型は同じです。
- コンストラクタで受け取った長さをメンバ変数として格納しています。Apache BeamではPCollectionの要素だけでなく、各処理のステップを表すオブジェクトも実行環境の都合で直列化される可能性があります。つまり、メンバ変数としてはSerializableな型(もしくは、Externalizableな型)だけを定義できます。
- 条件分岐に基づいて
ProcessContext
のoutput
メソッドを呼びだすかどうかを決めています。
フィルターのテスト
では、フィルター処理をユニットテストしてみましょう。
CsvFn.java
と同じディレクトリ内に FilterFnTest.java
というファイルを以下の内容で作成します。
package com.example.dataflow; import org.apache.beam.sdk.testing.*; import org.apache.beam.sdk.transforms.*; import org.junit.*; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.util.List; @RunWith(JUnit4.class) public class FilterFnTest { static final List<String> values = List.of( "alpha", "beta", "gamma"); @Rule public TestPipeline pipeline = TestPipeline.create(); @Test public void testSimplePipeline() throws Exception { var output = pipeline .apply(Create.of(values)) .apply(ParDo.of(new FilterFn(4))); PAssert.that(output).containsInAnyOrder("gamma", "alpha"); pipeline.run().waitUntilFinish(); } }
長さが4文字より大きい単語をフィルターできていますね。
値の増幅処理
次は、一つの入力から複数回の出力を行う処理を実装してみましょう。
増幅処理として作るのは、文字列をカンマ区切りで分割した各要素をそのまま後続に渡す処理です。
CsvFn.java
と同じディレクトリ内に FlatValuesFn.java
というファイルを以下の内容で作成します。
package com.example.dataflow; import org.apache.beam.sdk.transforms.DoFn; import java.util.Arrays; public class FlatValuesFn extends DoFn<String, String> { // 1. @ProcessElement public void processElement(ProcessContext c) { var element = c.element(); var list = Arrays.asList(element.split(",")); list.forEach(c::output); // 2. } }
- ここで実装するのは増幅処理なので、入力と出力の型は同じです。
- 文字列を分割して得られた要素全てについて
ProcessContext
のouput
メソッドを呼びだしています。
増幅処理のテスト
では、増幅処理をユニットテストしてみましょう。
CsvFn.java
と同じディレクトリ内に FlatValuesFnTest.java
というファイルを以下の内容で作成します。
package com.example.dataflow; import org.apache.beam.sdk.testing.*; import org.apache.beam.sdk.transforms.*; import org.junit.*; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.util.List; @RunWith(JUnit4.class) public class FlatValuesFnTest { static final List<String> values = List.of( "foo,bar,baz", "fo1,ba2,ba3", "ba1,ba2,ba3"); @Rule public TestPipeline pipeline = TestPipeline.create(); @Test public void testSimplePipeline() throws Exception { var output = pipeline .apply(Create.of(values)) .apply(ParDo.of(new FlatValuesFn())); PAssert.that(output).containsInAnyOrder( "foo", "bar", "baz", "ba1", "ba2", "ba3", "fo1", "ba2", "ba3" ); pipeline.run().waitUntilFinish(); } }
カンマ区切りで3つずつに分割できる要素を3回FlatValuesFn
で処理したので9つの要素が出力されていますね。
PCollectionの分岐
ここまでの処理では、処理の流れであるPCollection自体は1つのまま要素が流れていきました。 しかし、データの1カラム目だけを見て後続の処理を切り替えるといった処理構造を実現したくなることはあります。
ここでは、入力された文字列の1文字目を使って後続の処理を切り替えるためにPCollectionを分岐してみましょう。
CsvFn.java
と同じディレクトリ内に BranchFn.java
というファイルを以下の内容で作成します。
package com.example.dataflow; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.TupleTag; import java.util.*; public class BranchFn extends DoFn<String, List<String>> { static final TupleTag<List<String>> MAIN = new TupleTag<>() { // 1. }; static final TupleTag<List<String>> SUB = new TupleTag<>() { }; @ProcessElement public void processElement(ProcessContext c) { var element = c.element(); var list = Arrays.asList(element.split(",")); if (list.get(0).equals("M")) { c.output(MAIN, list.subList(1, list.size())); // 2. } else { c.output(SUB, list.subList(1, list.size())); } } }
TupleTag
は実行環境全体で一意のIDを付与する必要があります。ここでは、ややトリッキーなテクニックを使ってそれを実現しています。コンストラクタ呼び出しの後ろについている中括弧{}
によってインナークラスを作成していることがポイントです。実装の詳細が気になる方は是非コードを読んでみてください。ProcessContext
のouput
メソッドを呼びだす際に、TupleTag
を渡しています。これによって各要素にタグ付けをすることで、PCollectionの分岐を実現しているのです。
PCollectionの分岐をテストする
では、分岐したPCollectionをどのように扱うのかテストコードで確認してみましょう。
CsvFn.java
と同じディレクトリ内に BranchFnTest.java
というファイルを以下の内容で作成します。
package com.example.dataflow; import org.apache.beam.sdk.testing.*; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.values.TupleTagList; import org.junit.*; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.util.List; @RunWith(JUnit4.class) public class BranchFnTest { static final List<String> values = List.of( "M,foo,bar,baz", "S,fo1,ba2,ba3", "M,ba1,ba2,ba3"); // 1. @Rule public TestPipeline pipeline = TestPipeline.create(); @Test public void testSimplePipeline() throws Exception { var output = pipeline .apply(Create.of(values)) .apply(ParDo.of(new BranchFn()) .withOutputTags(BranchFn.MAIN, TupleTagList.of(List.of(BranchFn.SUB))) // 2. ); PAssert.that(output.get(BranchFn.MAIN)).containsInAnyOrder( // 3. List.of("foo", "bar", "baz"), List.of("ba1", "ba2", "ba3") ); PAssert.that(output.get(BranchFn.SUB)).containsInAnyOrder( // 4. List.of("fo1", "ba2", "ba3") ); pipeline.run().waitUntilFinish(); } }
- テストデータとして、各要素の先頭に分岐の条件となる
M
やS
を配置しています。 ParDo
のof
メソッドを呼びだして得られた変数に対して、withOutputTags
を呼びだすことでこのパイプラインが分岐することを宣言しています。- ここでは二つに分岐していますが、三つや四つ、それよりも多くのPCollectionに分岐できます。
- 分岐されたパイプラインから
MAIN
でタグ付けされたPCollection
を取り出しています。1. では文字列の先頭がM
になっているものがこれにあたります。 - 分岐されたパイプラインから
SUB
でタグ付けされたPCollection
を取り出しています。1. では文字列の先頭がS
になっているものがこれにあたります。
まとめ
Apache Beamを使ったバッチ処理を書く上で最も汎用性の高いParDo
を使ったスタイルをいくつか紹介しました。
今日紹介したスタイルは、それぞれ専用のAPIが用意されていますが、必要に応じてAPIを覚えなおすのはやや面倒です。 例えば、型を1:1で変換するなら、MapElementsという専用のAPIがあります。フィルターしたいならFilterがあります。
ParDo
には、この記事では紹介しきれなかった便利な機能が他にもありますので是非試してみてください。
Google Dataflowは非常に巨大なデータをバッチ処理するための実行基盤として非常に安価に利用できる上にハイパフォーマンスに動作する環境です。 例えば、筆者の業務ではGCSにおいたログファイルをBigQueryへ投入する手段としてDataflowを利用しています。テラバイトクラスのログファイルが分散処理によって数十分でDBに投入されていく様子は圧巻というほかありません。
この記事を読んだ皆様がDataflowを使って、筆者が受けた感銘を共有していただけたら非常に嬉しいです。
私たちは同じチームで働いてくれる仲間を探しています。今回のエントリで紹介したような仕事に興味のある方、ご応募をお待ちしています。
執筆:@sato.taichi、レビュー:@handa.kenta (Shodoで執筆されました)