Routine Load の内部動作とベストプラクティス
1. 概要
Routine LoadはKafkaデータを継続的に取得してApache Dorisに書き込むために設計されています。ユーザーはRoutine Load Jobを作成して、指定されたKafka Topicに自動的にサブスクライブできます。その主要な機能は以下の通りです:
-
高可用性: 24時間365日の無停止Kafkaデータ取得をサポートし、障害後の自動復旧機能を提供します。
-
低レイテンシ: Kafkaメッセージは秒レベルでの可視性を実現できます。
-
Exactly-Once セマンティクス: Kafkaデータの取得において損失や重複なしを保証し、exactly-once処理を実現します。
本ドキュメントでは、実装原理の詳細な分析、典型的なシナリオにおけるベストプラクティス、一般的な問題のトラブルシューティング手法を提供し、ユーザーが迅速に開始して効率的に運用できるよう支援します。
2. 実装原理
Kafkaデータはストリーミング形式で存在する一方、DorisはKafkaストリーミングデータを「マイクロバッチ」方式で取得します。routine load jobの作成後、システムは設定された並行性レベルに基づいて、ジョブを複数のタスクに分割して並行実行します。各タスクはKafka topicの特定のパーティションからのデータ取得を担当します。各タスクは1つのトランザクションに対応し、完了後に新しいタスクが生成されて次のデータバッチの取得を継続します。以下のセクションでは、job/taskスケジューリング、Exactly-Onceセマンティクス実装、単一ストリームからの複数Table書き込みの3つの観点から説明します。
2.1 JobとTaskのスケジューリング
Routine Loadは2レベルのスケジューリング方式を採用しています:
-
Jobスケジューリング: タスクの分割、障害復旧、ライフサイクル管理を担当します。
-
Taskスケジューリング: 具体的なデータ取得、変換、書き込み操作をBEノードに配信して実行することを担当します。
2.1.1 Jobスケジューリング
Job State Machine:
| State | デスクリプション |
|---|---|
| NEED_SCHEDULE | 初期スケジューリング待機またはリスケジュール要求中 |
| RUNNING | 正常な取得進行中 |
| PAUSED | 能動的または異常に一時停止、自動復旧可能 |
| CANCELLED | データベース/Table削除などの回復不可能なエラーにより終了 |
| STOPPED | 手動停止され回復不可能 |
異なるjob stateに基づいて、スケジューリングスレッドは各サイクル(10秒)で以下のアクションを実行します:
- NEED_SCHEDULE: topicメタデータ(パーティション数、開始オフセット)を取得し、以下に従ってタスクを分割します:
taskNum = min(topic_partition_num,
desired_concurrent_number,
max_routine_load_task_concurrent_num)
タスクをneedScheduleTasksQueueに配置し、タスクスケジューリングスレッドがスケジューリングを開始するのを待機します。
-
RUNNING: 定期的にトピックメタデータを取得し、パーティション数が変更された場合は即座に再スケジュールします。
-
PAUSED: ジョブの高可用性を確保するため、自動復旧メカニズムが導入されています。予期しない一時停止の場合、Routine Load Schedulerスレッドがジョブの自動復旧を試行します。予期しないKafka側の障害やその他の非機能的な状況に対して、自動復旧メカニズムによりKafka復旧後、インポートジョブが手動介入なしに正常動作を継続できることを保証します。なお、自動復旧されない3つの状況があります:
- ユーザーが手動でPAUSE ROUTINE LOADコマンドを実行した場合。
- データ品質の問題が存在する場合。
- データベース/Table削除など復旧不可能な状況。
上記3つの状況を除き、その他すべての一時停止されたジョブは自動復旧を試行します。
-
CANCELLED / STOPPED: 遅延リソースクリーンアップ。
2.1.2 タスクスケジューリング
スケジューリング条件
-
タスクがパーティションの終端に到達しておらず、消費すべきデータが残存することで、非効果的なリソース占有を回避します。
-
前回の実行でEOFに到達した場合、前回の実行開始から
max_batch_interval以上経過した場合のみ新しいスケジューリングラウンドを開始します。これは消費速度が生産速度を上回る場合にデータを適切にバッチ化し、過多な小規模トランザクションの生成を防ぐことを目的とします。
負荷分散戦略
-
現在実行中のTaskが最少のBEノードを優先選択します。
-
複数のBEが同じTask数を持つ場合、初期化オーバーヘッドを削減するため、Kafka Consumerをキャッシュしているノードの再利用を優先します。
バッチ境界
現在のタスクは以下のいずれかの条件を満たした時点で終了します:
-
max_batch_intervalで定義された時間制限に到達。 -
max_batch_rowsで定義された行数に到達。 -
max_batch_sizeで定義されたバイトサイズに到達。 -
Kafka EOFを読み取り、つまりストリームの終端まで消費。
タスク完了後、トランザクションがコミットされ、新しいタスクが即座に生成されて次のスケジューリングサイクルのためにキューに配置され、継続的な消費を可能にします。
2.2 Exactly-Onceセマンティクス
Routine Loadは「永続化消費進捗」+「コミット検証」の双重メカニズムによりKafkaデータの喪失も重複も発生しないことを保証します。
2.2.1 永続化消費進捗
各タスクはトランザクションコミット時に消費進捗をトランザクション情報と共にFEのedit logに書き込み、Berkeley DB JEを利用してすべてのFE Followerに同期します。進捗情報はMaster切り替え/再起動後も正確性を保持します。
2.2.2 コミット検証
手動一時停止、master切り替え、またはトピックメタデータ変更によりJobが再スケジュールされる際、2つのタスクが同一パーティションを同時消費する短時間のシナリオが発生する可能性があります。重複書き込みを防ぐため:
-
各Jobはメモリ内で
routineLoadTaskInfoListを維持します。 -
コミット前にタスクは自身が依然として
routineLoadTaskInfoListに存在するかを検証し、そうでない場合コミットは拒否されます。
2.3 単一ストリームからの複数Table書き込み
複数Table書き込みは単一のRoutine Load Jobが複数の対象Tableに同時書き込みを可能にします。核心プロセスは以下の通りです:
-
計画フェーズ:Job作成時に対象Tableが完全に決定できないため、実行計画は実行時まで遅延され、BEがFE Masterから動的に取得します。
-
データキャッシュ:BEは最初にローカルの複数Tableパイプにデータをキャッシュします。200レコードがキャッシュされた場合、または実行計画をまだリクエストしていない新しいTableが5つある場合、実行計画リクエストが開始・実行されデータバックログを防ぎます。
-
実行計画再利用:同一トランザクション内ではキャッシュされた実行計画を再利用し、トランザクション間では新しいリクエストを行いメタデータの適時性を確保します。
3. ベストプラクティス
Routine Loadのデフォルトパラメータは大部分のシナリオを満足します。以下の3つの状況では手動調整が必要です:
| シナリオ | 推奨パラメータ修正 |
|---|---|
| 低レイテンシ要件 | デフォルト60sからmax_batch_intervalを削減 |
| 小データ量、リソース敏感 | desired_concurrent_numberを削減 |
| 高スループット | デフォルト60sからmax_batch_intervalを120-180sに増加 |
4. 一般的な問題のトラブルシューティング
4.1 データバックログ
SHOW ROUTINE LOAD\Gを通じてタスクステータスを確認:
-
StateがRUNNINGかどうか;他のステータスの場合、理由を
ReasonOfStateChangedフィールドで確認。 -
OtherMsgにエラー情報が含まれているかどうか。
-
BEログを使用してスループット制限に到達しているかを判定
consumer group doneログを検索し、left_time / left_rows / left_bytesで最初にトリガーされた閾値を表示し、max_batch_sizeまたはmax_batch_rowsの対象増加を可能にします:consumer group done: 894fc32d5b9d3e93-7387a02da6dafd88. consume time(ms)=34004, received rows=2679540, received bytes=2147484043, eos: 0, left_time: 25996, left_rows: 17320460, left_bytes: -395, blocking get time(us): 949236, blocking put time(us): 28730419, id=69616a41fc064f1e-a93ff0ddd217f0a0, job_id=48121487, txn_id=61763720, label=ods_hq_market_unique_jobs_0-48121487-69616a41fc064f1e-a93ff0ddd217f0a0-61763720, elapse(s)=34
上記の例では、left_bytes: -395は、34秒以内にmax_batch_size制限に達したためにバッチが終了したことを示しています。この場合、max_batch_sizeを適切に増加させることで、単一バッチがmax_batch_interval内でフル容量に到達できるようになり、スループットを向上させることができます。
- 並行性とスループットの向上
-
desired_concurrent_numberをTopicパーティション数に合わせて増加させます。 -
max_batch_interval(例:120s ~ 180s)/max_batch_size/max_batch_rowsを適度に増加させて、単一トランザクションのデータ量を改善し、単一バッチのデータ量を増加させ、トランザクションオーバーヘッドを削減します。
4.2 タスクの異常停止
Routine Loadには自動回復メカニズムが組み込まれており、予期しない停止の大部分は再試行されます。タスクがPAUSED状態のままで自動回復できない場合は、SHOW ROUTINE LOADを実行してトラブルシューティングを行います:
-
PAUSE ROUTINE LOADが手動で実行されたかどうか。 -
データ品質の問題が存在するかどうか(フォーマットエラー、欠落フィールドなど)。
-
Kafkaデータが
out of rangeエラーで期限切れになっているかどうか。