こんにちは。株式会社JMDC データウェアハウス開発部の甲(きのえ)です。
今年、JMDCではアドベントカレンダーに参加しています。
qiita.com
本記事は、JMDC Advent Calendar 2025 6日目の記事です。
目次
はじめに
AWSを運用しているとCloudWatch LogsにVPCフローログ、各種アプリケーションのログがたまっていきます。
CloudWatch Logsはロググループごとに保存期間を設定できますが、長期保存したり分析したりするためにS3に保存したいニーズがあるかと思います。
CloudWatch LogsからS3にログを転送する方法はいくつかありますが、今回はAmazon Data Firehoseを使ってすべてのCloudWatch LogsをS3に転送する方法を紹介します。
注意事項
- この記事では、これから発生するすべてのCloudWatch LogsをS3に転送する方法を紹介します。すでに存在するログをS3に転送する方法ではありません
- この記事に載せるソースコードはすべてサンプルコードです
対象読者
- CloudWatch Logsに保存されているログをS3に転送したい方
- CloudWatch Logsのログを分析したい方
構成イメージ
実際にCloudWatch LogsからS3にログを転送する構成イメージは以下のとおりです。
- AWSサービスからCloudWatch Logsにログを送信する
- CloudWatch LogsのサブスクリプションフィルターでData Firehoseにログを転送する
- Data Firehoseがパーティショニング用のLambda関数を呼び出し、ログをS3に保存する

手順
データウェアハウス開発部ではクラウドのインフラストラクチャー構築においてIaC(Infrastructure as Code)を徹底しているため、AWSサービスの構築にAWSマネジメントコンソールはほとんど使用しません。
今回は一部AWSマネジメントコンソールの使用も交えつつ、CloudFormationで構築する例を紹介したいと思います。
※データウェアハウス開発部ではシステムによってTerraformやCloudFormationを使用しています。
CloudFormationの要所要所でリソースの説明をしますが、最終的なCloudFormationテンプレート全文はこの記事の最後に掲載します。
CloudFormationテンプレートの作成
① CloudWatch Logs保存用S3バケットの作成
まず保存先のS3を作成します。

削除ポリシーやバケットポリシーは要件にしたがって書き換えてください。
DestinationBucket: Type: AWS::S3::Bucket DeletionPolicy: Retain UpdateReplacePolicy: Retain Properties: BucketName: !Sub ${ProjectName}-${EnvType}-cwlogs-destination-bucket PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true OwnershipControls: Rules: - ObjectOwnership: BucketOwnerEnforced VersioningConfiguration: Status: Enabled
② CloudWatch LogsをS3に保存するためのパーティショニング用Lambda関数を作成
CloudWatch LogsをパーティショニングしながらS3に保存するためのLambda関数を作成します。
ただ転送するだけであればLambda関数を作る必要はないのですが、今回は将来的にAthenaなどで分析することを想定してパーティショニングします。

Lambda関数にはgzip圧縮されたデータが渡されてくるので、パーティションキーを設定してから返してあげます。
CloudWatchLogProcessorLambda: Type: AWS::Lambda::Function Properties: FunctionName: !Sub ${ProjectName}-${EnvType}-cwlogs-processor Handler: index.lambda_handler Role: !GetAtt CloudWatchLogProcessorLambdaRole.Arn Runtime: python3.12 Timeout: 120 MemorySize: 256 Code: ZipFile: | # flake8: noqa: E501 import json import base64 import gzip from io import StringIO import datetime from zoneinfo import ZoneInfo JST_TZ = ZoneInfo("Asia/Tokyo") def parse_payload(data: str): raw = base64.b64decode(data) # CloudWatch Logs サブスクリプション経由は gzip 圧縮されているため解凍する try: raw = gzip.decompress(raw) except OSError: # 非圧縮の場合はそのまま pass return json.loads(raw.decode('utf-8')) def generate_partition_keys(log_group: str, timestamp: int): dt_jst = datetime.datetime.fromtimestamp( timestamp / 1000, tz=datetime.timezone.utc).astimezone(JST_TZ) return { 'log_group': log_group, 'year': f"{dt_jst.year:04d}", 'month': f"{dt_jst.month:02d}", 'day': f"{dt_jst.day:02d}", } def process_record(record: dict): record_id = record.get('recordId') data = record.get('data', '') try: payload = parse_payload(data) log_group = payload.get('logGroup', '') log_events = payload.get('logEvents', []) # CONTROL_MESSAGEの場合は保存しないためDroppedを設定 # https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html # https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/data-transformation-status-model.html if payload.get('messageType') == 'CONTROL_MESSAGE': return {'recordId': record_id, 'result': 'Dropped', 'data': data} if not log_group or not log_events: return {'recordId': record_id, 'result': 'Dropped', 'data': data} # ロググループ、タイムスタンプからパーティションキーを作成する # https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/dynamic-partitioning-partitioning-keys.html # ロググループにスラッシュがある場合でもAthenaでクエリすることが可能なので置換は不要 # (S3の階層は分かれるが、Athenaでのクエリは可能) partition_keys = generate_partition_keys( log_group, log_events[0].get('timestamp', 0) ) buf = StringIO() for ev in log_events: ts = ev.get('timestamp') dt_jst = datetime.datetime.fromtimestamp( ts / 1000, tz=datetime.timezone.utc).astimezone(JST_TZ).isoformat() ev_obj = { 'id': ev.get('id'), 'timestamp': dt_jst, 'message': ev.get('message'), 'log_group': log_group, 'log_stream': payload.get('logStream') } buf.write(json.dumps(ev_obj, ensure_ascii=False) + '\n') encoded = base64.b64encode( buf.getvalue().encode('utf-8')).decode('utf-8') return { 'recordId': record_id, 'result': 'Ok', 'data': encoded, 'metadata': {'partitionKeys': partition_keys} } except json.JSONDecodeError as e: # エラーフォルダーに送られる print(f"JSON decode error recordId={record_id} msg={e}") return {'recordId': record_id, 'result': 'ProcessingFailed', 'data': data} except Exception as e: # エラーフォルダーに送られる print(f"Unexpected error recordId={record_id} type={type(e).__name__} msg={e}") return {'recordId': record_id, 'result': 'ProcessingFailed', 'data': data} def lambda_handler(event, context): out = [] for r in event.get('records', []): out.append(process_record(r)) return {'records': out}
③ CloudWatch LogsをS3に配置するためのFirehose Delivery Streamを作成します
②で作成したLambda関数でパーティションキーを設定しているのでそれを利用してS3に配置するようにFirehoseのパラメーターを調節します。

今回はHive形式で
s3//{S3バケット名}/log_group={ロググループ名}/year={年}/month={月}/day={日}/ログファイル名
がS3保存先のPathとなるように設定します。
この時のポイントはパーティショニングキーとしてpartitionKeyFromLambdaを使用していることです。
これにより、Lambda関数の戻り値に含まれるパーティショニングキーをS3の保存先として使用できます。
CloudWatchLogsDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: !Sub ${ProjectName}-${EnvType}-cwlogs-delivery-stream DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: BucketARN: !Sub arn:aws:s3:::${DestinationBucketName} RoleARN: !GetAtt FirehoseDeliveryRole.Arn BufferingHints: IntervalInSeconds: 300 SizeInMBs: 64 CompressionFormat: GZIP # 動的パーティショニングの場合はpartitionKeyFromLambdaを使う Prefix: !Sub log_group=!{partitionKeyFromLambda:log_group}/year=!{partitionKeyFromLambda:year}/month=!{partitionKeyFromLambda:month}/day=!{partitionKeyFromLambda:day}/ ErrorOutputPrefix: !Sub cwlogs_error/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/ DynamicPartitioningConfiguration: Enabled: true RetryOptions: DurationInSeconds: 300 ProcessingConfiguration: Enabled: true Processors: - Type: Lambda Parameters: - ParameterName: LambdaArn ParameterValue: !GetAtt CloudWatchLogProcessorLambda.Arn
④ アカウントレベルでCloudWatch Logsのサブスクリプションフィルターを作成する

AccountPolicyを作成し、アカウント全体でCloudWatch Logsに対してサブスクリプションフィルターを設定します。
AccountPolicyを使うことでS3に転送したいCloudWatch Logsのロググループを個別に指定する必要がなくなり、
一括ですべてのロググループをS3に転送できます。
ただし、先に作成したLambdaのロググループは無限ループを防ぐために除外する必要があります。
CloudWatchLogsAccountPolicy: Type: AWS::Logs::AccountPolicy Properties: PolicyName: !Sub ${ProjectName}-${EnvType}-cwlogs-account-policy PolicyDocument: !Sub - | { "DestinationArn" : "${DeliveryStreamArn}", "RoleArn" : "${CloudWatchLogsSubscriptionRoleArn}", "FilterPattern" : "" } - DeliveryStreamArn: !GetAtt CloudWatchLogsDeliveryStream.Arn CloudWatchLogsSubscriptionRoleArn: !GetAtt CloudWatchLogsSubscriptionRole.Arn PolicyType: SUBSCRIPTION_FILTER_POLICY Scope: ALL # 除外するロググループを指定する # ログ転送で使用するLambda関数のロググループは除外する[重要] SelectionCriteria: !Sub 'LogGroupName NOT IN ["/aws/lambda/${ProjectName}-${EnvType}-cwlogs-processor"]'
※現時点(2025年11月)時点でAWS::Logs::AccountPolicyのPolicyDocumentはJSONで指定する必要があるので注意してください。
AWSマネジメントコンソールからCloudFormationをデプロイ
最後にCloudFormationのテンプレートをデプロイします。
デプロイ方法はいくつかありますが今回はAWSマネジメントコンソールから手動で実施することにしています。
- AWSマネジメントコンソールのCloudFormationの画面に移動します
- 「スタックの作成」->「新しいリソースを使用(標準)」をクリックします


- 「テンプレートの準備」画面で「テンプレートファイルのアップロード」を選択し、作成したCloudFormationテンプレートをアップロードします


- 今回のテンプレートではパラメーターとしてProjectName、EnvTypeを指定する必要があるため、適切な値を入力します
- 画面の指示に従い、CloudFormationスタックを作成します
デプロイしたログ転送設定が正しく動作していることを確認します。
- CloudWatch Logsのロググループにアカウントレベルのサブスクリプションフィルターが設定されていること


- S3にCloudWatch Logsのログが保存されていること


まとめ
今回はCloudWatch LogsからS3にログを転送する方法を紹介しました。
- CloudWatch Logsのアカウントレベルのサブスクリプションフィルターを使用することで、すべてのロググループからログを転送できました
- Firehoseの動的パーティショニング機能を使用することで、パーティショニングしながらS3に保存できました
とにかくCloudWatch LogsをS3に保存したい、という場合は、ぜひ今回紹介した方法を参考にしてみてください!
CloudFormationテンプレート全文
最後に今回の記事で作成したCloudFormationテンプレート全部を掲載します
AWSTemplateFormatVersion: 2010-09-09 Description: Transfer CloudWatchLog to S3 Bucket Parameters: ProjectName: Description: Please input project name. Type: String AllowedPattern: '[-a-zA-Z0-9]*' EnvType: Default: dev Type: String Resources: DestinationBucket: Type: AWS::S3::Bucket DeletionPolicy: Retain UpdateReplacePolicy: Retain Properties: BucketName: !Sub ${ProjectName}-${EnvType}-cwlogs-destination-bucket PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true OwnershipControls: Rules: - ObjectOwnership: BucketOwnerEnforced VersioningConfiguration: Status: Enabled FirehoseDeliveryRole: Type: AWS::IAM::Role Properties: RoleName: !Sub ${ProjectName}-${EnvType}-firehose-delivery-role AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: !Sub ${ProjectName}-${EnvType}-firehose-delivery-policy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:PutObject - s3:PutObjectAcl - s3:PutObjectTagging Resource: !Sub arn:aws:s3:::${DestinationBucket}/* - Effect: Allow Action: - s3:ListBucket Resource: !Sub arn:aws:s3:::${DestinationBucket} - Effect: Allow Action: - lambda:InvokeFunction - lambda:GetFunctionConfiguration # 公式ドキュメントには記載されいる。なくても動くが追加しておく Resource: - !GetAtt CloudWatchLogProcessorLambda.Arn CloudWatchLogsSubscriptionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub ${ProjectName}-${EnvType}-common-cwlogs-subscription-role AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - !Sub logs.${AWS::Region}.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: !Sub ${ProjectName}-${EnvType}-cwlogs-subscription-policy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - firehose:PutRecord - firehose:PutRecordBatch Resource: !GetAtt CloudWatchLogsDeliveryStream.Arn ## -------------------------------------------- ## CloudWatch Logs to S3 via Kinesis Firehose ## -------------------------------------------- # アカウントレベルでCloudWatch Logsにサブスクリプションフィルターを設定する CloudWatchLogsAccountPolicy: Type: AWS::Logs::AccountPolicy Properties: PolicyName: !Sub ${ProjectName}-${EnvType}-cwlogs-account-policy PolicyDocument: !Sub - | { "DestinationArn" : "${DeliveryStreamArn}", "RoleArn" : "${CloudWatchLogsSubscriptionRoleArn}", "FilterPattern" : "" } - DeliveryStreamArn: !GetAtt CloudWatchLogsDeliveryStream.Arn CloudWatchLogsSubscriptionRoleArn: !GetAtt CloudWatchLogsSubscriptionRole.Arn PolicyType: SUBSCRIPTION_FILTER_POLICY Scope: ALL # 除外するロググループを指定する # ログ転送で使用するLambda関数のロググループは除外する[重要] SelectionCriteria: !Sub 'LogGroupName NOT IN ["/aws/lambda/${ProjectName}-${EnvType}-cwlogs-processor"]' CloudWatchLogsDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: !Sub ${ProjectName}-${EnvType}-cwlogs-delivery-stream DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: BucketARN: !Sub arn:aws:s3:::${DestinationBucket} RoleARN: !GetAtt FirehoseDeliveryRole.Arn BufferingHints: IntervalInSeconds: 300 SizeInMBs: 64 CompressionFormat: GZIP # 動的パーティショニングの場合はpartitionKeyFromLambdaを使う Prefix: !Sub log_group=!{partitionKeyFromLambda:log_group}/year=!{partitionKeyFromLambda:year}/month=!{partitionKeyFromLambda:month}/day=!{partitionKeyFromLambda:day}/ ErrorOutputPrefix: !Sub cwlogs_error/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/ DynamicPartitioningConfiguration: Enabled: true RetryOptions: DurationInSeconds: 300 ProcessingConfiguration: Enabled: true Processors: - Type: Lambda Parameters: - ParameterName: LambdaArn ParameterValue: !GetAtt CloudWatchLogProcessorLambda.Arn CloudWatchLogProcessorLambda: Type: AWS::Lambda::Function Properties: FunctionName: !Sub ${ProjectName}-${EnvType}-cwlogs-processor Handler: index.lambda_handler Role: !GetAtt CloudWatchLogProcessorLambdaRole.Arn Runtime: python3.12 Timeout: 120 MemorySize: 256 Code: ZipFile: | # flake8: noqa: E501 import json import base64 import gzip from io import StringIO import datetime from zoneinfo import ZoneInfo JST_TZ = ZoneInfo("Asia/Tokyo") def parse_payload(data: str): raw = base64.b64decode(data) # CloudWatch Logs サブスクリプション経由は gzip 圧縮されているため解凍する try: raw = gzip.decompress(raw) except OSError: # 非圧縮の場合はそのまま pass return json.loads(raw.decode('utf-8')) def generate_partition_keys(log_group: str, timestamp: int): dt_jst = datetime.datetime.fromtimestamp( timestamp / 1000, tz=datetime.timezone.utc).astimezone(JST_TZ) return { 'log_group': log_group, 'year': f"{dt_jst.year:04d}", 'month': f"{dt_jst.month:02d}", 'day': f"{dt_jst.day:02d}", } def process_record(record: dict): record_id = record.get('recordId') data = record.get('data', '') try: payload = parse_payload(data) log_group = payload.get('logGroup', '') log_events = payload.get('logEvents', []) # CONTROL_MESSAGEの場合は保存しないためDroppedを設定 # https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html # https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/data-transformation-status-model.html if payload.get('messageType') == 'CONTROL_MESSAGE': return {'recordId': record_id, 'result': 'Dropped', 'data': data} if not log_group or not log_events: return {'recordId': record_id, 'result': 'Dropped', 'data': data} # ロググループ、タイムスタンプからパーティションキーを作成する # https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/dynamic-partitioning-partitioning-keys.html # ロググループにスラッシュがある場合でもAthenaでクエリすることが可能なので置換は不要 # (S3の階層は分かれるが、Athenaでのクエリは可能) partition_keys = generate_partition_keys( log_group, log_events[0].get('timestamp', 0) ) buf = StringIO() for ev in log_events: ts = ev.get('timestamp') dt_jst = datetime.datetime.fromtimestamp( ts / 1000, tz=datetime.timezone.utc).astimezone(JST_TZ).isoformat() ev_obj = { 'id': ev.get('id'), 'timestamp': dt_jst, 'message': ev.get('message'), 'log_group': log_group, 'log_stream': payload.get('logStream') } buf.write(json.dumps(ev_obj, ensure_ascii=False) + '\n') encoded = base64.b64encode( buf.getvalue().encode('utf-8')).decode('utf-8') return { 'recordId': record_id, 'result': 'Ok', 'data': encoded, 'metadata': {'partitionKeys': partition_keys} } except json.JSONDecodeError as e: # エラーフォルダーに送られる print(f"JSON decode error recordId={record_id} msg={e}") return {'recordId': record_id, 'result': 'ProcessingFailed', 'data': data} except Exception as e: # エラーフォルダーに送られる print(f"Unexpected error recordId={record_id} type={type(e).__name__} msg={e}") return {'recordId': record_id, 'result': 'ProcessingFailed', 'data': data} def lambda_handler(event, context): out = [] for r in event.get('records', []): out.append(process_record(r)) return {'records': out} CloudWatchLogProcessorLambdaRole: Type: AWS::IAM::Role Properties: RoleName: !Sub ${ProjectName}-${EnvType}-common-cwlogs-processor-role AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
最後までお読みいただきありがとうございました。
JMDCでは、ヘルスケア領域の課題解決に一緒に取り組んでいただける方を積極採用中です!
フロントエンド /バックエンド/ データベースエンジニア等、様々なポジションで募集をしています。
詳細は下記の募集一覧からご確認ください。
hrmos.co
まずはカジュアルにJMDCメンバーと話してみたい/経験が活かせそうなポジションの話を聞いてみたい等ございましたら、下記よりエントリーいただけますと幸いです。 hrmos.co
★最新記事のお知らせはぜひ X(Twitter)、またはBlueskyをご覧ください! Tweets by jmdc_tech twitter.com