JMDC TECH BLOG

JMDCのエンジニアブログです

CloudWatch Logsをパーティショニングしつつ雑にS3に保存する方法

こんにちは。株式会社JMDC データウェアハウス開発部の甲(きのえ)です。
今年、JMDCではアドベントカレンダーに参加しています。
qiita.com

本記事は、JMDC Advent Calendar 2025 6日目の記事です。

目次

はじめに

AWSを運用しているとCloudWatch LogsにVPCフローログ、各種アプリケーションのログがたまっていきます。
CloudWatch Logsはロググループごとに保存期間を設定できますが、長期保存したり分析したりするためにS3に保存したいニーズがあるかと思います。

CloudWatch LogsからS3にログを転送する方法はいくつかありますが、今回はAmazon Data Firehoseを使ってすべてのCloudWatch LogsをS3に転送する方法を紹介します。

注意事項

  • この記事では、これから発生するすべてのCloudWatch LogsをS3に転送する方法を紹介します。すでに存在するログをS3に転送する方法ではありません
  • この記事に載せるソースコードはすべてサンプルコードです

対象読者

  • CloudWatch Logsに保存されているログをS3に転送したい方
  • CloudWatch Logsのログを分析したい方

構成イメージ

実際にCloudWatch LogsからS3にログを転送する構成イメージは以下のとおりです。

  1. AWSサービスからCloudWatch Logsにログを送信する
  2. CloudWatch LogsのサブスクリプションフィルターでData Firehoseにログを転送する
  3. Data Firehoseがパーティショニング用のLambda関数を呼び出し、ログをS3に保存する

手順

データウェアハウス開発部ではクラウドのインフラストラクチャー構築においてIaC(Infrastructure as Code)を徹底しているため、AWSサービスの構築にAWSマネジメントコンソールはほとんど使用しません。
今回は一部AWSマネジメントコンソールの使用も交えつつ、CloudFormationで構築する例を紹介したいと思います。

※データウェアハウス開発部ではシステムによってTerraformやCloudFormationを使用しています。

CloudFormationの要所要所でリソースの説明をしますが、最終的なCloudFormationテンプレート全文はこの記事の最後に掲載します。

CloudFormationテンプレートの作成

① CloudWatch Logs保存用S3バケットの作成

まず保存先のS3を作成します。


削除ポリシーやバケットポリシーは要件にしたがって書き換えてください。

  DestinationBucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    UpdateReplacePolicy: Retain
    Properties:
      BucketName: !Sub ${ProjectName}-${EnvType}-cwlogs-destination-bucket
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      OwnershipControls:
        Rules:
        - ObjectOwnership: BucketOwnerEnforced
      VersioningConfiguration:
        Status: Enabled

② CloudWatch LogsをS3に保存するためのパーティショニング用Lambda関数を作成

CloudWatch LogsをパーティショニングしながらS3に保存するためのLambda関数を作成します。
ただ転送するだけであればLambda関数を作る必要はないのですが、今回は将来的にAthenaなどで分析することを想定してパーティショニングします。

Lambda関数にはgzip圧縮されたデータが渡されてくるので、パーティションキーを設定してから返してあげます。

  CloudWatchLogProcessorLambda:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ${ProjectName}-${EnvType}-cwlogs-processor
      Handler: index.lambda_handler
      Role: !GetAtt CloudWatchLogProcessorLambdaRole.Arn
      Runtime: python3.12
      Timeout: 120
      MemorySize: 256
      Code:
        ZipFile: |
          # flake8: noqa: E501

          import json
          import base64
          import gzip
          from io import StringIO
          import datetime
          from zoneinfo import ZoneInfo

          JST_TZ = ZoneInfo("Asia/Tokyo")


          def parse_payload(data: str):
              raw = base64.b64decode(data)

              # CloudWatch Logs サブスクリプション経由は gzip 圧縮されているため解凍する
              try:
                  raw = gzip.decompress(raw)
              except OSError:
                  # 非圧縮の場合はそのまま
                  pass

              return json.loads(raw.decode('utf-8'))


          def generate_partition_keys(log_group: str, timestamp: int):
              dt_jst = datetime.datetime.fromtimestamp(
                  timestamp / 1000, tz=datetime.timezone.utc).astimezone(JST_TZ)
              return {
                  'log_group': log_group,
                  'year': f"{dt_jst.year:04d}",
                  'month': f"{dt_jst.month:02d}",
                  'day': f"{dt_jst.day:02d}",
              }


          def process_record(record: dict):
              record_id = record.get('recordId')
              data = record.get('data', '')

              try:
                  payload = parse_payload(data)

                  log_group = payload.get('logGroup', '')
                  log_events = payload.get('logEvents', [])

                  # CONTROL_MESSAGEの場合は保存しないためDroppedを設定
                  # https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html
                  # https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/data-transformation-status-model.html
                  if payload.get('messageType') == 'CONTROL_MESSAGE':
                      return {'recordId': record_id, 'result': 'Dropped', 'data': data}

                  if not log_group or not log_events:
                      return {'recordId': record_id, 'result': 'Dropped', 'data': data}

                  # ロググループ、タイムスタンプからパーティションキーを作成する
                  # https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/dynamic-partitioning-partitioning-keys.html
                  # ロググループにスラッシュがある場合でもAthenaでクエリすることが可能なので置換は不要
                  # (S3の階層は分かれるが、Athenaでのクエリは可能)
                  partition_keys = generate_partition_keys(
                      log_group, log_events[0].get('timestamp', 0)
                  )

                  buf = StringIO()
                  for ev in log_events:
                      ts = ev.get('timestamp')

                      dt_jst = datetime.datetime.fromtimestamp(
                          ts / 1000, tz=datetime.timezone.utc).astimezone(JST_TZ).isoformat()

                      ev_obj = {
                          'id': ev.get('id'),
                          'timestamp': dt_jst,
                          'message': ev.get('message'),
                          'log_group': log_group,
                          'log_stream': payload.get('logStream')
                      }

                      buf.write(json.dumps(ev_obj, ensure_ascii=False) + '\n')

                  encoded = base64.b64encode(
                      buf.getvalue().encode('utf-8')).decode('utf-8')

                  return {
                      'recordId': record_id,
                      'result': 'Ok',
                      'data': encoded,
                      'metadata': {'partitionKeys': partition_keys}
                  }

              except json.JSONDecodeError as e: # エラーフォルダーに送られる
                  print(f"JSON decode error recordId={record_id} msg={e}")
                  return {'recordId': record_id, 'result': 'ProcessingFailed', 'data': data}

              except Exception as e: # エラーフォルダーに送られる
                  print(f"Unexpected error recordId={record_id} type={type(e).__name__} msg={e}")
                  return {'recordId': record_id, 'result': 'ProcessingFailed', 'data': data}


          def lambda_handler(event, context):
              out = []
              for r in event.get('records', []):
                  out.append(process_record(r))
              return {'records': out}

③ CloudWatch LogsをS3に配置するためのFirehose Delivery Streamを作成します

②で作成したLambda関数でパーティションキーを設定しているのでそれを利用してS3に配置するようにFirehoseのパラメーターを調節します。

今回はHive形式で s3//{S3バケット名}/log_group={ロググループ名}/year={年}/month={月}/day={日}/ログファイル名 がS3保存先のPathとなるように設定します。

この時のポイントはパーティショニングキーとしてpartitionKeyFromLambdaを使用していることです。
これにより、Lambda関数の戻り値に含まれるパーティショニングキーをS3の保存先として使用できます。

  CloudWatchLogsDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Sub ${ProjectName}-${EnvType}-cwlogs-delivery-stream
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !Sub arn:aws:s3:::${DestinationBucketName}
        RoleARN: !GetAtt FirehoseDeliveryRole.Arn
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 64
        CompressionFormat: GZIP
        # 動的パーティショニングの場合はpartitionKeyFromLambdaを使う
        Prefix: !Sub log_group=!{partitionKeyFromLambda:log_group}/year=!{partitionKeyFromLambda:year}/month=!{partitionKeyFromLambda:month}/day=!{partitionKeyFromLambda:day}/
        ErrorOutputPrefix: !Sub cwlogs_error/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
        DynamicPartitioningConfiguration:
          Enabled: true
          RetryOptions:
            DurationInSeconds: 300
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: Lambda
              Parameters:
                - ParameterName: LambdaArn
                  ParameterValue: !GetAtt CloudWatchLogProcessorLambda.Arn

