各製品の資料を入手。
詳細はこちら →CData
こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。
Apache Airflow を使うと、データエンジニアリングワークフローの作成、スケジューリング、および監視を行うことができます。CData JDBC Driver for AlloyDB と組み合わせることで、Airflow からリアルタイムAlloyDB のデータに連携できます。 この記事では、Apache Airflow インスタンスからAlloyDB のデータに接続してクエリを実行し、結果をCSV ファイルに保存する方法を紹介します。
最適化されたデータ処理が組み込まれたCData JDBC Driver は、リアルタイムAlloyDB のデータを扱う上で高いパフォーマンスを提供します。 AlloyDB にSQL クエリを発行すると、CData ドライバーはフィルタや集計などのAlloyDB 側でサポートしているSQL 操作をAlloyDB に直接渡し、サポートされていない操作(主にSQL 関数とJOIN 操作)は組み込みSQL エンジンを利用してクライアント側で処理します。 組み込みの動的メタデータクエリを使用すると、ネイティブのデータ型を使ってAlloyDB のデータを操作および分析できます。
JDBC URL の作成の補助として、AlloyDB JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.alloydb.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
AlloyDB に接続するには、次の接続プロパティが必要です。
標準認証で接続する場合は、これ以上のアクションは必要ありません。
CData 製品がサポートしている他の認証方法では、AlloyDB サーバー上のpg_hba.conf ファイルで有効化する必要があります。
AlloyDB サーバーでの認証の設定については、こちらを参照してください。
MD5
pg_hba.conf ファイルのauth-method をmd5 に設定すると、MD5 パスワード検証を使用して認証できます。
SASL
CData 製品は、SASL(特にSCRAM-SHA-256)でパスワードを検証することで認証できます。
この認証方法を使用するには、pg_hba.conf ファイルのauth-method をscram-sha-256 に設定します。
Kerberos 認証は、CData 製品が接続を試行している際にAlloyDB サーバーで開始されます。この認証方法を有効化するには、AlloyDB サーバーでKerberos を設定します。AlloyDB サーバーでのKerberos 認証の設定を完了したら、CData 製品からKerberos 認証を行う方法については、ヘルプドキュメントの「Kerberos の使用」セクションを参照してください。
クラスタ環境またはクラウドでJDBC ドライバーをホストするには、ライセンス(フルまたはトライアル)およびランタイムキー(RTK)が必要です。本ライセンス(またはトライアル)の取得については、こちらからお問い合わせください。
以下は、JDBC 接続で要求される必須プロパティです。
プロパティ | 値 |
---|---|
Database Connection URL |
jdbc:alloydb:RTK=5246...;User=alloydb;Password=admin;Database=alloydb;Server=127.0.0.1;Port=5432
|
Database Driver Class Name | cdata.jdbc.alloydb.AlloyDBDriver |
jdbc:alloydb:RTK=5246...;User=alloydb;Password=admin;Database=alloydb;Server=127.0.0.1;Port=5432
Airflow におけるDAG は、ワークフローのプロセスを格納するエンティティであり、DAG にトリガーを設定することでワークフローを実行することができます。 今回のワークフローでは、シンプルにAlloyDB のデータに対してSQL クエリを実行し、結果をCSV ファイルに格納します。
import time from datetime import datetime from airflow.decorators import dag, task from airflow.providers.jdbc.hooks.jdbc import JdbcHook import pandas as pd # Dag の宣言 @dag(dag_id="alloydb_hook", schedule_interval="0 10 * * *", start_date=datetime(2022,2,15), catchup=False, tags=['load_csv']) # Dag となる関数を定義(取得するテーブルは必要に応じて変更してください) def extract_and_load(): # Define tasks @task() def jdbc_extract(): try: hook = JdbcHook(jdbc_conn_id="jdbc") sql = """ select * from Account """ df = hook.get_pandas_df(sql) df.to_csv("/{some_file_path}/{name_of_csv}.csv",header=False, index=False, quoting=1) # print(df.head()) print(df) tbl_dict = df.to_dict('dict') return tbl_dict except Exception as e: print("Data extract error: " + str(e)) jdbc_extract() sf_extract_and_load = extract_and_load()