NRIネットコム社員が様々な視点で、日々の気づきやナレッジを発信するメディアです

注目のタグ

    AWS SAM(Python)でAthenaの操作を実行する

    本記事は NRIネットコム Advent Calendar 2022 7日目の記事です。
    🎁 6日目 ▶▶本記事 ▶▶ 8日目 🎄

    初めに

    初めまして、基盤デザイン事業部の北野と申します。アドベントカレンダー7日目の記事を担当します。 私は昨年12月に中途入社しまして今月で1年が経ちました。 現在の業務ではクラウドサービス(AWS)を用いたアプリケーション実行基盤の構築、運用を主として行っております。 最近AWS SAMを用いたサーバレスアプリケーションにての実装に携わったのでその時の知見を記事にしたいと思います。 今回はAWS SAMを使用してLambda(Python) + Athenaを用いたデータ抽出の実装を紹介いたします。

    AWS SAMとは

    サーバレスのアプリケーションを構築するためのフレームワークであり、簡単な記述でAWSのサーバレスアプリケーションを構築できます。

    aws.amazon.com

    AWS Lambdaとは

    おなじみのサーバを立てずにイベント駆動でコードを実行するサービス(FaaS)

    aws.amazon.com

    Amazon Athenaとは

    S3に保存されたデータ(CSV、JSON等)に対してSQLの形式でクエリを投げてデータを分析できるサービス

    aws.amazon.com

    AWS Glueとは

    AWSが提供するサーバレスなデータ統合(ETL)サービス。指定したデータソース(S3上のファイル等)を指定の形式に変換することができます。 今回はS3に置いたcsvファイルをデータソースとし、AthenaからSQLでデータクエリを投げられる形式に変換します。

    docs.aws.amazon.com

    今回はAthenaを用いてS3に保存されたcsvファイルに対してSQLを発行し、そのクエリ結果を返却するWEB APIを実装します。

    実装した構成

    API Gateway + Lambda + Athena

    データ

    以下のようなユーザー情報(id,name,age)

    id,name,age
    1,yamada,22
    2,kitano,33
    3,tanaka,40
    4,hamada,27
    5,shimada,27

    Getメソッドでリクエストされた際URLパラメータとして指定されたageを受け取ってそのage以上のレコードを返す簡易なアプリケーションを作成します。

    実装

    こちらの手順をもとにsam cliをインストールしましょう。

    https://docs.aws.amazon.com/ja_jp/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html

    以下コマンドでサーバレスアプリケーションの雛形が作成できます。(今回はPythonを使用します)

    sam init --runtime=python3.9

    ウィザードに従っていけば雛形が完成します。

    Which template source would you like to use?
        1 - AWS Quick Start Templates
        2 - Custom Template Location
    Choice: 1
    
    Choose an AWS Quick Start application template
        1 - Hello World Example
        2 - Infrastructure event management
        3 - Multi-step workflow
        4 - Lambda EFS example
        5 - Serverless Connector Hello World Example
        6 - Multi-step workflow with Connectors
    Template: 1
    
    Based on your selections, the only Package type available is Zip.
    We will proceed to selecting the Package type as Zip.
    
    Based on your selections, the only dependency manager available is pip.
    We will proceed copying the template using pip.
    
    Would you like to enable X-Ray tracing on the function(s) in your application?  [y/N]: N
    
    Project name [sam-app]: lambdaAthena
    
    Cloning from https://github.com/aws/aws-sam-cli-app-templates (process may take a moment)
    
        -----------------------
        Generating application:
        -----------------------
        Name: lambdaAthena
        Runtime: python3.9
        Architectures: x86_64
        Dependency Manager: pip
        Application Template: hello-world
        Output Directory: .
    
        Next steps can be found in the README file at ./lambdaAthena/README.md
    
    
        Commands you can use next
        =========================
        [*] Create pipeline: cd lambdaAthena && sam pipeline init --bootstrap
        [*] Validate SAM template: cd lambdaAthena && sam validate
        [*] Test Function in the Cloud: cd lambdaAthena && sam sync --stack-name {stack-name} --watch
    
    template.yaml

    template.yamlに使用するリソースの情報を記載していきます。

    AWSTemplateFormatVersion: '2010-09-09'
    Transform: AWS::Serverless-2016-10-31
    Description: >
      lambdaAthena
    
      Sample SAM Template for lambdaAthena
    
    Globals:
      Function:
        Timeout: 15
    
    Resources:
      AppDataBucket:
        Type: AWS::S3::Bucket
        Properties:
          BucketName: app-users-dataset-bucket
          PublicAccessBlockConfiguration:
            BlockPublicAcls: True
            BlockPublicPolicy: True
            IgnorePublicAcls: True
            RestrictPublicBuckets: True
      GlueDatabase:
        Type: AWS::Glue::Database
        Properties:
          CatalogId: !Ref AWS::AccountId
          DatabaseInput:
            Name: !Sub app_database
      GlueTable:
        Type: AWS::Glue::Table
        Properties:
          CatalogId: !Ref AWS::AccountId
          DatabaseName: !Ref GlueDatabase
          TableInput:
            Name: users
            TableType: EXTERNAL_TABLE
            StorageDescriptor:
              Columns:
                - Name: id
                  Type: int
                - Name: name
                  Type: string
                - Name: age
                  Type: int
              InputFormat: org.apache.hadoop.mapred.TextInputFormat
              OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
              Location: !Sub s3://${AppDataBucket}/data
              SerdeInfo:
                Parameters:
                  field.delim: ","
                  serialization.format: ","
                SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
    
      AthenaQueryResultBucket:
        Type: AWS::S3::Bucket
        Properties:
          BucketName: !Sub app-work-group-query-result
          PublicAccessBlockConfiguration:
            BlockPublicAcls: True
            BlockPublicPolicy: True
            IgnorePublicAcls: True
            RestrictPublicBuckets: True
      AthenaWorkGroup:
        Type: AWS::Athena::WorkGroup
        Properties:
          Name: !Sub app-work-group
          RecursiveDeleteOption: true
          WorkGroupConfiguration:
            ResultConfiguration:
              OutputLocation: !Sub s3://${AthenaQueryResultBucket}/data
            EnforceWorkGroupConfiguration: true
            PublishCloudWatchMetricsEnabled: true
    
      GetAthenaDataFunction:
        Type: AWS::Serverless::Function 
        Properties:
          CodeUri: src/
          Handler: app.lambda_handler
          Runtime: python3.9
          Architectures:
            - x86_64
          Events:
            GetAthenaData:
              Type: Api 
              Properties:
                Path: /athena
                Method: get
          Policies:
            - Statement:
              - Sid: AllowAthenaExecution
                Effect: Allow
                Action:
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:ListMultipartUploadParts
                  - s3:AbortMultipartUpload
                  - s3:PutObject
                  - glue:GetDatabase
                  - glue:GetTable
                  - glue:GetPartitions
                  - glue:UpdateTable
                  - athena:GetQueryExecution
                  - athena:GetQueryResults
                  - athena:StartQueryExecution
                  - athena:StopQueryExecution
                Resource:
                  - !Sub arn:aws:s3:::${AthenaQueryResultBucket}
                  - !Sub arn:aws:s3:::${AthenaQueryResultBucket}/*
                  - !Sub arn:aws:s3:::${AppDataBucket}
                  - !Sub arn:aws:s3:::${AppDataBucket}/*
                  - !Sub arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup}
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${GlueDatabase}/${GlueTable}
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDatabase}
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
    Outputs:
      athenaApi:
        Description: "API Gateway endpoint URL for Prod stage for athena function"
        Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/athena/"
    • AppDataBucket
      • データのcsvを配置するバケットです。パブリックアクセスはOFFにしておきましょう。
    • GlueDatabase
      • Athenaから参照するデータストアを定義します。仮想的なデータベースとして下記GlueTableをまとめます。
    • GlueTable
      • ここに参照するcsvファイルのs3URLとテーブルのスキーマ定義を記述します。
    • AthenaQueryResultBucket
      • athenaのクエリ実行結果を配置するバケットになります。Athenaを使用する際は必要なので定義します。
    • AthenaWorkGroup
      • Athenaのリソースになります。
    • GetAthenaDataFunction
      • Athenaにクエリを投げるLambdaリソースになります。

    このファイルでのポイントはAthenaリソースを定義するためにはGlueDatabase,GlueTableと言ったリソースを定義し、ここでS3のcsvファイルの参照設定やテーブルのスキーマ定義を行うことと、LambdaのリソースにAthenaに関するリソースの参照権限を付与する必要があることです。(policyに関してはもう少し分割したほうが良いかも)

    src/app.py

    Lambdaのコードを以下のように実装します。

    import json
    import boto3
    from time import sleep
    
    SLEEP_SECONDS = 1
    RETRY_COUNT = 10
    WORK_GROUP = "app-work-group"
    DATABASE_NAME = "app_database"
    
    athena = boto3.client("athena")
    
    
    def lambda_handler(event, context):
        # クエリの作成
        age = event["queryStringParameters"]["age"]
        query_string = "SELECT id,name,age FROM users WHERE age > {0}".format(age)
        # クエリ実行開始
        query_execution_response = athena.start_query_execution(
            WorkGroup=WORK_GROUP,
            QueryExecutionContext={"Database": DATABASE_NAME},
            QueryString=query_string,
        )
        query_execution_id = query_execution_response["QueryExecutionId"]
    
        # クエリ実行状況の確認
        query_execution_status = ""
        retry_count = 0
        while query_execution_status != "SUCCEEDED":
            query_status = athena.get_query_execution(QueryExecutionId=query_execution_id)
            query_execution_status = query_status["QueryExecution"]["Status"]["State"]
    
            if query_execution_status == "FAILED":
                raise Exception("get_query_execution:FAILED")
            else:
                sleep(SLEEP_SECONDS)
                retry_count += 1
                if retry_count == RETRY_COUNT:
                    raise Exception("query TIMEOUT")
    
        # クエリ実行結果の取得
        query_result_paginator = athena.get_paginator("get_query_results")
        query_result_iter = query_result_paginator.paginate(
            QueryExecutionId=query_execution_id, PaginationConfig={"PageSize": 1000}
        )
        user_data = sum(list(map(lambda x: x["ResultSet"]["Rows"], query_result_iter)), [])
    
        return {
            "statusCode": 200,
            "body": json.dumps(user_data),
        }
    

    ここでのポイントはAthenaへのクエリ実行とデータの抽出は非同期で行われるため、Python上でAthenaのクエリが完了するまで待つ処理を実装する必要があることです。 大まかなAthenaの処理シーケンスは以下のようになっております。

    処理シーケンス

    もう一つポイントはget_query_resultsメソッドにてデータを受ける際は以下のようにathena.get_paginatorを生成してデータを取得する必要があります。

        query_result_paginator = athena.get_paginator("get_query_results")
        query_result_iter = query_result_paginator.paginate(
            QueryExecutionId=query_execution_id, PaginationConfig={"PageSize": 1000}
        )
        user_data = sum(list(map(lambda x: x["ResultSet"]["Rows"], query_result_iter)), [])
    

    よく以下のように実装して結果を取得するサンプルはネット上見かけるのですが、この場合1000件以上のデータを取得できないといった問題があります。

    query_result = athena.get_query_results(QueryExecutionId=query_execution_id)

    以下のコマンドでビルド・デプロイしてみましょう。

    sam build
    sam deploy --guided

    リージョンやstack名など入力していくとデプロイが完了するかと思います。 以下Out Putsに書かれているURLは動作確認で使用するので控えておいてください。

    Outputs
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    Key                 athenaApi
    Description         API Gateway endpoint URL for Prod stage for athena function
    Value               https://jcbtagzepk.execute-api.us-east-1.amazonaws.com/Prod/athena/
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    実行結果

    上記テストデータをdatas.csvとして保存しておき、以下のコマンドでS3にコピーしておきましょう。

    aws s3 cp datas.csv s3://app-users-dataset-bucket/data/datas.csv

    curlを用いて動作を検証します。(URLに関しては上記で控えておいたURLを使用します。)

    curl https://jcbtagzepk.execute-api.us-east-1.amazonaws.com/Prod/athena?age=25 | jq .
    [
      {
        "Data": [
          {
            "VarCharValue": "id"
          },
          {
            "VarCharValue": "name"
          },
          {
            "VarCharValue": "age"
          }
        ]
      },
      {
        "Data": [
          {
            "VarCharValue": "2"
          },
          {
            "VarCharValue": "kitano"
          },
          {
            "VarCharValue": "33"
          }
        ]
      },
      {
        "Data": [
          {
            "VarCharValue": "3"
          },
          {
            "VarCharValue": "tanaka"
          },
          {
            "VarCharValue": "40"
          }
        ]
      },
      {
        "Data": [
          {
            "VarCharValue": "4"
          },
          {
            "VarCharValue": "hamada"
          },
          {
            "VarCharValue": "27"
          }
        ]
      },
      {
        "Data": [
          {
            "VarCharValue": "5"
          },
          {
            "VarCharValue": "shimada"
          },
          {
            "VarCharValue": "27"
          }
        ]
      }
    ]

    age > 25以上のユーザー情報を取得できました。

    所感・まとめ

    あまりAPI Gatewayから直接Athenaを使用するサンプルがないので今回実装してみました。 Lambdaから実行した所感としては、

    • Athenaの終了を同期する実装が必要なため、普段使いのRDBのコネクタ等の感覚で使用するとなかなか苦労しそう。
    • データは1000件ごとにページングされるため、通常のget_query_resultsメソッドで取得すると1000件でデータが切れてしまう。

    あくまでサンプルなので簡易な作りですが、本格的にAPIで使用するとなるとAthenaの前段にキャッシュを用意したりする必要があるのかなと思います。あとQuery Resultのバケットにはライフサイクルを設定しないとどんどんデータが溜まっていってしまいますね。 Athenaの実行方法のセオリーとしてはStepFunctionsなどで実行する方法が一般的だと思うのでまたそちらに関しても記事が書けたらと思います。

    執筆者 : 北野友亮
    インフラエンジニア。元アプリケーションエンジニア(業務系)AWSのインフラ構築・設計を行なっています。 2025 Japan All AWS Certifications Engineers