メインコンテンツまでスキップ
バージョン: 26.x

Flink Doris Connector

Flink Doris Connectorは、Flinkを通じてDorisクラスターからデータを読み取り、データを書き込むために使用されます。また、FlinkCDCも統合されており、MySQLなどの上流データベースとのより便利な全データベース同期が可能です。

Flink Connectorを使用すると、以下の操作を実行できます:

  • Dorisからデータを読み取り: Flink ConnectorはBEからの並列読み取りをサポートし、データ取得効率を向上させます。

  • Dorisへデータを書き込み: Flinkでバッチング処理後、Stream Loadを使用してデータを一括でDorisにインポートします。

  • Lookup Joinでディメンションテーブル結合を実行: バッチングと非同期クエリにより、ディメンションテーブル結合を高速化します。

  • 全データベース同期: Flink CDCを使用して、MySQL、Oracle、PostgreSQLなどのデータベース全体を同期でき、自動テーブル作成とDDL操作が含まれます。

バージョンの説明

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11,1.12,1.13,1.140.15+82.11,2.12
1.1.11.141.0+82.11,2.12
1.2.11.151.0+8-
1.3.01.161.0+8-
1.4.01.15,1.16,1.171.0+8-
1.5.21.15,1.16,1.17,1.181.0+8-
1.6.11.15,1.16,1.17,1.18,1.191.0+8-
24.0.11.15,1.16,1.17,1.18,1.19,1.201.0+8-
24.1.01.15,1.16,1.17,1.18,1.19,1.201.0+8-
25.0.01.15,1.16,1.17,1.18,1.19,1.201.0+8-
25.1.01.15,1.16,1.17,1.18,1.19,1.201.0+8-

使用方法

Flink Doris Connectorは2つの方法で使用できます:JarまたはMaven経由。

Jar

対応するバージョンのFlink Doris Connector Jarファイルをこちらからダウンロードし、このファイルをFlinkセットアップのclasspathにコピーしてFlink-Doris-Connectorを使用できます。StandaloneモードのFlinkデプロイメントの場合、このファイルをlib/フォルダー下に配置してください。Yarnモードで実行されるFlinkクラスターの場合、ファイルを事前デプロイメントパッケージに配置してください。

Maven

Mavenで使用するには、Pomファイルに以下の依存関係を追加するだけです:

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-${flink.version}</artifactId>
<version>${connector.version}</version>
</dependency>

例えば:

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>25.1.0</version>
</dependency>

動作原理

Dorisからのデータ読み込み

FlinkConnectorPrinciples-JDBC-Doris

データ読み込み時、Flink Doris ConnectorはFlink JDBC Connectorと比較して高いパフォーマンスを提供し、使用が推奨されます:

  • Flink JDBC Connector: DorisはMySQLプロトコルと互換性がありますが、Dorisクラスターへの読み書きにFlink JDBC Connectorを使用することは推奨されません。このアプローチは単一のFEノードでのシリアル読み書き操作となり、ボトルネックを作成してパフォーマンスに影響を与えます。

  • Flink Doris Connector: Doris 2.1以降、ADBCはFlink Doris Connectorのデフォルトプロトコルです。読み込みプロセスは以下の手順に従います:

    a. Flink Doris Connectorはまずクエリプランに基づいてFEからTablet ID情報を取得します。

    b. クエリステートメントを生成します:SELECT * FROM tbs TABLET(id1, id2, id3)

    c. クエリはその後FEのADBCポートを通じて実行されます。

    d. データはFEをバイパスしてBEから直接返され、単一点のボトルネックを排除します。

Dorisへのデータ書き込み

データ書き込みにFlink Doris Connectorを使用する場合、Stream Load経由での一括インポート前にFlinkのメモリ内でバッチ処理が実行されます。Doris Flink Connectorは2つのバッチモードを提供し、Flink Checkpointベースのストリーミング書き込みがデフォルトです:

Streaming WriteBatch Write
トリガー条件Flink Checkpointsに依存し、FlinkのCheckpointサイクルに従ってDorisに書き込みコネクター定義の時間またはデータ量の閾値に基づく定期的な送信
一貫性Exactly-OnceAt-Least-Once;プライマリキーモデルでExactly-Onceを保証可能
レイテンシFlinkのCheckpoint間隔によって制限され、一般的により高い柔軟な調整が可能な独立したバッチメカニズム
障害許容性と復旧Flink状態復旧と完全に一致外部の重複排除ロジック(例:Dorisプライマリキー重複排除)に依存

クイックスタート

準備

Flinkクラスターデプロイメント

Standaloneクラスターを例にとって:

  1. Flinkインストールパッケージをダウンロードします。例:Flink 1.18.1
  2. 解凍後、Flink Doris Connectorパッケージを<FLINK_HOME>/libに配置します;
  3. <FLINK_HOME>ディレクトリに移動し、bin/start-cluster.shを実行してFlinkクラスターを開始します;
  4. jpsコマンドを使用してFlinkクラスターが正常に開始されたかを確認できます。

Dorisテーブルの初期化

以下のステートメントを実行してDorisテーブルを作成します:

CREATE DATABASE test;

CREATE TABLE test.student (
`id` INT,
`name` VARCHAR(256),
`age` INT
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

INSERT INTO test.student values(1,"James",18);
INSERT INTO test.student values(2,"Emily",28);

CREATE TABLE test.student_trans (
`id` INT,
`name` VARCHAR(256),
`age` INT
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

FlinkSQLタスクの実行

FlinkSQLクライアントの開始

bin/sql-client.sh

FlinkSQLの実行

CREATE TABLE Student (
id STRING,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.student',
'username' = 'root',
'password' = ''
);

CREATE TABLE StudentTrans (
id STRING,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.student_trans',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label'
);

INSERT INTO StudentTrans SELECT id, concat('prefix_',name), age+1 FROM Student;

クエリデータ

mysql> select * from test.student_trans;
+------+--------------+------+
| id | name | age |
+------+--------------+------+
| 1 | prefix_James | 19 |
| 2 | prefix_Emily | 29 |
+------+--------------+------+
2 rows in set (0.02 sec)

シナリオと操作

Dorisからのデータ読み取り

FlinkがDorisからデータを読み取る際、Doris Sourceは現在有界ストリームであり、CDCによる継続的な読み取りはサポートしていません。DorisからのデータはThriftまたはArrowFlightSQL(バージョン24.0.0以降でサポート)を使用して読み取ることができます。バージョン2.1以降では、ArrowFlightSQLが推奨される方法です。

  • Thrift: BEのThriftインターフェースを呼び出してデータを読み取ります。詳細な手順については、Reading Data via Thrift Interfaceを参照してください。
  • ArrowFlightSQL: Doris 2.1をベースとしたこの方法では、Arrow Flight SQLプロトコルを使用して大量のデータを高速で読み取ることができます。詳細については、High-speed Data Transfer via Arrow Flight SQLを参照してください。

FlinkSQLを使用したデータ読み取り

Thrift方式
CREATE TABLE student (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030', -- Fe的host:HttpPort
'table.identifier' = 'test.student',
'username' = 'root',
'password' = ''
);

SELECT * FROM student;
ArrowFlightSQL
CREATE TABLE student (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '{fe.conf:http_port}',
'table.identifier' = 'test.student',
'source.use-flight-sql' = 'true',
'source.flight-sql-port' = '{fe.conf:arrow_flight_sql_port}',
'username' = 'root',
'password' = ''
);

SELECT * FROM student;

DataStream APIを使用したデータの読み取り

DataStream APIを使用してデータを読み取る場合は、「使用方法」セクションで説明されているように、事前にプログラムのPOMファイルに依存関係を含める必要があります。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DorisOptions option = DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("")
.build();

DorisReadOptions readOptions = DorisReadOptions.builder().build();
DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
.setDorisOptions(option)
.setDorisReadOptions(readOptions)
.setDeserializer(new SimpleListDeserializationSchema())
.build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
env.execute("Doris Source Test");

完全なコードについては、DorisSourceDataStream.javaを参照してください。

Dorisへのデータ書き込み

FlinkはStream Load方式を使用してDorisにデータを書き込み、ストリーミングモードとバッチ挿入モードの両方をサポートしています。

ストリーミングとバッチ挿入の違い

Connector 1.5.0以降、バッチ挿入がサポートされています。バッチ挿入はCheckpointに依存せず、データをメモリ内でバッファし、バッチパラメータに基づいて書き込みタイミングを制御します。ストリーミング挿入はCheckpointの有効化が必要で、Checkpoint期間全体を通して上流データをDorisに継続的に書き込み、データをメモリ内に継続的に保持しません。

FlinkSQLを使用したデータ書き込み

テスト用に、FlinkのDatagenを使用して、継続的に生成される上流データをシミュレートします。

-- enable checkpoint
SET 'execution.checkpointing.interval' = '30s';

CREATE TABLE student_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '20',
'fields.id.min' = '1',
'fields.id.max' = '100000',
'fields.age.min' = '3',
'fields.age.max' = '30'
);

-- doris sink
CREATE TABLE student_sink (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '10.16.10.6:28737',
'table.identifier' = 'test.student',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
--'sink.enable.batch-mode' = 'true' Adding this configuration enables batch writing
);

INSERT INTO student_sink SELECT * FROM student_source;

DataStream APIを使用したデータ書き込み

DataStream APIを使用してデータを書き込む場合、異なるシリアライゼーション方法を使用して、上流データをDorisテーブルに書き込む前にシリアライズできます。

備考

Connectorには既にHttpClient4.5.13バージョンが含まれています。プロジェクトで個別にHttpClientを参照する場合は、バージョンの整合性を確保する必要があります。

標準文字列フォーマット

上流データがCSVまたはJSONフォーマットの場合、SimpleStringSerializerを直接使用してデータをシリアライズできます。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
DorisSink.Builder<String> builder = DorisSink.builder();

DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("")
.build();

Properties properties = new Properties();
// When the upstream data is in json format, the following configuration needs to be enabled
properties.setProperty("read_json_by_line", "true");
properties.setProperty("format", "json");

// When writing csv data from the upstream, the following configurations need to be enabled
//properties.setProperty("format", "csv");
//properties.setProperty("column_separator", ",");

DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
.setLabelPrefix("label-doris")
.setDeletable(false)
//.setBatchMode(true) Enable batch writing
.setStreamLoadProp(properties)
.build();

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisOptions);

