負荷内部構造とパフォーマンス最適化
概要
Apache Dorisは、MPP(Massively Parallel Processing)アーキテクチャを採用した高性能分散分析データベースであり、リアルタイムデータ分析、データウェアハウス、ストリーミング計算シナリオで広く使用されています。データロードはDorisのコア機能であり、データ分析のリアルタイム性と正確性に直接影響します。効率的なロードメカニズムにより、大規模データを迅速かつ確実にシステムに投入し、後続のクエリにサポートを提供することができます。本記事では、Dorisデータロードの内部メカニズムを分析し、主要なプロセス、コンポーネント、トランザクション管理などをカバーし、ロードパフォーマンスに影響する要因を探究し、実用的な最適化方法とベストプラクティスを提供して、ユーザーが適切なロード戦略を選択し、ロードパフォーマンスを最適化できるよう支援します。
データロード内部メカニズム
ロード内部メカニズム概要
Dorisのデータロード内部メカニズムは分散アーキテクチャをベースに構築されており、主にFrontendノード(FE)とBackendノード(BE)が関与します。FEはメタデータ管理、クエリ解析、タスクスケジューリング、トランザクション調整を担当し、BEは実際のデータストレージ、計算、書き込み操作を処理します。Dorisのデータロード設計は、リアルタイム書き込み、ストリーミング同期、バッチロード、外部データソース統合など、多様なビジネスニーズを満たすことを目指しています。その核となる概念には以下が含まれます:
- 一貫性と原子性:各ロードタスクはトランザクションとして動作し、データ書き込みの原子性を保証し、部分書き込みを回避します。Labelメカニズムにより、ロードされたデータの紛失や重複を防ぎます。
- 柔軟性:複数のデータソース(ローカルファイル、HDFS、S3、Kafkaなど)と形式(CSV、JSON、Parquet、ORCなど)をサポートし、異なるシナリオのニーズを満たします。
- 効率性:分散アーキテクチャを活用してデータ並列処理を行い、複数のBEノードがデータを並列処理してスループットを向上させます。
- 簡潔性:軽量なETL機能を提供し、ユーザーがロード中に直接データクリーニングと変換を実行でき、外部ツールへの依存を減らします。
- 柔軟なモデリング:詳細モデル(Duplicate Key)、主キーモデル(Unique Key)、集約モデル(Aggregate Key)をサポートし、ロード時のデータ集約や重複排除を可能にします。
一般的なロードプロセス
Dorisのデータロードプロセスは複数の直感的なステップに分けることができます。使用するロード方法(Stream Load、Broker Load、Routine Loadなど)に関係なく、コアプロセスは基本的に一貫しています。
-
ロードタスクの送信
- ユーザーはクライアント(HTTP、JDBC、MySQLクライアントなど)を通じてロードリクエストを送信し、データソース(ローカルファイル、Kafka Topics、HDFSファイルパスなど)、ターゲットTable、ファイル形式、ロードパラメータ(区切り文字、エラー許容度など)を指定します。
- 各タスクは一意のLabelを指定してタスク識別と冪等性サポート(重複ロードの防止)を行うことができます。例えば、Stream LoadではHTTPヘッダーを通じてLabelsを指定します。
- DorisのFrontendノード(FE)がリクエストを受信し、権限を検証し、ターゲットTableの存在を確認し、ロードパラメータを解析します。
-
タスク割り当てと調整
- FEはデータ分散(Tableのパーティショニングとバケット化ルールに基づく)を分析し、ロードプランを生成し、Backendノード(BE)をCoordinatorとして選択してタスク全体を調整します。
- ユーザーが直接BEに送信する場合(Stream Loadなど)、BEは直接Coordinatorとして機能できますが、FEからメタデータ(TableSchemaなど)を取得する必要があります。
- ロードプランはデータを複数のBEノードに分散し、並列処理による効率向上を保証します。
-
データ読み取りと分散
- Coordinator BEはデータソースからデータを読み取ります(例:Kafkaからのメッセージ取得、S3からのファイル読み取り、HTTPデータストリームの直接受信)。
- Dorisはデータ形式を解析し(CSV分割、JSON解析など)、ユーザー定義の軽量ETL操作をサポートします:
- 事前フィルタリング:生データをフィルタリングして処理オーバーヘッドを削減。
- カラムマッピング:データカラムとターゲットTableカラムの対応関係を調整。
- データ変換:式を通じてデータを処理。
- 事後フィルタリング:変換後のデータをフィルタリング。
- データ解析後、Coordinator BEはパーティショニングとバケット化ルールに従ってデータを複数の下流Executor BEsに分散します。
-
データ書き込み
- データは複数のBEノードに分散され、メモリTable(MemTable)に書き込まれ、Keyカラムでソートされます。AggregateまたはUnique Keyモデルでは、DorisはKeys(SUM、REPLACEなど)に従って集約や重複排除を実行します。
- MemTableが満杯になる(デフォルト200MB)かタスクが終了すると、データは非同期でディスクに書き込まれ、カラムストレージのSegmentファイルを形成し、Rowsetsを構成します。
- 各BEは割り当てられたデータを独立して処理し、書き込み完了後にCoordinatorにステータスを報告します。
-
トランザクションコミットと公開
- CoordinatorはFEにトランザクションコミット(Commit)を開始します。FEは大部分のレプリカが正常に書き込まれたことを確認後、BEにデータバージョンの公開(Publish Version)を通知します。BE Publishが成功後、FEはトランザクションをVISIBLEとしてマークし、この時点でデータがクエリ可能になります。
- 失敗した場合、FEはロールバック(Rollback)をトリガーし、一時データを削除してデータ一貫性を保証します。
-
結果返却
- 同期メソッド(Stream Load、Insert Intoなど)は直接ロード結果を返し、成功/失敗ステータスとエラー詳細(ErrorURLなど)を含みます。
- 非同期メソッド(Broker Loadなど)はタスクIDとLabelsを提供します。ユーザーはSHOW LOADを通じて進捗、エラー行数、詳細情報を確認できます。
- 操作は監査ログに記録され、後続のトレースが可能です。
Memtable転送
Memtable転送はApache Doris 2.1.0で導入された最適化メカニズムで、INSERT INTO…SELECTロードメソッドのパフォーマンスを大幅に改善します。公式テストによると、シングルレプリカシナリオでロード時間が36%に、3レプリカシナリオで54%に削減され、全体的なパフォーマンス向上は100%を超えます。従来のプロセスでは、SinkノードはデータをBlock形式にエンコードし、Ping-pong RPCを通じて下流ノードに送信する必要があり、複数のエンコードとデコード操作がオーバーヘッドを増加させます。Memtable転送はこのプロセスを最適化します:SinkノードはMemTableを直接処理し、Segmentデータを生成し、Streaming RPCを通じて送信し、エンコード/デコードと送信待機時間を削減し、より正確なメモリバックプレッシャーを提供します。現在、この機能はストレージコンピュート統合展開モードのみをサポートしています。
ストレージとコンピュートの分離ロード
ストレージコンピュート分離アーキテクチャでは、ロード最適化はデータストレージとトランザクション管理の分離に焦点を当てます:
- データストレージ:BEはデータを永続化しません。MemTableフラッシュ後、Segmentファイルは直接共有ストレージ(S3、HDFSなど)にアップロードされ、オブジェクトストレージの高可用性と低コストを活用して弾性スケーリングをサポートします。BEローカルFile Cacheは非同期でホットデータをキャッシュし、TTLとWarmup戦略を通じてクエリヒット率を向上させます。メタデータ(Tablet、Rowsetメタデータなど)はBEローカルRocksDBではなく、Meta ServiceがFoundationDBに格納します。
- トランザクション処理:トランザクション管理はFEからMeta Serviceに移行し、FE Edit Log書き込みボトルネックを解消します。Meta Serviceは標準インターフェース(beginTransaction、commitTransaction)を通じてトランザクションを管理し、FoundationDBのグローバルトランザクション機能に依存して一貫性を保証します。BE coordinatorsは直接Meta Serviceとやり取りし、トランザクション状態を記録し、アトミック操作を通じて競合とタイムアウト回復を処理し、同期ロジックを簡素化し、高並行性小バッチロードスループットを向上させます。
ロードメソッド
Dorisは上記の原理を共有する複数のロードメソッドを提供しますが、異なるシナリオに対して最適化されています。ユーザーはデータソースとビジネスニーズに基づいて選択できます:
- Stream Load:HTTPを通じてローカルファイルやデータストリームをロードし、結果を同期的に返し、リアルタイム書き込み(アプリケーションデータプッシュなど)に適しています。
- Broker Load:SQLを通じてHDFS、S3などの外部ストレージをロードし、非同期実行で、大規模バッチロードに適しています。
- Routine Load:Kafkaからデータを継続的に消費し、Exactly-Onceサポート付きの非同期ストリーミングロードで、メッセージキューデータのリアルタイム同期に適しています。
- Insert Into/Select:SQLを通じてDorisTableや外部ソース(Hive、MySQL、S3 TVFなど)からロードし、ETLジョブと外部データ統合に適しています。
- MySQL Load:MySQL LOAD DATA構文と互換性があり、ローカルCSVファイルをロードし、データはFE経由でStream Loadとして転送され、小規模テストやMySQLユーザー移行に適しています。
Dorisロードパフォーマンス向上方法
Dorisのロードパフォーマンスは分散アーキテクチャとストレージメカニズムの影響を受け、コア面はFEメタデータ管理、BE並列処理、MemTableキャッシュフラッシュ、トランザクション管理が関与します。以下では、Table構造設計、バッチ戦略、バケット設定、メモリ管理、並行制御の次元から最適化戦略とその効果をロード原理と組み合わせて説明します。
Table構造設計最適化:分散オーバーヘッドとメモリ圧迫を削減
Dorisのロードプロセスでは、データはFEで解析され、Tableのパーティショニングとバケット化ルールに従ってBEノード上のTablets(データシャード)に分散され、BEメモリでMemTableを通じてキャッシュ・ソートされ、その後ディスクにフラッシュしてSegmentファイルを生成します。Table構造(パーティショニング、モデル、インデックス)はデータ分散効率、計算負荷、ストレージフラグメンテーションに直接影響します。
- パーティション設計:データ範囲を分離し、分散とメモリ圧迫を削減
ビジネスクエリパターン(時間、地域など)に従ってパーティショニングすることで、ロード時にデータはターゲットパーティションのみに分散され、関連のないパーティションのメタデータとファイルの処理を回避します。複数パーティションへの同時書き込みは多数のTabletsをアクティブにし、各Tabletが独立したMemTableを占有し、BEメモリ圧迫を大幅に増加させ、早期フラッシュを引き起こし、多数の小さなSegmentファイルを生成する可能性があります。これはディスクやオブジェクトストレージのI/Oオーバーヘッドを増加させるだけでなく、小ファイルによる頻繁なCompactionと書き込み増幅を引き起こし、パフォーマンスを劣化させます。アクティブパーティション数を制限することで(日次ロードなど)、同時アクティブTablet数を削減し、メモリ圧迫を緩和し、より大きなSegmentファイルを生成し、Compaction負担を軽減し、並列書き込み効率と後続クエリパフォーマンスを向上させることができます。
- モデル選択:計算負荷を削減し、書き込みを高速化
詳細モデル(Duplicate Key)は生データのみを格納し、集約や重複排除計算を行いません。一方、AggregateモデルはKeyカラムによる集約が必要で、Unique Keyモデルは重複排除が必要で、両方ともCPUとメモリ消費を増加させます。重複排除や集約が不要なシナリオでは、詳細モデルを優先することで、BEノードのMemTable段階での追加計算(ソート、重複排除など)を回避し、メモリ使用量とCPU圧迫を削減し、データ書き込みプロセスを高速化できます。
- インデックス制御:クエリと書き込みオーバーヘッドのバランス
インデックス(ビットマップインデックス、逆インデックスなど)はロード時に同期更新が必要で、書き込み時の保守コストが増加します。高頻度クエリフィールドのみにインデックスを作成し、冗長インデックスを避けることで、BE書き込み時のインデックス更新操作(インデックス構築、検証など)を削減し、CPUとメモリ使用量を減らし、ロードスループットを向上させることができます。
バッチ最適化:トランザクションとストレージフラグメンテーションを削減
Dorisの各ロードタスクは独立したトランザクションで、FE Edit Log書き込み(メタデータ変更の記録)とBE MemTableフラッシュ(Segmentファイル生成)が関与します。高頻度小バッチロード(KBレベルなど)は頻繁なEdit Log書き込み(FEディスクI/O増加)と頻繁なMemTableフラッシュ(多数の小Segmentファイル生成、Compaction書き込み増幅をトリガー)を引き起こし、パフォーマンスを大幅に劣化させます。
- クライアント側バッチ:トランザクション数削減、メタデータオーバーヘッド低減
クライアントがデータを数百MBから数GBまで蓄積してから一度にロードすることで、トランザクション数を削減します。単一大トランザクションが複数小トランザクションを置き換えることで、FE Edit Log書き込み頻度(メタデータ操作削減)とBE MemTableフラッシュ頻度(小ファイル生成削減)を減らし、ストレージフラグメンテーションと後続Compactionリソース消費を回避できます。
- サーバー側バッチ(Group Commit):小トランザクション統合、ストレージ効率最適化
Group Commit有効化後、サーバーは短時間内の複数小バッチロードを単一トランザクションに統合し、Edit Log書き込み回数とMemTableフラッシュ頻度を削減します。統合された大トランザクションはより大きなSegmentファイル(小ファイル削減)を生成し、バックグラウンドCompaction圧迫を緩和し、高頻度小バッチシナリオ(ログ記録、IoTデータ書き込みなど)に特に適しています。
バケット数最適化:負荷と分散効率のバランス
バケット数はTablet数(各バケットが1つのTabletに対応)を決定し、BEノードでのデータ分散に直接影響します。バケット数が少なすぎるとデータ偏り(単一BE過負荷)が発生しやすく、多すぎるとメタデータ管理と分散オーバーヘッド(BEはより多くのTabletsのMemTableとSegmentファイルを処理する必要)が増加します。
- 合理的なバケット数設定:バランスの取れたTabletサイズを保証
バケット数はBEノード数とデータ量に応じて設定し、単一Tablet圧縮データサイズを1-10GB(計算式:バケット数 = 総データ量 / (1-10GB))に推奨します。同時に、バケットキー(乱数カラムなど)を調整してデータ偏りを回避します。合理的なバケット化によりBEノード負荷をバランスし、単一ノード過負荷や複数ノードリソース浪費を回避し、並列書き込み効率を向上させることができます。
- ランダムバケット化最適化:RPCオーバーヘッドとCompaction圧迫を削減
ランダムバケット化シナリオでは、load_to_single_tablet=trueを有効にすることで、データを単一Tabletに直接書き込み、複数Tabletsへの分散をバイパスできます。これによりTablet分散計算とBE間RPC送信のCPUオーバーヘッドを解消し、書き込み速度を大幅に向上させます。同時に、単一Tabletへの集中書き込みは小Segmentファイル生成を削減し、頻繁なCompactionによる書き込み増幅を回避し、BEリソース消費とストレージフラグメンテーションを削減し、ロードとクエリ効率を向上させます。
メモリ最適化:フラッシュとリソース影響を削減
データロード時、BEはまずデータをメモリMemTable(デフォルト200MB)に書き込み、満杯時にディスクに非同期フラッシュしてSegmentファイル(ディスクI/Oをトリガー)を生成します。高頻度フラッシュはディスクやオブジェクトストレージ(ストレージコンピュート分離シナリオ)のI/O圧迫を増加させ、メモリ不足はMemTable分散(マルチパーティション/バケットシナリオ)を引き起こし、頻繁なフラッシュやOOMを容易にトリガーします。
- パーティション順次ロード:メモリ使用集中
パーティション順序(日次など)でロードし、データ書き込みを単一パーティションに集中することで、MemTable分散(マルチパーティションは各パーティションにMemTable割り当てが必要)とフラッシュ頻度を削減し、メモリフラグメンテーションとI/O圧迫を軽減します。
- 大規模データバッチロード:リソース影響削減
大ファイルや複数ファイルロード(Broker Loadなど)では、バッチ化(バッチあたり≤100GB)を推奨し、ロードエラー後の高い再試行コストを回避し、BEメモリとディスクの集中占有を削減します。ローカル大ファイルはstreamloaderツールで自動バッチロードが可能です。
並行性最適化:スループットとリソース競合のバランス
Dorisの分散アーキテクチャは複数BE並列書き込みをサポートします。並行性増加によりスループット向上が可能ですが、過度な並行性はCPU、メモリ、オブジェクトストレージQPS競合(ストレージコンピュート分離シナリオではS3などのAPI QPS制限を考慮必要)を引き起こし、トランザクション競合とレイテンシを増加させます。
- 合理的な並行制御:ハードウェアリソースに適合
BEノード数とハードウェアリソース(CPU、メモリ、ディスクI/O)に基づいて並行スレッドを設定します。適度な並行性はBE並列処理能力を十分活用し、スループットを向上させ、過度な並行性はリソース競合により効率を低下させます。
- 低レイテンシシナリオ:並行性削減と非同期送信
低レイテンシ要件シナリオ(リアルタイム監視など)では、並行数を削減し(リソース競合回避)、Group Commitの非同期モード(async_mode)と組み合わせて小トランザクションを統合し、トランザクションコミットレイテンシを削減します。
Dorisデータロードレイテンシとスループットのトレードオフ
Apache Dorisを使用する際、実際のビジネスシナリオでデータロードのレイテンシとスループットのバランスが必要になることがよくあります:
- 低レイテンシ:ユーザーが最新データをより迅速に確認できることを意味しますが、小さな書き込みバッチと高い書き込み頻度により、バックグラウンドCompactionがより頻繁になり、より多くのCPU、IO、メモリリソースを消費し、メタデータ管理圧迫も増加します。
- 高スループット:単一ロードデータ量増加によりロード回数を削減し、メタデータ圧迫とバックグラウンドCompactionオーバーヘッドを大幅に削減し、システム全体のパフォーマンスを向上させることができます。ただし、データ書き込みから可視化までのレイテンシは増加します。
したがって、ユーザーはビジネスレイテンシ要件を満たしながら単一ロードデータ量を最大化してスループットを向上させ、システムオーバーヘッドを削減することが推奨されます。
テストデータ
Flinkエンドツーエンドレイテンシ
Flink Connectorのバッチモードで書き込みを使用し、主にエンドツーエンドレイテンシとロードスループットに焦点を当てます。バッチ時間はFlink Connectorのsink.buffer-flush.intervalパラメータで制御されます。Flink Connectorの詳細な使用方法については、Flink-Doris-Connectorを参照してください。
マシン構成:
- 1 FE:8コアCPU、16GBメモリ
- 3 BEs:16コアCPU、64GBメモリ
データセット:
- TPCH lineitemデータ
異なるバッチ時間と並行レベルでのロードパフォーマンス、テスト結果:
| バッチ時間(秒) | ロード並行性 | バケット数 | スループット(行/秒) | エンドツーエンド平均レイテンシ(秒) | エンドツーエンドP99レイテンシ(秒) |
|---|---|---|---|---|---|
| 0.2 | 1 | 32 | 6073 | 0.211 | 0.517 |
| 1 | 1 | 32 | 31586 | 0.71 | 1.39 |
| 10 | 1 | 32 | 67437 | 5.65 | 10.90 |
| 20 | 1 | 32 | 93769 | 10.962 | 20.682 |
| 60 | 1 | 32 | 125000 | 32.46 | 62.17 |
| 0.2 | 10 | 32 | 9300 | 0.38 | 0.704 |
| 1 | 10 | 32 | 34633 | 0.75 | 1.47 |
| 10 | 10 | 32 | 82023 | 5.44 | 10.43 |
| 20 | 10 | 32 | 139731 | 11.12 | 22.68 |
| 60 | 10 | 32 | 171642 | 32.37 | 61.93 |
異なるバケット数がロードパフォーマンスに与える影響、テスト結果:
| バッチ時間(秒) | ロード並行性 | バケット数 | スループット(行/秒) | エンドツーエンド平均レイテンシ(秒) | エンドツーエンドP99レイテンシ(秒) |
|---|---|---|---|---|---|
| 1 | 10 | 4 | 34722 | 0.86 | 2.28 |
| 1 | 10 | 16 | 34526 | 0.8 | 1.52 |
| 1 | 10 | 32 | 34633 | 0.75 | 1.47 |
| 1 | 10 | 64 | 34829 |