Seatunnel Doris Sink
SeaTunnelについて
SeaTunnelは、大量データのリアルタイム同期をサポートする、非常に使いやすい超高性能分散データ統合プラットフォームです。毎日数百億のデータを安定的かつ効率的に同期することができます。
Connector-V2
SeaTunnelのconnector-v2は、バージョン2.3.1からDoris Sinkをサポートし、exactly-once書き込みとCDCデータ同期をサポートしています
Plugin Code
SeaTunnel Doris Sink Plugin Code
Options
| name | type | required | default value |
|---|---|---|---|
| fenodes | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| table.identifier | string | yes | - |
| sink.label-prefix | string | yes | - |
| sink.enable-2pc | bool | no | true |
| sink.enable-delete | bool | no | false |
| doris.config | map | yes | - |
fenodes [string]
DorisクラスターのFE Nodesアドレス。フォーマットは"fe_ip:fe_http_port, ..."です
username [string]
Dorisユーザーのユーザー名
password [string]
Dorisユーザーのパスワード
table.identifier [string]
DorisTableの名前。フォーマットはDBName.TableNameです
sink.label-prefix [string]
stream loadインポートで使用されるラベルプレフィックス。2pcシナリオでは、SeaTunnelのEOSセマンティクスを保証するためにグローバルな一意性が必要です。
sink.enable-2pc [bool]
2相コミット(2pc)を有効にするかどうか。デフォルトはtrueで、Exactly-Onceセマンティクスを保証します。2相コミットについては、こちらを参照してください。
sink.enable-delete [bool]
削除を有効にするかどうか。このオプションは、DorisTableでバッチ削除機能を有効にする必要があり(0.15+バージョンではデフォルトで有効)、Uniqueモデルのみをサポートします。詳細はこちらのリンクで確認できます:
doris.config [map]
stream loadのdata_descパラメーター。詳細はこちらのリンクで確認できます:
Example
JSON形式を使用してデータをインポート
sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}
CSV形式を使用してデータをインポートする
sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_csv"
doris.config = {
format = "csv"
column_separator = ","
}
}
}
Connector-V1
Flink Sink Doris
プラグインコード
Seatunnel Flink Sink Doris plugin code
オプション
| name | type | required | default value | engine |
|---|---|---|---|---|
| fenodes | string | yes | - | Flink |
| database | string | yes | - | Flink |
| table | string | yes | - | Flink |
| user | string | yes | - | Flink |
| password | string | yes | - | Flink |
| batch_size | int | no | 100 | Flink |
| interval | int | no | 1000 | Flink |
| max_retries | int | no | 1 | Flink |
| doris.* | - | no | - | Flink |
fenodes [string]
Doris Fe http url、例:127.0.0.1:8030
database [string]
Dorisデータベース
table [string]
DorisTable
user [string]
Dorisユーザー
password [string]
Dorisパスワード
batch_size [int]
一度にDorisに書き込む行数の最大値、デフォルト値は100
interval [int]
フラッシュ間隔(ミリ秒)。この間隔後に、非同期スレッドがキャッシュ内のデータをDorisに書き込む。0に設定すると定期的な書き込みを無効にする。
max_retries [int]
Dorisへの書き込みが失敗した後の再試行回数
doris.* [string]
Stream loadのインポートパラメータ。例:'doris.column_separator' = ', ' など。
例
Socket To Doris
env {
execution.parallelism = 1
}
source {
SocketStream {
host = 127.0.0.1
port = 9999
result_table_name = "socket"
field_name = "info"
}
}
transform {
}
sink {
DorisSink {
fenodes = "127.0.0.1:8030"
user = root
password = 123456
database = test
table = test_tbl
batch_size = 5
max_retries = 1
interval = 5000
}
}
Start コマンド
sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf
Spark Sink Doris
プラグインコード
Seatunnel Spark Sink Doris plugin code
オプション
| name | type | required | default value | engine |
|---|---|---|---|---|
| fenodes | string | yes | - | Spark |
| database | string | yes | - | Spark |
| table | string | yes | - | Spark |
| user | string | yes | - | Spark |
| password | string | yes | - | Spark |
| batch_size | int | yes | 100 | Spark |
| doris.* | string | no | - | Spark |
fenodes [string]
Doris FEアドレス:8030
database [string]
Dorisターゲットデータベース名
table [string]
DorisターゲットTable名
user [string]
Dorisユーザー名
password [string]
Dorisユーザーのパスワード
batch_size [string]
バッチあたりのDoris送信数
doris. [string]
Doris stream_loadプロパティ。'doris.'プレフィックス + stream_loadプロパティを使用できます
More Doris stream_load Configurations
例
Hive to Doris
設定プロパティ
env{
spark.app.name = "hive2doris-template"
}
spark {
spark.sql.catalogImplementation = "hive"
}
source {
hive {
preSql = "select * from tmp.test"
result_table_name = "test"
}
}
transform {
}
sink {
Console {
}
Doris {
fenodes="xxxx:8030"
database="gl_mint_dim"
table="dim_date"
user="root"
password="root"
batch_size=1000
doris.column_separator="\t"
doris.columns="date_key,date_value,day_in_year,day_in_month"
}
}
Start コマンド
sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf