こんにちは。コミュニケーションIT事業部 ITソリューション部の英です。
普段はWebアプリやスマホアプリの案件などを担当しています。あと、趣味でAIを勉強しています。
いつもはAI関連の記事を書いていますが、今回はAWSの認証サービスであるAmazon Cognitoについて検証します。
近々案件で使いそうなので、そのための予習です。
Amazon CognitoにはユーザープールとIDプールの2つのサービスがありますが、今回はユーザープールにフォーカスして記事を書きます。
詳細は公式リファレンスを参照してください。
→ ユーザープールとIDプールの違いについて
また、各案件でCognitoを採用するべきか否かを判断するために、事前に割り当て(クォータ)を確認しておきましょう。
引き上げが可能なパラメータもあれば、事前相談が必要なものもあります。例えばユーザープール1つにつき、最大ユーザー数は4000万人となっていますが、これ以上使用する場合にはAWSアカウントチームと相談する必要があります。
BtoCの大きめの案件であれば、事前にこのあたりの調査や調整をしてケアしておきましょう。
→ クォータについて
さて、今回はLambdaを使って認証フローを実際に動かしてみましょう!
次回はそのフローで得られたトークンを用いて、アクセス制御を実装していきます。
Amazon Cognitoとは
AWSが提供するユーザー認証とアクセス管理サービスです。ユーザープールとIDプールを使用して、ユーザー認証を行い、アプリケーションへの安全なアクセスを実現します。
ユーザープールとは
アプリケーションユーザーのサインアップ、サインイン、多要素認証(MFA)を管理する機能です。これにより、ユーザー情報を安全に保存し、認証プロセスをシンプルにします。
外部IDプロバイダーの統合
ユーザープールに外部IDプロバイダー(Google、Facebook、SAMLなど)を追加することで、ユーザーはこれらのプロバイダーを使用してサインインできます。これにより、簡単に外部IDプロバイダーと統合でき、ユーザー管理が一元化されます。
各種トークンについて
トークン名 | 用途 |
---|---|
IDトークン | 認証後にユーザー情報を含むトークン。ユーザーの属性情報の取得や、各種AWSリソースへのアクセス制御に使用する |
アクセストークン | ユーザープール内の情報の参照および更新に使用する |
リフレッシュトークン | 新しいトークンを発行するためのトークン。IDトークンやアクセストークンの有効期限が切れた際に再発行を行う。 |
各トークンの具体的な用途については、公式リファレンスを参照してください。
→ IDトークンの使用
→ アクセストークンの使用
また、各トークンの有効期限はアプリケーションクライアントの設定から変更できます。
詳細は公式リファレンスを参照してください。
→ アプリケーションクライアントの設定
他サービスとの統合
API Gateway
Cognitoオーソライザーを使用して、認証されたユーザーのみがAPIにアクセスできるようにします。
これにより、APIアクセスのセキュリティが強化されます。(本記事の後編ではこちらの検証を行います)
CloudFront
署名付きURLやCookie、Lambda@Edgeと組み合わせて、認証されたユーザーのみがコンテンツにアクセスできるように設定できます。これにより、コンテンツ配信のセキュリティが向上します。
→Authorization@Edge using cookies: Protect your Amazon CloudFront content from being downloaded by unauthenticated users
Application Load Balancer
ALBはCognitoと統合できます。これにより、Cognitoで認証されたユーザーのみがアプリケーションにアクセスできます。
→Application Load Balancer を使用してユーザーを認証する
#ここから本題
今回の認証フローは以下のとおり。
- サインアップ:
- ユーザーがサインアップに必要な情報を入力する (メールアドレス、パスワード、ニックネーム)
- サインアップをすると、メールで認証コードが送信される。
- メールアドレスの確認:
- ユーザーがメールで受け取った認証コードを入力し、メールアドレスの認証を行う。
- サインイン:
- ユーザーがサインインに必要な情報を入力する (メールアドレス、パスワード)
- サインインをすると、SMSで認証コードが送信される。
- 二要素認証(SMSコードの確認):
- ユーザーがSMSで受け取った認証コードを入力し、電話番号の認証を行う。
- アクセストークンの取得:
これらのステップをLambda関数で実装し、実際に動かしてみましょう。
STEP1:ユーザープールの作成
サインインエクスペリエンスを設定
以下のように設定し、ユーザープールを新規作成します。
「フェデレーテッドアイデンティティプロバイダー」は後からでも設定が可能なので、いったんスキップします。
サインインオプションは後から変更できないので注意しましょう。今回はメールと電話番号を使用します。
セキュリティ要件を設定
ここではパスワードポリシーや、アカウントの復旧(パスワードを忘れた場合など)について設定できます。
多要素認証のMFAの方法については、「SMSメッセージ」を選択しておきます。
サインアップエクスペリエンスを設定
今回は「自己登録を有効化」しています。
開発中でまだオープンでないアプリで使用する場合は、無効化しておくことが推奨されています。
この場合、AWSコンソールから直接ユーザープールにユーザーを追加することになります。
後ほどnicknameをメール本文に埋め込みたいため、必須属性として選択しておきましょう。
メッセージ配信を設定
メッセージ配信のプロバイダーとしてSESかCognitoかを選べます。今回はCognitoを選択します。
IAMロールについては既存のポリシーを選択していますが、内容は↓に転記しておきます。
Amazon SNSでSMSメッセージを配信するために必要な権限となります。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sns:publish" ], "Resource": [ "*" ] } ] }
アプリケーションを統合
ここで生成するクライアントシークレットは後ほどLambdaからCognitoを呼びだすときに使用します。
確認および作成
設定を確認して保存してください。
「email_varified」と「phone_number_varified」はCognitoの認証フローでのみ書き込みが可能となっています。
STEP2:カスタムメッセージの作成
Lambda関数でカスタムメッセージを作成できます。
以下のようにメールオブジェクトを返却するだけのシンプルな関数を作成しておきます。
このためだけに関数を作成するのはちょっと無駄な感じもしますが、デフォルトのメッセージテンプレートは自由度が低いためこの方法を採用しています。メール本文にCognitoの{nickname}を埋め込みます。
その他のtrigger sourcesについては公式リファレンスを参照してください。
→Custom message Lambda trigger sources
Cognitoのユーザープールのプロパティ画面で、Lambdaトリガーを作成して先ほどの関数を紐づけます。
STEP3:signup関数を定義する
今回は簡易化のためにクライアントID、クライアントシークレットを環境変数に設定していますが、実際の案件ではAmazon KMSなどを使用して暗号化してください。(※後述)
メールアドレス、電話番号、ニックネームを渡すことでcognitoクライアントのsign_up処理を呼び出しています。
SignUpのソースコード
import boto3 import json import os import hmac import hashlib import base64 cognito_client = boto3.client('cognito-idp') CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] def get_secret_hash(username): msg = username + CLIENT_ID dig = hmac.new(str(CLIENT_SECRET).encode('utf-8'), msg=str(msg).encode('utf-8'), digestmod=hashlib.sha256).digest() return base64.b64encode(dig).decode() def lambda_handler(event, context): email = event['email'] password = event['password'] phone_number = event['phone_number'] nickname = event['nickname'] secret_hash = get_secret_hash(email) try: response = cognito_client.sign_up( ClientId=CLIENT_ID, SecretHash=secret_hash, Username=email, Password=password, UserAttributes=[ {'Name': 'email', 'Value': email}, {'Name': 'phone_number', 'Value': phone_number}, {'Name': 'nickname', 'Value': nickname} ] ) return { 'statusCode': 200, 'body': json.dumps({'message': 'ユーザー登録が成功しました'}, ensure_ascii=False) } except cognito_client.exceptions.UsernameExistsException: return { 'statusCode': 400, 'body': json.dumps({'error': 'ユーザー名は既に存在します'}, ensure_ascii=False) } except Exception as e: return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}, ensure_ascii=False) }
※入力値検証については各々で実装してください。本題と逸れるため本記事では触れません。
※パラメータは以下の公式リファレンスを参照してください
→sign_up
SignUpのテストイベント (メールを受信できるアドレスを設定しましょう)
{ "email": "(任意)@example.com", "password": "(半角英数記号を使ったパスワード)", "phone_number": "+81(電話番号)", "nickname": "でんつーそーけん" }
STEP4:ConfirmSignUp関数を定義する
confirmation_codeを受け取って、cognitoクライアントのconfirm_sign_upを呼び出しています。
confirmation_codeはCognitoから配信されたメール本文に記載されます。(この記事の後半でテストします)
ConfirmSignUpのソースコード
import boto3 import json import os import hmac import hashlib import base64 cognito_client = boto3.client('cognito-idp') CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] def get_secret_hash(username): msg = username + CLIENT_ID dig = hmac.new(str(CLIENT_SECRET).encode('utf-8'), msg=str(msg).encode('utf-8'), digestmod=hashlib.sha256).digest() return base64.b64encode(dig).decode() def lambda_handler(event, context): email = event['email'] confirmation_code = event['confirmation_code'] try: response = cognito_client.confirm_sign_up( ClientId=CLIENT_ID, Username=email, ConfirmationCode=confirmation_code, SecretHash=get_secret_hash(email) ) return { 'statusCode': 200, 'body': json.dumps({'message': 'ユーザーが正常に確認されました'}, ensure_ascii=False) } except cognito_client.exceptions.UserNotFoundException: return { 'statusCode': 400, 'body': json.dumps({'error': 'ユーザーが存在しません'}, ensure_ascii=False) } except cognito_client.exceptions.CodeMismatchException: return { 'statusCode': 400, 'body': json.dumps({'error': '無効な確認コードです'}, ensure_ascii=False) } except Exception as e: return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}, ensure_ascii=False) }
※入力値検証については各々で実装してください。本題と逸れるため本記事では触れません。
※パラメータは以下の公式リファレンスを参照してください
→confirm_sign_up
ConfirmSignUpのテストイベント
{ "email": "(任意)@example.com", "confirmation_code": "(メールで受け取った認証コード)" }
STEP5:InitiateAuth関数を定義する
emailとpasswordを受け取って、cognitoクライアントのinitiate_authを呼び出しています。
ここで多要素認証のSMSが配信されます。(この記事の後半でテストします)
また、ここで生成されたSessionは次のステップ(Challenge)で使用するので控えておきましょう。
USER_ID_FOR_SRPはユーザープール内での一意なIDになります。
InitiateAuthのソースコード
import boto3 import json import os import hmac import hashlib import base64 cognito_client = boto3.client('cognito-idp') CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] def get_secret_hash(username): msg = username + CLIENT_ID dig = hmac.new(str(CLIENT_SECRET).encode('utf-8'), msg=str(msg).encode('utf-8'), digestmod=hashlib.sha256).digest() return base64.b64encode(dig).decode() def lambda_handler(event, context): email = event['email'] password = event['password'] secret_hash = get_secret_hash(email) try: response = cognito_client.initiate_auth( AuthFlow='USER_PASSWORD_AUTH', AuthParameters={ 'USERNAME': email, 'PASSWORD': password, 'SECRET_HASH': secret_hash }, ClientId=CLIENT_ID ) if 'ChallengeName' in response: # 次の認証ステップが必要な場合 return { 'statusCode': 200, 'body': json.dumps({ 'challenge_name': response['ChallengeName'], 'session': response['Session'], 'challenge_parameters': response['ChallengeParameters'] }, ensure_ascii=False) } return { 'statusCode': 200, 'body': json.dumps({ 'access_token': response['AuthenticationResult']['AccessToken'], 'id_token': response['AuthenticationResult']['IdToken'], 'refresh_token': response['AuthenticationResult']['RefreshToken'] }, ensure_ascii=False) } except cognito_client.exceptions.NotAuthorizedException: return { 'statusCode': 401, 'body': json.dumps({'error': 'ユーザー名またはパスワードが正しくありません'}, ensure_ascii=False) } except Exception as e: return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}, ensure_ascii=False) }
※入力値検証については各々で実装してください。本題と逸れるため本記事では触れません。
※パラメータは以下の公式リファレンスを参照してください
→initiate_auth
InitiateAuthのテストイベント
{ "email": "(任意)@example.com", "password": "(半角英数記号を使ったパスワード)" }
STEP6:ConfirmSMS関数を定義する
ここが少し複雑なのですが、respond_to_auth_challenge(SMS_MFA)で使用するsessionやuser_id_for_srpはSTEP5のresponseのchallenge_parametersに記載されています。JSONを解析して値を取り出し、同一セッション内で検証ステップを進めるようにしてください。
ConfirmSMSのソースコード
import boto3 import json import os import hmac import hashlib import base64 import logging cognito_client = boto3.client('cognito-idp') CLIENT_ID = os.environ['CLIENT_ID'] CLIENT_SECRET = os.environ['CLIENT_SECRET'] def get_secret_hash(username): msg = username + CLIENT_ID dig = hmac.new(str(CLIENT_SECRET).encode('utf-8'), msg=str(msg).encode('utf-8'), digestmod=hashlib.sha256).digest() return base64.b64encode(dig).decode() def lambda_handler(event, context): email = event['email'] mfa_code = event['mfa_code'] session = event['session'] user_id_for_srp = event['user_id_for_srp'] secret_hash = get_secret_hash(user_id_for_srp) try: response = cognito_client.respond_to_auth_challenge( ClientId=CLIENT_ID, ChallengeName='SMS_MFA', Session=session, ChallengeResponses={ 'USERNAME': user_id_for_srp, 'SMS_MFA_CODE': mfa_code, 'SECRET_HASH': secret_hash } ) logger.info(f"Response: {response}") return { 'statusCode': 200, 'body': json.dumps({ 'access_token': response['AuthenticationResult']['AccessToken'], 'id_token': response['AuthenticationResult']['IdToken'], 'refresh_token': response['AuthenticationResult']['RefreshToken'] }, ensure_ascii=False) } except cognito_client.exceptions.CodeMismatchException as e: logger.error(f"CodeMismatchException: {e}") return { 'statusCode': 400, 'body': json.dumps({'error': '無効なMFAコードです'}, ensure_ascii=False) } except cognito_client.exceptions.NotAuthorizedException as e: logger.error(f"NotAuthorizedException: {e}") return { 'statusCode': 401, 'body': json.dumps({'error': '認証が失敗しました'}, ensure_ascii=False) } except Exception as e: logger.error(f"Exception: {e}") return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}, ensure_ascii=False) }
※入力値検証については各々で実装してください。本題と逸れるため本記事では触れません。
※パラメータは以下の公式リファレンスを参照してください
→respond_to_auth_challenge
ConfirmSMSのテストイベント
{ "email": "(任意)@example.com", "mfa_code": "(SMSで受け取った認証コード)", "session": "(STEP5で受け取ったセッションID)", "user_id_for_srp": "(STEP5で受け取ったuser_id_for_srp)" }
STEP7:検証
これで会員登録から初回ログインまでの関数を書き終わったので、さっそくテストしてみましょう。
まず、SignUpを呼び出します。
メールが飛んできました。
先ほどLambdaで作成したメールテンプレートが反映されています。
ニックネームもしっかり埋め込まれています。
この状態でユーザープールを確認すると、メールアドレスも電話番号も「未検証」のステータスで登録されています。
次にConfirmSignUpを呼び出します。
メールアドレスが「検証済み」のステータスになりました。
次にInitiateAuthを呼び出します。
SMS_MFAというチャレンジが発生し、セッションIDが返ってきています。
手元のスマートフォンにはSMSで認証コードが飛んできます。
飛んでこない場合はLambdaに紐づいているIAMロールを確認してください。(前述の通り)
次にConfirmSMSを呼び出します。
無事にaccess_token、id_token、refresh_tokenを取得することができました。
電話番号が「検証済み」のステータスになりました。
補足
先ほどスキップしましたが、Lambdaの環境変数を暗号化&復号化するには以下のようにしてください。
LambdaはKMSと統合されているので、暗号化はAWSコンソール上から行うことができます。
KMSを触るための権限をLambdaに付与する必要がありますので、各自で設定してください。以下は一例です。
他にも、AWS Secrets Managerに入れて保管するような方法もあります。
# KMSクライアントの作成 kms_client = boto3.client('kms') class KMSDecryptor: @staticmethod def decrypt_kms_key(ciphertext_blob: str) -> str: try: res = kms_client.decrypt( CiphertextBlob=bytes(base64.b64decode(ciphertext_blob)), EncryptionContext={'LambdaFunctionName': os.environ['AWS_LAMBDA_FUNCTION_NAME']} )['Plaintext'].decode('utf-8') return res except Exception as e: logger.error(f"Decryption error: {e}") raise e # 環境変数を確認し、復号化 try: encrypted_user_pool_id = os.environ['ENCRYPTED_USER_POOL_ID'] encrypted_client_id = os.environ['ENCRYPTED_CLIENT_ID'] encrypted_client_secret = os.environ['ENCRYPTED_CLIENT_SECRET'] USER_POOL_ID = KMSDecryptor.decrypt_kms_key(encrypted_user_pool_id) CLIENT_ID = KMSDecryptor.decrypt_kms_key(encrypted_client_id) CLIENT_SECRET = KMSDecryptor.decrypt_kms_key(encrypted_client_secret)
さいごに
今回はとても長い記事になってしまいました。
次回はこのトークンを使用してAWSリソースに対するアクセス制御を実装していきます。
これからもAWS×AIの検証記事をたくさん書いていきます。
↓ のスターを押していただけると嬉しいです。励みになります。
最後まで読んでいただき、ありがとうございました。
私たちは一緒に働いてくれる仲間を募集しています!
執筆:英 良治 (@hanabusa.ryoji)、レビュー:@akutsu.masahiro
(Shodoで執筆されました)