日々変化するゆるふわフォーマットをBigQueryでおいしく料理する方法。Athenaユーザも必見だよ!

3行まとめ

  • BigQueryはいいぞ
  • 外部テーブルはすごいぞ
  • Scheduled Queryも便利だぞ

こんにちは。ひむ(@himu)です。
株式会社fluctでエンジニアとして働いていたり、ボルダリングしたりガチャを回したり健康で文化的な生活をしています。

fluctはインターネット広告プラットフォームのサービスなどを提供しており、毎日億単位の大量のイベントログが発生しています。
イベントログには、売上の計算に必要なデータから、アプリケーションを改善する上で必要なデータなど、様々なデータが入り混じっており、情報が追加されることも度々あります。
今回は、そんな日々変化するログを分析基盤に取り込み、活用しやすくするためにやってきたことを紹介します。

背景

今回扱うサービスは社内では比較的新しいもので、Athenaでログをクエリするしくみが早い段階から作られていました。

アプリケーションはAWSのEC2インスタンス上で動いており、ログファイルに吐き出したデータをkinesis agentがストリームに取り込み、Kinesis Firehoseを使ってS3に保管するという流れです。
当初は取引量も少なく、S3に置いたログをAthenaでクエリするので十分データを活用できていました。

しかし取引量が増えるにつれてログの量やフィールドが増えたことで、1日あたり10億レコード、1TB近くと、単純にデータサイズが大きくなってきました。
Athenaで分析しようとしても、クエリごとに数分、あるいはそれ以上かかるようになり、データ活用の大きな障害になってしまいました。Athenaのパーティション自動生成のしくみがわかりづらかったことも頭を悩ませる原因になっていました。

そこで、もともとチームの分析基盤としてBigQueryを使っていたこともあり、そちらに取り込もうと考えました。

データの流れ

f:id:himuk:20201002185620j:plain

そのままコピーするだけのLambda

S3からGCSへのコピーでは、今回パスも含めてそのままコピーするLambdaを用意しました。

import io
import json
from urllib.parse import unquote_plus

import boto3
from google.cloud import storage
from google.oauth2 import service_account

s3_client = boto3.client('s3')
sm_client = boto3.client('secretsmanager')


def init_gcs():
    cred_json = sm_client.get_secret_value(SecretId='gcp_serviceaccount_json')['SecretString']
    cred = json.loads(cred_json)
    return storage.Client(
        credentials=service_account.Credentials.from_service_account_info(cred),
        project='project_name',
    )

def handler(event, context):
    gcs_client = init_gcs()

    for record in event['Records']:
        stream = io.BytesIO()

        s3_bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])
        s3_client.download_fileobj(s3_bucket, key, stream)

        gcs_bucket = gcs_client.bucket('bucket_name')
        blob = gcs_bucket.blob(key)
        blob.upload_from_string(stream.getvalue())

外部テーブルを使おう

ログはJSON Lines(BigQueryでいうところのNEWLINE_DELIMITED_JSON)で、配列データがあったり、デバッグ用のフィールドがomitemptyされたりしています。
また、スキーマの変更もあり得るため、そのたびにテーブルスキーマ定義も変更しないと取り込みエラーになる可能性があります。
いわゆる「ゆるふわフォーマット」というやつですね(たぶん)。

Athenaはそういったデータをクエリするのに便利なツールで、データの実体はクラウドストレージにあり、テーブルはビューのような感覚で使えるようになっています。

GCPにも似た機能があり、それがBigQueryの「外部テーブル(External Table)」です。

まさにGCP版Athenaとも言える機能で、GCSやGoogle Driveに置いたファイルをBigQueryから直接クエリすることができます。
Athenaと同様にhive partitionを扱えて、BigQueryのパーサをそのまま使えるので対応フォーマットも多いです。 少し前にGoogle SheetでBigQueryが使えるようになったりしていて、BigQueryの多機能ぶりには頭が上がらないですね。

クエリ速度ですが、Athenaと比較するのが馬鹿らしくなるくらい速いです。
Athenaで10分以上かかっていたクエリが数秒で返ってきたときは感動しました。
今回はGCSのNearline Storageに置いたので、ストレージクラスが違ったりGoogle Driveなどにクエリすると遅くなるのかもしれません。

ゆるふわをゆるふわのまま扱う

外部テーブルを使えばBigQueryで扱うことができるようになりますが、ゆるふわフォーマットをゆるふわたらしめているのはスキーマで、外部テーブルも例外なくスキーマ定義をしなければなりません(auto detectは使えます)。

正直これが一番面倒な作業です。ログに出しているデータすべてを定義しなくてはならないし、変更にも都度対応しなければなりません。
日々活用しているのは一部のフィールドだけだったりしていて、深くネストされたフィールドなどはデバッグでたまに見るくらいだったりします。

であれば、一部だけクエリできるようにして、あとはJSON文字列のままのほうがクエリしやすくもなるのでは?
必要なフィールドだけパースするようにすれば、エラーの可能性も低くできるのでは?

そこで、「JSONはJSON文字列のまま取り込んで、そのあと必要なフィールドだけ抽出する」ようにしました。
外部テーブルではJSON Linesをそのまま1カラムのレコードとして取り込み、その後BigQueryの関数を使ってパースしたテーブルを定期的に作ります。

こうすれば、外部テーブルは一度作ってしまえばずっと使えるし、パースするクエリを変更したタイミングから新しいスキーマでテーブルが作られるようにできます。

これを実現するために、

  • JSON Linesを1カラムのレコードとして取り込む
  • 定期的に外部テーブルにクエリして結果を保存する

といったことをやっていきます。

JSON Linesを1カラムのレコードとして取り込む

BigQueryでJSONフォーマットを指定すると、すべてのフィールドをパースしようとします。「ここまでパースして!これより深いネストはパースしないで!」というワガママは聞いてくれないのです。

というわけで、「これはJSONフォーマットではありません」という風に教えてあげます。

具体的にどうするかというと、同じ1行1レコードであるCSVを指定します。
BigQueryのCSVフォーマットにはデリミタを指定することができるため、データに入り得ない文字を指定すれば1行1カラムとしてパースされます。

外部テーブルは、通常のテーブルと同様にGCPコンソールやコマンドラインツールから作成することができます。

bq mk --external_table_definition=tabledef.json dataset.table
{
    "csvOptions": {
        "allowJaggedRows": false,
        "allowQuotedNewlines": false,
        "encoding": "UTF-8",
        "fieldDelimiter": "",
        "quote": "",
        "skipLeadingRows": 0
    },
    "schema": {
        "fields": [
            {
                "name": "fields",
                "type": "STRING"
            }
        ]
    },
    "sourceFormat": "CSV",
    "sourceUris": [
        "gs://bucket/*.gz"
    ],
    "hivePartitioningOptions": {
        "mode": "STRINGS",
        "sourceUriPrefix": "gs://bucket/",
        "requirePartitionFilter": true
    }
}

デリミタには制御文字を指定しています。
JSONは制御文字をエスケープしなければならないため、エスケープされていない制御文字をデリミタに指定すれば1カラムのテーブルができます。

……そう、JSONは制御文字をエスケープしなければなりません。
しかし、外部テーブル定義ファイルはJSONです。つまり上の外部テーブル定義はJSONとして読み込めないのです。

File ".../bq/third_party/yaml/lib2/reader.py", line 144, in check_printable
    'unicode', "special characters are not allowed")
ReaderError: unacceptable character #x0001: special characters are not allowed
  in "tabledef.json", position 174

当時のわたしは「一度きりだし、今だけ動けばいいや」とエラーを出しているライブラリを書き換えてコマンドを実行してテーブルを作成しました。
今考えると、BigQueryクライアントライブラリを使えば問題なく作れますね(一度使うためだけに書くのが面倒ではありますが)。試してませんが、ブラウザで制御文字を入力してもできるかもしれません。

定期的に外部テーブルにクエリして結果を保存する

ここまでで、JSONログをBigQueryでクエリすることができるようになりました。
あとは、JSONをパースして必要なフィールドを抽出したテーブルをつくるだけです。

今回は1時間毎にhive partitionがつくられるようになっているため、1時間毎に外部テーブルをクエリして、結果をテーブルに出力するようにしました。

この定期的にクエリを実行するのにはBigQueryの機能である「スケジュールされたクエリ(Scheduled queries)」を使います。

文字通り定期的に指定したクエリを実行してくれる機能で、1日ごと1時間ごとなどの指定はもちろん、特殊な文法ではあるものの任意の間隔でクエリを実行することができます。
これにBigQueryの機能である「Destination Table」を合わせて使うことで、定期的にクエリを実行し、結果をテーブルに書き込むことができます。

クエリには、実行時間をTIMESTAMP型の変数 @run_time として使うことができます。

SELECT
  PARSE_TIMESTAMP('%Y-%m-%dT%H:%M:%E3S%z', JSON_EXTRACT_SCALAR(fields, '$.timestamp')) AS `timestamp`,
  JSON_EXTRACT_SCALAR(fields, '$.message') AS `message`,
  STRUCT<
    `id` STRING,
    `type` STRING
  >(
    JSON_EXTRACT_SCALAR(fields, '$.req.id'),
    JSON_EXTRACT_SCALAR(fields, '$.req.type')
  ) AS `req`,
  JSON_EXTRACT(fields, '$.buyers') AS `buyers`,
  STRUCT<
    `creative_id` STRING,
    `price` FLOAT64
  >(
    JSON_EXTRACT_SCALAR(fields, '$.res.creative_id'),
    CAST(JSON_EXTRACT_SCALAR(fields, '$.res.price') AS FLOAT64)
  ) AS `res`,
  STRUCT<
    `req` STRING,
    `res` STRING
  >(
    JSON_EXTRACT(fields, '$.debug.req'),
    JSON_EXTRACT(fields, '$.debug.res')
  ) AS `debug`
FROM `project.dataset.external_table`
WHERE ts = FORMAT_TIMESTAMP('%Y%m%d%H', TIMESTAMP_ADD(@run_time, INTERVAL -1 HOUR))

Destination Tableのテーブル名にも変数を使うことができます。
UTCからJSTは+9時間ですが、1時間前のデータなので+8hとしています(今回は日別にテーブルを作って append するようにしています)。

table_{run_time+8h|"%Y%m%d"}

まとめ

S3に置いたログファイルを、ゆるふわなままBigQueryで扱うまでの流れを紹介しました。参考になれば幸いです。

BigQueryは細かい調整をせずとも、大抵の問題をお金とパワーで解決できてしまいますが、スキーマなどどうしても融通がきかないこともあります。
そんなときでも、ちょっとした工夫をしてあげれば便利に使えますし、そういった工夫をしやすくするための機能も豊富で、とても魅力的なサービスだと思います。

fluctでは、そんなゆるふわデータを扱ったり、ゆるふわにならないようしっかりスキーマを定義したりして、一緒に事業を作っていく仲間を募集しています。

株式会社fluct SSP開発本部 ソフトウェアエンジニア | 株式会社VOYAGE GROUP
株式会社fluct エクスペリエンスデザインセンター フロントエンドエンジニア | 株式会社VOYAGE GROUP