メインコンテンツまでスキップ
バージョン: 26.x

継続負荷

概要

Dorisでは、Job + TVFアプローチを使用して継続的なインポートタスクを作成できます。Jobを送信後、Dorisは継続的にインポートジョブを実行し、リアルタイムでTVFをクエリしてデータをDorisTableに書き込みます。

サポートされているTVF

S3 TVF

基本原理

S3

S3の指定されたディレクトリ内のファイルを反復処理し、各ファイルをリストに分割して小さなバッチでDorisTableに書き込みます。

増分読み込み方法

タスク作成後、Dorisは継続的に指定されたパスからデータを読み込み、固定の頻度で新しいファイルをポーリングします。

注意:新しいファイルの名前は、最後にインポートされたファイルの名前よりも辞書順で大きい必要があります。そうでない場合、Dorisはそれを新しいファイルとして扱いません。例えば、ファイルがfile1、file2、file3と命名されている場合、これらは順次インポートされます。後からfile0という名前の新しいファイルが追加された場合、最後にインポートされたファイルfile3よりも辞書順で小さいため、Dorisはそれをインポートしません。

クイックスタート

インポートジョブの作成

S3ディレクトリでCSV拡張子のファイルが定期的に生成されると仮定します。その場合、Jobを作成できます。

CREATE JOB my_job 
ON STREAMING
DO
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

インポートステータスの確認

select * from job(type=insert) where ExecuteType = "streaming"
Id: 1758538737484
Name: my_job1
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: INSERT INTO test.`student1`
SELECT * FROM S3
(
"uri" = "s3://bucket/s3/demo/*.csv",
"format" = "csv",
"column_separator" = ",",
"s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
"s3.region" = "ap-southeast-1",
"s3.access_key" = "",
"s3.secret_key" = ""
)
CreateTime: 2025-09-22 19:24:51
SucceedTaskCount: 1
FailedTaskCount: 0
CanceledTaskCount: 0
Comment: \N
Properties: \N
CurrentOffset: {"fileName":"s3/demo/test/1.csv"}
EndOffset: {"fileName":"s3/demo/test/1.csv"}
LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256}
ErrorMsg: \N
JobRuntimeMsg: \N

インポートジョブの一時停止

PAUSE JOB WHERE jobname = <job_name> ;

インポートジョブを再開する

RESUME JOB where jobName = <job_name> ;

import job の変更

-- -- Supports modifying Job properties and insert statements
Alter Job jobName
PROPERTIES(
"session.insert_max_filter_ratio"="0.5"
)
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

インポートされたジョブの削除

DROP JOB where jobName = <job_name> ;

Reference

Import command

Job + TVF常駐インポートジョブを作成する構文は以下の通りです:

CREATE JOB <job_name>
ON STREAMING
[job_properties]
[ COMMENT <comment> ]
DO <Insert_Command>

モジュールの説明は以下の通りです:

| Module | デスクリプション |

| -------------- | ------------------------------------------------------------ | | job_name | タスク名 | | job_properties | Jobを指定するために使用される一般的なインポートパラメータ | | comment | Jobを説明するために使用される備考 | | Insert_Command | 実行するSQL;現在はInsert into table select * from s3()のみサポート |

インポートパラメータ

FE設定パラメータ

Parameterデフォルト値
max_streaming_job_num1024Streamingジョブの最大数
job_streaming_task_exec_thread_num10StreamingTasksを実行するために使用されるスレッド数
max_streaming_task_show_count100StreamingTaskのメモリに保持されるタスク実行記録の最大数

インポート設定パラメータ

Parameterデフォルト値デスクリプション
session.*Nonejob_propertiesでのすべてのセッション変数の設定をサポート。インポート変数については、[Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#Import 構成 パラメータ)を参照してください
s3.max_batch_files256累積ファイル数がこの値に達したときにインポート書き込みをトリガーします。
s3.max_batch_bytes10G累積データ量がこの値に達したときにインポート書き込みをトリガーします。
max_interval10s上流で新しいファイルやデータが追加されていない場合のアイドル・スケジューリング間隔。

インポートステータス

Job

ジョブが正常に送信された後、**select * from job("insert") where ExecuteType = 'Streaming'**を実行してジョブの現在のステータスを確認できます。

select * from job(type=insert) where ExecuteType = "streaming"
Id: 1758538737484
Name: my_job1
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: INSERT INTO test.`student1`
SELECT * FROM S3
(
"uri" = "s3://wd-test123/s3/demo/*.csv",
"format" = "csv",
"column_separator" = ",",
"s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
"s3.region" = "ap-southeast-1",
"s3.access_key" = "",
"s3.secret_key" = ""
)
CreateTime: 2025-09-22 19:24:51
SucceedTaskCount: 5
FailedTaskCount: 0
CanceledTaskCount: 0
Comment:
Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"}
CurrentOffset: {"fileName":"s3/demo/test/1.csv"}
EndOffset: {"fileName":"s3/demo/test/1.csv"}
LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0}
ErrorMsg: \N

特定のパラメータ結果は以下のように表示されます:

Result Columnsデスクリプション
IDJob ID
NAMEJob Name
DefinerJob Definer
ExecuteTypeJobスケジューリングタイプ: ONE_TIME/RECURRING/STREAMING/MANUAL
RecurringStrategy繰り返し戦略。通常のInsert操作で使用されます。ExecuteType=Streamingの場合は空です
StatusJobステータス
ExecuteSqlJobのInsert SQL文
CreateTimeJob作成時刻
SucceedTaskCount成功したタスクの数
FailedTaskCount失敗したタスクの数
CanceledTaskCountキャンセルされたタスクの数
CommentJobコメント
PropertiesJobプロパティ
CurrentOffsetJobの現在の完了オフセット。ExecuteType=Streamingのみ値を持ちます。
EndOffsetJobがデータソースから取得した最大EndOffset。ExecuteType=Streamingのみ値を持ちます。
LoadStatisticJob統計。
ErrorMsgJob実行中のエラーメッセージ。
JobRuntimeMsgJobの一部のランタイム情報。

Task

select \* from tasks(type='insert') where jobId='1758534452459'を実行して、各Taskの実行状態を確認できます。

注意:最新のTask情報のみが保持されます。

mysql> select * from tasks(type='insert') where jobId='1758534452459'\G
*************************** 1. row ***************************
TaskId: 1758534723330
JobId: 1758534452459
JobName: test_streaming_insert_job_name
Label: 1758534452459_1758534723330
Status: SUCCESS
ErrorMsg: \N
CreateTime: 2025-09-22 17:52:55
StartTime: \N
FinishTime: \N
TrackingUrl: \N
LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256}
User: root
FirstErrorMsg: \N
RunningOffset: {"startFileName":"s3/demo/1.csv","endFileName":"s3/demo/8.csv"}
Results Columnsデスクリプション
TaskIdタスクID
JobIDJobID
JobNameジョブ名
LabelInsertのラベル
Statusタスクのステータス
ErrorMsgタスク失敗情報
CreateTimeタスク作成時間
StartTimeタスク開始時間
FinishTimeタスク完了時間
TrackingUrlInsertのエラーURL
LoadStatisticタスク統計
Userタスクの実行者
FirstErrorMsg通常のInsertTaskにおける最初のデータ品質エラーに関する情報
RunningOffset現在のタスク同期のオフセット情報。Job.ExecuteType=Streamingの場合のみ値を持つ