Flink Doris Connector
Flink Doris Connectorは、Flinkを通じてDorisクラスターからデータを読み取り、データを書き込むために使用されます。また、FlinkCDCも統合されており、MySQLなどの上流データベースとのより便利な全データベース同期が可能です。
Flink Connectorを使用して、以下の操作を実行できます:
-
Dorisからデータを読み取り: Flink ConnectorはBEからの並列読み取りをサポートし、データ取得効率を向上させます。
-
Dorisにデータを書き込み: Flinkでバッチ処理した後、Stream Loadを使用してデータを一括でDorisにインポートします。
-
Lookup JoinでディメンションTable結合を実行: バッチ処理と非同期クエリによってディメンションTable結合を高速化します。
-
全データベース同期: Flink CDCを使用して、自動Table作成およびDDL操作を含む、MySQL、Oracle、PostgreSQLなどのデータベース全体を同期できます。
バージョン説明
| Connector Version | Flink Version | Doris Version | Java Version | Scala Version |
|---|---|---|---|---|
| 1.0.3 | 1.11,1.12,1.13,1.14 | 0.15+ | 8 | 2.11,2.12 |
| 1.1.1 | 1.14 | 1.0+ | 8 | 2.11,2.12 |
| 1.2.1 | 1.15 | 1.0+ | 8 | - |
| 1.3.0 | 1.16 | 1.0+ | 8 | - |
| 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 | - |
| 1.5.2 | 1.15,1.16,1.17,1.18 | 1.0+ | 8 | - |
| 1.6.1 | 1.15,1.16,1.17,1.18,1.19 | 1.0+ | 8 | - |
| 24.0.1 | 1.15,1.16,1.17,1.18,1.19,1.20 | 1.0+ | 8 | - |
| 24.1.0 | 1.15,1.16,1.17,1.18,1.19,1.20 | 1.0+ | 8 | - |
| 25.0.0 | 1.15,1.16,1.17,1.18,1.19,1.20 | 1.0+ | 8 | - |
| 25.1.0 | 1.15,1.16,1.17,1.18,1.19,1.20 | 1.0+ | 8 | - |
使用方法
Flink Doris Connectorは、JarまたはMavenの2つの方法で使用できます。
Jar
対応するバージョンのFlink Doris Connector Jarファイルをこちらからダウンロードし、このファイルをFlinkセットアップのclasspathにコピーしてFlink-Doris-Connectorを使用できます。StandaloneモードのFlink展開の場合、このファイルをlib/フォルダー下に配置してください。Yarnモードで実行されるFlinkクラスターの場合、ファイルをプリデプロイメントパッケージに配置してください。
Maven
Mavenで使用するには、Pomファイルに以下の依存関係を追加するだけです:
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-${flink.version}</artifactId>
<version>${connector.version}</version>
</dependency>
例えば:
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>25.1.0</version>
</dependency>
動作原理
Dorisからのデータ読み取り