List<String> data = new ArrayList<>();
data.add("{\"id\":3,\"name\":\"Michael\",\"age\":28}");
data.add("{\"id\":4,\"name\":\"David\",\"age\":38}");

env.fromCollection(data).sinkTo(builder.build());
env.execute("doris test");

完全なコードについては、DorisSinkExample.javaを参照してください。

RowData形式

RowDataはFlinkの内部形式です。上流のデータがRowData形式の場合、RowDataSerializerを使用してデータをシリアライズする必要があります。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setParallelism(1);

DorisSink.Builder<RowData> builder = DorisSink.builder();

Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
// When writing json data from the upstream, the following configuration needs to be enabled
// properties.setProperty("read_json_by_line", "true");
// properties.setProperty("format", "json");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setDeletable(false).setStreamLoadProp(properties);

// flink rowdata‘s schema
String[] fields = {"id","name", "age"};
DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(256), DataTypes.INT()};

builder.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(
RowDataSerializer.builder() // serialize according to rowdata
.setType(LoadConstants.CSV)
.setFieldDelimiter(",")
.setFieldNames(fields)
.setFieldType(types)
.build())
.setDorisOptions(dorisBuilder.build());

// mock rowdata source
DataStream<RowData> source =
env.fromElements("")
.flatMap(
new FlatMapFunction<String, RowData>() {
@Override
public void flatMap(String s, Collector<RowData> out)
throws Exception {
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, 1);
genericRowData.setField(1, StringData.fromString("Michael"));
genericRowData.setField(2, 18);
out.collect(genericRowData);

GenericRowData genericRowData2 = new GenericRowData(3);
genericRowData2.setField(0, 2);
genericRowData2.setField(1, StringData.fromString("David"));
genericRowData2.setField(2, 38);
out.collect(genericRowData2);
}
});

source.sinkTo(builder.build());
env.execute("doris test");

完全なコードについては、DorisSinkExampleRowData.javaを参照してください。

Debezium Format

FlinkCDCからのデータやKafkaのDebezium formatなど、Debezium形式の上流データについては、JsonDebeziumSchemaSerializerを使用してデータをシリアライズできます。

// enable checkpoint
env.enableCheckpointing(10000);

Properties props = new Properties();
props.setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("").build();

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix")
.setStreamLoadProp(props)
.setDeletable(true);

DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisOptions)
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());

env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.sinkTo(builder.build());

完全なコードについては、CDCSchemaChangeExample.javaを参照してください。

マルチテーブル書き込み形式

現在、DorisSinkは単一のSinkで複数のテーブルの同期をサポートしています。データとデータベース/テーブル情報の両方をSinkに渡し、RecordWithMetaSerializerを使用してシリアライズする必要があります。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("")
.setUsername("root")
.setPassword("");

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();

executionBuilder
.setLabelPrefix("label-doris")
.setStreamLoadProp(properties)
.setDeletable(false)
.setBatchMode(true);

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build())
.setSerializer(new RecordWithMetaSerializer());

RecordWithMeta record = new RecordWithMeta("test", "student_1", "1,David,18");
RecordWithMeta record1 = new RecordWithMeta("test", "student_2", "1,Jack,28");
env.fromCollection(Arrays.asList(record, record1)).sinkTo(builder.build());

完全なコードについては、DorisSinkMultiTableExample.javaを参照してください。

