はじめに
こんにちは。2年目の大林です。
今回は2度目のブログ執筆なのですが、前回の執筆から約3か月も経過していました...。
1年目の頃より2年目の方が時が経つのが早い気がしています。
本ブログでは、Amazon CloudFrontのアクセスログの保存パスをAthena用に最適化する方法を書いていきたいと思います。
課題
Amazon CloudFrontに対してアドホックなログ分析をする際、Amazon Athenaを利用することが多いのではないでしょうか。 Amazon Athenaを利用する際は、パーティションを指定してスキャン量を最小限に抑えることがコスト効率の上昇につながります。 しかし、Amazon CloudFrontのデフォルトのアクセスログ保存のパス体系はAthenaで分析するに当たって最適な形にはなっていません。 part1では、Lambda関数を使用してこの課題を解決してみたいと思います。
想定しているケース
ケース1
・CloudFrontのアクセスログが有効になっていない
・単一のS3バケットを使用したパス最適化を実施したい
ケース2
・CloudFrontのアクセスログが有効になっていない
・パス最適化処理の途中でエラーが起きた場合でもログファイルを消失したくない場合
ケース1
ケース1の設計と処理の流れ

①Amazon CloudFrontのアクセスログがS3バケットに保存される

②Lambda関数でアクセスログ保存のパスを最適化する
・ログファイルをパス(ディストリビューション名/yyyy/mm/dd)を指定してコピーします。
・上記の処理が終了した時点で、パス最適化前と最適化後のログファイルが存在することになるのでパスが最適化できていないほうを削除します。
使用するLambda関数のコードは以下です。
import re
import boto3
import os
import json
def lambda_handler(event, context):
# イベントログを出力
recieved_event = json.dumps(event)
print('Received event:' + recieved_event)
s3 = boto3.client('s3')
date_pattern = r'[^\\d](\d{4})-(\d{2})-(\d{2})-(\d{2})[^\\d]'
filename_pattern = r'[^/]+$'
bucket = event['Records'][0]['s3']['bucket']['name']
source_key = event['Records'][0]['s3']['object']['key']
target = '/'
target_find = source_key.find(target)
target_key_prefix = source_key[:target_find]
# ファイル名が定義したパターンにマッチしているか判定する
source_regex = re.compile(date_pattern)
match = source_regex.search(source_key)
if match is None:
print('ファイルが見つかりませんでした。')
else:
year, month, day, hour = match.groups()
filename_regex = re.compile(filename_pattern)
filename = filename_regex.search(source_key).group(0)
target_key = target_key_prefix + '/' + year + '/' + month + '/' + day + '/' + filename
print(source_key + ' to ' + target_key)
# パスを指定して、オブジェクトを移動させる
copy_result =s3.copy_object(Bucket=bucket, Key=target_key,
CopySource={'Bucket': bucket, 'Key': source_key})
print(copy_result)
# パス最適化前のオブジェクトを削除する
s3.delete_object(Bucket=bucket, Key=source_key)
③Amazon Athenaでアクセスログをクエリする
分析用のテンプレートは以下です。
AWSTemplateFormatVersion: "2010-09-09"
#----------------------------------------------------------------------#
# Athenaを利用したCloudFrontのアクセスログ分析用テンプレート
# 作成されるリソース:Glue:Database, Glue:Table, S3bucket, Athenaワークグループ
#----------------------------------------------------------------------#
Parameters:
#アカウントIDを入力
AccountId:
Description: "input Account Id "
Type: String
#環境名を入力
Env:
Description: dev or prod
Type: String
#システム名を入力
SystemName:
Description: System name
Type: String
#CloudFrontのアクセスログが保存されているS3バケットを指定する
S3bucketCloudFrontLogs:
Type: String
#オブジェクトの有効期限を入力
ExpirationInDays:
Type: String
Default: 1825
#Glueテーブルのパーティションに設定するCloudFrontディストリビューションを入力
distributions:
Description: "enter distribution name by Comma Delimiter list (XXXXXXXXXXXX,XXXXXXXXXXXX...)"
Type: String
Resources:
#------------------------------------------------------------#
# Glue
# アクセスログ用のデータベースとテーブルを作成する
#------------------------------------------------------------#
GlueDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: !Sub '${Env}_${SystemName}_cloudfront_database'
#CloudFrontアクセスログのGlueテーブル
GlueTableCloudFrontLogs:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref GlueDatabase
TableInput:
Name: !Sub '${Env}_${SystemName}_cloudfront_table'
TableType: EXTERNAL_TABLE
Parameters:
EXTERNAL: true
projection.enabled: true
projection.distribution.type: enum
projection.distribution.values: !Sub '${distributions}'
projection.datetime.type: date
projection.datetime.format: yyyy/MM/dd
projection.datetime.range: 2023/01/01,NOW
storage.location.template: !Sub 's3://${S3bucketCloudFrontLogs}/${!distribution}/${!datetime}'
skip.header.line.count: '2'
serialization.encoding: utf-8
PartitionKeys:
- Name: distribution
Type: string
- Name: datetime
Type: string
StorageDescriptor:
Columns:
- Name: date
Type: date
- Name: time
Type: string
- Name: location
Type: string
- Name: bytes
Type: bigint
- Name: requestip
Type: string
- Name: method
Type: string
- Name: host
Type: string
- Name: uri
Type: string
- Name: status
Type: int
- Name: referrer
Type: string
- Name: useragent
Type: string
- Name: querystring
Type: string
- Name: cookie
Type: string
- Name: resulttype
Type: string
- Name: requestid
Type: string
- Name: hostheader
Type: string
- Name: requestprotocol
Type: string
- Name: requestbytes
Type: bigint
- Name: timetaken
Type: float
- Name: xforwardedfor
Type: string
- Name: sslprotocol
Type: string
- Name: sslcipher
Type: string
- Name: responseresulttype
Type: string
- Name: httpversion
Type: string
- Name: filestatus
Type: string
- Name: encryptedfields
Type: int
- Name: cPort
Type: string
- Name: timeToFirstByte
Type: string
- Name: xEdgeDetailedResultType
Type: string
- Name: scContentType
Type: string
- Name: scContentLen
Type: string
- Name: scRangeStart
Type: string
- Name: scRangeEnd
Type: string
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Location: !Sub 's3://${S3bucketCloudFrontLogs}/'
SerdeInfo:
SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Parameters:
serialization.format: "\t"
#------------------------------------------------------------#
# Athena
#------------------------------------------------------------#
AthenaWorkGroup:
Type: AWS::Athena::WorkGroup
Properties:
Name: !Sub "${Env}-athena-work-group"
WorkGroupConfiguration:
ResultConfiguration:
OutputLocation: !Sub "s3://${AthenaQueryResultBucket}/data"
EnforceWorkGroupConfiguration: true
PublishCloudWatchMetricsEnabled: true
#------------------------------------------------------------#
# S3 Backet
# Athenaでクエリを実行した結果が格納されるS3バケットを作成する
#------------------------------------------------------------#
AthenaQueryResultBucket:
Type: AWS::S3::Bucket
DeletionPolicy: 'Retain'
Properties:
BucketName: !Sub '${Env}-${SystemName}-cloudfront-athena-result'
AccessControl: Private
PublicAccessBlockConfiguration:
BlockPublicAcls: True
BlockPublicPolicy: True
IgnorePublicAcls: True
RestrictPublicBuckets: True
LifecycleConfiguration:
Rules:
- Id: life-cycle-rule
Status: Enabled
ExpirationInDays: !Sub '${ExpirationInDays}'
BucketEncryption:
ServerSideEncryptionConfiguration:
-
ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
ケース1のデモ
まずはコンテンツにアクセスして、CloudFrontのアクセスログがS3バケットに作成アップロードされるのを待ちます。
ログファイルは、指定したプレフィックス名のフォルダの配下にアップロードされます。
※CloudFrontのログファイルは、ログ有効化設定時に指定したプレフィックス名のフォルダ配下にアップロードされます。