データ読み取り時、Flink Doris ConnectorはFlink JDBC Connectorと比較して高いパフォーマンスを提供するため、使用が推奨されています:
-
Flink JDBC Connector: DorisはMySQLプロトコルと互換性がありますが、Dorisクラスターへの読み書きにFlink JDBC Connectorを使用することは推奨されません。このアプローチでは単一のFEノードでシリアルな読み書き操作が発生し、ボトルネックが生じてパフォーマンスに影響します。
-
Flink Doris Connector: Doris 2.1以降、ADBCがFlink Doris Connectorのデフォルトプロトコルです。読み取りプロセスは以下の手順に従います:
a. Flink Doris Connectorはまず、クエリプランに基づいてFEからTablet ID情報を取得します。
b. クエリステートメントを生成します:
SELECT * FROM tbs TABLET(id1, id2, id3)。c. その後、クエリはFEのADBCポート経由で実行されます。
d. データはFEをバイパスしてBEから直接返され、シングルポイントボトルネックが解消されます。
Dorisへのデータ書き込み
データ書き込みにFlink Doris Connectorを使用する場合、Flinkのメモリ内でバッチ処理が実行され、その後Stream Load経由でバルクインポートされます。Doris Flink Connectorは2つのバッチングモードを提供し、Flink Checkpointベースのストリーミング書き込みがデフォルトです:
| Streaming Write | Batch Write | |
|---|---|---|
| トリガー条件 | Flink Checkpointに依存し、Flinkのcheckpointサイクルに従ってDorisに書き込み | コネクター定義の時間またはデータ量しきい値に基づく定期的な送信 |
| 整合性 | Exactly-Once | At-Least-Once; 主キーモデルでExactly-Onceを保証可能 |
| レイテンシー | Flink checkpointインターバルによって制限され、一般的に高い | 独立したバッチメカニズムで柔軟な調整が可能 |
| 耐障害性と回復 | Flinkステート回復と完全に一致 | 外部重複排除ロジックに依存(例:Doris主キー重複排除) |
クイックスタート
準備
Flinkクラスターのデプロイ
Standaloneクラスターを例に:
- Flinkインストールパッケージをダウンロードします(例:Flink 1.18.1);
- 展開後、Flink Doris Connectorパッケージを
<FLINK_HOME>/libに配置します; <FLINK_HOME>ディレクトリに移動し、bin/start-cluster.shを実行してFlinkクラスターを開始します;jpsコマンドを使用してFlinkクラスターが正常に開始されたかを確認できます。
DorisTableの初期化
以下のステートメントを実行してDorisTableを作成します:
CREATE DATABASE test;
CREATE TABLE test.student (
`id` INT,
`name` VARCHAR(256),
`age` INT
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
INSERT INTO test.student values(1,"James",18);
INSERT INTO test.student values(2,"Emily",28);
CREATE TABLE test.student_trans (
`id` INT,
`name` VARCHAR(256),
`age` INT
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
FlinkSQL タスクの実行
FlinkSQL クライアントの開始
bin/sql-client.sh
FlinkSQLの実行
CREATE TABLE Student (
id STRING,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.student',
'username' = 'root',
'password' = ''
);
CREATE TABLE StudentTrans (
id STRING,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.student_trans',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label'
);
INSERT INTO StudentTrans SELECT id, concat('prefix_',name), age+1 FROM Student;
クエリデータ
mysql> select * from test.student_trans;
+------+--------------+------+
| id | name | age |
+------+--------------+------+
| 1 | prefix_James | 19 |
| 2 | prefix_Emily | 29 |
+------+--------------+------+
2 rows in set (0.02 sec)
シナリオと操作
Doris からのデータ読み取り
Flink が Doris からデータを読み取る場合、Doris Source は現在有界ストリームであり、CDC 方式での継続的な読み取りはサポートしていません。Doris からのデータ読み取りには、Thrift または ArrowFlightSQL(バージョン 24.0.0 以降でサポート)を使用できます。バージョン 2.1 以降では、ArrowFlightSQL が推奨されるアプローチです。
- Thrift: BE の Thrift インターフェースを呼び出すことでデータを読み取ります。詳細な手順については、Reading Data via Thrift Interface を参照してください。
- ArrowFlightSQL: Doris 2.1 をベースとし、この方法では Arrow Flight SQL プロトコルを使用して大量のデータを高速で読み取ることができます。詳細については、High-speed Data Transfer via Arrow Flight SQL を参照してください。
FlinkSQL を使用したデータ読み取り
Thrift 方式
CREATE TABLE student (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030', -- Fe的host:HttpPort
'table.identifier' = 'test.student',
'username' = 'root',
'password' = ''
);
SELECT * FROM student;
ArrowFlightSQL
CREATE TABLE student (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '{fe.conf:http_port}',
'table.identifier' = 'test.student',
'source.use-flight-sql' = 'true',
'source.flight-sql-port' = '{fe.conf:arrow_flight_sql_port}',
'username' = 'root',
'password' = ''
);
SELECT * FROM student;
DataStream APIを使用したデータの読み取り
DataStream APIを使用してデータを読み取る場合、「Usage」セクションで説明されているように、事前にプログラムのPOMファイルに依存関係を含める必要があります。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DorisOptions option = DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("")
.build();
DorisReadOptions readOptions = DorisReadOptions.builder().build();
DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
.setDorisOptions(option)
.setDorisReadOptions(readOptions)
.setDeserializer(new SimpleListDeserializationSchema())
.build();
env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
env.execute("Doris Source Test");
完全なコードについては、DorisSourceDataStream.javaを参照してください。
DorisへのデータWriting
FlinkはStream Load方式を使用してDorisにデータをwriteし、streamingとbatch-insertionの両方のモードをサポートします。
Connector 1.5.0から、batch-insertionがサポートされています。Batch-insertionはCheckpointsに依存せず、データをメモリ内でバッファリングし、batchパラメータに基づいてwriting timingを制御します。Streaming insertionはCheckpointsの有効化が必要で、Checkpoint期間全体にわたって継続的にupstreamのデータをDorisにwriteし、データを継続的にメモリ内に保持することはありません。
FlinkSQLを使用したデータWriting
テスト用途で、FlinkのDatagenを使用して継続的に生成されるupstreamデータをシミュレートします。
-- enable checkpoint
SET 'execution.checkpointing.interval' = '30s';
CREATE TABLE student_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '20',
'fields.id.min' = '1',
'fields.id.max' = '100000',
'fields.age.min' = '3',
'fields.age.max' = '30'
);
-- doris sink
CREATE TABLE student_sink (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '10.16.10.6:28737',
'table.identifier' = 'test.student',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
--'sink.enable.batch-mode' = 'true' Adding this configuration enables batch writing
);
INSERT INTO student_sink SELECT * FROM student_source;
DataStream APIを使用したデータ書き込み
DataStream APIを使用してデータを書き込む場合、異なるシリアライゼーション方法を使用して、DorisTableに書き込む前に上流データをシリアライズできます。
ConnectorにはすでにHttpClient4.5.13バージョンが含まれています。プロジェクトでHttpClientを別途参照する場合は、バージョンの整合性を確保する必要があります。
標準文字列フォーマット
上流データがCSVまたはJSON形式の場合、SimpleStringSerializerを直接使用してデータをシリアライズできます。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("")
.build();
Properties properties = new Properties();
// When the upstream data is in json format, the following configuration needs to be enabled
properties.setProperty("read_json_by_line", "true");
properties.setProperty("format", "json");
// When writing csv data from the upstream, the following configurations need to be enabled
//properties.setProperty("format", "csv");
//properties.setProperty("column_separator", ",");
DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
.setLabelPrefix("label-doris")
.setDeletable(false)
//.setBatchMode(true) Enable batch writing
.setStreamLoadProp(properties)
.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisOptions);
List<String> data = new ArrayList<>();
data.add("{\"id\":3,\"name\":\"Michael\",\"age\":28}");
data.add("{\"id\":4,\"name\":\"David\",\"age\":38}");
env.fromCollection(data).sinkTo(builder.build());
env.execute("doris test");
完全なコードについては、DorisSinkExample.javaを参照してください。
RowData Format
RowDataはFlinkの内部フォーマットです。上流のデータがRowDataフォーマットの場合、データをシリアライズするためにRowDataSerializerを使用する必要があります。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setParallelism(1);
DorisSink.Builder<RowData> builder = DorisSink.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
// When writing json data from the upstream, the following configuration needs to be enabled
// properties.setProperty("read_json_by_line", "true");
// properties.setProperty("format", "json");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setDeletable(false).setStreamLoadProp(properties);
// flink rowdata‘s schema
String[] fields = {"id","name", "age"};
DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(256), DataTypes.INT()};
builder.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(
RowDataSerializer.builder() // serialize according to rowdata
.setType(LoadConstants.CSV)
.setFieldDelimiter(",")
.setFieldNames(fields)
.setFieldType(types)
.build())
.setDorisOptions(dorisBuilder.build());
// mock rowdata source
DataStream<RowData> source =
env.fromElements("")
.flatMap(
new FlatMapFunction<String, RowData>() {
@Override
public void flatMap(String s, Collector<RowData> out)
throws Exception {
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, 1);
genericRowData.setField(1, StringData.fromString("Michael"));
genericRowData.setField(2, 18);
out.collect(genericRowData);
GenericRowData genericRowData2 = new GenericRowData(3);
genericRowData2.setField(0, 2);
genericRowData2.setField(1, StringData.fromString("David"));
genericRowData2.setField(2, 38);
out.collect(genericRowData2);
}
});
source.sinkTo(builder.build());
env.execute("doris test");
完全なコードについては、DorisSinkExampleRowData.javaを参照してください。
Debezium Format
FlinkCDCからのデータやKafkaのDebezium formatなど、Debezium形式の上流データに対しては、JsonDebeziumSchemaSerializerを使用してデータをシリアル化できます。
// enable checkpoint
env.enableCheckpointing(10000);
Properties props = new Properties();
props.setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("").build();
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix")
.setStreamLoadProp(props)
.setDeletable(true);
DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisOptions)
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.sinkTo(builder.build());
完全なコードについては、以下を参照してください:CDCSchemaChangeExample.java
マルチTable書き込み形式
現在、DorisSinkは単一のSinkで複数のTableの同期をサポートしています。データとデータベース/Table情報の両方をSinkに渡し、RecordWithMetaSerializerを使用してシリアライズする必要があります。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder
.setLabelPrefix("label-doris")
.setStreamLoadProp(properties)
.setDeletable(false)
.setBatchMode(true);
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build())
.setSerializer(new RecordWithMetaSerializer());
RecordWithMeta record = new RecordWithMeta("test", "student_1", "1,David,18");
RecordWithMeta record1 = new RecordWithMeta("test", "student_2", "1,Jack,28");
env.fromCollection(Arrays.asList(record, record1)).sinkTo(builder.build());
完全なコードについては、DorisSinkMultiTableExample.javaを参照してください。
Lookup Join
Lookup Joinを使用すると、FlinkでディメンションTableのjoinを最適化できます。ディメンションTableのjoinにFlink JDBC Connectorを使用する場合、以下の問題が発生する可能性があります:
-
Flink JDBC Connectorは同期クエリモードを使用するため、上流のデータ(例:Kafkaから)がレコードを送信した後、すぐにDorisディメンションTableに対してクエリを実行します。これにより、高並行処理のシナリオでクエリレイテンシが高くなります。
-
JDBC経由で実行されるクエリは、通常レコードごとのポイントルックアップですが、Dorisでは効率を向上させるためにバッチクエリを推奨しています。
Flink Doris ConnectorでディメンションTableのjoinにLookup Joinを使用すると、以下の利点が得られます:
-
上流データのバッチキャッシュにより、レコードごとのクエリによって引き起こされる高レイテンシとデータベースの負荷を回避します。
-
joinクエリの非同期実行により、データスループットを向上させ、Dorisへのクエリ負荷を軽減します。
CREATE TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);
create table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'root',
'password' = ''
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
フルデータベース同期
Flink Doris ConnectorはFlink CDC (Flink CDC Documentation)を統合し、MySQLなどのリレーショナルデータベースをDorisに同期することをより簡単にします。この統合には、自動Table作成、スキーマ変更なども含まれます。同期でサポートされているデータベースには次のものがあります:MySQL、Oracle、PostgreSQL、SQLServer、MongoDB、DB2。
- フルデータベース同期を使用する場合、
$FLINK_HOME/libディレクトリに対応するFlink CDC依存関係(Fat Jar)を追加する必要があります。例えばflink-sql-connector-mysql-cdc-${version}.jar、flink-sql-connector-oracle-cdc-${version}.jarなど。FlinkCDCバージョン3.1以降は以前のバージョンと互換性がありません。以下のリンクから依存関係をダウンロードできます:FlinkCDC 3.x、FlinkCDC 2.x。 - Connector 24.0.0以降のバージョンでは、必要なFlink CDCバージョンは3.1以上でなければなりません。こちらからダウンロードできます。Flink CDCを使用してMySQLとOracleを同期する場合、関連するJDBCドライバーも
$FLINK_HOME/lib配下に追加する必要があります。
MySQLホールデータベース同期
Flinkクラスターを開始した後、以下のコマンドを直接実行できます:
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf port=3306 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
Oracle全データベース同期
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
oracle-sync-database \
--database test_db \
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="password" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
PostgreSQL データベース全体の同期
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1\
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
SQLServer 全データベース同期
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
sqlserver-sync-database \
--database db1\
--sqlserver-conf hostname=127.0.0.1 \
--sqlserver-conf port=1433 \
--sqlserver-conf username=sa \
--sqlserver-conf password="123456" \
--sqlserver-conf database-name=CDC_DB \
--sqlserver-conf schema-name=dbo \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
DB2 データベース全体の同期
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
db2-sync-database \
--database db2_test \
--db2-conf hostname=127.0.0.1 \
--db2-conf port=50000 \
--db2-conf username=db2inst1 \
--db2-conf password=doris123456 \
--db2-conf database-name=testdb \
--db2-conf schema-name=DB2INST1 \
--including-tables "FULL_TYPES|CUSTOMERS" \
--single-sink true \
--use-new-schema-change true \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
MongoDB データベース全体同期
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.18-24.0.1.jar \
mongodb-sync-database \
--database doris_db \
--schema-change-mode debezium_structure \
--mongodb-conf hosts=127.0.0.1:27017 \
--mongodb-conf username=flinkuser \
--mongodb-conf password=flinkpwd \
--mongodb-conf database=test \
--mongodb-conf scan.startup.mode=initial \
--mongodb-conf schema.sample-percent=0.2 \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--sink-conf sink.enable-2pc=false \
--table-conf replication_num=1
AWS Aurora MySQL 全データベース同期
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.18-25.0.0.jar \
mysql-sync-database \
--database testwd \
--mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--mysql-conf server-time-zone=UTC \
--including-tables "student" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
AWS RDS MySQL データベース全体同期
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.18-25.0.0.jar \
mysql-sync-database \
--database testwd \
--mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--mysql-conf server-time-zone=UTC \
--including-tables "student" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
使用方法
パラメータ設定
一般設定項目
| Key | デフォルト値 | 必須 | コメント |
|---|---|---|---|
| fenodes | -- | Y | Doris FE httpアドレス。複数のアドレスがサポートされており、カンマで区切る必要があります。 |
| benodes | -- | N | Doris BE httpアドレス。複数のアドレスがサポートされており、カンマで区切る必要があります。 |
| jdbc-url | -- | N | JDBC接続情報、例:jdbc:mysql://127.0.0.1:9030。 |
| table.identifier | -- | Y | DorisTable名、例:db.tbl。 |
| username | -- | Y | Dorisにアクセスするためのユーザー名。 |
| password | -- | Y | Dorisにアクセスするためのパスワード。 |
| auto-redirect | TRUE | N | StreamLoadリクエストをリダイレクトするかどうか。有効にすると、StreamLoadはFEを通して書き込み、明示的にBE情報を取得しなくなります。 |
| doris.request.retries | 3 | N | Dorisへのリクエスト送信の再試行回数。 |
| doris.request.connect.timeout | 30s | N | Dorisへのリクエスト送信の接続タイムアウト。 |
| doris.request.read.timeout | 30s | N | Dorisへのリクエスト送信の読み取りタイムアウト。 |
Source設定
| Key | デフォルト値 | 必須 | コメント |
|---|---|---|---|
| doris.request.query.timeout | 21600s | N | Dorisクエリのタイムアウト。デフォルト値は6時間です。 |
| doris.request.tablet.size | 1 | N | 1つのPartitionに対応するDoris Tabletsの数。この値を小さく設定するほど、より多くのPartitionが生成され、Flink側での並列性を高めることができます。ただし、Dorisにより多くの負荷をかけることにもなります。 |
| doris.batch.size | 4064 | N | BEから一度に読み取る行の最大数。この値を増やすと、FlinkとDoris間で確立される接続数を減らすことができ、ネットワーク遅延による追加的な時間オーバーヘッドを削減できます。 |
| doris.exec.mem.limit | 8192mb | N | 単一クエリのメモリ制限。デフォルトは8GB、バイト単位。 |
| source.use-flight-sql | FALSE | N | 読み取りにArrow Flight SQLを使用するかどうか。 |
| source.flight-sql-port | - | N | Arrow Flight SQLを読み取りに使用する場合のFEのarrow_flight_sql_port。 |
DataStream専用設定
| Key | デフォルト値 | 必須 | コメント |
|---|---|---|---|
| doris.read.field | -- | N | DorisTableを読み取るための列名のリスト。複数の列はカンマで区切る必要があります。 |
| doris.filter.query | -- | N | 読み取りデータをフィルタリングするための式。この式はDorisに渡されます。Dorisはこの式を使用してソースデータのフィルタリングを完了します。例:age=18。 |
Sink設定
| Key | デフォルト値 | 必須 | コメント |
|---|---|---|---|
| sink.label-prefix | -- | Y | Stream loadインポートに使用されるラベルプレフィックス。2pcシナリオでは、FlinkのEOSセマンティクスを保証するためにグローバルに一意である必要があります。 |
| sink.properties.* | -- | N | Stream Loadのインポートパラメータ。例:'sink.properties.column_separator' = ', 'は列区切り文字を定義し、'sink.properties.escape_delimiters' = 'true'は区切り文字として使用される\x01などの特殊文字がバイナリ0x01に変換されることを意味します。JSON形式のインポートの場合、'sink.properties.format' = 'json'、'sink.properties.read_json_by_line' = 'true'。詳細なパラメータについては、ここを参照してください。Group Commitモードの場合、例:'sink.properties.group_commit' = 'sync_mode'はgroup commitを同期モードに設定します。Flinkコネクタは、バージョン1.6.2からインポート設定group commitをサポートしています。詳細な使用方法と制限については、group commitを参照してください。 |
| sink.enable-delete | TRUE | N | 削除を有効にするかどうか。このオプションは、DorisTableでバッチ削除機能が有効になっている必要があり(Doris 0.15+バージョンではデフォルトで有効)、Uniqueモデルのみをサポートします。 |
| sink.enable-2pc | TRUE | N | 2相コミット(2pc)を有効にするかどうか。デフォルトはtrue、Exactly-Onceセマンティクスを保証します。2相コミットの詳細については、ここを参照してください。 |
| sink.buffer-size | 1MB | N | 書き込みデータキャッシュバッファのサイズ、バイト単位。変更は推奨されず、デフォルト設定を使用できます。 |
| sink.buffer-count | 3 | N | 書き込みデータキャッシュバッファの数。変更は推奨されず、デフォルト設定を使用できます。 |
| sink.max-retries | 3 | N | Commit失敗後の最大再試行回数。デフォルトは3回。 |
| sink.enable.batch-mode | FALSE | N | DorisへのバッチモードでWriteを使用するかどうか。有効にすると、書き込みタイミングはCheckpointに依存せず、sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes、sink.buffer-flush.intervalなどのパラメータによって制御されます。同時に、有効にした後はExactly-onceセマンティクスは保証されませんが、Uniqモデルの助けを借りて冪等性を実現できます。 |
| sink.flush.queue-size | 2 | N | バッチモードでのキャッシュキューのサイズ。 |
| sink.buffer-flush.max-rows | 500000 | N | バッチモードでの単一バッチで書き込まれる行の最大数。 |
| sink.buffer-flush.max-bytes | 100MB | N | バッチモードでの単一バッチで書き込まれるバイトの最大数。 |
| sink.buffer-flush.interval | 10s | N | バッチモードでの非同期キャッシュフラッシュの間隔。 |
| sink.ignore.update-before | TRUE | N | update-beforeイベントを無視するかどうか。デフォルトは無視します。 |
Lookup Join設定
| Key | デフォルト値 | 必須 | コメント |
|---|---|---|---|
| lookup.cache.max-rows | -1 | N | lookupキャッシュ内の最大行数。デフォルト値は-1、キャッシュが有効でないことを意味します。 |
| lookup.cache.ttl | 10s | N | lookupキャッシュの最大時間。デフォルトは10秒。 |
| lookup.max-retries | 1 | N | lookupクエリ失敗後の再試行回数。 |
| lookup.jdbc.async | FALSE | N | 非同期lookupを有効にするかどうか。デフォルトはfalse。 |
| lookup.jdbc.read.batch.size | 128 | N | 非同期lookupでの各クエリの最大バッチサイズ。 |
| lookup.jdbc.read.batch.queue-size | 256 | N | 非同期lookup中の中間バッファキューのサイズ。 |
| lookup.jdbc.read.thread-size | 3 | N | 各タスクでのlookup用jdbcスレッド数。 |
フルデータベース同期設定
構文
<FLINK_HOME>bin/flink run \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.6.1.jar \
<mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database|mongodb-sync-database> \
--database <doris-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <doris-table-prefix>] \
[--table-suffix <doris-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
--oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
--postgres-conf <postgres-cdc-source-conf> [--postgres-conf <postgres-cdc-source-conf> ...] \
--sqlserver-conf <sqlserver-cdc-source-conf> [--sqlserver-conf <sqlserver-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
設定
| Key | Comment |
|---|---|
| --job-name | Flinkタスクの名前です。オプションです。 |
| --database | Dorisに同期するデータベースの名前です。 |
| --table-prefix | DorisTableのプレフィックス名です。例:--table-prefix ods_。 |
| --table-suffix | DorisTableのサフィックス名です。プレフィックスと同様です。 |
| --including-tables | 同期が必要なMySQLTableです。複数のTableは|で区切ることができ、正規表現がサポートされています。例:--including-tables table1。 |
| --excluding-tables | 同期が不要なTableです。使用方法は--including-tablesと同じです。 |
| --mysql-conf | MySQL CDCSourceの設定です。例:--mysql-conf hostname=127.0.0.1。MySQL-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-nameは必須です。同期するデータベースとTableに主キーのないTableが含まれる場合、scan.incremental.snapshot.chunk.key-columnを設定する必要があり、非null型のフィールドを1つだけ選択できます。例:scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...、異なるデータベースとTableのカラムはカンマで区切ります。 |
| --oracle-conf | Oracle CDCSourceの設定です。例:--oracle-conf hostname=127.0.0.1。Oracle-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。 |
| --postgres-conf | Postgres CDCSourceの設定です。例:--postgres-conf hostname=127.0.0.1。Postgres-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-name、slot.nameは必須です。 |
| --sqlserver-conf | SQLServer CDCSourceの設定です。例:--sqlserver-conf hostname=127.0.0.1。SQLServer-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。 |
| --db2-conf | SQLServer CDCSourceの設定です。例:--db2-conf hostname=127.0.0.1。DB2-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。 |
| --sink-conf | Doris Sinkのすべての設定は[こちら](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#General 構成 Items)で確認できます。 |
| --mongodb-conf | MongoDB CDCSourceの設定です。例:--mongodb-conf hosts=127.0.0.1:27017。Mongo-CDCのすべての設定はこちらで確認できます。その中で、hosts、username、password、databaseは必須です。--mongodb-conf schema.sample-percentは、DorisでTableを作成するためにMongoDBデータを自動的にサンプリングする設定で、デフォルト値は0.2です。 |
| --table-conf | DorisTableの設定項目です。つまり、propertiesに含まれる内容です(table-bucketsを除く。これはpropertiesの属性ではありません)。例:--table-conf replication_num=1、--table-conf table-buckets="tbl1:10,tbl2:20,a.:30,b.:40,.*:50"は、正規表現の順序で異なるTableのバケット数を指定することを意味します。一致しない場合は、BUCKETS AUTOメソッドを使用してTableを作成します。 |
| --schema-change-mode | スキーマ変更を解析するモードで、debezium_structureとsql_parserが含まれます。デフォルトでdebezium_structureモードが使用されます。debezium_structureモードは、上流のCDCがデータを同期する際に使用されるデータ構造を解析し、この構造を解析してDDL変更操作を判断します。sql_parserモードは、上流のCDCがデータを同期する際のDDL文を解析してDDL変更操作を判断するため、この解析モードはより正確です。使用例:--schema-change-mode debezium_structure。この機能は24.0.0以降のバージョンで利用可能です。 |
| --single-sink | 単一のSinkを使用してすべてのTableを同期するかどうかです。有効にすると、上流で新規作成されたTableも自動的に識別し、Tableを自動作成できます。 |
| --multi-to-one-origin | 複数の上流Tableを同じTableに書き込む場合のソースTableの設定です。例:--multi-to-one-origin "a_.*|b_.*"。#208を参照してください。 |
| --multi-to-one-target | multi-to-one-originと組み合わせて使用し、ターゲットTableの設定です。例:--multi-to-one-target "a|b" |
| --create-table-only | Tableの構造のみを同期するかどうかです。 |
型マッピング
| Doris Type | Flink Type |
|---|---|
| NULL_TYPE | NULL |
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| DATETIME | TIMESTAMP |
| DECIMAL | DECIMAL |
| CHAR | STRING |
| LARGEINT | STRING |
| VARCHAR | STRING |
| STRING | STRING |
| DECIMALV2 | DECIMAL |
| ARRAY | ARRAY |
| MAP | STRING |
| JSON | STRING |
| VARIANT | STRING |
| IPV4 | STRING |
| IPV6 | STRING |
監視メトリクス
Flinkは、Flinkクラスターの指標を監視するための複数のMetricsを提供します。以下は、Flink Doris Connectorで新たに追加された監視メトリクスです。
| Name | Metric Type | デスクリプション |
|---|---|---|
| totalFlushLoadBytes | Counter | フラッシュされてインポートされた総バイト数です。 |
| flushTotalNumberRows | Counter | インポートされて処理された総行数です。 |
| totalFlushLoadedRows | Counter | 正常にインポートされた総行数です。 |
| totalFlushTimeMs | Counter | 正常なインポートが完了するまでにかかった総時間です。 |
| totalFlushSucceededNumber | Counter | インポートが正常に完了した回数です。 |
| totalFlushFailedNumber | Counter | インポートが失敗した回数です。 |
| totalFlushFilteredRows | Counter | データ品質が不適格な総行数です。 |
| totalFlushUnselectedRows | Counter | where条件によってフィルタリングされた総行数です。 |
| beginTxnTimeMs | Histogram | Feにトランザクション開始を要求する時間(ミリ秒)です。 |
| putDataTimeMs | Histogram | Feにインポートデータ実行プランの取得を要求する時間です。 |
| readDataTimeMs | Histogram | データを読み取る時間です。 |
| writeDataTimeMs | Histogram | データ書き込み操作を実行する時間です。 |
| commitAndPublishTimeMs | Histogram | Feにトランザクションのコミットと公開を要求する時間です。 |
| loadTimeMs | Histogram | インポートが完了するまでの時間です。 |
ベストプラクティス
FlinkSQLでCDC経由でMySQLデータに高速接続
-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (
id int
,name VARCHAR
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);
-- Supports synchronizing insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true', -- Synchronize delete events
'sink.label-prefix' = 'doris_label'
);
insert into doris_sink select id,name from cdc_mysql_source;
Flink は部分的なカラム更新を実行する
CREATE TABLE doris_sink (
id INT,
name STRING,
bank STRING,
age int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.columns' = 'id,name,bank,age', -- Columns that need to be updated
'sink.properties.partial_columns' = 'true' -- Enable partial column updates
);
Flink が Bitmap データをインポートする
CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.bitmap_test',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label',
'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)
FlinkCDC Updates Key Columns
一般的に、ビジネスデータベースでは、数値がTableの主キーとしてよく使用されます。例えば、StudentTableでは、番号(id)が主キーとして使用されます。しかし、ビジネスの発展に伴い、データに対応する番号が変更される場合があります。このシナリオでは、Flink CDC + Doris Connectorを使用してデータを同期する際、Doris内の主キー列のデータが自動的に更新されます。
原理
Flink CDCの基盤となる収集ツールはDebeziumです。Debeziumは内部的にopフィールドを使用して対応する操作を識別します。opフィールドの値はc、u、d、rであり、それぞれcreate、update、delete、readに対応します。主キー列の更新については、Flink CDCは下流にDELETEとINSERTイベントを送信し、データがDorisに同期された後、Doris内の主キー列のデータが自動的に更新されます。
使用方法
FlinkプログラムはCDC同期の上記の例を参照できます。タスクの送信が成功した後、MySQL側で主キー列を更新するステートメントを実行し(例:update student set id = '1002' where id = '1001')、その後Doris内のデータが変更されます。
Flink Deletes Data According to Specified Columns
一般的に、Kafkaのメッセージは特定のフィールドを使用して操作タイプをマークします。例:{"op_type":"delete",data:{...}}。この種のデータでは、op_type=deleteのデータを削除することが望まれます。
DorisSinkは、デフォルトでRowKindに従ってイベントのタイプを区別します。通常、CDCの場合、イベントタイプを直接取得でき、隠し列__DORIS_DELETE_SIGN__に値を割り当てることで削除の目的を達成できます。しかし、Kafkaの場合、ビジネスロジックに従って判断し、隠し列の値を明示的に渡す必要があります。
-- For example, the upstream data:{"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
data STRING,
op_type STRING
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE DORIS_SINK(
id INT,
name STRING,
__DORIS_DELETE_SIGN__ INT
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = '',
'sink.enable-delete' = 'false', -- false means not to obtain the event type from RowKind
'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- Explicitly specify the import columns of streamload
);
INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name,
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
from KAFKA_SOURCE;
Flink CDC DDLステートメントの同期
一般的に、MySQLなどのアップストリームデータソースを同期する際、アップストリームでフィールドの追加や削除を行う場合、DorisでSchema Change操作を同期する必要があります。
このシナリオでは、通常DataStream API用のプログラムを作成し、DorisSinkが提供するJsonDebeziumSchemaSerializerシリアライザーを使用してSchemaChangeを自動実行する必要があります。詳細については、CDCSchemaChangeExample.javaを参照してください。
Connectorが提供する全データベース同期ツールでは、追加設定は不要で、アップストリームのDDLが自動的に同期され、DorisでSchemaChange操作が実行されます。
よくある質問 (FAQ)
-
errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]
Exactly-Onceシナリオでは、Flink Jobは最新のCheckpoint/Savepointから再起動する必要があります。そうでなければ、上記のエラーが報告されます。Exactly-Onceが不要な場合は、2PC送信を無効にする(sink.enable-2pc=false)か、異なるsink.label-prefixに変更することでもこの問題を解決できます。
-
errCode = 2, detailMessage = transaction [19650] not found
これはCommitステージで発生します。チェックポイントに記録されたトランザクションIDがFE側で期限切れになっています。この時点で再度コミットすると、上記のエラーが発生します。この時点では、チェックポイントから開始することは不可能です。その後、
fe.confのstreaming_label_keep_max_second設定を変更して有効期限を延長できます。デフォルトの有効期限は12時間です。dorisバージョン2.0以降では、fe.confのlabel_num_threshold設定(デフォルト2000)によっても制限されます。これは増加させるか、-1に変更できます(-1は時間によってのみ制限されることを意味します)。 -
errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100
これは同一データベースへの並行インポートが100を超えているためです。
fe.confのパラメータmax_running_txn_num_per_dbを調整することで解決できます。具体的な詳細については、max_running_txn_num_per_dbを参照してください。一方、labelの頻繁な変更やタスクの再起動もこのエラーにつながる可能性があります。2pcシナリオ(Duplicate/Aggregateモデル用)では、各タスクのlabelが一意である必要があります。そして、チェックポイントから再起動する際、Flinkタスクは事前コミットに成功したがまだコミットされていないトランザクションを積極的にアボートします。頻繁なlabel変更と再起動により、アボートできない事前コミット成功トランザクションが大量に発生し、トランザクションを占有します。Uniqueモデルでは、2pcを無効にしてべき等書き込みを実現することも可能です。
-
tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235
これは通常Connectorバージョン1.1.0以前で発生し、書き込み頻度が高すぎることが原因でバージョン数が過多になることによります。
sink.batch.sizeとsink.batch.intervalパラメータを設定してStreamloadの頻度を減らすことができます。Connectorバージョン1.1.0以降では、デフォルトの書き込みタイミングはCheckpointによって制御され、Checkpointの間隔を増やすことで書き込み頻度を減らすことができます。 -
Flinkインポート時にダーティデータをスキップする方法は?
Flinkがデータをインポートする際、フィールド形式や長さの問題などのダーティデータがあると、StreamLoadがエラーを報告します。この時、Flinkは再試行を続けます。このようなデータをスキップする必要がある場合は、StreamLoadのstrictモードを無効にする(
strict_mode=falseとmax_filter_ratio=1を設定)か、Sinkオペレータ前でデータをフィルタリングできます。 -
FlinkマシンとBEマシン間のネットワークが接続されていない場合の設定方法は?
FlinkがDorisへの書き込みを開始すると、DorisはBEに書き込み操作をリダイレクトします。この時、返されるアドレスはBEの内部ネットワークIPで、これは
show backendsコマンドで確認できるIPです。この時FlinkとDorisの間にネットワーク接続がない場合、エラーが報告されます。この場合、benodesでBEの外部ネットワークIPを設定できます。 -
stream load error: HTTP/1.1 307 Temporary Redirect
FlinkはまずFEにリクエストし、307を受信した後、リダイレクト後にBEにリクエストします。FEがFullGC/高負荷/ネットワーク遅延状態にある場合、HttpClientはデフォルトで一定期間(3秒)内にレスポンスを待たずにデータを送信します。リクエストボディはデフォルトでInputStreamなので、307レスポンスを受信した際にデータを再生できず、直接エラーが報告されます。この問題を解決する方法は3つあります:1. Connector25.1.0以降にアップグレードしてデフォルト時間を増加;2. auto-redirect=falseに変更してBEに直接リクエスト(一部のクラウドシナリオには適用不可);3. unique keyモデルではbatchモードを有効化。