PostgreSQL から VeloDB へのデータ取り込み(Flink CDC を使用)
前提条件
VeloDB Cloud アカウント(必須)
アクティブなクラスターを持つ VeloDB Cloud アカウントが必要です。アカウントをお持ちでない場合は、VeloDB Quick Start Guide に従ってアカウントとクラスターを作成してください。
VeloDB で SQL クエリを実行し、データベースを作成するには、VeloDB Console サイドバーの組み込み SQL Editor を使用してください:

PostgreSQL ソース設定
PostgreSQL データベースが CDC 用に設定されていることを確認してください:
| ソースタイプ | セットアップガイド |
|---|---|
| セルフホスト PostgreSQL | Local PostgreSQL Source Setup |
| Amazon Aurora RDS PostgreSQL | Aurora RDS PostgreSQL Source Setup |
| Amazon Aurora Serverless v2 PostgreSQL | Aurora Serverless PostgreSQL Source Setup |
必須の PostgreSQL 設定:
wal_level = logicalmax_replication_slots >= 1max_wal_senders >= 1REPLICATION, LOGIN権限とTableに対するSELECT権限を持つ CDC ユーザー- Aurora の場合:
rds_replication、rds_superuser、pg_read_all_dataロールを持つ CDC ユーザー
1. (オプション)サンプルTableの作成
PostgreSQL データベースで以下の SQL を実行してください:
CREATE DATABASE test_db;
\c test_db
CREATE TABLE student (
id INT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT,
email VARCHAR(255),
phone VARCHAR(20),
score NUMERIC(5,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO student (id, name, age, email, phone, score) VALUES
(1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50),
(2, 'Bob Li', 21, 'bob@example.com', '13900139000', 76.80),
(3, 'Charlie Wang', 23, 'charlie@example.com', '13600136000', 92.00);
DELETE操作をキャプチャする場合: TableでREPLICA IDENTITY FULLを有効にしてください:
ALTER TABLE student REPLICA IDENTITY FULL;
これにより、DELETEイベントに主キーだけでなく、完全な行データが含まれることが保証されます。
2. Install Flink with Connectors
- Pre-packaged Bundle (Recommended)
- Manual Download
Flink、必要なコネクタ、およびJDKがすべて含まれた事前パッケージ化されたFlinkバンドルをダウンロードしてください:
curl -O https://apache-doris-releases.oss-accelerate.aliyuncs.com/extention/flink-1.17.2-with-doris-connector.tar.gz
tar -xzf flink-1.17.2-with-doris-connector.tar.gz
cd flink-1.17.2-with-doris-connector
export JAVA_HOME=$(pwd)/jdk
このパッケージには、Flink 1.17.2、Doris Connector、PostgreSQL CDC Connector、JDBCドライバー、およびLinux用のバンドルされたJDKが含まれています。別途Javaのインストールは不要です。
コンポーネントを個別にダウンロードする場合:
| Package | File |
|---|---|
| Flink 1.17 | flink-1.17.2-bin-scala_2.12.tgz |
| Flink PostgreSQL CDC | flink-sql-connector-postgres-cdc-3.1.0.jar |
| Flink Doris Connector | flink-doris-connector-1.17-25.1.0.jar |
| PostgreSQL JDBC Driver | postgresql-42.7.1.jar |
| MySQL JDBC Driver | mysql-connector-j-8.0.33.jar |
# Download Flink
curl -O https://archive.apache.org/dist/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
tar -xzf flink-1.17.2-bin-scala_2.12.tgz
# Download connectors to lib/
curl -o flink-1.17.2/lib/flink-sql-connector-postgres-cdc-3.1.0.jar \
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.1.0/flink-sql-connector-postgres-cdc-3.1.0.jar
curl -o flink-1.17.2/lib/flink-doris-connector-1.17-25.1.0.jar \
https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.17/25.1.0/flink-doris-connector-1.17-25.1.0.jar
curl -o flink-1.17.2/lib/postgresql-42.7.1.jar \
https://jdbc.postgresql.org/download/postgresql-42.7.1.jar
curl -o flink-1.17.2/lib/mysql-connector-j-8.0.33.jar \
https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
3. VeloDBでターゲットデータベースを作成
Flink CDCを実行する前に、SQL Editorを使用してVeloDBにターゲットデータベースを作成します:
CREATE DATABASE IF NOT EXISTS <your_velodb_database>;
データベースはVeloDBに存在する必要があります。TableはPostgreSQLスキーマに基づいてFlink Doris Connectorによって自動作成されます。
4. 実行
- Local
- Standalone
- YARN
- Kubernetes
bin/flink run -t local \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database "<YOUR_VELODB_DATABASE>" \
--postgres-conf hostname="<YOUR_POSTGRES_HOST>" \
--postgres-conf port="5432" \
--postgres-conf username="<YOUR_POSTGRES_USER>" \
--postgres-conf password="<YOUR_POSTGRES_PASSWORD>" \
--postgres-conf database-name="<YOUR_POSTGRES_DATABASE>" \
--postgres-conf schema-name="public" \
--postgres-conf slot.name="velodb_cdc_slot" \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables ".*" \
--sink-conf fenodes="<YOUR_VELODB_HOST>:8080" \
--sink-conf username="<YOUR_VELODB_USER>" \
--sink-conf password="<YOUR_VELODB_PASSWORD>" \
--sink-conf jdbc-url="jdbc:mysql://<YOUR_VELODB_HOST>:9030" \
--sink-conf sink.label-prefix=postgres_cdc
Aurora/RDS の場合: --postgres-conf publication.name="<YOUR_PUBLICATION_NAME>" パラメータを追加してください。
Standalone クラスターデプロイメントの場合、Flink クラスターが稼働していることを確認し、-t remote を使用してジョブを送信してください:
bin/flink run -t remote \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database "<YOUR_VELODB_DATABASE>" \
--postgres-conf hostname="<YOUR_POSTGRES_HOST>" \
--postgres-conf port="5432" \
--postgres-conf username="<YOUR_POSTGRES_USER>" \
--postgres-conf password="<YOUR_POSTGRES_PASSWORD>" \
--postgres-conf database-name="<YOUR_POSTGRES_DATABASE>" \
--postgres-conf schema-name="public" \
--postgres-conf slot.name="velodb_cdc_slot" \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables ".*" \
--sink-conf fenodes="<YOUR_VELODB_HOST>:8080" \
--sink-conf username="<YOUR_VELODB_USER>" \
--sink-conf password="<YOUR_VELODB_PASSWORD>" \
--sink-conf jdbc-url="jdbc:mysql://<YOUR_VELODB_HOST>:9030" \
--sink-conf sink.label-prefix=postgres_cdc
YARN デプロイメントの場合、Hadoop が設定され、YARN にアクセス可能であることを確認してください:
bin/flink run -t yarn-per-job \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database "<YOUR_VELODB_DATABASE>" \
--postgres-conf hostname="<YOUR_POSTGRES_HOST>" \
--postgres-conf port="5432" \
--postgres-conf username="<YOUR_POSTGRES_USER>" \
--postgres-conf password="<YOUR_POSTGRES_PASSWORD>" \
--postgres-conf database-name="<YOUR_POSTGRES_DATABASE>" \
--postgres-conf schema-name="public" \
--postgres-conf slot.name="velodb_cdc_slot" \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables ".*" \
--sink-conf fenodes="<YOUR_VELODB_HOST>:8080" \
--sink-conf username="<YOUR_VELODB_USER>" \
--sink-conf password="<YOUR_VELODB_PASSWORD>" \
--sink-conf jdbc-url="jdbc:mysql://<YOUR_VELODB_HOST>:9030" \
--sink-conf sink.label-prefix=postgres_cdc
Kubernetesデプロイメントの場合、Flinkセッションクラスターが実行されていることを確認してください:
bin/flink run -t kubernetes-session \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
postgres-sync-database \
--database "<YOUR_VELODB_DATABASE>" \
--postgres-conf hostname="<YOUR_POSTGRES_HOST>" \
--postgres-conf port="5432" \
--postgres-conf username="<YOUR_POSTGRES_USER>" \
--postgres-conf password="<YOUR_POSTGRES_PASSWORD>" \
--postgres-conf database-name="<YOUR_POSTGRES_DATABASE>" \
--postgres-conf schema-name="public" \
--postgres-conf slot.name="velodb_cdc_slot" \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables ".*" \
--sink-conf fenodes="<YOUR_VELODB_HOST>:8080" \
--sink-conf username="<YOUR_VELODB_USER>" \
--sink-conf password="<YOUR_VELODB_PASSWORD>" \
--sink-conf jdbc-url="jdbc:mysql://<YOUR_VELODB_HOST>:9030" \
--sink-conf sink.label-prefix=postgres_cdc
5. 検証
VeloDB ConsoleのSQL Editorに移動し、データベースを選択します。
VeloDBでの初期同期を確認する:
SELECT * FROM student;
期待される結果:
+----+--------------+------+---------------------+-------------+-------+---------------------+
| id | name | age | email | phone | score | created_at |
+----+--------------+------+---------------------+-------------+-------+---------------------+
| 1 | Alice Zhang | 22 | alice@example.com | 13800138000 | 89.50 | 2025-12-22 14:30:25 |
| 2 | Bob Li | 21 | bob@example.com | 13900139000 | 76.80 | 2025-12-22 14:30:25 |
| 3 | Charlie Wang | 23 | charlie@example.com | 13600136000 | 92.00 | 2025-12-22 14:30:25 |
+----+--------------+------+---------------------+-------------+-------+---------------------+
リアルタイムCDCのテスト:
-- Run in PostgreSQL
INSERT INTO student (id, name, age, email, phone, score)
VALUES (4, 'David Chen', 24, 'david@example.com', '13400134000', 88.75);
UPDATE student SET score = 95.00 WHERE id = 2;
DELETE FROM student WHERE id = 3;
-- Verify in VeloDB (changes appear within seconds)
SELECT * FROM student ORDER BY id;
期待される結果:
+----+-------------+------+-------------------+-------------+-------+---------------------+
| id | name | age | email | phone | score | created_at |
+----+-------------+------+-------------------+-------------+-------+---------------------+
| 1 | Alice Zhang | 22 | alice@example.com | 13800138000 | 89.50 | 2025-12-22 14:30:25 |
| 2 | Bob Li | 21 | bob@example.com | 13900139000 | 95.00 | 2025-12-22 14:30:25 |
| 4 | David Chen | 24 | david@example.com | 13400134000 | 88.75 | 2025-12-22 14:35:40 |
+----+-------------+------+-------------------+-------------+-------+---------------------+
-- id=2: score updated 76.80 → 95.00
-- id=3: deleted
-- id=4: inserted