ELTツール+Dataform でBigQuery へのデータロード・チェック・マージを自動化してみた:CDataSync

by 宮本航太 | 2020年12月14日

こんにちは、CData Software でエンジニアをしている宮本です。

先日、Google からDataform がGCP の傘下になったとアナウンスがありました。

cloud.google.com

自分自身、Dataform というサービスを知らなかったのですが、Webサイトやドキュメントなどを読んでみると、どうやらELT というか、ロードしたあとのデータ変換(ELTのTの部分)に特化したサービスというのがなんとなくわかってきました。

Dataform が接続できるデータウェアハウスは、BigQuery をはじめRedShift やSnowflake などに接続して利用できるようです。
機能面では、例えばBigQuery ではいくつかの処理を1つにまとめてストアドプロシージャとして作成し、それを外部から呼ぶことはできますが、Dataform ではその部分がより扱いやすくなった印象です。(まだ全容を把握できていませんが)

今回はそのDataform でいくつかBigQuery 上で行う処理を作成し、外部のELT ツールからDataform を実行するような構成を組んでみたいと思います。

※ELT について 最近使ったETL、ELTサービス(ツール)でデータ収集タスクについて考える / etl-elt-datacollect-task - Speaker Deck

f:id:sennanvolar44:20201214182532p:plain

やってみること

f:id:sennanvolar44:20201214182347p:plain CData Sync でSaaS(今回はSalesforce) → BigQuery のロードを行ったあと、Dataform で①連携データのチェック、②連携用テーブルとメインテーブルのマージを行います。

※CDataSync では差分データのみの転送ができるが、BigQuery へはInsertAll で追加されていく形なので、メインテーブルを用意して差分データを毎回マージして管理するイメージ

ELT のEL (Extract Load)部分をCDataSync、T(Transform)をDataform という構成をとります。

CData Sync とは

クラウドサービスからデータベースへのノーコードレプリケートアプリケーションになります。レプリケートとは同期するという意味になりますので、CDataSync から接続できるクラウドサービスのデータをデータベースにまるっとコピーすることができます。
https://www.cdata.com/jp/
f:id:sennanvolar44:20200501163232p:plain

手順

CData Sync のインストール~ジョブ作成まで

データをBigQuery にロードする部分は、CDataSyncのインストールからSalesforce → BigQuery のジョブ実行を以下記事にて紹介していますので、本記事では省略します。

www.cdatablog.jp ※CDataSync は30日間の無償評価版があります。

ジョブ実行~メインテーブル作成

ジョブ実行

今回はLead_Sync という名前でBigQuery にテーブルを作成して連携するジョブを作成してみます。 以下のクエリをカスタムクエリボタンをクリックして入力します。

REPLICATE [Lead_Sync] SELECT * FROM [Lead] f:id:sennanvolar44:20201213222956p:plain

作成したらチェックボックスをオンにして、実行ボタンを押すことでBigQuery にSalesforce のLead データが連携されます。(22件連携されたようです)
f:id:sennanvolar44:20201213222102p:plain

Salesforce のLead データに変更がない状態でもう一度実行すると、0件で完了しました。なので差分更新が効いていることが確認できますね。

f:id:sennanvolar44:20201213222606p:plain

メインテーブル作成

連携用テーブルはLead_Sync という名前で先ほど作成しました。Lead_Sync には常に前回からの差分データだけを存在するようなテーブルとします。

ただ初回ジョブ実行時はLead_Sync に全件連携されてきていますので、それををもとにしてメインテーブルを作成します。

メインテーブルの作成はBigQuery コンソールで「テーブルコピー」&「Insert into select * ~」で作成しました。最後に、連携用テーブル(Lead_Sync) にTruncate をかけてレコードを全削除しておきます。
(ここの部分もDataform で多分やれるはずです)

これでBigQuery へロードするジョブと連携用テーブル、メインテーブルの作成が完了しました。

f:id:sennanvolar44:20201213230250p:plain

