電通総研 テックブログ

電通総研が運営する技術ブログ

Amazon Kinesis Data StreamsでIoTデバイスのリアルタイム異常検知

こんにちは。コミュニケーションIT事業部 ITソリューション部の英です。

普段はWebアプリやスマホアプリの案件などを担当しています。あと、趣味で AI を勉強しています。

今回は IoTデバイス(温度センサー)の異常検知 をテーマに記事を書いてみます。

温度センサーからAmazon Kinesis Data Streamsにデータが送信され、送信されたデータに対してAmazon Managed Service for Apache Flinkでリアルタイムにデータを処理します。
今回は閾値で異常を検知し、SNSに通知するシンプルな仕組みにしています。

Amazon Kinesis Data Streams(KDS)とは

Amazon Kinesis Data Streams(KDS)は、リアルタイムで大量のデータを収集、処理、および分析するためのサービスです。データのストリーミング処理を簡単に行うことができます。

  • 高いスケーラビリティ:必要に応じてスループットを調整可能で、大規模なデータ処理にも対応します。
  • データの分割:データを複数のシャードに分けることで、並列処理が可能です。
  • データ保持:デフォルトで24時間、最大8760 時間(365日)までデータを保持できます。

Amazon Managed Service for Apache Flinkとは?

Apache Flink は、リアルタイムでデータを処理・分析するためのソリューションです。
マネージドサービスなのでインフラの管理が不要で、ユーザーはコードの記述に集中できます。
Kinesis Data Streams、Kinesis Data Firehose、Amazon S3などのAWSサービスと簡単に統合できます。


ここから本題

STEP1:データストリームの作成

Kinesis Data Streamsを選択した状態で「データストリームの作成」を押下します。
以下のように設定します。今回はシャード分割なしとします。

STEP2:通知用のSNSを作成する

Amazon SNSのトピックを作成し、Eメールでサブスクライブしておきます。
とくに難しい手順はないのでこの記事では解説しません。

STEP3:Cloud9にKinesisへの権限を与える

Cloud9から先ほどのデータストリームに送信できるように権限を設定します。
新規のIAMポリシーを作成して、新規ロールにアタッチしましょう。
Cloud9用のデフォルトロールにポリシーをアタッチすることはおすすめしません。
特定のデータストリームにだけ権限を与えればよいのでARNを指定しましょう。
その後、Cloud9が稼働しているEC2のIAMロールを切り替えます。

STEP4:テストデータの送信

Cloud9を立ち上げ、以下のスクリプトを実行します。
1秒ごとに温度データが正弦波で連携され、0.5%の確率で異常データが生成されます。
以下のようにデータが流れ始めます。

  • 正常なデータ:20~30℃の範囲内
  • 異常なデータ:35~50℃の範囲内

送信されたデータはKinesisのデータビューワーでも確認できます。

STEP5:Apache Flinkのノートブックを作成

データ分析タブからノートブックを新規作成できます。
ボタン一つでGlueのDBも自動生成されます。

作成が完了すると「Open in Apache Zeppelin」からノートブックを開くことができます。

STEP6:サンプルSQLの確認

初期状態から全文SELECTのSQLが作成されています。
connectorにkinesisを指定し、rawデータを取得しています。

STEP7:新規ノートブックを作成する(カスタマイズ用)

Managed Apache Flinkの画面で「Studio ノートブックを作成」を押下します。
先ほど自動生成されたDBを流用します。


ノートブックに紐づいているIAMロールに以下の権限を追加します。

  • Kinesis Data Streamsの読み書き
  • Glueの読み書き ※すでにGlueを利用している場合はリソースを指定してください
  • SNSの発行
     {
           "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": "arn:aws:sns:ap-northeast-1:(アカウントID):(トピック名)"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:ListShards",
                "kinesis:DescribeStream",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": "arn:aws:kinesis:ap-northeast-1:(アカウントID):stream/TemperatureStream"
        },
       {
            "Effect": "Allow",
            "Action": [
                "glue:GetPartitions",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:DeleteTable",
                "glue:CreateTable"
            ],
            "Resource": [
                "arn:aws:glue:ap-northeast-1:(アカウントID):catalog",
                "arn:aws:glue:ap-northeast-1:(アカウントID):database/*",
                "arn:aws:glue:ap-northeast-1:(アカウントID):table/*"
            ]
        }

