製品をチェック

製品の詳細・30日間の無償トライアルはこちら

CData Sync

Apache Kafka へのMySQL のデータのETL / ELT パイプラインを作ってデータを統合する方法

ETL / ELT ツールのCData Sync を使って、MySQL のデータのCDC を使ったApache Kafka へのETL パイプラインをノーコードで作成する方法を解説します。

宮本航太
プロダクトスペシャリスト

最終更新日:2024-01-19
mysql ロゴ

CData

sync ロゴ画像
Kafka ロゴ

こんにちは!プロダクトスペシャリストの宮本です。

データ分析基盤へのMySQL のデータの取り込みのニーズが高まっています。CData Sync は、数百のSaaS / DB のデータをApache Kafka をはじめとする各種DB / データウェアハウスにノーコードで統合・レプリケーション(複製)が可能なETL / ELT ツールです。

本記事では、MySQL のデータをCData Sync を使ってApache Kafka に統合するデータパイプラインを作っていきます。

CData Sync とは?

CData Sync は、レポーティング、アナリティクス、機械学習、AI などで使えるよう、社内のデータを一か所に統合して管理できるデータ基盤をノーコードで構築できるETL ツールで、以下の特徴を持っています。

  1. MySQL をはじめとする数百種類のSaaS / DB データに対応
  2. Apache Kafka など多くのRDB、データレイク、データストア、データウェアハウスに同期可能
  3. 業務データのデータ分析基盤へのETL / ELT 機能に特化し、極限まで設定操作をシンプルに
  4. 主要なSaaS データの差分更新やCDC(Change Data Capture、変更データキャプチャ)のサポート
  5. フレキシブルなSQL / dbt 連携での取得データの変換

CData Sync を使い始める

CData Sync はフルマネージド(SaaS)型・オンプレミス型・AWS でのホスティング、と多様なホスティング環境に対応しています。各オプションで無償トライアルを提供していますので、自社のニーズにフィットするオプションを以下から選択してお試しください。

無償トライアルへ

CData Sync では、1.データソースとしてMySQL の接続を設定、2.同期先としてApache Kafka の接続を設定、3.MySQL からApache Kafka へのレプリケーションジョブの作成、という3つのステップだけでレプリケーション処理を作成可能です。以下に具体的な設定手順を説明します。

1. データソースとしてMySQL の接続を設定

まずはじめに、CData Sync のブラウザ管理コンソールにログインします。CData Sync のインストールをまだ行っていない方は、本記事の製品リンクから「CData Sync」をクリックしてCData Sync をインストールしてください。30日間の無償トライアルをご利用いただけます。インストール後にCData Sync が起動して、ブラウザ設定画面が開きます。

それでは、データソース側にMySQL を設定していきましょう。左の「接続」タブをクリックします。

  1. 「+接続の追加」ボタンをクリックします。 コネクションの追加。
  2. 「データソース」タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、MySQL を見つけます。
  3. MySQL の右側の「→」をクリックして、MySQL アカウントへの接続画面を開きます。もし、MySQL のコネクタがデフォルトでCData Sync にインストールされていない場合には、ダウンロードアイコン(コネクタのアップロードアイコン)をクリックし、「ダウンロード」をクリックすると、CData Sync にコネクタがインストールされます。 データソースの追加。
  4. 接続プロパティにMySQL に接続するアカウント情報を入力をします。

    Server およびPort プロパティがMySQL への接続には必須です。IntegratedSecurity をFALSE に設定した場合、User、Password も必須になります。 オプションで、Database を設定することもできます。Database は設定がない場合すべてのデータベースを使えるようになります。

    パスワード方式によるSSH 接続

    パスワード方式によるSSH接続時に必要なプロパティ一覧を以下に示します。

    • User: MySQL のユーザ
    • Password: MySQL のパスワード
    • Database: MySQL の接続先データベース
    • Server: MySQL のサーバー
    • Port: MySQL のポート
    • UserSSH: "true"
    • SSHAuthMode: "Password"
    • SSHPort: SSH のポート
    • SSHServer: SSH サーバー
    • SSHUser: SSH ユーザー
    • SSHPassword: SSH パスワード

    接続文字列形式では以下のようになります。

    User=admin;Password=adminpassword;Database=test;Server=mysql-server;Port=3306;UseSSH=true;SSHAuthMode=Password;SSHPort=22;SSHServer=ssh-server;SSHUser=root;SSHPassword=sshpasswd;

    公開鍵認証方式方式によるSSH 接続

    公開鍵認証によるSSH接続時に必要なプロパティ一覧を以下に示します。

    • User: MySQL のユーザ
    • Password: MySQL のパスワード
    • Database: MySQL の接続先データベース
    • Server: MySQL のサーバー
    • Port: MySQL のポート
    • UserSSH: "true"
    • SSHAuthMode: "Public_Key"
    • SSHClientCertType: キーストアの種類
    • SSHPort: SSH のポート
    • SSHServer: SSH サーバー
    • SSHUser: SSH ユーザー
    • SSHClientCert: 秘密鍵ファイルのパス

    接続文字列形式では以下のようになります。

    User=admin;Password=adminpassword;Database=test;Server=mysql-server;Port=3306;UseSSH=true;SSHAuthMode=Public_Key;SSHClientCertType=PUBLIC_KEY_FILE;SSHPort=22;SSHServer=ssh-server;SSHUser=root;SSHClientCert=C:\Keys\key.pem; データソースの追加。
  5. 「作成およびテスト」をクリックして、正しくMySQL に接続できているかをテストして保存します。これでレプリケーションのデータソースとしてMySQL への接続が設定されました。

