NRIネットコム Blog

NRIネットコム社員が様々な視点で、日々の気づきやナレッジを発信するメディアです

AWS SAM(Python)でAthenaの操作を実行する

本記事は NRIネットコム Advent Calendar 2022 7日目の記事です。
🎁 6日目 ▶▶本記事 ▶▶ 8日目 🎄

初めに

初めまして、基盤デザイン事業部の北野と申します。アドベントカレンダー7日目の記事を担当します。 私は昨年12月に中途入社しまして今月で1年が経ちました。 現在の業務ではクラウドサービス(AWS)を用いたアプリケーション実行基盤の構築、運用を主として行っております。 最近AWS SAMを用いたサーバレスアプリケーションにての実装に携わったのでその時の知見を記事にしたいと思います。 今回はAWS SAMを使用してLambda(Python) + Athenaを用いたデータ抽出の実装を紹介いたします。

AWS SAMとは

サーバレスのアプリケーションを構築するためのフレームワークであり、簡単な記述でAWSのサーバレスアプリケーションを構築できます。

aws.amazon.com

AWS Lambdaとは

おなじみのサーバを立てずにイベント駆動でコードを実行するサービス(FaaS)

aws.amazon.com

Amazon Athenaとは

S3に保存されたデータ(CSV、JSON等)に対してSQLの形式でクエリを投げてデータを分析できるサービス

aws.amazon.com

AWS Glueとは

AWSが提供するサーバレスなデータ統合(ETL)サービス。指定したデータソース(S3上のファイル等)を指定の形式に変換することができます。 今回はS3に置いたcsvファイルをデータソースとし、AthenaからSQLでデータクエリを投げられる形式に変換します。

docs.aws.amazon.com

今回はAthenaを用いてS3に保存されたcsvファイルに対してSQLを発行し、そのクエリ結果を返却するWEB APIを実装します。

実装した構成

API Gateway + Lambda + Athena

データ

以下のようなユーザー情報(id,name,age)

id,name,age
1,yamada,22
2,kitano,33
3,tanaka,40
4,hamada,27
5,shimada,27

Getメソッドでリクエストされた際URLパラメータとして指定されたageを受け取ってそのage以上のレコードを返す簡易なアプリケーションを作成します。

実装

こちらの手順をもとにsam cliをインストールしましょう。

https://docs.aws.amazon.com/ja_jp/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html

以下コマンドでサーバレスアプリケーションの雛形が作成できます。(今回はPythonを使用します)

sam init --runtime=python3.9

ウィザードに従っていけば雛形が完成します。

Which template source would you like to use?
    1 - AWS Quick Start Templates
    2 - Custom Template Location
Choice: 1

Choose an AWS Quick Start application template
    1 - Hello World Example
    2 - Infrastructure event management
    3 - Multi-step workflow
    4 - Lambda EFS example
    5 - Serverless Connector Hello World Example
    6 - Multi-step workflow with Connectors
Template: 1

Based on your selections, the only Package type available is Zip.
We will proceed to selecting the Package type as Zip.

Based on your selections, the only dependency manager available is pip.
We will proceed copying the template using pip.

Would you like to enable X-Ray tracing on the function(s) in your application?  [y/N]: N

Project name [sam-app]: lambdaAthena

Cloning from https://github.com/aws/aws-sam-cli-app-templates (process may take a moment)

    -----------------------
    Generating application:
    -----------------------
    Name: lambdaAthena
    Runtime: python3.9
    Architectures: x86_64
    Dependency Manager: pip
    Application Template: hello-world
    Output Directory: .

    Next steps can be found in the README file at ./lambdaAthena/README.md


    Commands you can use next
    =========================
    [*] Create pipeline: cd lambdaAthena && sam pipeline init --bootstrap
    [*] Validate SAM template: cd lambdaAthena && sam validate
    [*] Test Function in the Cloud: cd lambdaAthena && sam sync --stack-name {stack-name} --watch
template.yaml

template.yamlに使用するリソースの情報を記載していきます。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  lambdaAthena

  Sample SAM Template for lambdaAthena

Globals:
  Function:
    Timeout: 15