ログファイルがS3バケットにアップロードされたことをトリガーにLambda関数が起動され、パスが最適化されると以下のようになります。

ケース2
ケース2の設計と処理の流れ
ケース2では、S3バケットを2つ使用するためその分コストがかかってしまいます。
ライフサイクルポリシーを使用するなどコストを抑える対策を取る必要があります。

①Amazon CloudFrontのアクセスログがS3バケットに保存される
この段階では、通常のCloudFrontのアクセスログ保存パスと同じです。

②Lambda関数でアクセスログ保存のパスを最適化する
ログファイルをパス(ディストリビューション名/yyyy/mm/dd)を指定して別のS3バケットにコピーします。
使用するLambda関数のコードは以下です。
import re
import boto3
import os
import json
def lambda_handler(event, context):
# イベントログを出力
recieved_event = json.dumps(event)
print('Received event:' + recieved_event)
s3 = boto3.client('s3')
date_pattern = r'[^\\d](\d{4})-(\d{2})-(\d{2})-(\d{2})[^\\d]'
filename_pattern = r'[^/]+$'
destination_bucket = os.getenv('s3bucket')
bucket = event['Records'][0]['s3']['bucket']['name']
source_key = event['Records'][0]['s3']['object']['key']
target = '/'
target_find = source_key.find(target)
target_key_prefix = source_key[:target_find]
# ファイル名が定義したパターンにマッチしているか判定する
source_regex = re.compile(date_pattern)
match = source_regex.search(source_key)
if match is None:
print('ファイルが見つかりませんでした。')
else:
year, month, day, hour = match.groups()
filename_regex = re.compile(filename_pattern)
filename = filename_regex.search(source_key).group(0)
target_key = target_key_prefix + '/' + year + '/' + month + '/' + day + '/' + filename
print(source_key + ' to ' + target_key)
# パスを指定して、オブジェクトを移動させる
copy_result =s3.copy_object(Bucket=destination_bucket, Key=target_key,
CopySource={'Bucket': bucket, 'Key': source_key})
print(copy_result)
③Amazon Athenaでアクセスログをクエリする
ケース2で使用するテンプレートはケース1で使用したものと同じものを使用しますが、Glueのテーブルを作成する際に指定するS3バケットが異なります。 Glueのテーブルを作成する際は注意する必要があります。
ケース2のデモ
まずはコンテンツにアクセスして、CloudFrontのアクセスログがS3バケットに作成アップロードされるのを待ちます。
ログファイルは、指定したプレフィックス名のフォルダの配下にアップロードされます。
※CloudFrontのログファイルは、ログ有効化設定時に指定したプレフィックス名のフォルダ配下にアップロードされます。

ログファイルがS3バケットにアップロードされたことをトリガーにLambda関数が起動され、パスを最適化して別のS3バケットアップロードされます。

さいごに
今回はLambda関数でCloudFrontのアクセスログのパス保存を最適化する方法を紹介しました。 今回紹介した方法は、新規でS3バケットにアップロードされたログファイルのパスを最適化する際に使用できるものです。 次回のAmazon CloudFrontのアクセスログ保存パスをAthena用に最適化してみた~part2~では、既存のCloudFrontのアクセスログに対してパス保存最適化処理を行う方法を紹介します。