MySQL の変更データキャプチャ(CDC 機能)とは?

MySQL では、binlog というサーバーから出力されるバイナリログがあり、レコードの更新内容をログとして保存してくれます。MySQL 8.0 以降ではデフォルトでこのbinlog 設定が[有効]になっています。8.0以前のMySQL を使っている場合には、binlog を有効化する必要があります。

このbinlog のレコード更新ログをCData Sync が読み込むことで、MySQL の変更をCData Sync の同期先DB/DWH に反映します。CDC を使わない場合だとMySQL の対象となるテーブルデータを全件SELECT してDB に負荷をかけてしまったり、フィルタリング条件を上手に設定するような手間がかかります。

Aurora MySQL におけるCDC の設定方法については、こちらのページをご確認ください。

2. 同期先としてApache Kafka の接続を設定

次に、MySQL のデータを書き込む先(=同期先)として、Apache Kafka を設定します。同じく「接続」タブを開きます。

  1. 「+接続の追加」ボタンをクリックします。
  2. 「同期先」タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、Apache Kafka を見つけます。
  3. Apache Kafka の右側の「→」をクリックして、Apache Kafka データベースへの接続画面を開きます。もし、Apache Kafka のコネクタがデフォルトでCData Sync にインストールされていない場合には、ダウンロードアイコン(コネクタのアップロードアイコン)をクリックし、「ダウンロード」をクリックすると、CData Sync にコネクタがインストールされます。 Kafka を同期先に設定
  4. 必要な接続プロパティを入力します。

    • Bootstrap Servers - 接続するApache Kafka Bootstrap サーバーのアドレスを設定。
    • Auth Scheme - 認証スキームを選択。デフォルト設定はPlain で、ユーザーのログイン情報を使用します。
    • User - Apache Kafka への認証に使用するユーザー名を入力。
    • Password - Apache Kafka への認証に使用するパスワードを入力。
    • Type Detection Scheme - 使用する型検出用スキーム(NoneRowScanSchemaRegistryMessageOnly)を指定。デフォルトは「None」です。
    • Use SSL - Secure Sockets Layer(SSL)プロトコルを使用するかどうかを指定。デフォルト値はFalse です。
  5. 「作成およびテスト」をクリックして、正しく接続できているかをテストします。 同期先接続の設定
  6. これで同期先としてApache Kafka を設定できました。CData Sync では、Apache Kafka のデータベース名を指定するだけで同期するMySQL に併せたテーブルスキーマを自動的に作成(CREATE TABLE)してくれます。同期データに合わせたテーブルを事前に作成するなどの面倒な手順は必要ありません。もちろん、既存テーブルにマッピングを行いデータ同期を行うことも可能です。

3. MySQL からApache Kafka へのレプリケーションジョブの作成

CData Sync では、レプリケーションをジョブ単位で設定します。ジョブは、MySQL からApache Kafka という単位で設定し、複数のテーブルを含むことができます。レプリケーションジョブ設定には、「ジョブ」タブに進み、「+ジョブを追加」ボタンをクリックします。 ジョブの追加Salesforce の例)。

