電通総研 テックブログ

電通総研が運営する技術ブログ

golang.org/x/sync/errgroupパッケージを利用した並行処理の実装

こんにちは。金融ソリューション事業部の宮原です。本記事は電通国際情報サービス Advent Calendar 2023 7日目の記事となります。

みなさん、Go言語で並行処理は利用されているでしょうか?Go言語はゴルーチンやチャネルといった独自の並行処理機構を備えており、比較的簡単に並行処理を導入できます。
しかしながら、ゴルーチンやチャネルの仕組みを理解し、並行処理のパターンを独自に実装していくのは意外と難しいものです。

Go言語ではsyncパッケージをはじめとして、並行処理のパターンを簡単に導入できるようにライブラリとして提供しています。
本記事ではGo言語が提供しているライブラリの中でも準標準パッケージの位置付けであるgolang.org/x/sync/errgroupパッケージを利用した並行処理の実装方法をご紹介したいと思います。

前提

以降では「特定のURLにHTTPリクエストを並行で送信しながら、全てのURLへのHTTPリクエストが終わるまで待ち合わせる」という処理を前提としてサンプルコードを記載します。
今回のサンプルコードはsyncパッケージのサンプルコードを一部改変して作成しています。
また、Goのバージョンについては1.21.3を前提としています。

まずはシンプルに記述してみる

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.example.com/",
    }

    for i, url := range urls {
        i, url := i, url
        wg.Add(1)
        go func() {
            defer wg.Done()
            resp, err := http.Get(url)
            if err != nil {
                fmt.Println(err)
                return
            }
            defer resp.Body.Close()
            fmt.Printf("%d番目のリクエスト: %s\n", i+1, resp.Status)
        }()
    }
    wg.Wait()
}

まずはシンプルな形で処理を実装してみます。最初の例ではerrgroupパッケージは利用せず、syncパッケージのsync.WaitGroupを利用して処理を記述します。
後ほど詳しく説明しますが、errgroupパッケージはsync.WaitGroupの機能を拡張したものになります。そのため、まずはsync.WaitGroupをよく理解することが重要です。

処理としてはforループとgoキーワードを利用しゴルーチンを起動して、3つのURLに対して並行にリクエストを送信しています。

sync.WaitGroupを理解する上で重要なのはAddメソッド、Doneメソッド、Waitメソッドです。
今回のプログラムではgoキーワードでゴルーチンを起動する前にAddメソッドを呼び出し、処理が終わった後にDoneメソッドを呼び出しています。
そしてループの外側でWaitメソッドを呼び出し、すべてのゴルーチンが完了するのを待ちます。

sync.WaitGroupではAddメソッドで内部のカウンタをインクリメントし、Doneメソッドで内部のカウンタをデクリメントします。
そしてWaitメソッドでは内部のカウンタを監視し、カウンタが0になるまで待つようになっています。

実行結果は以下です。今回の実行では2番目、3番目、1番目の順で処理が完了したようです。処理が並行に行われているため、この出力は実行毎に変わる可能性があります。

2番目のリクエスト: 200 OK
3番目のリクエスト: 200 OK
1番目のリクエスト: 200 OK

sync.WaitGroupを利用するとシンプルに並行処理を記述できます。チャネルの処理を考慮する必要がなく、処理の流れも比較的イメージしやすいです。

エラーを考慮しながら関数化してみる

先ほどの例ではsync.WaitGroupを利用して処理を記述しました。
今度はURLを並行で処理する部分を関数化する例を考えてみましょう。
実装している中で特定の処理を関数化したくなったり、エラーハンドリングをしたくなったりすることはよくあります。

今回は以下のような関数を考えてみます

func CallURL(urls []string) error

リクエスト先のURLを引数にとり、URLに対して並行に処理を行うようなCallURLという関数を考えてみます。
また、関数内でエラーが発生した場合にはエラーを上位の関数に渡します。
しかし悩ましいのがエラーの扱いです。goキーワードを使った関数から直接エラーを受け取ることはできませんし、sync.WaitGroupのWaitメソッドを利用してエラーを取得することもできません。

そこで便利なのがgolang.org/x/sync/errgroupパッケージのerrgroup.Groupです。
errgroup.Groupを利用することでsync.WaitGroupと同等の機能を利用しながらエラーハンドリングを行うことが可能です。このerrgroup.Groupを利用して上記の関数を記述してみます。

package main

import (
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.example.com/",
    }
    if err := CallURL(urls); err != nil {
        fmt.Println(err)
    }
}

func CallURL(urls []string) error {
    var eg errgroup.Group
    for i, url := range urls {
        i, url := i, url
        eg.Go(func() error {
            resp, err := http.Get(url)
            if err != nil {
                return err
            }
            defer resp.Body.Close()
            fmt.Printf("%d番目のリクエスト: %s\n", i+1, resp.Status)
            return nil
        })
    }
    if err := eg.Wait(); err != nil {
        return err
    }
    return nil
}

errgroup.Groupはsync.WaitGroupを内部に持つ構造体です。
errgroup.Groupを利用した今回のサンプルコードではAddメソッド、Doneメソッドやゴルーチンの起動処理は記載していません。
これらの処理は全てGoメソッドの中で呼び出されているため、明示的に呼びだす必要はありません。
Goメソッドに渡した関数は内部でgoキーワードを使って並行に起動するようになっています。

