NRIネットコム Blog

NRIネットコム社員が様々な視点で、日々の気づきやナレッジを発信するメディアです

Amazon CloudFrontのアクセスログ保存パスをAthena用に最適化してみた~part1~

はじめに

こんにちは。2年目の大林です。 今回は2度目のブログ執筆なのですが、前回の執筆から約3か月も経過していました...。
1年目の頃より2年目の方が時が経つのが早い気がしています。
本ブログでは、Amazon CloudFrontのアクセスログの保存パスをAthena用に最適化する方法を書いていきたいと思います。

課題

Amazon CloudFrontに対してアドホックなログ分析をする際、Amazon Athenaを利用することが多いのではないでしょうか。 Amazon Athenaを利用する際は、パーティションを指定してスキャン量を最小限に抑えることがコスト効率の上昇につながります。 しかし、Amazon CloudFrontのデフォルトのアクセスログ保存のパス体系はAthenaで分析するに当たって最適な形にはなっていません。 part1では、Lambda関数を使用してこの課題を解決してみたいと思います。

想定しているケース

ケース1
 ・CloudFrontのアクセスログが有効になっていない
 ・単一のS3バケットを使用したパス最適化を実施したい
ケース2
 ・CloudFrontのアクセスログが有効になっていない
 ・パス最適化処理の途中でエラーが起きた場合でもログファイルを消失したくない場合

ケース1

ケース1の設計と処理の流れ

①Amazon CloudFrontのアクセスログがS3バケットに保存される

②Lambda関数でアクセスログ保存のパスを最適化する

・ログファイルをパス(ディストリビューション名/yyyy/mm/dd)を指定してコピーします。
・上記の処理が終了した時点で、パス最適化前と最適化後のログファイルが存在することになるのでパスが最適化できていないほうを削除します。
使用するLambda関数のコードは以下です。

import re
import boto3
import os
import json

def lambda_handler(event, context):
    # イベントログを出力
    recieved_event = json.dumps(event)
    print('Received event:' + recieved_event)
    
    s3 = boto3.client('s3')
    date_pattern = r'[^\\d](\d{4})-(\d{2})-(\d{2})-(\d{2})[^\\d]'
    filename_pattern = r'[^/]+$'

    bucket = event['Records'][0]['s3']['bucket']['name']
    source_key = event['Records'][0]['s3']['object']['key']
    target = '/'
    target_find = source_key.find(target)
    target_key_prefix = source_key[:target_find]
    
    # ファイル名が定義したパターンにマッチしているか判定する
    source_regex = re.compile(date_pattern)
    match = source_regex.search(source_key)
    if match is None:
        print('ファイルが見つかりませんでした。')
    else:
        year, month, day, hour = match.groups()
        filename_regex = re.compile(filename_pattern)
        filename = filename_regex.search(source_key).group(0)
        target_key = target_key_prefix + '/' + year + '/' + month + '/' + day + '/' + filename
        print(source_key + ' to ' + target_key)
        # パスを指定して、オブジェクトを移動させる
        copy_result =s3.copy_object(Bucket=bucket, Key=target_key, 
           CopySource={'Bucket': bucket, 'Key': source_key})
        print(copy_result)
        # パス最適化前のオブジェクトを削除する
        s3.delete_object(Bucket=bucket, Key=source_key)

③Amazon Athenaでアクセスログをクエリする

分析用のテンプレートは以下です。

AWSTemplateFormatVersion: "2010-09-09"
#----------------------------------------------------------------------#
# Athenaを利用したCloudFrontのアクセスログ分析用テンプレート
# 作成されるリソース:Glue:Database, Glue:Table, S3bucket, Athenaワークグループ
#----------------------------------------------------------------------#
Parameters:
 #アカウントIDを入力
  AccountId:
    Description: "input Account Id "
    Type: String
 #環境名を入力
  Env:
    Description: dev or prod
    Type: String
 #システム名を入力
  SystemName:
    Description: System name
    Type: String
 #CloudFrontのアクセスログが保存されているS3バケットを指定する
  S3bucketCloudFrontLogs:
    Type: String
 #オブジェクトの有効期限を入力
  ExpirationInDays:
    Type: String
    Default: 1825
 #Glueテーブルのパーティションに設定するCloudFrontディストリビューションを入力
  distributions:
    Description: "enter distribution name by Comma Delimiter list (XXXXXXXXXXXX,XXXXXXXXXXXX...)"
    Type: String

Resources:
#------------------------------------------------------------#
# Glue
# アクセスログ用のデータベースとテーブルを作成する
#------------------------------------------------------------#
  GlueDatabase:
    Type: AWS::Glue::Database
    Properties: 
      CatalogId: !Ref AWS::AccountId  
      DatabaseInput:
        Name: !Sub '${Env}_${SystemName}_cloudfront_database'
   
  #CloudFrontアクセスログのGlueテーブル   
  GlueTableCloudFrontLogs:
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseName: !Ref GlueDatabase
      TableInput:
        Name: !Sub '${Env}_${SystemName}_cloudfront_table'
        TableType: EXTERNAL_TABLE
        Parameters:
          EXTERNAL: true
          projection.enabled: true
          projection.distribution.type: enum
          projection.distribution.values: !Sub '${distributions}'
          projection.datetime.type: date
          projection.datetime.format: yyyy/MM/dd
          projection.datetime.range: 2023/01/01,NOW
          storage.location.template: !Sub 's3://${S3bucketCloudFrontLogs}/${!distribution}/${!datetime}'
          skip.header.line.count: '2'
          serialization.encoding: utf-8
        PartitionKeys:
          - Name: distribution
            Type: string
          - Name: datetime
            Type: string
        StorageDescriptor:
          Columns:
            - Name: date
              Type: date
            - Name: time
              Type: string
            - Name: location
              Type: string
            - Name: bytes
              Type: bigint
            - Name: requestip
              Type: string
            - Name: method
              Type: string
            - Name: host
              Type: string
            - Name: uri
              Type: string
            - Name: status
              Type: int
            - Name: referrer
              Type: string
            - Name: useragent
              Type: string
            - Name: querystring
              Type: string
            - Name: cookie
              Type: string
            - Name: resulttype
              Type: string
            - Name: requestid
              Type: string
            - Name: hostheader
              Type: string
            - Name: requestprotocol
              Type: string
            - Name: requestbytes
              Type: bigint
            - Name: timetaken
              Type: float
            - Name: xforwardedfor
              Type: string
            - Name: sslprotocol
              Type: string
            - Name: sslcipher
              Type: string
            - Name: responseresulttype
              Type: string
            - Name: httpversion
              Type: string
            - Name: filestatus
              Type: string
            - Name: encryptedfields
              Type: int
            - Name: cPort
              Type: string
            - Name: timeToFirstByte
              Type: string       
            - Name: xEdgeDetailedResultType
              Type: string
            - Name: scContentType
              Type: string   
            - Name: scContentLen
              Type: string 
            - Name: scRangeStart
              Type: string
            - Name: scRangeEnd
              Type: string                   
          InputFormat: org.apache.hadoop.mapred.TextInputFormat
          OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
          Location: !Sub 's3://${S3bucketCloudFrontLogs}/'
          SerdeInfo:
            SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
            Parameters:
              serialization.format: "\t"
  #------------------------------------------------------------#
  # Athena
  #------------------------------------------------------------#
  AthenaWorkGroup:
    Type: AWS::Athena::WorkGroup
    Properties:
      Name: !Sub "${Env}-athena-work-group"
      WorkGroupConfiguration:
        ResultConfiguration:
          OutputLocation: !Sub "s3://${AthenaQueryResultBucket}/data"
        EnforceWorkGroupConfiguration: true
        PublishCloudWatchMetricsEnabled: true
  #------------------------------------------------------------#
  # S3 Backet
  # Athenaでクエリを実行した結果が格納されるS3バケットを作成する
  #------------------------------------------------------------#
  AthenaQueryResultBucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: 'Retain'
    Properties: 
      BucketName:  !Sub '${Env}-${SystemName}-cloudfront-athena-result'
      AccessControl: Private
      PublicAccessBlockConfiguration:
        BlockPublicAcls: True
        BlockPublicPolicy: True
        IgnorePublicAcls: True
        RestrictPublicBuckets: True
      LifecycleConfiguration:
        Rules:
          - Id: life-cycle-rule
            Status: Enabled
            ExpirationInDays: !Sub '${ExpirationInDays}'
      BucketEncryption:
        ServerSideEncryptionConfiguration: 
          -
            ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256

ケース1のデモ

まずはコンテンツにアクセスして、CloudFrontのアクセスログがS3バケットに作成アップロードされるのを待ちます。 ログファイルは、指定したプレフィックス名のフォルダの配下にアップロードされます。 ※CloudFrontのログファイルは、ログ有効化設定時に指定したプレフィックス名のフォルダ配下にアップロードされます。

ログファイルがS3バケットにアップロードされたことをトリガーにLambda関数が起動され、パスが最適化されると以下のようになります。


ケース2

ケース2の設計と処理の流れ

ケース2では、S3バケットを2つ使用するためその分コストがかかってしまいます。 ライフサイクルポリシーを使用するなどコストを抑える対策を取る必要があります。

①Amazon CloudFrontのアクセスログがS3バケットに保存される

この段階では、通常のCloudFrontのアクセスログ保存パスと同じです。

②Lambda関数でアクセスログ保存のパスを最適化する

ログファイルをパス(ディストリビューション名/yyyy/mm/dd)を指定して別のS3バケットにコピーします。
使用するLambda関数のコードは以下です。

import re
import boto3
import os
import json

def lambda_handler(event, context):
    # イベントログを出力
    recieved_event = json.dumps(event)
    print('Received event:' + recieved_event)
    
    s3 = boto3.client('s3')
    date_pattern = r'[^\\d](\d{4})-(\d{2})-(\d{2})-(\d{2})[^\\d]'
    filename_pattern = r'[^/]+$'
    destination_bucket = os.getenv('s3bucket')

    bucket = event['Records'][0]['s3']['bucket']['name']
    source_key = event['Records'][0]['s3']['object']['key']
    target = '/'
    target_find = source_key.find(target)
    target_key_prefix = source_key[:target_find]
    
    # ファイル名が定義したパターンにマッチしているか判定する
    source_regex = re.compile(date_pattern)
    match = source_regex.search(source_key)
    if match is None:
        print('ファイルが見つかりませんでした。')
    else:
        year, month, day, hour = match.groups()
        filename_regex = re.compile(filename_pattern)
        filename = filename_regex.search(source_key).group(0)
        target_key = target_key_prefix + '/' + year + '/' + month + '/' + day + '/' + filename
        print(source_key + ' to ' + target_key)
        # パスを指定して、オブジェクトを移動させる
        copy_result =s3.copy_object(Bucket=destination_bucket, Key=target_key, 
           CopySource={'Bucket': bucket, 'Key': source_key})
        print(copy_result)

③Amazon Athenaでアクセスログをクエリする

ケース2で使用するテンプレートはケース1で使用したものと同じものを使用しますが、Glueのテーブルを作成する際に指定するS3バケットが異なります。 Glueのテーブルを作成する際は注意する必要があります。

ケース2のデモ

まずはコンテンツにアクセスして、CloudFrontのアクセスログがS3バケットに作成アップロードされるのを待ちます。 ログファイルは、指定したプレフィックス名のフォルダの配下にアップロードされます。 ※CloudFrontのログファイルは、ログ有効化設定時に指定したプレフィックス名のフォルダ配下にアップロードされます。

ログファイルがS3バケットにアップロードされたことをトリガーにLambda関数が起動され、パスを最適化して別のS3バケットアップロードされます。

さいごに

今回はLambda関数でCloudFrontのアクセスログのパス保存を最適化する方法を紹介しました。 今回紹介した方法は、新規でS3バケットにアップロードされたログファイルのパスを最適化する際に使用できるものです。 次回のAmazon CloudFrontのアクセスログ保存パスをAthena用に最適化してみた~part2~では、既存のCloudFrontのアクセスログに対してパス保存最適化処理を行う方法を紹介します。

執筆者大林 優斗

クラウドエンジニア
AWSを活用したシステムの設計と開発をやらせていただいています。


執筆記事一覧:https://tech.nri-net.com/archive/author/y-obayashi