Resources:
  AppDataBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: app-users-dataset-bucket
      PublicAccessBlockConfiguration:
        BlockPublicAcls: True
        BlockPublicPolicy: True
        IgnorePublicAcls: True
        RestrictPublicBuckets: True
  GlueDatabase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: !Sub app_database
  GlueTable:
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseName: !Ref GlueDatabase
      TableInput:
        Name: users
        TableType: EXTERNAL_TABLE
        StorageDescriptor:
          Columns:
            - Name: id
              Type: int
            - Name: name
              Type: string
            - Name: age
              Type: int
          InputFormat: org.apache.hadoop.mapred.TextInputFormat
          OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
          Location: !Sub s3://${AppDataBucket}/data
          SerdeInfo:
            Parameters:
              field.delim: ","
              serialization.format: ","
            SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  AthenaQueryResultBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub app-work-group-query-result
      PublicAccessBlockConfiguration:
        BlockPublicAcls: True
        BlockPublicPolicy: True
        IgnorePublicAcls: True
        RestrictPublicBuckets: True
  AthenaWorkGroup:
    Type: AWS::Athena::WorkGroup
    Properties:
      Name: !Sub app-work-group
      RecursiveDeleteOption: true
      WorkGroupConfiguration:
        ResultConfiguration:
          OutputLocation: !Sub s3://${AthenaQueryResultBucket}/data
        EnforceWorkGroupConfiguration: true
        PublishCloudWatchMetricsEnabled: true

  GetAthenaDataFunction:
    Type: AWS::Serverless::Function 
    Properties:
      CodeUri: src/
      Handler: app.lambda_handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Events:
        GetAthenaData:
          Type: Api 
          Properties:
            Path: /athena
            Method: get
      Policies:
        - Statement:
          - Sid: AllowAthenaExecution
            Effect: Allow
            Action:
              - s3:GetBucketLocation
              - s3:GetObject
              - s3:ListBucket
              - s3:ListBucketMultipartUploads
              - s3:ListMultipartUploadParts
              - s3:AbortMultipartUpload
              - s3:PutObject
              - glue:GetDatabase
              - glue:GetTable
              - glue:GetPartitions
              - glue:UpdateTable
              - athena:GetQueryExecution
              - athena:GetQueryResults
              - athena:StartQueryExecution
              - athena:StopQueryExecution
            Resource:
              - !Sub arn:aws:s3:::${AthenaQueryResultBucket}
              - !Sub arn:aws:s3:::${AthenaQueryResultBucket}/*
              - !Sub arn:aws:s3:::${AppDataBucket}
              - !Sub arn:aws:s3:::${AppDataBucket}/*
              - !Sub arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup}
              - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${GlueDatabase}/${GlueTable}
              - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDatabase}
              - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
Outputs:
  athenaApi:
    Description: "API Gateway endpoint URL for Prod stage for athena function"
    Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/athena/"
  • AppDataBucket
    • データのcsvを配置するバケットです。パブリックアクセスはOFFにしておきましょう。
  • GlueDatabase
    • Athenaから参照するデータストアを定義します。仮想的なデータベースとして下記GlueTableをまとめます。
  • GlueTable
    • ここに参照するcsvファイルのs3URLとテーブルのスキーマ定義を記述します。
  • AthenaQueryResultBucket
    • athenaのクエリ実行結果を配置するバケットになります。Athenaを使用する際は必要なので定義します。
  • AthenaWorkGroup
    • Athenaのリソースになります。
  • GetAthenaDataFunction
    • Athenaにクエリを投げるLambdaリソースになります。

このファイルでのポイントはAthenaリソースを定義するためにはGlueDatabase,GlueTableと言ったリソースを定義し、ここでS3のcsvファイルの参照設定やテーブルのスキーマ定義を行うことと、LambdaのリソースにAthenaに関するリソースの参照権限を付与する必要があることです。(policyに関してはもう少し分割したほうが良いかも)

src/app.py

Lambdaのコードを以下のように実装します。

import json
import boto3
from time import sleep

SLEEP_SECONDS = 1
RETRY_COUNT = 10
WORK_GROUP = "app-work-group"
DATABASE_NAME = "app_database"

athena = boto3.client("athena")


def lambda_handler(event, context):
    # クエリの作成
    age = event["queryStringParameters"]["age"]
    query_string = "SELECT id,name,age FROM users WHERE age > {0}".format(age)
    # クエリ実行開始
    query_execution_response = athena.start_query_execution(
        WorkGroup=WORK_GROUP,
        QueryExecutionContext={"Database": DATABASE_NAME},
        QueryString=query_string,
    )
    query_execution_id = query_execution_response["QueryExecutionId"]

    # クエリ実行状況の確認
    query_execution_status = ""
    retry_count = 0
    while query_execution_status != "SUCCEEDED":
        query_status = athena.get_query_execution(QueryExecutionId=query_execution_id)
        query_execution_status = query_status["QueryExecution"]["Status"]["State"]

        if query_execution_status == "FAILED":
            raise Exception("get_query_execution:FAILED")
        else:
            sleep(SLEEP_SECONDS)
            retry_count += 1
            if retry_count == RETRY_COUNT:
                raise Exception("query TIMEOUT")

    # クエリ実行結果の取得
    query_result_paginator = athena.get_paginator("get_query_results")
    query_result_iter = query_result_paginator.paginate(
        QueryExecutionId=query_execution_id, PaginationConfig={"PageSize": 1000}
    )
    user_data = sum(list(map(lambda x: x["ResultSet"]["Rows"], query_result_iter)), [])

    return {
        "statusCode": 200,
        "body": json.dumps(user_data),
    }

ここでのポイントはAthenaへのクエリ実行とデータの抽出は非同期で行われるため、Python上でAthenaのクエリが完了するまで待つ処理を実装する必要があることです。 大まかなAthenaの処理シーケンスは以下のようになっております。

処理シーケンス

もう一つポイントはget_query_resultsメソッドにてデータを受ける際は以下のようにathena.get_paginatorを生成してデータを取得する必要があります。

    query_result_paginator = athena.get_paginator("get_query_results")
    query_result_iter = query_result_paginator.paginate(
        QueryExecutionId=query_execution_id, PaginationConfig={"PageSize": 1000}
    )
    user_data = sum(list(map(lambda x: x["ResultSet"]["Rows"], query_result_iter)), [])

よく以下のように実装して結果を取得するサンプルはネット上見かけるのですが、この場合1000件以上のデータを取得できないといった問題があります。

query_result = athena.get_query_results(QueryExecutionId=query_execution_id)

以下のコマンドでビルド・デプロイしてみましょう。

sam build
sam deploy --guided

リージョンやstack名など入力していくとデプロイが完了するかと思います。 以下Out Putsに書かれているURLは動作確認で使用するので控えておいてください。

Outputs
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Key                 athenaApi
Description         API Gateway endpoint URL for Prod stage for athena function
Value               https://jcbtagzepk.execute-api.us-east-1.amazonaws.com/Prod/athena/
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
実行結果

上記テストデータをdatas.csvとして保存しておき、以下のコマンドでS3にコピーしておきましょう。

aws s3 cp datas.csv s3://app-users-dataset-bucket/data/datas.csv

curlを用いて動作を検証します。(URLに関しては上記で控えておいたURLを使用します。)

curl https://jcbtagzepk.execute-api.us-east-1.amazonaws.com/Prod/athena?age=25 | jq .
[
  {
    "Data": [
      {
        "VarCharValue": "id"
      },
      {
        "VarCharValue": "name"
      },
      {
        "VarCharValue": "age"
      }
    ]
  },
  {
    "Data": [
      {
        "VarCharValue": "2"
      },
      {
        "VarCharValue": "kitano"
      },
      {
        "VarCharValue": "33"
      }
    ]
  },
  {
    "Data": [
      {
        "VarCharValue": "3"
      },
      {
        "VarCharValue": "tanaka"
      },
      {
        "VarCharValue": "40"
      }
    ]
  },
  {
    "Data": [
      {
        "VarCharValue": "4"
      },
      {
        "VarCharValue": "hamada"
      },
      {
        "VarCharValue": "27"
      }
    ]
  },
  {
    "Data": [
      {
        "VarCharValue": "5"
      },
      {
        "VarCharValue": "shimada"
      },
      {
        "VarCharValue": "27"
      }
    ]
  }
]

age > 25以上のユーザー情報を取得できました。

所感・まとめ

あまりAPI Gatewayから直接Athenaを使用するサンプルがないので今回実装してみました。 Lambdaから実行した所感としては、

  • Athenaの終了を同期する実装が必要なため、普段使いのRDBのコネクタ等の感覚で使用するとなかなか苦労しそう。
  • データは1000件ごとにページングされるため、通常のget_query_resultsメソッドで取得すると1000件でデータが切れてしまう。

あくまでサンプルなので簡易な作りですが、本格的にAPIで使用するとなるとAthenaの前段にキャッシュを用意したりする必要があるのかなと思います。あとQuery Resultのバケットにはライフサイクルを設定しないとどんどんデータが溜まっていってしまいますね。 Athenaの実行方法のセオリーとしてはStepFunctionsなどで実行する方法が一般的だと思うのでまたそちらに関しても記事が書けたらと思います。

執筆者 : 北野友亮
インフラエンジニア。元アプリケーションエンジニア(業務系)AWSのインフラ構築・設計を行なっています。最近はPythonをよく触ります。