Dataform でプロジェクトを作成

ではここからDataform の作業です。 以下のリンクからアカウントを作成します。

Dataform | Manage data pipelines in BigQuery

アカウント作成後、プロジェクトを新規作成します。
f:id:sennanvolar44:20201213211828p:plain

BigQuery に接続するにはサービスアカウントでの接続が必要なようですので、サービスアカウント作成後、JSONタイプの秘密鍵をダウンロードしときます。
サービスアカウントのロールは今回はBigQuery 管理者を付与しときました。
f:id:sennanvolar44:20201213211954p:plain

これでプロジェクトができたのでDataform を始めることができますね。
f:id:sennanvolar44:20201213212653p:plain

Dataform で実行させるSQLを作成

Schemas を選択してみるとBigQuery にあるテーブルがずらーっと表示されています。
f:id:sennanvolar44:20201213213141p:plain

テーブルの中もプレビューできますね。
f:id:sennanvolar44:20201213213431p:plain

①連携データのチェック

最初に連携データのチェックを行うSQL を作成するのですが、Dataform ではアサーションを使用できるのでこれを使ってチェックする処理を作成します。

Test data quality with assertions | Dataform

では、FILES からdefinitions にマウスカーソルをもってきて「・・・」をクリックします。
f:id:sennanvolar44:20201213214038p:plain

New file をクリックします。
f:id:sennanvolar44:20201213214133p:plain

ファイル名を設定し、SQL かJavaScript を選べますが、今回はSQLX を選択します。(SQLXはSQLの拡張機能版ライブラリのようです)
f:id:sennanvolar44:20201213215035p:plain

今回はEmail アドレスが入っていないレコードがあるのかをチェックしていきます。以下のコードをDataform に登録します。

config {
  type: "assertion",
}

SELECT
  Id,Name,Email
FROM
  demo.Lead_Sync
WHERE 
  Email IS NULL

入力したら自動的にコンパイルされて右側に結果が表示されます。エラーの場合はエラー箇所が表示されるようになってました。
f:id:sennanvolar44:20201213235735p:plain

コンパイルされたクエリは下のRUN STATEMENT で実行できます。
f:id:sennanvolar44:20201214000147p:plain

では実際にアサーションを実行してみます。
f:id:sennanvolar44:20201214001157p:plain

引っかかった場合は、クエリ結果が表示されます。
f:id:sennanvolar44:20201214001411p:plain

ちなみに結果はテーブルで保存されるようです。

これでデータチェックの部分は準備できました。

②連携用テーブルとメインテーブルのマージ

また同じようにNew でファイルを作成します。
先ほどはAssertion を選択しましたが、今回はテーブルでもビューでもどちらでも大丈夫です。

以下のマージを行うSQL をセットしました。 対象レコードがない場合は追加する処理内容となっています。
※Lead_Main がメインテーブルです。

MERGE demo.Lead_Main T USING demo.Lead_Sync S ON T.Id = S.Id
WHEN MATCHED THEN
UPDATE
SET
  Id = S.Id,
  Name = S.Name,
  LastName = S.LastName,
  FirstName = S.FirstName,
  Title = S.Title,
  Company = S.Company,
  Phone = S.Phone,
  MobilePhone = S.MobilePhone,
  Fax = S.Fax,
  Email = S.Email,
  AnnualRevenue = S.AnnualRevenue,
  LastModifiedDate = S.LastModifiedDate
  WHEN NOT MATCHED THEN
INSERT
  (
    Id,
    Name,
    LastName,
    FirstName,
    Title,
    Company,
    Phone,
    MobilePhone,
    Fax,
    Email,
    AnnualRevenue,
    LastModifiedDate
  )
VALUES(
    Id,
    Name,
    LastName,
    FirstName,
    Title,
    Company,
    Phone,
    MobilePhone,
    Fax,
    Email,
    AnnualRevenue,
    LastModifiedDate
  )

これでDataform で実行するSQL が完成です。

スケジューリング設定

外部から実行するためにはスケジューリング設定を行って、実行するSQL をひとまとめにする必要があるようです。

では、Schedules & environments をクリックし、スケジューリング設定を行います。
今回はスケジューリング実行じゃなくて、外部からの実行となりますので、「Enable this schedule」はオフにしておきます。
f:id:sennanvolar44:20201214003523p:plain

外部から実行する際に、Schedule name が必要となりますのでコピーしておきます。

API Keyの取得&プロジェクトID の確認

Dataform での最後の作業はAPI Keyの取得&プロジェクトIDの確認です。

API KeyはホームボタンからProject settings をクリックします。
f:id:sennanvolar44:20201214004254p:plain

ここからAPI Key を取得することができます。
f:id:sennanvolar44:20201214004425p:plain

プロジェクトIDはURL のこの部分のID になります。
f:id:sennanvolar44:20201214004635p:plain

これでDataform での作業が終わりました。
最後にELT ツールのCData Sync から実行するように設定していきます。
※作成したSQL などはコミットしておく必要があるようです。コミット方法は画面上部のCOMMIT ボタン押下です。

外部(CDataSync)からDataform 実行

CData Sync では連携ジョブ実行後に行う処理を色々と定義することができるようになっています。

ジョブ画面のイベントタブのPost Job イベントに以下定義を追加します。

<!-- Code goes here -->
<api:set attr="http.url"  value="https://api.dataform.co/v1/project/xxxxxxxxxxxxx/run"/> 
<api:set attr="http.header:name#1"  value="Authorization"/>
<api:set attr="http.header:value#1"  value="Bearer 取得したAPIKey"/>
<api:set attr="http.header:name#2"  value="scheduleName"/>
<api:set attr="http.header:value#2"  value="SalesforceLeadTest"/>
<api:set attr="http.contenttype"  value="application/x-www-form-urlencoded"/>
<!-- ログを出力させたい場合↓↓ -->
<api:set attr="http.logfile"  value="C:\\log\\CDataSync-API\\cdatasync-dataform.log"/>
<api:set attr="http.verbosity"  value="5"/>
<!-- ログを出力させたい場合↑↑ -->

<api:call op="httpPost" in="http"/>

f:id:sennanvolar44:20201214005408p:plain

これで一連の処理の作成・設定が完了しました。

Salesforce のレコードを変更してみる

Salesforce でEmail がnull のレコードを作成しておきます。 現時点ではSalesforce でEmail 項目にnullのレコードが1件ある状態。
f:id:sennanvolar44:20201214010355p:plain

CData Sync から連携ジョブを実行

この状態でCData Sync のジョブを実行してみましょう。
結果はSaleforce 側で更新されたデータ1件が連携されました。
f:id:sennanvolar44:20201214010640p:plain

Dataform で実行結果をみてみると、マージ処理もアサーションも正常終了したようですね。
f:id:sennanvolar44:20201214024501p:plain

BigQuery コンソールでも確認してみます。
マージされたメインテーブルにEmail 項目がnull のレコードが反映されているのが確認できました。
f:id:sennanvolar44:20201214024954p:plain

また、dataform_assertions.assert_Lead というビューが作成されていて、アサーションでエラーとなったレコードが参照できるようになっていました。

f:id:sennanvolar44:20201214024625p:plain

これで、CData Sync でジョブをスケジューリングしてあげれば、あとは自動的にBigQuery のマージまでやってくれるようになります。
ちなみにDataform では結果をメールやSlack に通知できるようなので、エラーの時だけ通知がくるように設定することで運用も楽になりそうですね。

おわりに

いかがでしたでしょうか。Dataform のほんの一部分を使ってELT ツールと組み合わせてみました。マージ以外にも加工であったり、削除したりといろいろなケースに合わせて使えそうですね。
今日使ったCData Sync は30日間無償での利用が可能であり、Dataform については現在無料で使用できるようです。

www.cdata.com

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。