また、Goメソッドにはエラーを返り値とした関数を渡し、Waitメソッドでは関数からのエラーを受け取れるようになっています。Goメソッドで起動した関数の中でエラーが発生した場合にWaitメソッド経由でエラーを受け取ることができます。

ここでポイントなのが、複数のエラーが発生した場合でもWaitメソッドで確認できるエラーは一つだけということです。
Waitメソッドでは複数の関数の中で最初に発生したエラーしか確認することができず、その他のエラーについては確認することはできません。

キャンセルを検討してみる

先ほどの例では一つの関数でエラーが発生しても、他の関数はそのエラーを気にせず処理を続けるようになっていました。
しかしながら場合によってはエラーが発生した場合に他の関数の処理を停止させたい、キャンセルしたい場合もあると思います。

実はキャンセルに関する機能もerrgroup.Groupには備わっています。
一つの関数でエラーが発生した場合に、他の関数の処理を停止するサンプルを作成してみます。

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.example.com/",
    }
    if err := CallURL(urls); err != nil {
        fmt.Println(err)
    }
}

func CallURL(urls []string) error {
    eg, ctx := errgroup.WithContext(context.Background())

    for i, url := range urls {
        i, url := i, url
        eg.Go(func() error {
            req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err
            }

            defer resp.Body.Close()
            fmt.Printf("%d番目のリクエスト: %s\n", i+1, resp.Status)
            return nil
        })
    }

    if err := eg.Wait(); err != nil {
        return err
    }
    return nil
}

今回の例ではerrgroup.WithContext関数を最初に呼び出しています。
返り値はerrgroup.Groupとcontext.Contextです。
あとは返り値のerrgroup.Groupとcontext.Contextを利用して、処理を実装します。

今回の例ではAPI呼び出し部分も少し変わっています。htttp.NewRequestWithContext関数とhttp.DefaultClient.Doメソッドを呼びだすようにしています。
これらの関数を利用することで引数のcontext.Contextがキャンセルされているかを監視しながら処理を進めるようにしています。
http.DefaultClient.Doメソッド内部ではcontext.Contextのキャンセルを監視し、キャンセルされている場合は処理を終了するようになっています。

では肝心のcontext.Contextのキャンセル処理はどこで実行されているのでしょうか?
実はGoメソッドの内部で関数がエラーを返した場合にcontext.Contextのキャンセルを実行するようになっています。
WithContext関数の中ではcontext.WithCancelCause関数が呼び出されており、WithCancelCause関数の返り値のキャンセル関数がerrgroup.Groupのフィールドにセットされます。
Goメソッドの内部ではエラーが発生した場合に、このキャンセル関数を呼びだすことでcontext.Contextをキャンセルするようになっています。

(Goのバージョンが1.20以上の場合、WithContext関数の内部ではcontext.WithCancelCause関数が呼び出されますが、1.20以前の場合はcontext.WithCancel関数が呼び出されます)

同時実行数を制御する

errgroup.GroupにはGoメソッドで起動するゴルーチンの同時実行数を制御する機能があります。
例えば特定のURLに複数のリクエストを投げる際に、負荷を考慮しながらリクエストを投げたい場合などがあると思います。そういった際にはこの同時実行数制御の仕組みが役に立ちます。
サンプルコードを見てみましょう。

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.example.com/",
    }
    if err := CallURL(urls); err != nil {
        fmt.Println(err)
    }
}

func CallURL(urls []string) error {
    eg, ctx := errgroup.WithContext(context.Background())
    eg.SetLimit(2)

    for i, url := range urls {
        i, url := i, url
        eg.Go(func() error {
            req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err
            }

            defer resp.Body.Close()
            fmt.Printf("%d番目のリクエスト: %s\n", i+1, resp.Status)
            return nil
        })
    }

    if err := eg.Wait(); err != nil {
        return err
    }
    return nil
}

一つ前の例から変わっている箇所は一箇所です。
今回の例ではerrgroup.GroupのSetLimitメソッドを呼び出しています。引数で同時実行数を渡すと、Goメソッド内で起動されるゴルーチンの数が制限されます。今回のサンプルコードではゴルーチンの同時実行数を2としています。
errgroup.Groupの内部ではバッファ付きチャネルを利用して、ゴルーチンの同時実行数を制御しています。
Goメソッド内部でゴルーチン起動前にバッファ付きチャネルに値を追加し、処理完了後にチャネルから値を取り出しています。バッファがいっぱいになった場合はそのタイミングで処理が停止するようになっています。

このようにSetLimitメソッドを使うだけでゴルーチンの同時実行数を制御できます。

まとめ

今回の記事ではgolang.org/x/sync/errgroupパッケージを利用した並行処理の実装について紹介しました。
errgroupパッケージを利用することで簡単に並行処理を記述できるだけではなくエラーハンドリング、キャンセル処理、ゴルーチンの同時実行数制御など並行処理のパターンを導入できます。

みなさんも並行処理を実装する際にはぜひgolang.org/x/sync/errgroupパッケージの利用を検討してみてください。

私たちは一緒に働いてくれる仲間を募集しています!

募集職種一覧

執筆:@miyahara.hikaru、レビュー:@yamashita.tsuyoshi
Shodoで執筆されました