「ジョブを追加」画面が開き、以下を入力します:

  1. 名前:ジョブの名前
  2. データソース:ドロップダウンリストから先に設定したMySQL を選択
  3. 同期先:先に設定したApache Kafka を選択
データソースの設定Salesforce の例)。

すべてのオブジェクトをレプリケーションする場合

MySQL のすべてのオブジェクト / テーブルをレプリケーションするには、「種類」セクションで「すべて同期」を選択して、「ジョブを追加」ボタンで確定します。

作成したジョブ画面で、右上の「▷実行」ボタンをクリックするだけで、全MySQL テーブルのApache Kafka への同期を行うことができます。

オブジェクトを選択してレプリケーションする場合

MySQL から特定のオブジェクト / テーブルを選択してレプリケーションを行うことが可能です。「種類」セクションでは、「標準(個別設定)」を選んでください。

次に「ジョブ」画面で、「タスク」タブをクリックし、「タスクを追加」ボタンをクリックします。 ジョブへのタスク追加Salesforce の例)。

するとCData Sync で利用可能なオブジェクト / テーブルのリストが表示されるので、レプリケーションを行うオブジェクトにチェックを付けます(複数選択可)。「ジョブを追加」ボタンで確定します。

タスク選択(Salesforce の例)。

作成したジョブ画面で、「▷実行」ボタンをクリックして(もしくは各タスク毎の実行ボタンを押して)、レプリケーションジョブを実行します。 作成したジョブの実行(Salesforce の例)。

このようにとても簡単にMySQL からApache Kafka への同期を行うことができました。

CData Sync の主要な機能を試してみる:スケジューリング・差分更新・ETL

ジョブのスケジュール起動設定

CData Sync では、同期ジョブを1日に1回や15分に1回などのスケジュール起動をすることができます。ジョブ画面の「概要」タブから「スケジュール」パネルを選び、「⚙設定」ボタンをクリックします。「間隔」と同期時間の「毎時何分」を設定し、「保存」を押して設定を完了します。これでCData Sync が同期ジョブをスケジュール実行してくれます。ユーザーはダッシュボードで同期ジョブの状態をチェックするだけです。 スケジュール実行設定。

差分更新

CData Sync では、主要なデータソースでは、差分更新が可能です。差分更新では、最後のジョブ実行時からデータソース側でデータの追加・変更があったデータだけを同期するので、レプリケーションのクエリ・通信のコストを圧倒的に抑えることが可能です。

差分更新を有効化するには、ジョブの「概要」タブから「差分更新」パネルを選び、「⚙設定」ボタンをクリックします。「開始日」と「レプリケーション間隔」を設定して、「保存」します。

SQL での取得データのカスタマイズ

CData Sync は、デフォルトではMySQL のオブジェクト / テーブルをそのままApache Kafka に複製しますが、ここにSQL、またはdbt 連携でのETL 処理を組み込むことができます。テーブルカラムが多すぎる場合や、データ管理の観点から一部のカラムだけをレプリケーションしたり、さらにデータの絞り込み(フィルタリング)をしたデータだけをレプリケーションすることが可能です。

ジョブの「概要」タブ、「タスク」タブへと進みます。選択されたタスク(テーブル)の「▶」の左側のメニューをクリックし、「編集」を選びます。タスクの編集画面が開きます。

UI からカラムを選択する場合には、「カラム」タブから「マッピング編集」をクリックします。レプリケーションで使用しないカラムからチェックを外します。

SQL を記述して、フィルタリングなどのカスタマイズを行うには、「クエリ」タブをクリックし、REPLICATE 「テーブル名」の後に標準SQL でフィルタリングを行います。 レプリケーションのカスタマイズ設定。

MySQL からApache Kafka へのデータ同期には、ぜひCData Sync をご利用ください

このようにノーコードで簡単にMySQL のデータをApache Kafka にレプリケーションできます。データ分析、AI やノーコードツールからのデータ利用などさまざまな用途でCData Sync をご利用いただけます。30日の無償トライアルで、シンプルでパワフルなデータパイプラインを体感してください。

日本のユーザー向けにCData Sync は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。

CData Sync の 導入事例を併せてご覧ください。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。