Cloud Storage + Cloud Functions + BigQuery で Avro ファイルを load するサンプルコード

Cloud Storage へアップロードされた Avro ファイルを Cloud Functions で BigQuery へ load するサンプルコードをご紹介します。

Google Cloud

背景 Cloud Storage -> Cloud Functions -> BigQuery

本記事では、以下のフローを実現するための事前準備と Cloud Functions サンプルコードをご紹介します。

  1. Cloud Storage へ Avro ファイルをアップロード
  2. Cloud Functions で BigQuery へ load

事前準備

BigQuery データセット作成

bq mk --dataset \
--location=asia-northeast1 \
your-dataset-name

Cloud Storage バケット作成

gsutil mb \
-c standard \
-l asia-northeast1 \
-p your-project-name \
gs://your-bucket-name

Cloud Functions へ Deploy

gcloud functions deploy loadTableGCSAvro \
--region asia-northeast1 \
--runtime nodejs14 \
--trigger-resource your-bucket-name \
--trigger-event google.storage.object.finalize

Cloud Functions サンプルコード

Cloud Storage へアップロードされた Avro ファイルを BigQuery へ load する Cloud Functions のサンプルコードは以下のとおりです。

'use strict';
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

const bigquery = new BigQuery();
const storage = new Storage();

exports.loadTableGCSAvro = async (file, context) => {
  console.info(`Uploaded file: ${file.name}`);

  if (file.size === '0') {
    console.info('file.size is 0');
    return;
  }

  // file.name の例: bucketName/fileName/20210801-1235.avro
  const fileNames = file.name.split('/');
  if (fileNames.length !== 3) {
    console.error(`fileName is invalid: ${file.name}`);
    return;
  }

  const datasetId = fileNames[0];
  const tableId = fileNames[1];
  await bqLoadAvro(file.bucket, file.name, datasetId, tableId);
};

async function bqLoadAvro(bucketName, fileName, datasetId, tableId) {

  // @doc https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const jobConfigurationLoad = {
    sourceFormat: 'AVRO',
    useAvroLogicalTypes: true,
    writeDisposition: 'WRITE_APPEND'
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(fileName), jobConfigurationLoad);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

以上、Cloud Storage へアップロードされた Avro ファイルを Cloud Functions で BigQuery へ load させたい、現場からお送りしました。

参考情報