④ アカウントレベルでCloudWatch Logsのサブスクリプションフィルターを作成する

AccountPolicyを作成し、アカウント全体でCloudWatch Logsに対してサブスクリプションフィルターを設定します。
AccountPolicyを使うことでS3に転送したいCloudWatch Logsのロググループを個別に指定する必要がなくなり、 一括ですべてのロググループをS3に転送できます。
ただし、先に作成したLambdaのロググループは無限ループを防ぐために除外する必要があります。

  CloudWatchLogsAccountPolicy:
    Type: AWS::Logs::AccountPolicy
    Properties:
      PolicyName: !Sub ${ProjectName}-${EnvType}-cwlogs-account-policy
      PolicyDocument: !Sub
        - |
          {
            "DestinationArn" : "${DeliveryStreamArn}",
            "RoleArn" : "${CloudWatchLogsSubscriptionRoleArn}",
            "FilterPattern" : ""
          }
        - DeliveryStreamArn: !GetAtt CloudWatchLogsDeliveryStream.Arn
          CloudWatchLogsSubscriptionRoleArn: !GetAtt CloudWatchLogsSubscriptionRole.Arn
      PolicyType: SUBSCRIPTION_FILTER_POLICY
      Scope: ALL
      # 除外するロググループを指定する
      # ログ転送で使用するLambda関数のロググループは除外する[重要]
      SelectionCriteria: !Sub 'LogGroupName NOT IN ["/aws/lambda/${ProjectName}-${EnvType}-cwlogs-processor"]'

※現時点(2025年11月)時点でAWS::Logs::AccountPolicyのPolicyDocumentはJSONで指定する必要があるので注意してください。

AWSマネジメントコンソールからCloudFormationをデプロイ

最後にCloudFormationのテンプレートをデプロイします。
デプロイ方法はいくつかありますが今回はAWSマネジメントコンソールから手動で実施することにしています。

  1. AWSマネジメントコンソールのCloudFormationの画面に移動します
  2. 「スタックの作成」->「新しいリソースを使用(標準)」をクリックします
  3. 「テンプレートの準備」画面で「テンプレートファイルのアップロード」を選択し、作成したCloudFormationテンプレートをアップロードします
  4. 今回のテンプレートではパラメーターとしてProjectName、EnvTypeを指定する必要があるため、適切な値を入力します
  5. 画面の指示に従い、CloudFormationスタックを作成します

デプロイしたログ転送設定が正しく動作していることを確認します。

  1. CloudWatch Logsのロググループにアカウントレベルのサブスクリプションフィルターが設定されていること
  2. S3にCloudWatch Logsのログが保存されていること

まとめ

今回はCloudWatch LogsからS3にログを転送する方法を紹介しました。

  • CloudWatch Logsのアカウントレベルのサブスクリプションフィルターを使用することで、すべてのロググループからログを転送できました
  • Firehoseの動的パーティショニング機能を使用することで、パーティショニングしながらS3に保存できました

とにかくCloudWatch LogsをS3に保存したい、という場合は、ぜひ今回紹介した方法を参考にしてみてください!

CloudFormationテンプレート全文

最後に今回の記事で作成したCloudFormationテンプレート全部を掲載します

AWSTemplateFormatVersion: 2010-09-09
Description: Transfer CloudWatchLog to S3 Bucket

Parameters:

  ProjectName:
    Description: Please input project name.
    Type: String
    AllowedPattern: '[-a-zA-Z0-9]*'

  EnvType:
    Default: dev
    Type: String

