本記事は
NRIネットコム Advent Calendar 2022
7日目の記事です。
🎁 6日目
▶▶本記事 ▶▶
8日目
🎄
初めに
初めまして、基盤デザイン事業部の北野と申します。アドベントカレンダー7日目の記事を担当します。 私は昨年12月に中途入社しまして今月で1年が経ちました。 現在の業務ではクラウドサービス(AWS)を用いたアプリケーション実行基盤の構築、運用を主として行っております。 最近AWS SAMを用いたサーバレスアプリケーションにての実装に携わったのでその時の知見を記事にしたいと思います。 今回はAWS SAMを使用してLambda(Python) + Athenaを用いたデータ抽出の実装を紹介いたします。
AWS SAMとは
サーバレスのアプリケーションを構築するためのフレームワークであり、簡単な記述でAWSのサーバレスアプリケーションを構築できます。
AWS Lambdaとは
おなじみのサーバを立てずにイベント駆動でコードを実行するサービス(FaaS)
Amazon Athenaとは
S3に保存されたデータ(CSV、JSON等)に対してSQLの形式でクエリを投げてデータを分析できるサービス
AWS Glueとは
AWSが提供するサーバレスなデータ統合(ETL)サービス。指定したデータソース(S3上のファイル等)を指定の形式に変換することができます。 今回はS3に置いたcsvファイルをデータソースとし、AthenaからSQLでデータクエリを投げられる形式に変換します。
今回はAthenaを用いてS3に保存されたcsvファイルに対してSQLを発行し、そのクエリ結果を返却するWEB APIを実装します。
実装した構成
API Gateway + Lambda + Athena
データ
以下のようなユーザー情報(id,name,age)
id,name,age 1,yamada,22 2,kitano,33 3,tanaka,40 4,hamada,27 5,shimada,27
Getメソッドでリクエストされた際URLパラメータとして指定されたageを受け取ってそのage以上のレコードを返す簡易なアプリケーションを作成します。
実装
こちらの手順をもとにsam cliをインストールしましょう。
以下コマンドでサーバレスアプリケーションの雛形が作成できます。(今回はPythonを使用します)
sam init --runtime=python3.9
ウィザードに従っていけば雛形が完成します。
Which template source would you like to use? 1 - AWS Quick Start Templates 2 - Custom Template Location Choice: 1 Choose an AWS Quick Start application template 1 - Hello World Example 2 - Infrastructure event management 3 - Multi-step workflow 4 - Lambda EFS example 5 - Serverless Connector Hello World Example 6 - Multi-step workflow with Connectors Template: 1 Based on your selections, the only Package type available is Zip. We will proceed to selecting the Package type as Zip. Based on your selections, the only dependency manager available is pip. We will proceed copying the template using pip. Would you like to enable X-Ray tracing on the function(s) in your application? [y/N]: N Project name [sam-app]: lambdaAthena Cloning from https://github.com/aws/aws-sam-cli-app-templates (process may take a moment) ----------------------- Generating application: ----------------------- Name: lambdaAthena Runtime: python3.9 Architectures: x86_64 Dependency Manager: pip Application Template: hello-world Output Directory: . Next steps can be found in the README file at ./lambdaAthena/README.md Commands you can use next ========================= [*] Create pipeline: cd lambdaAthena && sam pipeline init --bootstrap [*] Validate SAM template: cd lambdaAthena && sam validate [*] Test Function in the Cloud: cd lambdaAthena && sam sync --stack-name {stack-name} --watch
template.yaml
template.yamlに使用するリソースの情報を記載していきます。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > lambdaAthena Sample SAM Template for lambdaAthena Globals: Function: Timeout: 15 Resources: AppDataBucket: Type: AWS::S3::Bucket Properties: BucketName: app-users-dataset-bucket PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True GlueDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: !Sub app_database GlueTable: Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref GlueDatabase TableInput: Name: users TableType: EXTERNAL_TABLE StorageDescriptor: Columns: - Name: id Type: int - Name: name Type: string - Name: age Type: int InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat Location: !Sub s3://${AppDataBucket}/data SerdeInfo: Parameters: field.delim: "," serialization.format: "," SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe AthenaQueryResultBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub app-work-group-query-result PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True AthenaWorkGroup: Type: AWS::Athena::WorkGroup Properties: Name: !Sub app-work-group RecursiveDeleteOption: true WorkGroupConfiguration: ResultConfiguration: OutputLocation: !Sub s3://${AthenaQueryResultBucket}/data EnforceWorkGroupConfiguration: true PublishCloudWatchMetricsEnabled: true GetAthenaDataFunction: Type: AWS::Serverless::Function Properties: CodeUri: src/ Handler: app.lambda_handler Runtime: python3.9 Architectures: - x86_64 Events: GetAthenaData: Type: Api Properties: Path: /athena Method: get Policies: - Statement: - Sid: AllowAthenaExecution Effect: Allow Action: - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:ListMultipartUploadParts - s3:AbortMultipartUpload - s3:PutObject - glue:GetDatabase - glue:GetTable - glue:GetPartitions - glue:UpdateTable - athena:GetQueryExecution - athena:GetQueryResults - athena:StartQueryExecution - athena:StopQueryExecution Resource: - !Sub arn:aws:s3:::${AthenaQueryResultBucket} - !Sub arn:aws:s3:::${AthenaQueryResultBucket}/* - !Sub arn:aws:s3:::${AppDataBucket} - !Sub arn:aws:s3:::${AppDataBucket}/* - !Sub arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup} - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${GlueDatabase}/${GlueTable} - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDatabase} - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog Outputs: athenaApi: Description: "API Gateway endpoint URL for Prod stage for athena function" Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/athena/"
- AppDataBucket
- データのcsvを配置するバケットです。パブリックアクセスはOFFにしておきましょう。
- GlueDatabase
- Athenaから参照するデータストアを定義します。仮想的なデータベースとして下記GlueTableをまとめます。
- GlueTable
- ここに参照するcsvファイルのs3URLとテーブルのスキーマ定義を記述します。
- AthenaQueryResultBucket
- athenaのクエリ実行結果を配置するバケットになります。Athenaを使用する際は必要なので定義します。
- AthenaWorkGroup
- Athenaのリソースになります。
- GetAthenaDataFunction
- Athenaにクエリを投げるLambdaリソースになります。
このファイルでのポイントはAthenaリソースを定義するためにはGlueDatabase,GlueTableと言ったリソースを定義し、ここでS3のcsvファイルの参照設定やテーブルのスキーマ定義を行うことと、LambdaのリソースにAthenaに関するリソースの参照権限を付与する必要があることです。(policyに関してはもう少し分割したほうが良いかも)
src/app.py
Lambdaのコードを以下のように実装します。
import json import boto3 from time import sleep SLEEP_SECONDS = 1 RETRY_COUNT = 10 WORK_GROUP = "app-work-group" DATABASE_NAME = "app_database" athena = boto3.client("athena") def lambda_handler(event, context): # クエリの作成 age = event["queryStringParameters"]["age"] query_string = "SELECT id,name,age FROM users WHERE age > {0}".format(age) # クエリ実行開始 query_execution_response = athena.start_query_execution( WorkGroup=WORK_GROUP, QueryExecutionContext={"Database": DATABASE_NAME}, QueryString=query_string, ) query_execution_id = query_execution_response["QueryExecutionId"] # クエリ実行状況の確認 query_execution_status = "" retry_count = 0 while query_execution_status != "SUCCEEDED": query_status = athena.get_query_execution(QueryExecutionId=query_execution_id) query_execution_status = query_status["QueryExecution"]["Status"]["State"] if query_execution_status == "FAILED": raise Exception("get_query_execution:FAILED") else: sleep(SLEEP_SECONDS) retry_count += 1 if retry_count == RETRY_COUNT: raise Exception("query TIMEOUT") # クエリ実行結果の取得 query_result_paginator = athena.get_paginator("get_query_results") query_result_iter = query_result_paginator.paginate( QueryExecutionId=query_execution_id, PaginationConfig={"PageSize": 1000} ) user_data = sum(list(map(lambda x: x["ResultSet"]["Rows"], query_result_iter)), []) return { "statusCode": 200, "body": json.dumps(user_data), }
ここでのポイントはAthenaへのクエリ実行とデータの抽出は非同期で行われるため、Python上でAthenaのクエリが完了するまで待つ処理を実装する必要があることです。 大まかなAthenaの処理シーケンスは以下のようになっております。
処理シーケンス
もう一つポイントはget_query_resultsメソッドにてデータを受ける際は以下のようにathena.get_paginatorを生成してデータを取得する必要があります。
query_result_paginator = athena.get_paginator("get_query_results") query_result_iter = query_result_paginator.paginate( QueryExecutionId=query_execution_id, PaginationConfig={"PageSize": 1000} ) user_data = sum(list(map(lambda x: x["ResultSet"]["Rows"], query_result_iter)), [])
よく以下のように実装して結果を取得するサンプルはネット上見かけるのですが、この場合1000件以上のデータを取得できないといった問題があります。
query_result = athena.get_query_results(QueryExecutionId=query_execution_id)
以下のコマンドでビルド・デプロイしてみましょう。
sam build sam deploy --guided
リージョンやstack名など入力していくとデプロイが完了するかと思います。 以下Out Putsに書かれているURLは動作確認で使用するので控えておいてください。
Outputs ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Key athenaApi Description API Gateway endpoint URL for Prod stage for athena function Value https://jcbtagzepk.execute-api.us-east-1.amazonaws.com/Prod/athena/ -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
実行結果
上記テストデータをdatas.csvとして保存しておき、以下のコマンドでS3にコピーしておきましょう。
aws s3 cp datas.csv s3://app-users-dataset-bucket/data/datas.csv
curlを用いて動作を検証します。(URLに関しては上記で控えておいたURLを使用します。)
curl https://jcbtagzepk.execute-api.us-east-1.amazonaws.com/Prod/athena?age=25 | jq . [ { "Data": [ { "VarCharValue": "id" }, { "VarCharValue": "name" }, { "VarCharValue": "age" } ] }, { "Data": [ { "VarCharValue": "2" }, { "VarCharValue": "kitano" }, { "VarCharValue": "33" } ] }, { "Data": [ { "VarCharValue": "3" }, { "VarCharValue": "tanaka" }, { "VarCharValue": "40" } ] }, { "Data": [ { "VarCharValue": "4" }, { "VarCharValue": "hamada" }, { "VarCharValue": "27" } ] }, { "Data": [ { "VarCharValue": "5" }, { "VarCharValue": "shimada" }, { "VarCharValue": "27" } ] } ]
age > 25以上のユーザー情報を取得できました。
所感・まとめ
あまりAPI Gatewayから直接Athenaを使用するサンプルがないので今回実装してみました。 Lambdaから実行した所感としては、
- Athenaの終了を同期する実装が必要なため、普段使いのRDBのコネクタ等の感覚で使用するとなかなか苦労しそう。
- データは1000件ごとにページングされるため、通常のget_query_resultsメソッドで取得すると1000件でデータが切れてしまう。
あくまでサンプルなので簡易な作りですが、本格的にAPIで使用するとなるとAthenaの前段にキャッシュを用意したりする必要があるのかなと思います。あとQuery Resultのバケットにはライフサイクルを設定しないとどんどんデータが溜まっていってしまいますね。 Athenaの実行方法のセオリーとしてはStepFunctionsなどで実行する方法が一般的だと思うのでまたそちらに関しても記事が書けたらと思います。
インフラエンジニア。元アプリケーションエンジニア(業務系)AWSのインフラ構築・設計を行なっています。最近はPythonをよく触ります。