Lookup Join

Lookup Joinを使用することで、FlinkにおけるディメンションテーブルのJoinを最適化できます。ディメンションテーブルのJoinにFlink JDBC Connectorを使用する場合、以下の問題が発生する可能性があります:

  • Flink JDBC Connectorは同期クエリモードを使用するため、上流のデータ(例:Kafkaからのデータ)がレコードを送信した後、即座にDorisディメンションテーブルにクエリを実行します。これにより、高並行性シナリオにおいて高いクエリレイテンシが発生します。

  • JDBCを介して実行されるクエリは、通常レコードごとのポイントルックアップですが、Dorisでは効率性を向上させるためにバッチクエリを推奨しています。

Flink Doris ConnectorでディメンションテーブルのJoinにLookup Joinを使用することで、以下の利点が得られます:

  • 上流データのバッチキャッシュにより、レコードごとのクエリによって引き起こされる高レイテンシとデータベース負荷を回避します。

  • Joinクエリの非同期実行により、データスループットを向上させ、Dorisへのクエリ負荷を軽減します。

CREATE TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);

create table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'root',
'password' = ''
);

SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city

完全データベース同期

Flink Doris ConnectorはFlink CDC (Flink CDC Documentation)と統合されており、MySQLなどのリレーショナルデータベースをDorisに同期することがより簡単になります。この統合には、自動テーブル作成、スキーマ変更なども含まれます。同期でサポートされているデータベースには、MySQL、Oracle、PostgreSQL、SQLServer、MongoDB、DB2があります。

Note
  1. 完全データベース同期を使用する場合、$FLINK_HOME/libディレクトリに対応するFlink CDCの依存関係(Fat Jar)を追加する必要があります。例えば、flink-sql-connector-mysql-cdc-${version}.jarflink-sql-connector-oracle-cdc-${version}.jarなどです。FlinkCDCバージョン3.1以降は、以前のバージョンとは互換性がありません。依存関係は以下のリンクからダウンロードできます:FlinkCDC 3.xFlinkCDC 2.x
  2. Connector 24.0.0以降のバージョンでは、必要なFlink CDCバージョンは3.1以上である必要があります。ここからダウンロードできます。Flink CDCを使用してMySQLとOracleを同期する場合は、$FLINK_HOME/libの下に関連するJDBCドライバーも追加する必要があります。

MySQL全データベース同期

Flinkクラスターを起動した後、以下のコマンドを直接実行できます:

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf port=3306 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

Oracle 全データベース同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
oracle-sync-database \
--database test_db \
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="password" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

PostgreSQL データベース全体同期

<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1\
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

SQLServer データベース全体同期

<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
sqlserver-sync-database \
--database db1\
--sqlserver-conf hostname=127.0.0.1 \
--sqlserver-conf port=1433 \
--sqlserver-conf username=sa \
--sqlserver-conf password="123456" \
--sqlserver-conf database-name=CDC_DB \
--sqlserver-conf schema-name=dbo \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

DB2 データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
db2-sync-database \
--database db2_test \
--db2-conf hostname=127.0.0.1 \
--db2-conf port=50000 \
--db2-conf username=db2inst1 \
--db2-conf password=doris123456 \
--db2-conf database-name=testdb \
--db2-conf schema-name=DB2INST1 \
--including-tables "FULL_TYPES|CUSTOMERS" \
--single-sink true \
--use-new-schema-change true \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

MongoDB データベース全体の同期

<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.18-24.0.1.jar \
mongodb-sync-database \
--database doris_db \
--schema-change-mode debezium_structure \
--mongodb-conf hosts=127.0.0.1:27017 \
--mongodb-conf username=flinkuser \
--mongodb-conf password=flinkpwd \
--mongodb-conf database=test \
--mongodb-conf scan.startup.mode=initial \
--mongodb-conf schema.sample-percent=0.2 \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--sink-conf sink.enable-2pc=false \
--table-conf replication_num=1

AWS Aurora MySQL データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.18-25.0.0.jar \
mysql-sync-database \
--database testwd \
--mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--mysql-conf server-time-zone=UTC \
--including-tables "student" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

AWS RDS MySQL データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.18-25.0.0.jar \
mysql-sync-database \
--database testwd \
--mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--mysql-conf server-time-zone=UTC \
--including-tables "student" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

使用手順

パラメータ設定

一般設定項目

