Migrating Data from Other OLTP
There are various ways to migrate data from other TP systems, such as MySQL/SqlServer/Oracle, to Doris.
Multi-Catalog
Use the Catalog to map as an external table, and then use the INSERT INTO or CREATE-TABLE-AS-SELECT statements to complete the data load.
For example, with MySQL:
CREATE CATALOG mysql_catalog properties(
'type' = 'jdbc',
'user' = 'root',
'password' = '123456',
'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db',
'driver_url' = 'mysql-connector-java-8.0.25.jar',
'driver_class' = 'com.mysql.cj.jdbc.Driver'
);
-- Load via INSERT
INSERT INTO internal.doris_db.tbl1
SELECT * FROM iceberg_catalog.iceberg_db.table1;
-- Load via CTAS
CREATE TABLE internal.doris_db.tbl1
PROPERTIES('replication_num' = '1')
AS
SELECT * FROM iceberg_catalog.iceberg_db.table1;
For more details, refer to Catalog Data Load。
Flink Doris Connector
You can leverage Flink to achieve offline and real-time synchronization for TP systems.
-
Offline synchronization can be done using Flink's JDBC Source and Doris Sink to complete the data load. For example, using FlinkSQL:
CREATE TABLE student_source ( id INT, name STRING, age INT PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'students', 'username' = 'username', 'password' = 'password', ); CREATE TABLE student_sink ( id INT, name STRING, age INT ) WITH ( 'connector' = 'doris', 'fenodes' = '127.0.0.1:8030', 'table.identifier' = 'test.students', 'username' = 'root', 'password' = 'password', 'sink.label-prefix' = 'doris_label' ); INSERT into student_sink select * from student_source;
For more details, refer to Flink JDBC (opens in a new tab)。
-
Real-time synchronization can be achieved using FlinkCDC to read both full and incremental data. For example, using FlinkSQL:
SET 'execution.checkpointing.interval' = '10s'; CREATE TABLE cdc_mysql_source ( id int ,name VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = 'password', 'database-name' = 'database', 'table-name' = 'table' ); -- Supports synchronization of insert/update/delete events. CREATE TABLE doris_sink ( id INT, name STRING ) WITH ( 'connector' = 'doris', 'fenodes' = '127.0.0.1:8030', 'table.identifier' = 'database.table', 'username' = 'root', 'password' = '', 'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true', 'sink.enable-delete' = 'true', -- Synchronize delete events. 'sink.label-prefix' = 'doris_label' ); insert into doris_sink select id,name from cdc_mysql_source;
For synchronizing an entire database or multiple tables in a TP database, you can use the full-database synchronization feature provided by the Flink Doris Connector to complete the TP database write with a single click, as shown below:
<FLINK_HOME>bin/flink run \ -Dexecution.checkpointing.interval=10s \ -Dparallelism.default=1 \ -c org.apache.doris.flink.tools.cdc.CdcTools \ lib/flink-doris-connector-1.16-24.0.1.jar \ mysql-sync-database \ --database test_db \ --mysql-conf hostname=127.0.0.1 \ --mysql-conf port=3306 \ --mysql-conf username=root \ --mysql-conf password=123456 \ --mysql-conf database-name=mysql_db \ --including-tables "tbl1|test.*" \ --sink-conf fenodes=127.0.0.1:8030 \ --sink-conf username=root \ --sink-conf password=123456 \ --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ --sink-conf sink.label-prefix=label \ --table-conf replication_num=1
For more details, refer to Full Database Synchronization
Spark Connector
You can use the JDBC Source and Doris Sink of the Spark Connector to complete the data write.
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
jdbcDF.write.format("doris")
.option("doris.table.identifier", "db.table")
.option("doris.fenodes", "127.0.0.1:8030")
.option("user", "root")
.option("password", "")
.save()
For more details, refer to JDBC To Other Databases (opens in a new tab),Spark Doris Connector
DataX / Seatunnel / CloudCanal and other third-party tools.
In addition, you can also use third-party synchronization tools for data synchronization. For more details, please refer to: