メインコンテンツまでスキップ
バージョン: 2.1

Job Scheduler

背景

洗練されたデータ管理に対する要求が高まる中、スケジュール調整は重要な役割を果たします。通常、以下のシナリオで適用されます:

  • 定期的なデータ更新(定期的なデータインポートやETL操作など)により、手動介入を削減し、データ処理の効率性と正確性を向上させます。

  • 外部データソースとCatalogを同期し、複数ソースのデータを対象システムに効率的かつ正確に統合することを保証し、複雑なビジネス分析ニーズを満たします。

  • 期限切れ/無効なデータを定期的にクリーンアップし、ストレージ容量を解放し、過度の期限切れ/無効なデータがシステムパフォーマンスに影響を与えることを防ぎます。

Apache Doris 2.1より前のバージョンでは、通常、外部スケジューリングシステムに依存する必要がありました。例えば、ビジネスコードによるスケジューリングや、サードパーティのスケジューリングツールや分散スケジューリングプラットフォームの導入により、上記の要件を満たしていました。しかし、外部システム自体の制限により、Dorisのスケジューリング戦略やリソース管理の柔軟性に対する要件を満たすことができない可能性があります。さらに、外部スケジューリングシステムに障害が発生した場合、ビジネスリスクが増加するだけでなく、対処するための追加の運用時間と人員が必要となります。

Job Scheduler

上記の問題を解決するため、Apache Dorisはバージョン2.1でJob Scheduler機能を導入し、秒レベルの精度に達するスケジューリング精度で自律的なタスクスケジューリング機能を実現しました。この機能の導入により、データインポートの整合性と一貫性が保証されるだけでなく、ユーザーは柔軟かつ便利にスケジューリング戦略を調整できます。同時に、外部システムへの依存を減らすことで、システム障害のリスクと運用コストも削減し、コミュニティユーザーにより統一された信頼性の高いユーザーエクスペリエンスを提供します。

Doris Job Schedulerは、事前設定されたスケジュールに基づくタスク管理システムで、特定の時点や指定された時間間隔で事前定義された操作をトリガーし、自動化されたタスク実行を実現できます。Job Schedulerには以下の特徴があります:

  • 効率的なスケジューリング: Job Schedulerは指定された時間間隔内でタスクとイベントを配置し、データ処理の効率性を保証します。時間ホイールアルゴリズムを使用して、イベントを秒レベルで正確にトリガーできることを保証します。
  • 柔軟なスケジューリング: Job Schedulerは、分、時間、日、または週間隔でのスケジューリングなど、複数のスケジューリングオプションを提供します。また、一回限りのスケジューリングや繰り返し(定期的)イベントスケジューリングもサポートし、定期スケジューリングでは開始時刻と終了時刻を指定できます。
  • イベントプールと高性能処理キュー: Job SchedulerはDisruptorを使用して高性能なproducer-consumerモデルを実装し、タスク実行の過負荷を最大限回避します。
  • 追跡可能なスケジューリング記録: Job Schedulerは最新のTask実行記録を保存します(設定可能)。Task実行記録は簡単なコマンドで閲覧でき、プロセスの追跡可能性を保証します。
  • 高可用性: Doris独自の高可用性メカニズムを活用して、Job Schedulerは自己復旧と高可用性を容易に実現できます。

関連ドキュメント: CREATE-JOB

構文説明

有効なJob文には以下を含める必要があります:

  • キーワードCREATE JOBの後にジョブ名を続ける必要があり、これがデータベース内のイベントを一意に識別します。

  • ON SCHEDULE句は、Jobのタイプ、トリガー時刻、頻度を指定するために使用されます。

    • AT timestampは一回限りのイベントに使用されます。JOBが指定された日時に一度だけ実行されることを指定し、AT current_timestampは現在の日時を指定します。JOBが作成されると即座に実行され、非同期タスク作成にも使用できます。

    • EVERY: 定期的なジョブに使用され、ジョブの実行頻度を指定します。キーワードの後に時間間隔(週、日、時間、分)を指定する必要があります。

      • Interval: ジョブ実行の頻度を指定します。1 DAYはジョブが1日に1回実行されることを意味し、1 HOURは1時間に1回、1 MINUTEは1分に1回、1 WEEKは1週間に1回を意味します。

      • EVERY句にはオプションのSTARTS句が含まれます。STARTSの後はタイムスタンプ値で、繰り返しの開始時刻を定義し、CURRENT_TIMESTAMPは現在の日時を指定します。JOBが作成されると即座に実行されます。

      • EVERY句にはオプションのENDS句が含まれます。ENDSキーワードの後はタイムスタンプ値で、JOBイベントが実行を停止する時刻を定義します。

  • DO句は、Jobがトリガーされたときに実行される操作を指定するために使用されます。現在、INSERT文のみがサポートされています。

    CREATE
    JOB
    job_name
    ON SCHEDULE schedule
    [COMMENT 'string']
    DO execute_sql;

    schedule: {
    AT timestamp
    | EVERY interval
    [STARTS timestamp ]
    [ENDS timestamp ]
    }
    interval:
    quantity { WEEK |DAY | HOUR | MINUTE}

以下の例:

CREATE JOB my_job ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;

このステートメントは、毎分1回実行されるmy_jobという名前のジョブを作成します。実行される操作は、db2.tbl2からdb1.tbl1にデータをインポートすることです。

使用例

一回限りのJobを作成:2025-01-01 00:00:00に1回実行し、db2.tbl2からdb1.tbl1にデータをインポートします。

CREATE JOB my_job ON SCHEDULE AT '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;

終了時刻を指定しない定期的なJobを作成する:2025-01-01 00:00:00から開始し、1日に1回実行して、db2.tbl2からdb1.tbl1にデータをインポートする。

CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE  create_time >=  days_add(now(),-1);

指定された終了時間を持つ定期Jobを作成する:2025-01-01 00:00:00から開始し、1日に1回実行して、db2.tbl2からdb1.tbl1にデータをインポートし、2026-01-01 00:10:00で終了する。

CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' ENDS '2026-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >=  days_add(now(),-1);

JobによるAsynchronous Executionの実装: DorisのJobは同期タスクとして作成されますが、実行プロセスは非同期です。この機能により、Jobは一般的なINSERT INTO SELECT操作などの非同期タスクの実装に非常に適しています。

例えば、db2.tbl2からdb1.tbl1にデータをimportする必要がある場合、JOBを一回限りのタスクとして指定し、開始時間を現在時刻に設定するだけです。

CREATE JOB my_job ON SCHEDULE AT current_timestamp DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;

Catalogとジョブスケジューラーに基づくデータ自動同期

Eコマースシナリオを例に挙げると、ユーザーはしばしばMySQLからビジネスデータを抽出し、このデータをDorisに同期してデータ分析を行い、これによって精密なマーケティング活動をサポートする必要があります。Job Schedulerは、Multi Catalogデータレイク機能と連携して動作し、データソース間の定期的なデータ同期を効率的に完了することができます。

CREATE TABLE IF NOT EXISTS user.activity (
`user_id` INT NOT NULL,
`date` DATE NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME DEFAULT '1970-01-01 00:00:00',
`cost` BIGINT DEFAULT '0',
`max_dwell_time` INT DEFAULT '0',
`min_dwell_time` INT DEFAULT '99999'
);
INSERT INTO user.activity VALUES
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 06:00:00', 20, 10, 10),
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 07:00:00', 15, 2, 2),
(10001, '2017-10-01', 'BeiJing', 30, 1, '2017-10-01 17:05:00', 2, 22, 22),
(10002, '2017-10-02', 'ShangHai', 20, 1, '2017-10-02 12:59:00', 200, 5, 5),
(10003, '2017-10-02', 'GuangZhou', 32, 0, '2017-10-02 11:20:00', 30, 11, 11),
(10004, '2017-10-01', 'ShenZhen', 35, 0, '2017-10-01 10:00:00', 100, 3, 3),
(10004, '2017-10-03', 'ShenZhen', 35, 0, '2017-10-03 10:20:00', 11, 6, 6);
user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017/10/1BeiJing2002017/10/1 6:00201010
100002017/10/1BeiJing2002017/10/1 7:001522
100012017/10/1BeiJing3012017/10/1 17:0522222
100022017/10/2ShangHai2012017/10/2 12:5920055
100032017/10/2GuangZhou3202017/10/2 11:20301111
100042017/10/1ShenZhen3502017/10/1 10:0010033
100042017/10/3ShenZhen3502017/10/3 10:201166

