負荷内部構造とパフォーマンス最適化
概要
Apache Dorisは、MPP(Massively Parallel Processing)アーキテクチャを採用した高性能な分散分析データベースで、リアルタイムデータ分析、データウェアハウジング、ストリーム計算のシナリオで広く使用されています。データロードはDorisのコア機能であり、データ分析のリアルタイム性と正確性に直接影響します。効率的なロード機構により、大規模データが迅速かつ確実にシステムに入力され、後続のクエリのサポートを提供します。本記事では、Dorisデータロードのload内部構造を分析し、主要なプロセス、コンポーネント、トランザクション管理などをカバーし、ロードパフォーマンスに影響する要因を探り、実用的な最適化方法とベストプラクティスを提供して、ユーザが適切なロード戦略を選択し、ロードパフォーマンスを最適化できるよう支援します。
Data Load内部構造
Load内部構造概要
Dorisのデータロード内部構造は、その分散アーキテクチャ上に構築されており、主にFrontendノード(FE)とBackendノード(BE)が関与します。FEはメタデータ管理、クエリ解析、タスクスケジューリング、トランザクション調整を担当し、BEは実際のデータストレージ、計算、書き込み操作を処理します。Dorisのデータロード設計は、リアルタイム書き込み、ストリーミング同期、バッチロード、外部データソース統合を含む多様なビジネスニーズを満たすことを目的としています。そのコア概念には以下が含まれます:
- 一貫性と原子性:各ロードタスクはトランザクションとして機能し、データ書き込みの原子性を保証し、部分書き込みを回避します。Labelメカニズムにより、ロードされたデータの紛失や重複を防ぎます。
- 柔軟性:複数のデータソース(ローカルファイル、HDFS、S3、Kafkaなど)とフォーマット(CSV、JSON、Parquet、ORCなど)をサポートし、異なるシナリオに対応します。
- 効率性:分散アーキテクチャを活用した並列データ処理により、複数のBEノードがデータを並列処理してスループットを向上させます。
- 簡潔性:軽量ETL機能を提供し、ユーザがロード中に直接データクリーニングと変換を実行でき、外部ツールへの依存を削減します。
- 柔軟なモデリング:詳細モデル(Duplicate Key)、主キーモデル(Unique Key)、集約モデル(Aggregate Key)をサポートし、ロード中にデータ集約または重複排除が可能です。
一般的なLoadプロセス
Dorisのデータロードプロセスは、いくつかの直感的なステップに分割できます。使用するロード方法(Stream Load、Broker Load、Routine Loadなど)に関係なく、コアプロセスは基本的に一貫しています。
-
Loadタスクの送信
- ユーザはクライアント(HTTP、JDBC、MySQLクライアントなど)を通じてロードリクエストを送信し、データソース(ローカルファイル、Kafka Topics、HDFSファイルパスなど)、ターゲットテーブル、ファイルフォーマット、ロードパラメータ(区切り文字、エラー許容度など)を指定します。
- 各タスクは、タスク識別と冪等性サポート(重複ロードの防止)のために一意のLabelを指定できます。たとえば、ユーザはStream LoadでHTTPヘッダーを通じてLabelを指定します。
- DorisのFrontendノード(FE)がリクエストを受信し、権限を検証し、ターゲットテーブルの存在を確認し、ロードパラメータを解析します。
-
タスクの割り当てと調整
- FEがデータ分散を分析(テーブルパーティショニングとバケットルールに基づく)し、ロードプランを生成し、Backendノード(BE)をCoordinatorとして選択してタスク全体を調整します。
- ユーザが直接BEに送信する場合(Stream Loadなど)、BEは直接Coordinatorとして機能できますが、それでもFEからメタデータ(テーブルSchemaなど)を取得する必要があります。
- ロードプランはデータを複数のBEノードに分散し、並列処理を保証して効率を向上させます。
-
データ読み取りと分散
- Coordinator BEがデータソースからデータを読み取ります(たとえば、Kafkaからメッセージをプル、S3からファイルを読み取り、またはHTTPデータストリームを直接受信)。
- Dorisがデータフォーマットを解析(CSV分割、JSON解析など)し、ユーザ定義の軽量ETL操作をサポートします:
- 事前フィルタリング:生データをフィルタリングして処理オーバーヘッドを削減。
- カラムマッピング:データカラムとターゲットテーブルカラムの対応を調整。
- データ変換:式を通じてデータを処理。
- 事後フィルタリング:変換されたデータをフィルタリング。
- データ解析後、Coordinator BEはパーティショニングとバケットルールに従って、複数の下流Executor BEにデータを分散します。
-
データ書き込み
- データが複数のBEノードに分散され、メモリテーブル(MemTable)に書き込まれ、Keyカラムによってソートされます。AggregateまたはUnique Keyモデルの場合、DorisはKeysに従って集約または重複排除を実行します(SUMやREPLACEなど)。
- MemTableが満杯(デフォルト200MB)になるかタスクが終了すると、データは非同期でディスクに書き込まれ、カラム型ストレージSegmentファイルを形成し、Rowsetsを構成します。
- 各BEは割り当てられたデータを独立して処理し、書き込み完了後にCoordinatorにステータスを報告します。
-
トランザクションCommitとPublishing
- CoordinatorがFEにトランザクションcommit(Commit)を開始します。FEが大部分のレプリカの書き込み成功を確認後、BEにデータバージョンの公開(Publish Version)を通知します。BE Publishが成功後、FEはトランザクションをVISIBLEとしてマークし、この時点でデータがクエリ可能になります。
- 失敗した場合、FEはrollback(Rollback)をトリガーし、一時データを削除し、データの一貫性を保証します。
-
結果の返却
- 同期方法(Stream Load、Insert Intoなど)は直接ロード結果を返し、成功/失敗ステータスとエラー詳細(ErrorURLなど)を含みます。
- 非同期方法(Broker Loadなど)はタスクIDとLabelを提供します。ユーザはSHOW LOADを通じて進捗、エラー行数、詳細情報を確認できます。
- 操作は監査ログに記録され、後続の追跡が可能です。
Memtable Forwarding
Memtable forwardingは、Apache Doris 2.1.0で導入された最適化メカニズムで、INSERT INTO…SELECTロード方式のパフォーマンスを大幅に向上させます。公式テストでは、シングルレプリカシナリオでロード時間が36%に、3レプリカシナリオで54%に短縮され、全体的なパフォーマンス向上は100%を超えます。従来のプロセスでは、SinkノードがデータをBlockフォーマットにエンコードし、Ping-pong RPCを通じて下流ノードに送信する必要があり、複数回のエンコード・デコード操作がオーバーヘッドを増加させます。Memtable forwardingはこのプロセスを最適化します:SinkノードがMemTableを直接処理し、Segmentデータを生成し、Streaming RPCを通じて送信することで、エンコード/デコードと送信待機を削減し、より正確なメモリバックプレッシャーを提供します。現在、この機能はストレージ・計算統合デプロイメントモードのみをサポートしています。
ストレージ・計算分離Load
ストレージ・計算分離アーキテクチャでは、ロード最適化はデータストレージとトランザクション管理の分離に焦点を当てます:
- データストレージ:BEはデータを永続化しません。MemTable flush後、Segmentファイルは共有ストレージ(S3、HDFSなど)に直接アップロードされ、オブジェクトストレージの高可用性と低コストを活用して弾性スケーリングをサポートします。BEローカルFile Cacheはホットデータを非同期でキャッシュし、TTLとWarmup戦略を通じてクエリヒット率を向上させます。メタデータ(Tablet、Rowsetメタデータなど)はBEローカルRocksDBではなく、Meta ServiceがFoundationDBに保存します。
- トランザクション処理:トランザクション管理はFEからMeta Serviceに移行し、FE Edit Logの書き込みボトルネックを排除します。Meta Serviceは標準インターフェース(beginTransaction、commitTransaction)を通じてトランザクションを管理し、FoundationDBのグローバルトランザクション機能に依存して一貫性を保証します。BE coordinatorはMeta Serviceと直接やり取りし、トランザクション状態を記録し、アトミック操作を通じて競合とタイムアウト回復を処理し、同期ロジックを簡素化し、高並行小バッチロードのスループットを向上させます。
Load方法
Dorisは複数のロード方法を提供し、上記の原則を共有しますが、異なるシナリオに最適化されています。ユーザはデータソースとビジネスニーズに基づいて選択できます:
- Stream Load:HTTP経由でローカルファイルやデータストリームをロードし、結果を同期で返します。リアルタイム書き込み(アプリケーションデータプッシュなど)に適しています。
- Broker Load:SQL経由でHDFS、S3などの外部ストレージをロードし、非同期で実行します。大規模バッチロードに適しています。
- Routine Load:Kafkaからデータを継続的に消費し、Exactly-Onceサポート付きの非同期ストリーミングロードです。メッセージキューデータのリアルタイム同期に適しています。
- Insert Into/Select:SQL経由でDorisテーブルや外部ソース(Hive、MySQL、S3 TVFなど)からロードします。ETLジョブと外部データ統合に適しています。
- MySQL Load:MySQL LOAD DATA構文と互換性があり、ローカルCSVファイルをロードし、データがFE経由でStream Loadとして転送されます。小規模テストやMySQLユーザ移行に適しています。
DorisのLoadパフォーマンスを向上させる方法
Dorisのロードパフォーマンスは、その分散アーキテクチャとストレージメカニズムの影響を受け、コア側面はFEメタデータ管理、BE並列処理、MemTableキャッシュフラッシュ、トランザクション管理を含みます。以下の最適化戦略とその効果を、テーブル構造設計、バッチ戦略、バケット設定、メモリ管理、並行制御の次元から、ロード原理と組み合わせて説明します。
テーブル構造設計最適化:分散オーバーヘッドとメモリ圧力の削減
Dorisのロードプロセスでは、データはFEによって解析され、次にテーブルパーティショニングとバケットルールに従ってBEノード上のTablets(データシャード)に分散され、BEメモリでMemTableを通じてキャッシュ・ソートされ、その後ディスクにフラッシュされてSegmentファイルを生成します。テーブル構造(パーティショニング、モデル、インデックス)は、データ分散効率、計算負荷、ストレージフラグメンテーションに直接影響します。
- パーティション設計:データ範囲の分離、分散とメモリ圧力の削減
ビジネスクエリパターン(時間、地域など)に従ってパーティショニングすることで、ロード時にデータはターゲットパーティションのみに分散され、無関係なパーティションからのメタデータとファイル処理を回避します。複数のパーティションへの同時書き込みは多くのTabletsをアクティブにし、各Tabletが独立したMemTableを占有するため、BEメモリ圧力を大幅に増加させ、早期Flushをトリガーして多数の小さなSegmentファイルを生成する可能性があります。これはディスクまたはオブジェクトストレージI/Oオーバーヘッドを増加させるだけでなく、小ファイルによる頻繁なCompactionと書き込み増幅を引き起こし、パフォーマンスを低下させます。アクティブなパーティション数を制限することで(日次ロードなど)、同時にアクティブなTablets数を削減し、メモリ圧力を軽減し、より大きなSegmentファイルを生成し、Compaction負担を削減し、並列書き込み効率と後続のクエリパフォーマンスを向上させることができます。
- モデル選択:計算負荷の削減、書き込みの高速化
詳細モデル(Duplicate Key)は集約や重複排除計算なしに生データのみを保存します。一方、Aggregateモデルはキーカラムによる集約が必要で、Unique Keyモデルは重複排除が必要であり、どちらもCPUとメモリ消費を増加させます。重複排除や集約の必要がないシナリオでは、詳細モデルを優先することで、BEノードのMemTableステージでの追加計算(ソート、重複排除など)を回避し、メモリ使用量とCPU圧力を削減し、データ書き込みプロセスを高速化できます。
- インデックス制御:クエリと書き込みオーバーヘッドのバランス
インデックス(bitmapインデックス、転置インデックスなど)は、ロード中に同期更新が必要で、書き込み時のメンテナンスコストを増加させます。高頻度クエリフィールドのみにインデックスを作成し、冗長インデックスを回避することで、BE書き込み時のインデックス更新操作(インデックス構築、検証など)を削減し、CPUとメモリ使用量を削減してロードスループットを向上させることができます。
バッチ最適化:トランザクションとストレージフラグメンテーションの削減
Dorisの各ロードタスクは独立したトランザクションで、FE Edit Log書き込み(メタデータ変更の記録)とBE MemTableフラッシュ(Segmentファイルの生成)を含みます。高頻度小バッチロード(KB レベルなど)は、頻繁なEdit Log書き込み(FEディスクI/Oの増加)と頻繁なMemTableフラッシュ(多数の小Segmentファイル生成、Compaction書き込み増幅のトリガー)を引き起こし、パフォーマンスを大幅に低下させます。
- クライアント側バッチ:トランザクション数の削減、メタデータオーバーヘッドの軽減
クライアントが数百MBから数GBまでデータを蓄積してから一度にロードすることで、トランザクション数を削減します。複数の小トランザクションを単一の大トランザクションに置き換えることで、FE Edit Log書き込み頻度(メタデータ操作の削減)とBE MemTableフラッシュ頻度(小ファイル生成の削減)を削減し、ストレージフラグメンテーションと後続のCompactionリソース消費を回避できます。
- サーバ側バッチ(Group Commit):小トランザクションの結合、ストレージ効率の最適化
Group Commit有効化後、サーバは短時間内の複数の小バッチロードを単一のトランザクションにマージし、Edit Log書き込み回数とMemTableフラッシュ頻度を削減します。マージされた大トランザクションは、より大きなSegmentファイルを生成し(小ファイルの削減)、バックグラウンドCompaction圧力を軽減し、高頻度小バッチシナリオ(ログ記録、IoTデータ書き込みなど)に特に適しています。
バケット数最適化:負荷と分散効率のバランス
バケット数はTablet数を決定し(各バケットが1つのTabletに対応)、BEノード上のデータ分散に直接影響します。バケット数が少なすぎるとデータスキューが起きやすく(単一BEの過負荷)、多すぎるとメタデータ管理と分散オーバーヘッド(BEがより多くのTabletsのMemTableとSegmentファイルを処理する必要)が増加します。
- 適切なバケット数設定:バランスの取れたTabletサイズの確保
バケット数はBEノード数とデータ量に従って設定し、推奨される単一Tablet圧縮データサイズは1-10GB(計算式:バケット数 = 総データ量 / (1-10GB))です。同時に、バケットキー(ランダム数カラムなど)を調整してデータスキューを回避します。適切なバケット設定により、BEノード負荷をバランスさせ、単一ノードの過負荷や複数ノードのリソース浪費を回避し、並列書き込み効率を向上させることができます。
- ランダムバケット最適化:RPCオーバーヘッドとCompaction圧力の削減
ランダムバケットシナリオでは、load_to_single_tablet=trueを有効にすることで、データを単一のTabletに直接書き込み、複数のTabletsへの分散をバイパスできます。これにより、Tablet分散を計算するCPUオーバーヘッドとBE間のRPC送信オーバーヘッドが排除され、書き込み速度が大幅に向上します。同時に、単一のTabletへの集中書き込みにより、小Segmentファイル生成が削減され、頻繁なCompactionによる書き込み増幅を回避し、BEリソース消費とストレージフラグメンテーションを削減し、ロードとクエリ効率を向上させます。
メモリ最適化:フラッシュとリソース影響の削減
データロード中、BEは最初にデータをメモリMemTable(デフォルト200MB)に書き込み、満杯になると非同期でディスクにフラッシュしてSegmentファイルを生成します(ディスクI/Oをトリガー)。高頻度フラッシュはディスクまたはオブジェクトストレージ(ストレージ・計算分離シナリオ)のI/O圧力を増加させ、メモリ不足はMemTable分散(マルチパーティション/バケットシナリオ)を引き起こし、頻繁なフラッシュやOOMを容易にトリガーします。
- パーティション別順次ロード:メモリ使用の集中
パーティション順序でのロード(日次など)により、データ書き込みを単一パーティションに集中させ、MemTable分散(マルチパーティションは各パーティションにMemTable割り当てが必要)とフラッシュ頻度を削減し、メモリフラグメンテーションとI/O圧力を削減します。
- 大規模データバッチロード:リソース影響の削減
大ファイルまたはマルチファイルロード(Broker Loadなど)の場合、バッチング(バッチあたり≤100GB)を推奨し、ロードエラー後の高い再試行コストを回避しながら、BEメモリとディスクの集中占有を削減します。ローカル大ファイルはstreamloaderツールを使用して自動バッチロードが可能です。
並行最適化:スループットとリソース競合のバランス
Dorisの分散アーキテクチャは、マルチBE並列書き込みをサポートします。並行性を増加させることでスループットを向上させることができますが、過度な並行性はCPU、メモリ、またはオブジェクトストレージQPS競合(ストレージ・計算分離シナリオではS3などのAPIのQPS制限を考慮する必要)を引き起こし、トランザクション競合とレイテンシを増加させます。
- 適切な並行制御:ハードウェアリソースとのマッチング
BEノード数とハードウェアリソース(CPU、メモリ、ディスクI/O)に基づいて並行スレッドを設定します。適度な並行性により、BE並列処理能力を十分に活用してスループットを向上させることができ、過度な並行性はリソース競合により効率を低下させます。
- 低レイテンシシナリオ:並行性の削減と非同期送信
低レイテンシ要件シナリオ(リアルタイム監視など)では、並行数を削減し(リソース競合の回避)、Group Commitの非同期モード(async_mode)と組み合わせて小トランザクションをマージし、トランザクションコミットレイテンシを削減します。
Dorisデータロードレイテンシとスループットのトレードオフ
Apache Dorisを使用する際、実際のビジネスシナリオでは、データロードのレイテンシとスループットのバランスを取る必要があることがよくあります:
- 低レイテンシ:ユーザが最新のデータをより迅速に見ることができることを意味しますが、小さな書き込みバッチと高い書き込み頻度により、より頻繁なバックグラウンドCompactionが発生し、より多くのCPU、IO、メモリリソースを消費し、メタデータ管理圧力が増加します。
- 高スループット:単一ロードデータ量を増加させることでロード回数を削減し、メタデータ圧力とバックグラウンドCompactionオーバーヘッドを大幅に削減し、システム全体のパフォーマンスを向上させることができます。ただし、データ書き込みから可視性までのレイテンシが増加します。
そのため、ユーザはビジネスレイテンシ要件を満たしながら単一ロードデータ量を最大化し、スループットを向上させ、システムオーバーヘッドを削減することを推奨します。
テストデータ
Flink End-to-Endレイテンシ
Flink Connectorのバッチモードでの書き込みを使用し、主にend-to-endレイテンシとロードスループットに焦点を当てます。バッチ時間は、Flink Connectorのsink.buffer-flush.intervalパラメータで制御されます。Flink Connectorの詳細な使用方法については、Flink-Doris-Connectorを参照してください。
マシン構成:
- 1 FE:8コアCPU、16GBメモリ
- 3 BE:16コアCPU、64GBメモリ
データセット:
- TPCH lineitemデータ
異なるバッチ時間と並行レベル下でのロードパフォーマンス、テスト結果:
| バッチ時間 (s) | ロード並行数 | バケット数 | スループット (rows/s) | End-to-End平均レイテンシ (s) | End-to-End P99レイテンシ (s) |
|---|---|---|---|---|---|
| 0.2 | 1 | 32 | 6073 | 0.211 | 0.517 |
| 1 | 1 | 32 | 31586 | 0.71 | 1.39 |
| 10 | 1 | 32 | 67437 | 5.65 | 10.90 |
| 20 | 1 | 32 | 93769 | 10.962 | 20.682 |
| 60 | 1 | 32 | 125000 | 32.46 | 62.17 |
| 0.2 | 10 | 32 | 9300 | 0.38 | 0.704 |
| 1 | 10 | 32 | 34633 | 0.75 | 1.47 |
| 10 | 10 | 32 | 82023 | 5.44 | 10.43 |
| 20 | 10 | 32 | 139731 | 11.12 | 22.68 |
| 60 | 10 | 32 | 171642 | 32.37 | 61.93 |
異なるバケット数がロードパフォーマンスに与える影響、テスト結果:
| バッチ時間 (s) | ロード並行数 | バケット数 | スループット (rows/s) | End-to-End平均レイテンシ (s) | End-to-End P99レイテンシ (s) |
|---|---|---|---|---|---|
| 1 | 10 | 4 | 34722 | 0.86 | 2.28 |
| 1 | 10 | 16 | 34526 | 0.8 | 1.52 |
| 1 | 10 |