Y_Yamashitaのブログ

勉強したことのアウトプット・メモが中心。記事の内容は個人の見解であり、所属組織を代表するものではありません。

DynamoDB StreamsのログをCloudWatch Logsに記録する

今回は、DynamoDB StreamsのログをLambdaを使ってCloudWatch Logsに記録してみます。

DynamoDB Streamsとは

DynamoDB テーブル内の項目レベルの変更をキャプチャし、時系列順にストリームデータとして保存してくれる機能です。
ただし、保存期間は24時間となっており、長期保存のためのサービスではありません。長期保存したい場合は、データを別途S3やCloudWatch Logsに保存する必要があります。
詳細は下記公式ドキュメントをご参照ください。

docs.aws.amazon.com

事前準備

DynamoDB、Lambda、CloudWatch Logsの設定を行います。

DynamoDB

DynamoDBでストリームをオンにします。「エクスポートおよびストリーム」で「オンにする」ボタンを押します。

あとは表示タイプを選ぶだけです。今回は「新旧イメージ」を選択します。項目が変更された際に、変更前の値と変更後の値が両方記録されます。

オンになると、ストリーム用のARNが発行されます。後ほどLambdaのIAMロールに権限を追加する際に、このARNを指定します。

CloudWatch Logs

ロググループとログストリームを新規作成します。テスト用なので暗号化の設定などは特に行いません。

Lambda

環境変数

CloudWatchロググループ名とログストリーム名を環境変数として指定します。今回はどちらも同じ名前にしたので、同じ値が指定されています。

アクセス権

LambdaのIAMロールに、以下の権限を追加します。

  1. DynamoDB Streamのデータを取得する権限
  2. 今回作成したCloudWatch Logsにログを書き込む権限

1の権限については、以下のポリシーをインラインポリシーで追加します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "APIAccessForDynamoDBStreams",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:DescribeStream",
                "dynamodb:ListStreams"
            ],
            "Resource": "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/yamashita_test_table/stream/2024-11-09T08:12:19.138"
        }
    ]
}

詳細は以下ドキュメントを参照ください。

docs.aws.amazon.com

2については、Lambda作成時に自動で作られるIAMロールのポリシーにて、Resouceに今回作成したCloudWatchのARNを追加します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:ap-northeast-1:xxxxxxxxxxxx:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:ap-northeast-1:xxxxxxxxxxxx:log-group:/aws/lambda/dynamodb_stream_put_cloudwatch_logs:*",
                "arn:aws:logs:ap-northeast-1:xxxxxxxxxxxx:log-group:yamashita_test_table_stream_logs:*"
            ]
        }
    ]
}

トリガー

今回作成したDynamoDBを指定してトリガーを作成します。テストのため、設定値は全てデフォルトです。
なお、LambdaのIAMロールにDynamoDBへのアクセス権を付与していないとトリガーが作成できないのでご注意ください。

コード

今回のLambdaのコードは以下です。Boto3を使います。

import boto3
import os
import json

def lambda_handler(event, context):
    records = event['Records']
    cloudwatch = boto3.client('logs')

    for record in records:
        timestamp = 1000 * int(record['dynamodb']['ApproximateCreationDateTime'])
        response = cloudwatch.put_log_events(
            logGroupName=os.environ['CLOUDWATCH_LOG_GROUP'],
            logStreamName=os.environ['CLOUDWATCH_LOG_STREAM'],
            logEvents=[
                {
                    'timestamp': timestamp,
                    'message': json.dumps(record)
                },
            ]
        )
    
        print(response)

CloudWatch Logsにログを保存する際にはタイムスタンプを指定します。注意点として、UNIX Time形式で、ミリ秒で指定する必要があります。今回は、DynamoDB Streams内のApproximateCreationDateTimeを利用します。ApproximateCreationDateTimeはUNIX Timeなので、1,000倍してミリ秒に変換しています。
また、バッチ処理を行う場合、一つのレコード内に複数のログが配列として記録されます。そのため、for文を使い繰り返し配列からログを取り出して記録するようにします。
CloudWatchロググループ名とログストリーム名は、先ほど指定した環境変数を使って指定します。

テストする

それではテストしてみます。DynamoDBテーブルデータの書き込み・更新・削除を実施します。

書き込み

まずは書き込みです。方法は何でも良いのですが、今回は書き込み用のLambdaを別で作成して書き込みます。

書き込み用Lambdaのコードは以下です。書き込むデータは何でも良いので、今回はLambdaのコンテキスト情報を書き込みます。
DynamoDBのテーブル名は環境変数で指定しています。

import boto3
import os

def lambda_handler(event, context):
    region = os.environ['AWS_REGION']
    tablename = os.environ['DYNAMODB_TABLE_NAME']
    
    item = {
        "lambda_request_id": {"S": context.aws_request_id},
        "lambda_name": {"S": context.function_name},
        "lambda_version": {"S": context.function_version},
        "lambda_memory_limit": {"N": context.memory_limit_in_mb},
        "lambda_log_group": {"S": context.log_group_name},
        "lambda_log_stream": {"S": context.log_stream_name}
    }
    
    dynamodb = boto3.client('dynamodb', region_name=region)
    dynamodb.put_item(TableName=tablename, Item=item)

書き込み用Lambdaでテストを実行します。

DynamoDBでスキャンを行います。問題なく書き込まれています。

CloudWatch Logsを確認すると、ログが書き込まれています。

書き込みログの中身は以下です。新規書き込みなので、NewImageのみ記録されています。

書き込みログ(クリックまたはタップで詳細表示)

{
    "eventID": "6c5a2c4d4029a60a8e21614b87544a90",
    "eventName": "INSERT",
    "eventVersion": "1.1",
    "eventSource": "aws:dynamodb",
    "awsRegion": "ap-northeast-1",
    "dynamodb": {
        "ApproximateCreationDateTime": 1731150824,
        "Keys": {
            "lambda_request_id": {
                "S": "44b7e3ea-91b5-4149-a932-1e5d930907a7"
            }
        },
        "NewImage": {
            "lambda_memory_limit": {
                "N": "128"
            },
            "lambda_log_stream": {
                "S": "2024/11/09/[$LATEST]8e459ecb2e12456b80841137639eb172"
            },
            "lambda_request_id": {
                "S": "44b7e3ea-91b5-4149-a932-1e5d930907a7"
            },
            "lambda_version": {
                "S": "$LATEST"
            },
            "lambda_name": {
                "S": "dynamodb_put_item"
            },
            "lambda_log_group": {
                "S": "/aws/lambda/dynamodb_put_item"
            }
        },
        "SequenceNumber": "116900000000065479831528",
        "SizeBytes": 291,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
    },
    "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/yamashita_test_table/stream/2024-11-09T08:12:19.138"
}

更新

続いて更新です。更新もLambdaで行ってみます。更新用のコードは以下です。今回は lambda_memory_limit の値を更新します。

import boto3
import os

def lambda_handler(event, context):
    region = os.environ['AWS_REGION']
    tablename = os.environ['DYNAMODB_TABLE_NAME']
    
    dynamodb = boto3.client('dynamodb', region_name=region)
    response = dynamodb.update_item(
        TableName=tablename,
        Key={"lambda_request_id":  {"S": "44b7e3ea-91b5-4149-a932-1e5d930907a7"}},
        UpdateExpression="SET lambda_memory_limit = :lambda_memory_limit",
        ExpressionAttributeValues={":lambda_memory_limit": {"N": "1024"}}
    )

    print(response)

更新用Lambdaでテストを実施します。

DynamoDBでスキャンを行います。想定通り、lambda_memory_limit の値が更新されています。

CloudWatch Logsを確認すると、ログが書き込まれています。

更新ログの中身は以下です。NewImageとOldImageが記録されています。

更新ログ(クリックまたはタップで詳細表示)

{
    "eventID": "812c7eca2bb587ea0a8c17b3e7921d20",
    "eventName": "MODIFY",
    "eventVersion": "1.1",
    "eventSource": "aws:dynamodb",
    "awsRegion": "ap-northeast-1",
    "dynamodb": {
        "ApproximateCreationDateTime": 1731152303,
        "Keys": {
            "lambda_request_id": {
                "S": "44b7e3ea-91b5-4149-a932-1e5d930907a7"
            }
        },
        "NewImage": {
            "lambda_memory_limit": {
                "N": "1024"
            },
            "lambda_log_stream": {
                "S": "2024/11/09/[$LATEST]8e459ecb2e12456b80841137639eb172"
            },
            "lambda_request_id": {
                "S": "44b7e3ea-91b5-4149-a932-1e5d930907a7"
            },
            "lambda_version": {
                "S": "$LATEST"
            },
            "lambda_name": {
                "S": "dynamodb_put_item"
            },
            "lambda_log_group": {
                "S": "/aws/lambda/dynamodb_put_item"
            }
        },
        "OldImage": {
            "lambda_memory_limit": {
                "N": "128"
            },
            "lambda_log_stream": {
                "S": "2024/11/09/[$LATEST]8e459ecb2e12456b80841137639eb172"
            },
            "lambda_request_id": {
                "S": "44b7e3ea-91b5-4149-a932-1e5d930907a7"
            },
            "lambda_version": {
                "S": "$LATEST"
            },
            "lambda_name": {
                "S": "dynamodb_put_item"
            },
            "lambda_log_group": {
                "S": "/aws/lambda/dynamodb_put_item"
            }
        },
        "SequenceNumber": "117000000000065481125080",
        "SizeBytes": 529,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
    },
    "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/yamashita_test_table/stream/2024-11-09T08:12:19.138"
}

削除

最後は削除です。これはDynamoDBコンソール上で実施してしまいます。

削除されました。

CloudWatch Logsを確認すると、ログが書き込まれています。

削除ログの中身は以下です。削除の場合、OldImageのみ記録されています。

削除ログ(クリックまたはタップで詳細表示)

{
    "eventID": "03a08f9eaac5e874c10c24f75e35e52a",
    "eventName": "REMOVE",
    "eventVersion": "1.1",
    "eventSource": "aws:dynamodb",
    "awsRegion": "ap-northeast-1",
    "dynamodb": {
        "ApproximateCreationDateTime": 1731152855,
        "Keys": {
            "lambda_request_id": {
                "S": "44b7e3ea-91b5-4149-a932-1e5d930907a7"
            }
        },
        "OldImage": {
            "lambda_memory_limit": {
                "N": "1024"
            },
            "lambda_log_stream": {
                "S": "2024/11/09/[$LATEST]8e459ecb2e12456b80841137639eb172"
            },
            "lambda_request_id": {
                "S": "44b7e3ea-91b5-4149-a932-1e5d930907a7"
            },
            "lambda_version": {
                "S": "$LATEST"
            },
            "lambda_name": {
                "S": "dynamodb_put_item"
            },
            "lambda_log_group": {
                "S": "/aws/lambda/dynamodb_put_item"
            }
        },
        "SequenceNumber": "117100000000065481579503",
        "SizeBytes": 291,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
    },
    "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:xxxxxxxxxxxx:table/yamashita_test_table/stream/2024-11-09T08:12:19.138"
}



というわけで、無事にCloudWatch Logsに記録することが出来ました。LambdaのトリガーにDynamoDB Streamsを設定できるため、eventデータとしてログを受け取れるので、かなりシンプルなコードで済みました。

今回のブログは以上です。少しでも参考になることがあれば幸いです。