上記のTableを例として、ユーザーは総消費金額、最終訪問時間、性別、都市などの特定の数値条件を満たすユーザーをクエリし、これらの条件を満たすユーザーの情報をDorisにインポートして、その後のターゲット広告に使用したいと考えています。

  1. まず、DorisTableを作成します

    CREATE TABLE IF NOT EXISTS user_activity
    (
    `user_id` LARGEINT NOT NULL,
    `date` DATE NOT NULL,
    `city` VARCHAR(20),
    `age` SMALLINT,
    `sex` TINYINT,
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00",
    `cost` BIGINT SUM DEFAULT "0",
    `max_dwell_time` INT MAX DEFAULT "0",
    `min_dwell_time` INT MIN DEFAULT "99999"
    )
    AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
    DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
    );
  2. 次に、MySQLデータベースに対応するCatalogを作成します。

    CREATE CATALOG activity PROPERTIES (
    "type"="jdbc",
    "user"="root",
    "password"="123456",
    "jdbc_url" = "jdbc:mysql://127.0.0.1:3306/user?useSSL=false",
    "driver_url" = "mysql-connector-java-5.1.49.jar",
    "driver_class" = "com.mysql.jdbc.Driver"
    );
  3. 最後に、MySQLデータをDorisにインポートします。Catalog + Insert Intoメソッドを使用して、全データセットをインポートします。フルインポートはシステムサービスの変動を引き起こす可能性があるため、通常は業務のオフピーク時間にこの操作を実行することが推奨されます。

  • ワンタイムスケジューリング:以下のコードに示すように、ワンタイムタスクを使用して指定時間にフルインポートタスクをトリガーし、トリガーを午前3:00に設定します。

    CREATE JOB one_time_load_job
    ON SCHEDULE
    AT '2024-8-10 03:00:00'
    DO
    INSERT INTO user_activity SELECT * FROM activity.user.activity
  • 定期スケジューリング: ユーザーは定期スケジューリングタスクを作成して、最新データを定期的に更新することもできます。

    CREATE JOB schedule_load
    ON SCHEDULE EVERY 1 DAY
    DO
    INSERT INTO user_activity SELECT * FROM activity.user.activity where last_visit_date >= days_add(now(),-1)

設計と実装

効率的なスケジューリングには多くの場合、大幅なリソース消費を伴い、高精度スケジューリングはさらに多くのリソースを要求します。従来のアプローチでは、Javaの組み込みスケジューリング機能(定期的にスレッドにアクセスするスケジュールタスク)や、さまざまなスケジューリングユーティリティライブラリを使用していました。しかし、これらの方法は精度とメモリ使用量の面で重要な問題を抱えています。パフォーマンスをより良く保証しながらリソース消費を削減するため、TimingWheelアルゴリズムとDisruptorを組み合わせて秒レベルのタスクスケジューリングを実現することを選択しました。

具体的には、NettyのHashedWheelTimerを使用してTiming Wheelアルゴリズムを実装しています。Job Managerは定期的に(デフォルトでは10分ごと)将来のイベントをタイミングホイールに配置してスケジューリングします。効率的なタスクトリガーを確保し、過度なリソース使用を回避するため、Disruptorを使用してシングルプロデューサー・マルチコンシューマーモデルを構築します。タイミングホイールはイベントをトリガーするだけで、タスクを直接実行はしません。期限切れ時にトリガーが必要なタスクについては、Dispatchスレッドに配置され、適切な実行スレッドプールにタスクを分散する責任を持ちます。即座に実行が必要なタスクについては、対応するタスク実行スレッドプールに直接配信されます。

一回限りのイベントについては、タスクがスケジュールされた後にイベント定義が削除されます。定期イベントについては、タイミングホイール内のシステムイベントが次の実行サイクルのタスクを定期的に取得します。これにより、大量のタスクが単一のバケットに集中することを回避し、不要な走査を減らして処理効率を向上させます。

トランザクションタスクについては、Job Schedulerはトランザクションとの強力な関連付けとトランザクションコールバックメカニズムを通じて、トランザクションタスクの実行結果が期待通りになることを保証し、データの整合性と一貫性を確保できます。

今後の計画

Doris Job Schedulerは強力で柔軟なタスクスケジューリングツールであり、データ処理における必須機能です。データレイク分析や内部ETLなどの一般的な使用例に加えて、Job Schedulerは非同期マテリアライズドビューの実装においても重要な役割を果たします。非同期マテリアライズドビューは事前計算され保存された結果セットであり、データ更新の頻度はソースTableの変更と密接に関連しています。ソースTableのデータが頻繁に更新される場合、データを最新の状態に保つためにマテリアライズドビューの定期的な更新が必要です。バージョン2.1では、JOBスケジューリング機能を巧妙に活用してマテリアライズドビューとソースTableデータ間の一貫性を確保し、手動介入のコストを大幅に削減しました。

将来的に、Doris Job Schedulerは以下の機能もサポートします:

  • UI経由で異なる時間帯に実行されたタスクの分散を表示する機能のサポート。
  • JOBワークフローオーケストレーション、すなわちDAG JOBのサポート。これにより、内部データウェアハウスタスクオーケストレーションを実装でき、Catalog機能と組み合わせることで、データ処理と分析タスクをより効率的に完了できます。
  • インポートタスク、UPDATE、DELETE操作のスケジューリングのサポート。