Resources:

  DestinationBucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    UpdateReplacePolicy: Retain
    Properties:
      BucketName: !Sub ${ProjectName}-${EnvType}-cwlogs-destination-bucket
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      OwnershipControls:
        Rules:
        - ObjectOwnership: BucketOwnerEnforced
      VersioningConfiguration:
        Status: Enabled

  FirehoseDeliveryRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub ${ProjectName}-${EnvType}-firehose-delivery-role
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - firehose.amazonaws.com
          Action:
          - sts:AssumeRole
      Policies:
      - PolicyName: !Sub ${ProjectName}-${EnvType}-firehose-delivery-policy
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
              - s3:PutObject
              - s3:PutObjectAcl
              - s3:PutObjectTagging
            Resource: !Sub arn:aws:s3:::${DestinationBucket}/*
          - Effect: Allow
            Action:
              - s3:ListBucket
            Resource: !Sub arn:aws:s3:::${DestinationBucket}
          - Effect: Allow
            Action:
              - lambda:InvokeFunction
              - lambda:GetFunctionConfiguration # 公式ドキュメントには記載されいる。なくても動くが追加しておく
            Resource:
              - !GetAtt CloudWatchLogProcessorLambda.Arn

  CloudWatchLogsSubscriptionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub ${ProjectName}-${EnvType}-common-cwlogs-subscription-role
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - !Sub logs.${AWS::Region}.amazonaws.com
          Action:
          - sts:AssumeRole
      Policies:
      - PolicyName: !Sub ${ProjectName}-${EnvType}-cwlogs-subscription-policy
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
              - firehose:PutRecord
              - firehose:PutRecordBatch
            Resource: !GetAtt CloudWatchLogsDeliveryStream.Arn

  ## --------------------------------------------
  ## CloudWatch Logs to S3 via Kinesis Firehose
  ## --------------------------------------------

  # アカウントレベルでCloudWatch Logsにサブスクリプションフィルターを設定する
  CloudWatchLogsAccountPolicy:
    Type: AWS::Logs::AccountPolicy
    Properties:
      PolicyName: !Sub ${ProjectName}-${EnvType}-cwlogs-account-policy
      PolicyDocument: !Sub
        - |
          {
            "DestinationArn" : "${DeliveryStreamArn}",
            "RoleArn" : "${CloudWatchLogsSubscriptionRoleArn}",
            "FilterPattern" : ""
          }
        - DeliveryStreamArn: !GetAtt CloudWatchLogsDeliveryStream.Arn
          CloudWatchLogsSubscriptionRoleArn: !GetAtt CloudWatchLogsSubscriptionRole.Arn
      PolicyType: SUBSCRIPTION_FILTER_POLICY
      Scope: ALL
      # 除外するロググループを指定する
      # ログ転送で使用するLambda関数のロググループは除外する[重要]
      SelectionCriteria: !Sub 'LogGroupName NOT IN ["/aws/lambda/${ProjectName}-${EnvType}-cwlogs-processor"]'

  CloudWatchLogsDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Sub ${ProjectName}-${EnvType}-cwlogs-delivery-stream
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !Sub arn:aws:s3:::${DestinationBucket}
        RoleARN: !GetAtt FirehoseDeliveryRole.Arn
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 64
        CompressionFormat: GZIP
        # 動的パーティショニングの場合はpartitionKeyFromLambdaを使う
        Prefix: !Sub log_group=!{partitionKeyFromLambda:log_group}/year=!{partitionKeyFromLambda:year}/month=!{partitionKeyFromLambda:month}/day=!{partitionKeyFromLambda:day}/
        ErrorOutputPrefix: !Sub cwlogs_error/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
        DynamicPartitioningConfiguration:
          Enabled: true
          RetryOptions:
            DurationInSeconds: 300
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: Lambda
              Parameters:
                - ParameterName: LambdaArn
                  ParameterValue: !GetAtt CloudWatchLogProcessorLambda.Arn

  CloudWatchLogProcessorLambda:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ${ProjectName}-${EnvType}-cwlogs-processor
      Handler: index.lambda_handler
      Role: !GetAtt CloudWatchLogProcessorLambdaRole.Arn
      Runtime: python3.12
      Timeout: 120
      MemorySize: 256
      Code:
        ZipFile: |
          # flake8: noqa: E501

          import json
          import base64
          import gzip
          from io import StringIO
          import datetime
          from zoneinfo import ZoneInfo

          JST_TZ = ZoneInfo("Asia/Tokyo")


          def parse_payload(data: str):
              raw = base64.b64decode(data)

              # CloudWatch Logs サブスクリプション経由は gzip 圧縮されているため解凍する
              try:
                  raw = gzip.decompress(raw)
              except OSError:
                  # 非圧縮の場合はそのまま
                  pass

              return json.loads(raw.decode('utf-8'))


          def generate_partition_keys(log_group: str, timestamp: int):
              dt_jst = datetime.datetime.fromtimestamp(
                  timestamp / 1000, tz=datetime.timezone.utc).astimezone(JST_TZ)
              return {
                  'log_group': log_group,
                  'year': f"{dt_jst.year:04d}",
                  'month': f"{dt_jst.month:02d}",
                  'day': f"{dt_jst.day:02d}",
              }


          def process_record(record: dict):
              record_id = record.get('recordId')
              data = record.get('data', '')

              try:
                  payload = parse_payload(data)

                  log_group = payload.get('logGroup', '')
                  log_events = payload.get('logEvents', [])

                  # CONTROL_MESSAGEの場合は保存しないためDroppedを設定
                  # https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html
                  # https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/data-transformation-status-model.html
                  if payload.get('messageType') == 'CONTROL_MESSAGE':
                      return {'recordId': record_id, 'result': 'Dropped', 'data': data}

                  if not log_group or not log_events:
                      return {'recordId': record_id, 'result': 'Dropped', 'data': data}

                  # ロググループ、タイムスタンプからパーティションキーを作成する
                  # https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/dynamic-partitioning-partitioning-keys.html
                  # ロググループにスラッシュがある場合でもAthenaでクエリすることが可能なので置換は不要
                  # (S3の階層は分かれるが、Athenaでのクエリは可能)
                  partition_keys = generate_partition_keys(
                      log_group, log_events[0].get('timestamp', 0)
                  )

                  buf = StringIO()
                  for ev in log_events:
                      ts = ev.get('timestamp')

                      dt_jst = datetime.datetime.fromtimestamp(
                          ts / 1000, tz=datetime.timezone.utc).astimezone(JST_TZ).isoformat()

                      ev_obj = {
                          'id': ev.get('id'),
                          'timestamp': dt_jst,
                          'message': ev.get('message'),
                          'log_group': log_group,
                          'log_stream': payload.get('logStream')
                      }

                      buf.write(json.dumps(ev_obj, ensure_ascii=False) + '\n')

                  encoded = base64.b64encode(
                      buf.getvalue().encode('utf-8')).decode('utf-8')

                  return {
                      'recordId': record_id,
                      'result': 'Ok',
                      'data': encoded,
                      'metadata': {'partitionKeys': partition_keys}
                  }

              except json.JSONDecodeError as e: # エラーフォルダーに送られる
                  print(f"JSON decode error recordId={record_id} msg={e}")
                  return {'recordId': record_id, 'result': 'ProcessingFailed', 'data': data}

              except Exception as e: # エラーフォルダーに送られる
                  print(f"Unexpected error recordId={record_id} type={type(e).__name__} msg={e}")
                  return {'recordId': record_id, 'result': 'ProcessingFailed', 'data': data}


          def lambda_handler(event, context):
              out = []
              for r in event.get('records', []):
                  out.append(process_record(r))

              return {'records': out}

  CloudWatchLogProcessorLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub ${ProjectName}-${EnvType}-common-cwlogs-processor-role
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - lambda.amazonaws.com
          Action:
          - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

最後までお読みいただきありがとうございました。

JMDCでは、ヘルスケア領域の課題解決に一緒に取り組んでいただける方を積極採用中です!
フロントエンド /バックエンド/ データベースエンジニア等、様々なポジションで募集をしています。
詳細は下記の募集一覧からご確認ください。 hrmos.co

まずはカジュアルにJMDCメンバーと話してみたい/経験が活かせそうなポジションの話を聞いてみたい等ございましたら、下記よりエントリーいただけますと幸いです。 hrmos.co

★最新記事のお知らせはぜひ X(Twitter)、またはBlueskyをご覧ください! twitter.com

bsky.app