たそらぼ

日頃思ったこととかメモとか。

S3イベント→SNS→SQSの構成を試した。

S3イベント→SNS→SQSの構成を試した。
SQSの先にはLambdaが繋がっているようなイメージ。
意外と設定をズバリ書いてくれいてる文献がなかったのでまとめる。

設定手順

S3バケットの作成

イベントを起こすS3バケットがなければ、バケットを作成する。

SNSの設定

SNSでトピックを作成する。
このとき、アクセスポリシーにS3のイベントを検知するための権限設定をすることを忘れない。
アクセスポリシーは以下の公式ドキュメントを参考にするとよい。
チュートリアル: 通知 (SNS トピックまたは SQS キュー) のバケットを設定する - Amazon Simple Storage Service
以下のjsonは上記ドキュメントの「ステップ 2: Amazon SNS トピックを作成する」の3の引用。
"SNS-topic-ARN"、"bucket-name"、"bucket-owner-account-id"は変更する。

{
 "Version": "2012-10-17",
 "Id": "example-ID",
 "Statement": [
  {
   "Sid": "example-statement-ID",
   "Effect": "Allow",
   "Principal": {
    "AWS":"*"  
   },
   "Action": [
    "SNS:Publish"
   ],
   "Resource": "SNS-topic-ARN",
   "Condition": {
      "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:bucket-name" },
      "StringEquals": { "aws:SourceAccount": "bucket-owner-account-id" }
   }
  }
 ]
}

これはIAMのポリシー作成画面から作成するのではなく、SNSの画面から直接設定する。Cloudformationテンプレートだとどうなるんだろう?

S3バケットのイベント設定

再度S3バケットに戻る。「プロパティ」>「イベント通知」で発火させたいイベントを設定する。今回はとりあえずPUTをイベントタイプにした。SNSトピックは先に作成したものを選ぶ。トピックにポリシーを付け損ねていた場合は、ここで権限エラーになる。

SQSキュー作成、SNSサブスクリプション設定

SQSキューを作成し、SNSサブスクリプションを設定する。
SQSの画面から適当にキューを作る。作成後、キューの画面から「SNS サブスクリプション」で先ほど作成したSNSトピックのARNを設定する。

  • デッドレターキューは設定しておく。

なお、少し脱線するが、Lambdaのトリガーにするようなキューを作る場合は、忘れずにデッドレターキューを設定する。
Amazon SQS デッドレターキュー - Amazon Simple Queue Service

デッドレターキューを設定しておくと、検証中にSQSからデータを取ったLambdaでなにかエラーが吐いた際に、可視性リトライで知らないうちにリトライされて続けることを防ぐことができる。

  • サブスクライブ設定の動作確認

SNSからのサブスクライブがちゃんとできているかは、以下の方法で分かる。

  1. SNSトピックの画面から「メッセージの発行」でテストメッセージを送る。
  2. SQSの画面で「メッセージを受信」>「メッセージをポーリング」でメッセージを受け付け、中身を見て送ったメッセージであることを確認する。

動作確認

とりあえずバケットとキーを取り出す。

Lambdaを作成し、動作確認を行う。
だいぶ泥臭いが、以下のような感じのコードでPUTされたS3バケットとオブジェクトを取ることができた。

import json

def lambda_handler(event, context):
    
    for msg in event["Records"]:
        notification = json.loads(msg["body"])
        records = json.loads(notification["Message"])
        for record in records["Records"]:
            print("Backet is: ", record["s3"]["bucket"]["name"])
            print("Object is: ", record["s3"]["object"]["key"])
            
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

メッセージの構造

以下の3つのメッセージ構造が組み合わさる。

S3のイベントメッセージがSNSのHTTP/HTTPS 通知の JSONの"Message"キーのバリューに入り、さらにこの全体がlambdaでSQSからメッセージを取り出した時の形式の"body"キーのバリューに入るような感じになる。むずい。