STEP8:SQLを定義していく

まずは、ストリームデータが取得できるか、デフォルトのSQLを投げてみます。
Cloud9からダミーデータを流してみると、JSON形式でrawデータが返ってきました。

次にJSON内の要素を分割してみます。
temperatureとtimestampの2つに分割することができました。

STEP10:異常検知のジョブを定義する

以下をノートブックに貼り付けて実行する。
<>内は環境に合わせて書き換えてください。

%flink.pyflink
import boto3
from pyflink.table import TableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

# ストリーミング実行環境とテーブル環境の設定
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_env = StreamTableEnvironment.create(exec_env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().build())

# データソースの作成
t_env.execute_sql("""
CREATE TABLE raw_single_column_table (
  single_column_output STRING
) WITH (
  'connector' = 'kinesis',
  'stream' = '<ストリーム名>',
  'aws.region' = '<リージョン名>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'raw'
)
""")

# データ解析ビューの作成
t_env.execute_sql("""
CREATE VIEW parsed_data AS
SELECT 
  CAST(JSON_VALUE(single_column_output, '$.temperature') AS DOUBLE) AS temperature,
  TO_TIMESTAMP(CAST(JSON_VALUE(single_column_output, '$.timestamp') AS STRING), 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS''Z''') AS `timestamp`
FROM 
  raw_single_column_table
""")

# UDF(ユーザー定義関数)を作成して、SNS通知を送信
@udf(result_type=DataTypes.BOOLEAN())
def notify_sns(temperature, timestamp):
    if temperature >= 30:
        sns_client = boto3.client('sns', region_name='ap-northeast-1')
        message = f"Temperature alert: {temperature}℃ at {timestamp}"
        sns_client.publish(
            TopicArn='arn:aws:sns:ap-northeast-1:<アカウントID>:<トピック名>',
            Message=message
        )
        print(message)
        return True
    return False

# UDFをテーブル環境に登録
t_env.create_temporary_function("notify_sns", notify_sns)

# 結果を出力するテーブルを作成
t_env.execute_sql("""
CREATE TABLE alert_table (
  temperature DOUBLE,
  `timestamp` TIMESTAMP(3),
  notified BOOLEAN
) WITH (
  'connector' = 'print'
)
""")

# 温度が30℃以上のデータをフィルタリングし、通知を送信するSQLクエリを実行
result_table = t_env.sql_query("""
SELECT 
  temperature, 
  `timestamp`,
  notify_sns(temperature, `timestamp`) AS notified
FROM parsed_data
WHERE temperature >= 30
""")

# 結果をalert_tableに保存
result_table.execute_insert("alert_table")

# ストリーミングジョブの実行
exec_env.execute("Temperature Alert Job")

ジョブの実行状況はノートブックの右上の「FLINK JOB」を押下することで確認できます。
ログやチェックポイントなど詳細なジョブの実行状況を監視できます。

Cloud9からダミーデータを流すと、Eメールで通知されることが確認できました。
温度センサーのデータが30℃以上ならばメールで通知される仕組みです。↓は45.61℃。

まとめ

IoTデバイスの温度センサーから送信されるデータをリアルタイムで処理し、異常を検知するシステムの構築方法を紹介しました。
今回はメールで通知するだけでしたが、AWSの様々なサービスと統合できるので、業務シナリオに合わせて柔軟に組み替えてリアルタイム監視システムを構築していきましょう。

また今回は閾値で異常を検知しましたが、時系列データの異常検知にはRCF(ランダムカットフォレスト)を用いることが一般的になっています。
これはまた別の記事で紹介しようと思います。

↓ のスターを押していただけると嬉しいです。励みになります。

最後まで読んでいただき、ありがとうございました。

執筆:英 良治 (@hanabusa.ryoji)、レビュー:Ishizawa Kento (@kent)
Shodoで執筆されました