メインコンテンツまでスキップ
バージョン: 4.x

PostgreSQL から VeloDB へのデータ取り込み(Flink CDC を使用)

前提条件

VeloDB Cloud アカウント(必須)

アクティブなクラスターを持つ VeloDB Cloud アカウントが必要です。アカウントをお持ちでない場合は、VeloDB Quick Start Guide に従ってアカウントとクラスターを作成してください。

VeloDB で SQL クエリを実行し、データベースを作成するには、VeloDB Console サイドバーの組み込み SQL Editor を使用してください:

SQL Editor

PostgreSQL ソース設定

PostgreSQL データベースが CDC 用に設定されていることを確認してください:

ソースタイプセットアップガイド
セルフホスト PostgreSQLLocal PostgreSQL Source Setup
Amazon Aurora RDS PostgreSQLAurora RDS PostgreSQL Source Setup
Amazon Aurora Serverless v2 PostgreSQLAurora Serverless PostgreSQL Source Setup

必須の PostgreSQL 設定:

  • wal_level = logical
  • max_replication_slots >= 1
  • max_wal_senders >= 1
  • REPLICATION, LOGIN 権限とTableに対する SELECT 権限を持つ CDC ユーザー
  • Aurora の場合:rds_replicationrds_superuserpg_read_all_data ロールを持つ CDC ユーザー

1. (オプション)サンプルTableの作成

PostgreSQL データベースで以下の SQL を実行してください:

CREATE DATABASE test_db;

\c test_db

CREATE TABLE student (
id INT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT,
email VARCHAR(255),
phone VARCHAR(20),
score NUMERIC(5,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO student (id, name, age, email, phone, score) VALUES
(1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50),
(2, 'Bob Li', 21, 'bob@example.com', '13900139000', 76.80),
(3, 'Charlie Wang', 23, 'charlie@example.com', '13600136000', 92.00);
備考

DELETE操作をキャプチャする場合: TableでREPLICA IDENTITY FULLを有効にしてください:

ALTER TABLE student REPLICA IDENTITY FULL;

これにより、DELETEイベントに主キーだけでなく、完全な行データが含まれることが保証されます。


Flink、必要なコネクタ、およびJDKがすべて含まれた事前パッケージ化されたFlinkバンドルをダウンロードしてください:

curl -O https://apache-doris-releases.oss-accelerate.aliyuncs.com/extention/flink-1.17.2-with-doris-connector.tar.gz
tar -xzf flink-1.17.2-with-doris-connector.tar.gz
cd flink-1.17.2-with-doris-connector
export JAVA_HOME=$(pwd)/jdk
備考

このパッケージには、Flink 1.17.2、Doris Connector、PostgreSQL CDC Connector、JDBCドライバー、およびLinux用のバンドルされたJDKが含まれています。別途Javaのインストールは不要です。


3. VeloDBでターゲットデータベースを作成

Flink CDCを実行する前に、SQL Editorを使用してVeloDBにターゲットデータベースを作成します:

CREATE DATABASE IF NOT EXISTS <your_velodb_database>;
備考

データベースはVeloDBに存在する必要があります。TableはPostgreSQLスキーマに基づいてFlink Doris Connectorによって自動作成されます。


4. 実行

bin/flink run -t local \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database "<YOUR_VELODB_DATABASE>" \
--postgres-conf hostname="<YOUR_POSTGRES_HOST>" \
--postgres-conf port="5432" \
--postgres-conf username="<YOUR_POSTGRES_USER>" \
--postgres-conf password="<YOUR_POSTGRES_PASSWORD>" \
--postgres-conf database-name="<YOUR_POSTGRES_DATABASE>" \
--postgres-conf schema-name="public" \
--postgres-conf slot.name="velodb_cdc_slot" \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables ".*" \
--sink-conf fenodes="<YOUR_VELODB_HOST>:8080" \
--sink-conf username="<YOUR_VELODB_USER>" \
--sink-conf password="<YOUR_VELODB_PASSWORD>" \
--sink-conf jdbc-url="jdbc:mysql://<YOUR_VELODB_HOST>:9030" \
--sink-conf sink.label-prefix=postgres_cdc
備考

Aurora/RDS の場合: --postgres-conf publication.name="<YOUR_PUBLICATION_NAME>" パラメータを追加してください。


5. 検証

VeloDB ConsoleのSQL Editorに移動し、データベースを選択します。

VeloDBでの初期同期を確認する:

SELECT * FROM student;

期待される結果:

+----+--------------+------+---------------------+-------------+-------+---------------------+
| id | name | age | email | phone | score | created_at |
+----+--------------+------+---------------------+-------------+-------+---------------------+
| 1 | Alice Zhang | 22 | alice@example.com | 13800138000 | 89.50 | 2025-12-22 14:30:25 |
| 2 | Bob Li | 21 | bob@example.com | 13900139000 | 76.80 | 2025-12-22 14:30:25 |
| 3 | Charlie Wang | 23 | charlie@example.com | 13600136000 | 92.00 | 2025-12-22 14:30:25 |
+----+--------------+------+---------------------+-------------+-------+---------------------+

リアルタイムCDCのテスト:

-- Run in PostgreSQL
INSERT INTO student (id, name, age, email, phone, score)
VALUES (4, 'David Chen', 24, 'david@example.com', '13400134000', 88.75);

UPDATE student SET score = 95.00 WHERE id = 2;

DELETE FROM student WHERE id = 3;
-- Verify in VeloDB (changes appear within seconds)
SELECT * FROM student ORDER BY id;

期待される結果:

+----+-------------+------+-------------------+-------------+-------+---------------------+
| id | name | age | email | phone | score | created_at |
+----+-------------+------+-------------------+-------------+-------+---------------------+
| 1 | Alice Zhang | 22 | alice@example.com | 13800138000 | 89.50 | 2025-12-22 14:30:25 |
| 2 | Bob Li | 21 | bob@example.com | 13900139000 | 95.00 | 2025-12-22 14:30:25 |
| 4 | David Chen | 24 | david@example.com | 13400134000 | 88.75 | 2025-12-22 14:35:40 |
+----+-------------+------+-------------------+-------------+-------+---------------------+
-- id=2: score updated 76.80 → 95.00
-- id=3: deleted
-- id=4: inserted

参考資料