Keyデフォルト値必須コメント
fenodes--YDoris FE httpアドレス。複数のアドレスがサポートされており、カンマで区切る必要があります。
benodes--NDoris BE httpアドレス。複数のアドレスがサポートされており、カンマで区切る必要があります。
jdbc-url--NJDBC接続情報、例:jdbc:mysql://127.0.0.1:9030。
table.identifier--YDorisテーブル名、例:db.tbl。
username--YDorisにアクセスするためのユーザー名。
password--YDorisにアクセスするためのパスワード。
auto-redirectTRUENStreamLoadリクエストをリダイレクトするかどうか。有効にすると、StreamLoadはFE経由で書き込みを行い、BE情報を明示的に取得しなくなります。
doris.request.retries3NDorisへのリクエスト送信のリトライ回数。
doris.request.connect.timeout30sNDorisへのリクエスト送信の接続タイムアウト。
doris.request.read.timeout30sNDorisへのリクエスト送信の読み取りタイムアウト。

Source設定

Keyデフォルト値必須コメント
doris.request.query.timeout21600sNDorisクエリのタイムアウト。デフォルト値は6時間です。
doris.request.tablet.size1N1つのPartitionに対応するDoris Tabletsの数。この値を小さく設定するほど、より多くのPartitionsが生成され、Flink側の並列性を向上させることができます。ただし、Dorisにはより多くの負荷をかけることになります。
doris.batch.size4064NBEから一度に読み取る最大行数。この値を増やすことで、FlinkとDoris間で確立される接続数を削減し、ネットワーク遅延による追加の時間オーバーヘッドを減らすことができます。
doris.exec.mem.limit8192mbN単一クエリのメモリ制限。デフォルトは8GB、単位はバイトです。
source.use-flight-sqlFALSEN読み取りにArrow Flight SQLを使用するかどうか。
source.flight-sql-port-N読み取りにArrow Flight SQLを使用する際のFEのarrow_flight_sql_port。

DataStream固有の設定

Keyデフォルト値必須コメント
doris.read.field--NDorisテーブルを読み取るための列名のリスト。複数の列はカンマで区切る必要があります。
doris.filter.query--N読み取りデータをフィルタリングするための式。この式はDorisに渡されます。Dorisはこの式を使用してソースデータのフィルタリングを完了します。例:age=18。

Sink設定

Keyデフォルト値必須コメント
sink.label-prefix--YStream loadインポートに使用されるラベルプレフィックス。2pcシナリオでは、FlinkのEOSセマンティクスを保証するため、グローバルに一意である必要があります。
sink.properties.*--NStream Loadのインポートパラメータ。例:'sink.properties.column_separator' = ', 'は列区切り文字を定義し、'sink.properties.escape_delimiters' = 'true'は\x01のような区切り文字としての特殊文字をバイナリ0x01に変換することを意味します。JSON形式のインポートの場合、'sink.properties.format' = 'json'、'sink.properties.read_json_by_line' = 'true'。詳細なパラメータについては、こちらを参照してください。Group Commitモードの場合、例:'sink.properties.group_commit' = 'sync_mode'はgroup commitを同期モードに設定します。Flinkコネクタはバージョン1.6.2以降、インポート設定group commitをサポートしています。詳細な使用方法と制限については、group commitを参照してください。
sink.enable-deleteTRUEN削除を有効にするかどうか。このオプションはDorisテーブルでバッチ削除機能が有効になっている必要があり(Doris 0.15+バージョンではデフォルトで有効)、Uniqueモデルのみをサポートします。
sink.enable-2pcTRUEN2フェーズコミット(2pc)を有効にするかどうか。デフォルトはtrueで、Exactly-Onceセマンティクスを保証します。2フェーズコミットの詳細については、こちらを参照してください。
sink.buffer-size1MBN書き込みデータキャッシュバッファのサイズ(バイト単位)。変更は推奨されず、デフォルト設定を使用できます。
sink.buffer-count3N書き込みデータキャッシュバッファの数。変更は推奨されず、デフォルト設定を使用できます。
sink.max-retries3NCommit失敗後の最大リトライ回数。デフォルトは3回です。
sink.enable.batch-modeFALSENバッチモードでDorisに書き込むかどうか。有効にすると、書き込みタイミングはCheckpointに依存せず、sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes、sink.buffer-flush.intervalなどのパラメータによって制御されます。同時に、有効にした後はExactly-onceセマンティクスは保証されませんが、Uniqモデルの助けを借りて冪等性を実現できます。
sink.flush.queue-size2Nバッチモードでのキャッシュキューのサイズ。
sink.buffer-flush.max-rows500000Nバッチモードで単一バッチで書き込まれる最大行数。
sink.buffer-flush.max-bytes100MBNバッチモードで単一バッチで書き込まれる最大バイト数。
sink.buffer-flush.interval10sNバッチモードでキャッシュを非同期でフラッシュする間隔。
sink.ignore.update-beforeTRUENupdate-beforeイベントを無視するかどうか。デフォルトは無視します。

Lookup Join設定

Keyデフォルト値必須コメント
lookup.cache.max-rows-1Nlookupキャッシュ内の最大行数。デフォルト値は-1で、キャッシュが有効でないことを意味します。
lookup.cache.ttl10sNlookupキャッシュの最大時間。デフォルトは10秒です。
lookup.max-retries1Nlookupクエリ失敗後のリトライ回数。
lookup.jdbc.asyncFALSEN非同期lookupを有効にするかどうか。デフォルトはfalseです。
lookup.jdbc.read.batch.size128N非同期lookupで各クエリの最大バッチサイズ。
lookup.jdbc.read.batch.queue-size256N非同期lookup中の中間バッファキューのサイズ。
lookup.jdbc.read.thread-size3N各タスクでlookup用のjdbcスレッド数。

フルデータベース同期設定

構文

<FLINK_HOME>bin/flink run \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.6.1.jar \
<mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database|mongodb-sync-database> \
--database <doris-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <doris-table-prefix>] \
[--table-suffix <doris-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
--oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
--postgres-conf <postgres-cdc-source-conf> [--postgres-conf <postgres-cdc-source-conf> ...] \
--sqlserver-conf <sqlserver-cdc-source-conf> [--sqlserver-conf <sqlserver-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]

設定

KeyComment
--job-nameFlinkタスクの名前で、オプションです。
--databaseDorisに同期するデータベースの名前。
--table-prefixDorisテーブルのプレフィックス名。例:--table-prefix ods_。
--table-suffixDorisテーブルのサフィックス名。プレフィックスと同様です。
--including-tables同期が必要なMySQLテーブル。複数のテーブルは|で区切ることができ、正規表現がサポートされています。例:--including-tables table1。
--excluding-tables同期が不要なテーブル。使用方法は--including-tablesと同じです。
--mysql-confMySQL CDCSourceの設定。例:--mysql-conf hostname=127.0.0.1。MySQL-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-nameは必須です。同期するデータベースとテーブルに非主キーテーブルが含まれている場合、scan.incremental.snapshot.chunk.key-columnを設定する必要があり、非null型のフィールドを1つだけ選択できます。例:scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...、異なるデータベースとテーブルの列はカンマで区切られます。
--oracle-confOracle CDCSourceの設定。例:--oracle-conf hostname=127.0.0.1。Oracle-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。
--postgres-confPostgres CDCSourceの設定。例:--postgres-conf hostname=127.0.0.1。Postgres-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-name、slot.nameは必須です。
--sqlserver-confSQLServer CDCSourceの設定。例:--sqlserver-conf hostname=127.0.0.1。SQLServer-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。
--db2-confSQLServer CDCSourceの設定。例:--db2-conf hostname=127.0.0.1。DB2-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。
--sink-confDoris Sinkのすべての設定は[こちら](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#General Configuration Items)で確認できます。
--mongodb-confMongoDB CDCSourceの設定。例:--mongodb-conf hosts=127.0.0.1:27017。Mongo-CDCのすべての設定はこちらで確認できます。その中で、hosts、username、password、databaseは必須です。--mongodb-conf schema.sample-percentは、MongoDBデータを自動的にサンプリングしてDorisにテーブルを作成するための設定で、デフォルト値は0.2です。
--table-confDorisテーブルの設定項目、つまりpropertiesに含まれる内容(table-bucketsを除く、これはpropertiesの属性ではありません)。例:--table-conf replication_num=1、--table-conf table-buckets="tbl1:10,tbl2:20,a.:30,b.:40,.*:50"は正規表現の順序で異なるテーブルのバケット数を指定することを意味します。マッチしない場合は、BUCKETS AUTOメソッドを使用してテーブルを作成します。
--schema-change-modeスキーマ変更を解析するモード。debezium_structureとsql_parserが含まれます。デフォルトでdebezium_structureモードが使用されます。debezium_structureモードは、上流のCDCがデータを同期する際に使用するデータ構造を解析し、この構造を解析してDDL変更操作を判断します。sql_parserモードは、上流のCDCがデータを同期する際のDDLステートメントを解析してDDL変更操作を判断するため、この解析モードはより正確です。使用例:--schema-change-mode debezium_structure。この機能は24.0.0以降のバージョンで利用可能です。
--single-sink単一のSinkを使用してすべてのテーブルを同期するかどうか。有効にした後、上流で新しく作成されたテーブルも自動的に識別し、自動的にテーブルを作成できます。
--multi-to-one-origin複数の上流テーブルを同じテーブルに書き込む際のソーステーブルの設定。例:--multi-to-one-origin "a_.*|b_.*"、#208を参照。
--multi-to-one-targetmulti-to-one-originと組み合わせて使用し、ターゲットテーブルの設定。例:--multi-to-one-target "a|b"
--create-table-onlyテーブルの構造のみを同期するかどうか。

型マッピング

Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
STRINGSTRING
DECIMALV2DECIMAL
ARRAYARRAY
MAPSTRING
JSONSTRING
VARIANTSTRING
IPV4STRING
IPV6STRING

監視メトリクス

Flinkは、Flinkクラスターの指標を監視するための複数のMetricsを提供しています。以下は、Flink Doris Connectorに新しく追加された監視メトリクスです。

NameMetric TypeDescription
totalFlushLoadBytesCounterフラッシュおよびインポートされた総バイト数。
flushTotalNumberRowsCounterインポートおよび処理された総行数。
totalFlushLoadedRowsCounter正常にインポートされた総行数。
totalFlushTimeMsCounter正常なインポートが完了するまでにかかった総時間。
totalFlushSucceededNumberCounterインポートが正常に完了した回数。
totalFlushFailedNumberCounterインポートが失敗した回数。
totalFlushFilteredRowsCounterデータ品質が基準に満たない総行数。
totalFlushUnselectedRowsCounterwhere条件によってフィルタリングされた総行数。
beginTxnTimeMsHistogramFeにトランザクション開始を要求するのにかかった時間(ミリ秒)。
putDataTimeMsHistogramFeにインポートデータ実行プランの取得を要求するのにかかった時間。
readDataTimeMsHistogramデータの読み取りにかかった時間。
writeDataTimeMsHistogramデータ書き込み操作の実行にかかった時間。
commitAndPublishTimeMsHistogramFeにトランザクションのコミットおよび公開を要求するのにかかった時間。
loadTimeMsHistogramインポートが完了するまでにかかった時間。

ベストプラクティス

FlinkSQLでCDC経由でMySQLデータに迅速接続

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
id int
,name VARCHAR
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);

-- Supports synchronizing insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true', -- Synchronize delete events
'sink.label-prefix' = 'doris_label'
);

insert into doris_sink select id,name from cdc_mysql_source;
CREATE TABLE doris_sink (
id INT,
name STRING,
bank STRING,
age int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.columns' = 'id,name,bank,age', -- Columns that need to be updated
'sink.properties.partial_columns' = 'true' -- Enable partial column updates
);
CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.bitmap_test',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label',
'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)

FlinkCDC キー列の更新

一般的に、ビジネスデータベースでは、番号がテーブルの主キーとしてよく使用されます。例えば、Studentテーブルでは、番号(id)が主キーとして使用されます。しかし、ビジネスが発展するにつれて、データに対応する番号が変更される場合があります。このシナリオでは、Flink CDC + Doris Connectorを使用してデータを同期する際、Dorisの主キー列のデータを自動的に更新できます。

原理

Flink CDCの基盤収集ツールはDebeziumです。Debeziumは内部的にopフィールドを使用して対応する操作を識別します。opフィールドの値は、c、u、d、rであり、それぞれcreate、update、delete、readに対応します。主キー列の更新については、Flink CDCはDELETEおよびINSERTイベントを下流に送信し、データがDorisに同期された後、Dorisの主キー列のデータが自動的に更新されます。

使用方法

Flinkプログラムは上記のCDC同期例を参照できます。タスクの送信が成功した後、MySQL側で主キー列を更新するステートメントを実行し(例:update student set id = '1002' where id = '1001')、その後Dorisのデータが変更されます。

指定された列に基づくFlinkのデータ削除

一般的に、Kafkaのメッセージは特定のフィールドを使用して操作タイプをマークします。例えば、{"op_type":"delete",data:{...}}のようなものです。この種のデータについては、op_type=deleteのデータを削除することが期待されます。

DorisSinkは、デフォルトでRowKindに従ってイベントのタイプを区別します。通常、CDCの場合、イベントタイプを直接取得でき、隠し列__DORIS_DELETE_SIGN__に値を割り当てて削除の目的を達成できます。しかし、Kafkaの場合は、ビジネスロジックに従って判断し、隠し列の値を明示的に渡す必要があります。

-- For example, the upstream data:{"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
data STRING,
op_type STRING
) WITH (
'connector' = 'kafka',
...
);

CREATE TABLE DORIS_SINK(
id INT,
name STRING,
__DORIS_DELETE_SIGN__ INT
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = '',
'sink.enable-delete' = 'false', -- false means not to obtain the event type from RowKind
'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- Explicitly specify the import columns of streamload
);

INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name,
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
from KAFKA_SOURCE;

一般的に、MySQLなどの上流データソースを同期する際、上流でフィールドの追加や削除を行う場合、DorisでSchema Change操作を同期する必要があります。

このシナリオでは、通常DataStream API用のプログラムを作成し、DorisSinkが提供するJsonDebeziumSchemaSerializerシリアライザーを使用してSchemaChangeを自動的に実行する必要があります。詳細については、CDCSchemaChangeExample.javaを参照してください。

Connectorが提供する全データベース同期ツールでは、追加の設定は不要で、上流のDDLが自動的に同期され、DorisでSchemaChange操作が実行されます。

よくある質問 (FAQ)

  1. errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

    Exactly-Onceシナリオでは、Flink Jobは最新のCheckpoint/Savepointから再起動する必要があります。そうしないと上記のエラーが報告されます。Exactly-Onceが不要な場合、この問題は2PC送信を無効にする(sink.enable-2pc=false)か、異なるsink.label-prefixに変更することでも解決できます。

  2. errCode = 2, detailMessage = transaction [19650] not found

    これはCommitステージ中に発生します。checkpointに記録されたトランザクションIDがFE側で期限切れになっています。この時点で再びコミットすると、上記のエラーが発生します。この時点では、checkpointから開始することは不可能です。その後、fe.confstreaming_label_keep_max_second設定を変更することで有効期限を延長できます。デフォルトの有効期限は12時間です。dorisバージョン2.0以降では、fe.conflabel_num_threshold設定(デフォルト2000)によっても制限され、これを増加させるか-1に変更できます(-1は時間によってのみ制限されることを意味します)。

  3. errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

    これは同じデータベースへの同時インポートが100を超えているためです。fe.confmax_running_txn_num_per_dbパラメータを調整することで解決できます。詳細については、max_running_txn_num_per_dbを参照してください。

    同時に、labelを頻繁に変更してタスクを再起動することもこのエラーにつながる可能性があります。2pcシナリオ(Duplicate/Aggregateモデル用)では、各タスクのlabelは一意である必要があります。そしてcheckpointから再起動する際、Flinkタスクは事前コミットに成功したがまだコミットされていないトランザクションを能動的に中止します。頻繁なlabel変更と再起動により、中止できない事前コミット成功トランザクションが大量に発生し、トランザクションを占有します。Uniqueモデルでは、2pcを無効にしてべき等書き込みを実現することもできます。

  4. tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235

    これは通常Connectorバージョン1.1.0以前で発生し、書き込み頻度が高すぎてバージョン数が過多になることが原因です。sink.batch.sizesink.batch.intervalパラメータを設定してStreamloadの頻度を下げることができます。Connectorバージョン1.1.0以降では、デフォルトの書き込みタイミングはCheckpointによって制御され、Checkpoint間隔を増加させることで書き込み頻度を下げることができます。

  5. Flinkがインポートする際にダーティデータをスキップする方法は?

    Flinkがデータをインポートする際、フィールド形式や長さの問題などのダーティデータがあると、StreamLoadでエラーが発生します。この時、Flinkは再試行を続けます。このようなデータをスキップする必要がある場合、StreamLoadのstrict modeを無効にする(strict_mode=falsemax_filter_ratio=1を設定)か、Sinkオペレータの前でデータをフィルタリングできます。

  6. FlinkマシンとBEマシン間のネットワークが接続されていない場合の設定方法は?

    FlinkがDorisへの書き込みを開始すると、DorisはBEに書き込み操作をリダイレクトします。この時、返されるアドレスはBEの内部ネットワークIPで、これはshow backendsコマンドで確認できるIPです。この時FlinkとDorisの間にネットワーク接続がない場合、エラーが報告されます。この場合、benodesでBEの外部ネットワークIPを設定できます。

  7. stream load error: HTTP/1.1 307 Temporary Redirect

    FlinkはまずFEにリクエストし、307を受信後、リダイレクト後にBEにリクエストします。FEがFullGC/高負荷/ネットワーク遅延状態にある場合、HttpClientはデフォルトで一定期間(3秒)内にレスポンスを待たずにデータを送信します。リクエストボディがデフォルトでInputStreamであるため、307レスポンスを受信した際にデータを再実行できず、直接エラーが報告されます。この問題を解決する方法は3つあります:1. Connector25.1.0以上にアップグレードしてデフォルト時間を増加させる;2. auto-redirect=falseに変更してBEに直接リクエストを開始する(一部のクラウドシナリオには適用不可);3. unique keyモデルでbatch modeを有効にする。