Job Scheduler
背景
データ管理の精緻化に対する要求が高まる中、スケジューリングは重要な役割を果たしています。通常、以下のシナリオで適用されます:
-
定期的なデータインポートやETL操作などの定期的なデータ更新により、手動介入を減らし、データ処理の効率と精度を向上させます。
-
外部データソースとCatalogの同期により、複数ソースのデータを効率的かつ正確にターゲットシステムに統合し、複雑なビジネス分析ニーズを満たします。
-
期限切れ/無効データの定期的なクリーンアップにより、ストレージ容量を解放し、過度な期限切れ/無効データがシステムパフォーマンスに影響することを防ぎます。
Apache Doris 2.1より前のバージョンでは、通常、ビジネスコードによるスケジューリングや、サードパーティのスケジューリングツールおよび分散スケジューリングプラットフォームの導入など、外部スケジューリングシステムに依存して上記の要件を満たす必要がありました。しかし、外部システム自体の制限により、スケジューリング戦略とリソース管理の柔軟性に対するDorisの要件を満たすことができない場合があります。さらに、外部スケジューリングシステムが失敗した場合、ビジネスリスクが増加するだけでなく、対処するために追加の運用時間と人的リソースが必要になります。
Job Scheduler
上記の問題を解決するため、Apache Dorisはバージョン2.1でJob Scheduler機能を導入し、秒レベルまでのスケジューリング精度で自律的なタスクスケジューリング機能を実現しました。この機能の導入により、データインポートの整合性と一貫性が確保されるだけでなく、ユーザーが柔軟で便利にスケジューリング戦略を調整できるようになりました。同時に、外部システムへの依存を減らすことで、システム障害のリスクと運用コストも削減し、コミュニティユーザーに、より統一された信頼性の高いユーザー体験を提供します。
Doris Job Schedulerは、事前設定されたスケジュールに基づくタスク管理システムで、特定の時点または指定された時間間隔で事前定義された操作をトリガーし、自動化されたタスク実行を実現できます。Job Schedulerには以下の特徴があります:
- 効率的なスケジューリング: Job Schedulerは指定された時間間隔内でタスクとイベントを配置し、データ処理の効率を確保します。時間輪アルゴリズムを使用して、イベントが秒レベルまで正確にトリガーされることを保証します。
- 柔軟なスケジューリング: Job Schedulerは、分、時間、日、週間隔でのスケジューリングなど、複数のスケジューリングオプションを提供します。また、一回限りのスケジューリングと繰り返し(周期的)イベントスケジューリングをサポートし、周期的スケジューリングは開始時刻と終了時刻を指定できます。
- イベントプールと高性能処理キュー: Job SchedulerはDisruptorを使用して高性能なプロデューサー・コンシューマーモデルを実装し、タスク実行の過負荷を最大限回避します。
- 追跡可能なスケジューリングレコード: Job Schedulerは最新のTask実行レコードを保存します(設定可能)。Task実行レコードは簡単なコマンドで確認でき、プロセスの追跡可能性を確保します。
- 高可用性: Doris自体の高可用性メカニズムを活用して、Job Schedulerは簡単に自己回復と高可用性を実現できます。
関連ドキュメント: CREATE-JOB
構文説明
有効なJob文には以下を含める必要があります:
-
キーワードCREATE JOBの後にジョブ名を指定する必要があり、これによりデータベース内のイベントが一意に識別されます。
-
ON SCHEDULE句は、Jobのタイプ、トリガー時刻、および頻度を指定するために使用されます。
-
AT timestampは一回限りのイベントに使用されます。指定された日時にJOBが一度だけ実行されることを指定し、AT current_timestampは現在の日時を指定します。JOBが作成されると、すぐに実行され、非同期タスク作成にも使用できます。
-
EVERY: 周期的なジョブに使用され、ジョブの実行頻度を指定します。キーワードの後に時間間隔(週、日、時間、分)を指定する必要があります。
-
Interval: ジョブ実行の頻度を指定します。1 DAYはジョブが1日に1回実行されることを意味し、1 HOURは1時間に1回、1 MINUTEは1分に1回、1 WEEKは1週間に1回を意味します。
-
EVERY句には、オプションのSTARTS句が含まれます。STARTSの後はtimestamp値で、繰り返しの開始時刻を定義し、CURRENT_TIMESTAMPは現在の日時を指定します。JOBが作成されると、すぐに実行されます。
-
EVERY句には、オプションのENDS句が含まれます。ENDSキーワードの後はtimestamp値で、JOBイベントの実行が停止する時刻を定義します。
-
-
-
DO句は、Jobがトリガーされたときに実行される操作を指定するために使用されます。現在、INSERT文のみがサポートされています。
CREATE
JOB
job_name
ON SCHEDULE schedule
[COMMENT 'string']
DO execute_sql;
schedule: {
AT timestamp
| EVERY interval
[STARTS timestamp ]
[ENDS timestamp ]
}
interval:
quantity { WEEK |DAY | HOUR | MINUTE}
以下の例:
CREATE JOB my_job ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
このステートメントはmy_jobという名前のジョブを作成し、1分ごとに実行されます。実行される操作はdb2.tbl2からdb1.tbl1にデータをインポートすることです。
使用例
一回限りのJobを作成:2025-01-01 00:00:00に一度だけ実行し、db2.tbl2からdb1.tbl1にデータをインポートします。
CREATE JOB my_job ON SCHEDULE AT '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
終了時刻を指定しない定期Jobを作成する:2025-01-01 00:00:00から開始し、1日1回実行して、db2.tbl2からdb1.tbl1にデータをインポートする。
CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >= days_add(now(),-1);
指定された終了時刻を持つ定期Jobを作成する:2025-01-01 00:00:00から開始し、1日に1回実行して、db2.tbl2からdb1.tbl1にデータをインポートし、2026-01-01 00:10:00に終了する。
CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' ENDS '2026-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >= days_add(now(),-1);
Jobを使用した非同期実行の実装: DorisのJobは同期タスクとして作成されますが、その実行プロセスは非同期です。この機能により、Jobは一般的なINSERT INTO SELECT操作などの非同期タスクの実装に非常に適しています。
例えば、db2.tbl2からdb1.tbl1にデータをインポートする必要がある場合、JOBを1回限りのタスクとして指定し、開始時刻を現在時刻に設定するだけです。
CREATE JOB my_job ON SCHEDULE AT current_timestamp DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
CatalogとJob Schedulerに基づくデータ自動同期
Eコマースシナリオを例として、ユーザーはしばしばMySQLからビジネスデータを抽出し、このデータをDorisに同期してデータ分析を行い、精密なマーケティング活動をサポートする必要があります。Job SchedulerはMulti Catalogデータレイク機能と連携して、データソース間の定期的なデータ同期を効率的に完了することができます。
CREATE TABLE IF NOT EXISTS user.activity (
`user_id` INT NOT NULL,
`date` DATE NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME DEFAULT '1970-01-01 00:00:00',
`cost` BIGINT DEFAULT '0',
`max_dwell_time` INT DEFAULT '0',
`min_dwell_time` INT DEFAULT '99999'
);
INSERT INTO user.activity VALUES
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 06:00:00', 20, 10, 10),
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 07:00:00', 15, 2, 2),
(10001, '2017-10-01', 'BeiJing', 30, 1, '2017-10-01 17:05:00', 2, 22, 22),
(10002, '2017-10-02', 'ShangHai', 20, 1, '2017-10-02 12:59:00', 200, 5, 5),
(10003, '2017-10-02', 'GuangZhou', 32, 0, '2017-10-02 11:20:00', 30, 11, 11),
(10004, '2017-10-01', 'ShenZhen', 35, 0, '2017-10-01 10:00:00', 100, 3, 3),
(10004, '2017-10-03', 'ShenZhen', 35, 0, '2017-10-03 10:20:00', 11, 6, 6);
| user_id | date | city | age | sex | last_visit_date | cost | max_dwell_time | min_dwell_time |
|---|---|---|---|---|---|---|---|---|
| 10000 | 2017/10/1 | BeiJing | 20 | 0 | 2017/10/1 6:00 | 20 | 10 | 10 |
| 10000 | 2017/10/1 | BeiJing | 20 | 0 | 2017/10/1 7:00 | 15 | 2 | 2 |
| 10001 | 2017/10/1 | BeiJing | 30 | 1 | 2017/10/1 17:05 | 2 | 22 | 22 |
| 10002 | 2017/10/2 | ShangHai | 20 | 1 | 2017/10/2 12:59 | 200 | 5 | 5 |
| 10003 | 2017/10/2 | GuangZhou | 32 | 0 | 2017/10/2 11:20 | 30 | 11 | 11 |
| 10004 | 2017/10/1 | ShenZhen | 35 | 0 | 2017/10/1 10:00 | 100 | 3 | 3 |
| 10004 | 2017/10/3 | ShenZhen | 35 | 0 | 2017/10/3 10:20 | 11 | 6 | 6 |
上記のTableを例として、ユーザーは総支出金額、最終訪問時間、性別、都市などの特定の数値条件を満たすユーザーをクエリし、これらの条件を満たすユーザーの情報をDorisにインポートして、その後のターゲット広告に使用したいと考えています。
-
まず、DorisTableを作成します
CREATE TABLE IF NOT EXISTS user_activity
(
`user_id` LARGEINT NOT NULL,
`date` DATE NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00",
`cost` BIGINT SUM DEFAULT "0",
`max_dwell_time` INT MAX DEFAULT "0",
`min_dwell_time` INT MIN DEFAULT "99999"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
); -
次に、MySQLデータベースに対応するCatalogを作成します。
CREATE CATALOG activity PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306/user?useSSL=false",
"driver_url" = "mysql-connector-java-5.1.49.jar",
"driver_class" = "com.mysql.jdbc.Driver"
); -
最後に、MySQLデータをDorisにインポートします。カタログ + Insert Into方式を使用して全データセットをインポートします。フルインポートはシステムサービスの変動を引き起こす可能性があるため、通常はビジネスのオフピーク時間中にこの操作を実行することが推奨されます。
-
ワンタイム スケジューリング:以下のコードに示すように、ワンタイムタスクを使用して指定された時間にフルインポートタスクをトリガーし、トリガーは午前3:00に設定します。
CREATE JOB one_time_load_job
ON SCHEDULE
AT '2024-8-10 03:00:00'
DO
INSERT INTO user_activity SELECT * FROM activity.user.activity -
定期スケジューリング: ユーザーは定期スケジューリングタスクを作成して、最新データを定期的に更新することもできます。
CREATE JOB schedule_load
ON SCHEDULE EVERY 1 DAY
DO
INSERT INTO user_activity SELECT * FROM activity.user.activity where last_visit_date >= days_add(now(),-1)
設計と実装
効率的なスケジューリングは多くの場合、大幅なリソース消費を伴い、高精度のスケジューリングはさらに要求が厳しくなります。従来のアプローチでは、Javaの組み込みスケジューリング機能—定期的にスレッドにアクセスするスケジュールされたタスク—または様々なスケジューリングユーティリティライブラリを使用することが含まれます。しかし、これらの方法は精度とメモリ使用量の観点で重大な問題があります。パフォーマンスをより良く保証しながらリソース消費を削減するために、我々はTimingWheelアルゴリズムとDisruptorを組み合わせて秒レベルのタスクスケジューリングを実現することを選択しました。
具体的には、NettyのHashedWheelTimerを使用してTiming Wheelアルゴリズムを実装します。Job Managerは定期的に(デフォルトでは10分ごと)、将来のイベントをスケジューリングのためにタイミングホイールに配置します。効率的なタスクトリガーを確保し、過度なリソース使用を避けるために、Disruptorを使用してシングルプロデューサー、マルチコンシューマーモデルを構築します。タイミングホイールはイベントをトリガーするだけで、タスクを直接実行しません。期限切れ時にトリガーされる必要があるタスクについては、適切な実行スレッドプールにタスクを配布する責任を持つDispatchスレッドに配置されます。即座に実行される必要があるタスクについては、対応するタスク実行スレッドプールに直接配信されます。
一回限りのイベントについては、タスクがスケジュールされた後にイベント定義が削除されます。定期的なイベントについては、タイミングホイール内のシステムイベントが定期的に次の実行サイクルのタスクを取得します。これにより、大量のタスクが単一のバケットに集中することを避け、不要な走査を削減し、処理効率を向上させます。
トランザクションタスクについては、Job Schedulerはトランザクションとの強い関連付けとトランザクションコールバック機構により、トランザクションタスクの実行結果が期待と一致することを確保でき、データの整合性と一貫性を保証します。
今後の計画
Doris Job Schedulerは強力で柔軟なタスクスケジューリングツールであり、データ処理における必須機能です。データレイク分析や内部ETLなどの一般的な使用例に加えて、Job Schedulerは非同期マテリアライズドビューの実装においても重要な役割を果たします。非同期マテリアライズドビューは事前に計算され保存された結果セットであり、データ更新の頻度はソースTableの変更と密接に関連しています。ソースTableのデータが頻繁に更新される場合、マテリアライズドビューのデータを最新に保つために定期的なリフレッシュが必要です。バージョン2.1では、JOBスケジューリング機能を巧妙に活用してマテリアライズドビューとソースTableデータ間の一貫性を確保し、手動介入のコストを大幅に削減しました。
将来的に、Doris Job Schedulerは以下の機能もサポートする予定です:
- UI経由で異なる時間帯に実行されたタスクの分布を表示するサポート
- JOBワークフローオーケストレーション、すなわちDAG JOBのサポート。これにより内部データウェアハウスのタスクオーケストレーションを実装でき、Catalog機能と合わせて、データ処理と分析タスクをより効率的に完了できます
- インポートタスク、UPDATE、DELETE操作のスケジューリングサポート