メインコンテンツまでスキップ
バージョン: 2.1

Seatunnel Doris Sink

SeaTunnelについて

SeaTunnelは、大量データのリアルタイム同期をサポートする、非常に使いやすい超高性能分散データ統合プラットフォームです。毎日数百億件のデータを安定的かつ効率的に同期できます。

Connector-V2

SeaTunnelのconnector-v2は、バージョン2.3.1以降でDoris Sinkをサポートし、exactly-onceライトとCDCデータ同期をサポートします

Plugin Code

SeaTunnel Doris Sink Plugin Code

Options

nametyperequireddefault value
fenodesstringyes-
usernamestringyes-
passwordstringyes-
table.identifierstringyes-
sink.label-prefixstringyes-
sink.enable-2pcboolnotrue
sink.enable-deleteboolnofalse
doris.configmapyes-

fenodes [string]

DorisクラスターのFE Nodesアドレス、形式は"fe_ip:fe_http_port, ..."です

username [string]

Dorisユーザーのユーザー名

password [string]

Dorisユーザーのパスワード

table.identifier [string]

DorisTableの名前、形式はDBName.TableNameです

sink.label-prefix [string]

stream loadインポートで使用されるラベルプレフィックス。2pcシナリオでは、SeaTunnelのEOSセマンティクスを保証するためにグローバル一意性が必要です。

sink.enable-2pc [bool]

2フェーズコミット(2pc)を有効にするかどうか、デフォルトはtrueで、Exactly-Onceセマンティクスを保証します。2フェーズコミットについては、こちらを参照してください。

sink.enable-delete [bool]

削除を有効にするかどうか。このオプションでは、DorisTableでbatch delete機能を有効にする必要があり(0.15+バージョンではデフォルトで有効)、Uniqueモデルのみをサポートします。詳細はこのリンクで確認できます:

batch delete

doris.config [map]

stream loadのdata_descパラメータ、詳細はこのリンクで確認できます:

More Stream Load parameters

Example

JSON形式を使用してデータをインポート

sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}

CSV形式を使用してデータをインポートする

sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_csv"
doris.config = {
format = "csv"
column_separator = ","
}
}
}

Connector-V1

Plugin Code

Seatunnel Flink Sink Doris plugin code

Options

nametyperequireddefault valueengine
fenodesstringyes-Flink
databasestringyes-Flink
tablestringyes-Flink
userstringyes-Flink
passwordstringyes-Flink
batch_sizeintno100Flink
intervalintno1000Flink
max_retriesintno1Flink
doris.*-no-Flink

fenodes [string]

Doris Fe httpのURL、例: 127.0.0.1:8030

database [string]

Dorisデータベース

table [string]

DorisTable

user [string]

Dorisユーザー

password [string]

Dorisパスワード

batch_size [int]

一度にDorisに書き込む最大行数、デフォルト値は100

interval [int]

フラッシュ間隔(ミリ秒)、この時間の後、非同期スレッドがキャッシュ内のデータをDorisに書き込みます。0に設定すると定期書き込みをオフにします。

max_retries [int]

Dorisへの書き込みが失敗した後の再試行回数

doris.* [string]

Stream loadのインポートパラメータ。例: 'doris.column_separator' = ', ' など。

その他のStream Loadパラメータ設定

Examples

Socket To Doris

env {
execution.parallelism = 1
}
source {
SocketStream {
host = 127.0.0.1
port = 9999
result_table_name = "socket"
field_name = "info"
}
}
transform {
}
sink {
DorisSink {
fenodes = "127.0.0.1:8030"
user = root
password = 123456
database = test
table = test_tbl
batch_size = 5
max_retries = 1
interval = 5000
}
}

Start コマンド

sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf

Spark Sink Doris

プラグインコード

Seatunnel Spark Sink Doris plugin code

オプション

nametyperequireddefault valueengine
fenodesstringyes-Spark
databasestringyes-Spark
tablestringyes-Spark
userstringyes-Spark
passwordstringyes-Spark
batch_sizeintyes100Spark
doris.*stringno-Spark

fenodes [string]

Doris FEアドレス:8030

database [string]

Dorisターゲットデータベース名

table [string]

DorisターゲットTable名

user [string]

Dorisユーザー名

password [string]

Dorisユーザーのパスワード

batch_size [string]

バッチあたりのDoris送信件数

doris. [string] Doris stream_loadプロパティ。'doris.'プレフィックス + stream_loadプロパティを使用できます

その他のDoris stream_load設定

HiveからDorisへ

設定プロパティ

env{
spark.app.name = "hive2doris-template"
}

spark {
spark.sql.catalogImplementation = "hive"
}

source {
hive {
preSql = "select * from tmp.test"
result_table_name = "test"
}
}

transform {
}


sink {

Console {

}

Doris {
fenodes="xxxx:8030"
database="gl_mint_dim"
table="dim_date"
user="root"
password="root"
batch_size=1000
doris.column_separator="\t"
doris.columns="date_key,date_value,day_in_year,day_in_month"
}
}

Start コマンド

sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf