Broker Load
Broker LoadはMySQL APIから開始されます。DorisはLOAD文の情報に基づいて、ソースからデータを能動的に取得します。Broker Loadは非同期インポート方式です。Broker Loadタスクの進捗と結果は、SHOW LOAD文で確認できます。
Broker Loadは、ソースデータがHDFSなどのリモートストレージシステムに格納されており、データ量が比較的大きいシナリオに適しています。
HDFSやS3からの直接読み取りは、レイクハウス/TVFのHDFS TVFやS3 TVFを通じてもインポートできます。TVFに基づく現在の「Insert Into」は同期インポートですが、Broker Loadは非同期インポート方式です。
Dorisの初期バージョンでは、S3 LoadとHDFS Loadの両方がWITH BROKERを使用して特定のBrokerプロセスに接続することで実装されていました。
新しいバージョンでは、S3 LoadとHDFS Loadは最も一般的に使用されるインポート方式として最適化されており、追加のBrokerプロセスに依存しなくなりましたが、Broker Loadと類似の構文を使用しています。
歴史的な理由と構文の類似性により、S3 Load、HDFS Load、およびBroker Loadは総称してBroker Loadと呼ばれています。
制限事項
サポートされているデータソース:
- S3プロトコル
- HDFSプロトコル
- カスタムプロトコル(brokerプロセスが必要)
サポートされているデータ型:
- CSV
- JSON
- PARQUET
- ORC
サポートされている圧縮タイプ:
- PLAIN
- GZ
- LZO
- BZ2
- LZ4FRAME
- DEFLATE
- LZOP
- LZ4BLOCK
- SNAPPYBLOCK
- ZLIB
- ZSTD
基本原理
ユーザーがインポートタスクを送信した後、Frontend(FE)が対応するプランを生成します。現在のBackend(BE)ノード数とファイルサイズに基づいて、プランは複数のBEノードに分散して実行され、各BEノードがインポートデータの一部を処理します。
実行中、BEノードはBrokerからデータを取得し、必要な変換を行った後、システムにデータをインポートします。すべてのBEノードがインポートを完了すると、FEがインポートの成功可否について最終判定を行います。

図に示すように、BEノードは対応するリモートストレージシステムからデータを読み取るためにBrokerプロセスに依存しています。Brokerプロセスの導入は、主に異なるリモートストレージシステムに対応することを目的としています。ユーザーは確立された標準に従って独自のBrokerプロセスを開発できます。Javaを使用して開発できるこれらのBrokerプロセスは、ビッグデータエコシステム内のさまざまなストレージシステムとの互換性を向上させます。BrokerプロセスとBEノードの分離により、両者間のエラー分離が確保され、BEの安定性が向上します。
現在、BEノードにはHDFSおよびS3 Brokerのサポートが組み込まれています。そのため、HDFSやS3からデータをインポートする際は、追加でBrokerプロセスを開始する必要がありません。ただし、カスタマイズされたBroker実装が必要な場合は、対応するBrokerプロセスをデプロイする必要があります。
クイックスタート
このセクションでは、S3 Loadのデモを示します。 使用方法の具体的な構文については、SQLマニュアルのBROKER LOADを参照してください。
前提条件の確認
- Tableへの権限付与
Broker Loadは対象Tableに対するINSERT権限が必要です。INSERT権限がない場合は、GRANTコマンドを通じてユーザーに権限を付与できます。
- S3認証と接続情報
ここでは主に、AWS S3に格納されたデータをインポートする方法を紹介します。S3プロトコルをサポートする他のオブジェクトストレージシステムからデータをインポートする場合は、AWS S3の手順を参考にしてください。
-
AKとSK:まず、AWSの
Access Keysを見つけるか再生成する必要があります。生成方法の説明は、AWSコンソールのMy Security Credentialsで確認できます。 -
REGIONとENDPOINT:REGIONはバケット作成時に選択するか、バケットリストで確認できます。各REGIONのS3 ENDPOINTはAWSドキュメントで確認できます。
ロードジョブの作成
- CSVファイルbrokerload_example.csvを作成します。このファイルはS3に格納されており、その内容は以下の通りです:
1,Emily,25
2,Benjamin,35
3,Olivia,28
4,Alexander,60
5,Ava,17
6,William,69
7,Sophia,32
8,James,64
9,Emma,37
10,Liam,64
- ロード用のDorisTableを作成する
Dorisでインポート用のTableを作成します。SQL文は以下の通りです:
CREATE TABLE testdb.test_brokerload(
user_id BIGINT NOT NULL COMMENT "user id",
name VARCHAR(20) COMMENT "name",
age INT COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
- Broker Loadを使用してS3からデータをインポートします。バケット名とS3認証情報は実際の状況に応じて入力してください:
LOAD LABEL broker_load_2022_04_01
(
DATA INFILE("s3://your_bucket_name/brokerload_example.csv")
INTO TABLE test_brokerload
COLUMNS TERMINATED BY ","
FORMAT AS "CSV"
(user_id, name, age)
)
WITH S3
(
"provider" = "S3",
"AWS_ENDPOINT" = "s3.us-west-2.amazonaws.com",
"AWS_ACCESS_KEY" = "<your-ak>",
"AWS_SECRET_KEY"="<your-sk>",
"AWS_REGION" = "us-west-2",
"compress_type" = "PLAIN"
)
PROPERTIES
(
"timeout" = "3600"
);
providerはS3サービスのベンダーを指定します。
サポートされているS3プロバイダー一覧:
- "S3" (AWS, Amazon Web Services)
- "AZURE" (Microsoft Azure)
- "GCP" (GCP, Google Cloud Platform)
- "OSS" (Alibaba Cloud)
- "COS" (Tencent Cloud)
- "OBS" (Huawei Cloud)
- "BOS" (Baidu Cloud)
お使いのサービスがリストにない場合(MinIOなど)、"S3"(AWS互換モード)を使用することができます。
インポートステータスの確認
Broker Loadは非同期インポート方式であり、具体的なインポート結果はSHOW LOADコマンドで確認できます。
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 41326624
Label: broker_load_2022_04_01
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2022-04-01 18:59:06
EtlStartTime: 2022-04-01 18:59:11
EtlFinishTime: 2022-04-01 18:59:11
LoadStartTime: 2022-04-01 18:59:11
LoadFinishTime: 2022-04-01 18:59:11
URL: NULL
JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
1 row in set (0.01 sec)
インポートのキャンセル
Broker LoadジョブのステータスがCANCELLEDまたはFINISHEDでない場合、ユーザーが手動でキャンセルできます。キャンセルするには、ユーザーはキャンセルするインポートタスクのlabelを指定する必要があります。cancel importコマンドの構文は、CANCEL LOADを実行することで確認できます。
例:DEMOデータベースでlabelが"broker_load_2022_04_01"のインポートジョブをキャンセルする場合。
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_04_01";
Compute Groupの選択
ストレージ・コンピュート分離モードにおいて、Broker LoadがCompute Groupを選択する優先順位のロジックは以下の通りです:
use db@cluster statementで指定されたCompute Groupを選択する;- ユーザープロパティ
default_compute_groupで指定されたCompute Groupを選択する; - 現在のユーザーがアクセス権限を持つCompute Groupから1つを選択する;
統合ストレージ・コンピュートモードでは、ユーザープロパティresource_tags.locationで指定されたCompute Groupを選択します;
ユーザープロパティで指定されていない場合は、defaultという名前のCompute Groupを使用します;
リファレンスマニュアル
broker loadのSQL構文
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
[format_properties]
)
WITH [S3|HDFS|BROKER broker_name]
[broker_properties]
[load_properties]
[COMMENT "comments"];
WITH句はストレージシステムへのアクセス方法を指定し、broker_propertiesはアクセス方法の設定パラメータです。
S3: S3プロトコルを使用するストレージシステムHDFS: HDFSプロトコルを使用するストレージシステムBROKER broker_name: その他のプロトコルを使用するストレージシステム。現在利用可能なbroker_nameのリストはSHOW BROKERで確認できます。詳細については、よくある問題セクションの「その他のBrokerインポート」を参照してください。
関連設定
Load Properties
| Property Name | タイプ | デフォルト値 | デスクリプション |
|---|---|---|---|
| "timeout" | Long | 14400 | インポートのタイムアウトを秒単位で指定するために使用されます。設定可能な範囲は1秒から259200秒です。 |
| "max_filter_ratio" | Float | 0.0 | フィルタリング可能な(不正または問題のある)データの最大許容率を指定するために使用され、デフォルトはゼロ許容です。値の範囲は0から1です。インポートされたデータのエラー率がこの値を超えた場合、インポートは失敗します。不正データにはwhere条件によってフィルタリングされた行は含まれません。 |
| "strict_mode" | Boolean | false | このインポートに対してストリクトモードを有効にするかどうかを指定するために使用されます。 |
| "partial_columns" | Boolean | false | 部分カラム更新を有効にするかどうかを指定するために使用され、デフォルト値はfalseです。このパラメータはUnique Key + Merge on WriteTableでのみ利用可能です。 |
| "timezone" | String | "Asia/Shanghai" | このインポートで使用するタイムゾーンを指定するために使用されます。このパラメータは、インポートに関わるすべてのタイムゾーン関連関数の結果に影響します。 |
| "load_parallelism" | Integer | 8 | 各backendでの最大並列インスタンス数を制限します。 |
| "send_batch_parallelism" | Integer | 1 | memtable_on_sink_nodeが無効な場合の、sinkノードがデータを送信する際の並列度です。 |
| "load_to_single_tablet" | Boolean | "false" | パーティションに対応する単一のtabletにのみデータをロードするかどうかを指定するために使用されます。このパラメータは、ランダムバケッティングを持つOLAPTableにロードする場合にのみ利用可能です。 |
| "priority" | oneof "HIGH", "NORMAL", "LOW" | "NORMAL" | タスクの優先度です。 |
Format Properties
| Property Name | タイプ | デフォルト値 | デスクリプション |
|---|---|---|---|
skip_lines | Integer | 0 | CSVファイルの開始時にスキップする行数。csv_with_namesまたはcsv_with_names_and_typesを使用している場合は無視されます。 |
trim_double_quotes | Boolean | false | trueの場合、各フィールドから最外側のダブルクォートを除去します。 |
enclose | String | "" | 区切り文字や改行を含むフィールドの囲み文字。例:区切り文字が,で囲み文字が'の場合、'b,c'は1つのフィールドとして解析されます。 |
escape | String | "" | フィールド内容に囲み文字を含めるためのエスケープ文字。例:'が囲み文字で\がエスケープ文字の場合、'b,\'c'は'b,'c'を1つのフィールドとして保持します。 |
注意: Format propertiesはソースファイルの解析方法(区切り文字、クォート処理など)を定義し、LOAD句内に設定する必要があります。Load propertiesは実行動作(タイムアウト、再試行など)を制御し、外側のPROPERTIESブロック内に設定する必要があります。
LOAD LABEL s3_load_example (
DATA INFILE("s3://bucket/path/file.csv")
INTO TABLE users
COLUMNS TERMINATED BY ","
FORMAT AS "CSV"
(user_id, name, age)
PROPERTIES (
"trim_double_quotes" = "true" -- format property
)
)
WITH S3 (
...
)
PROPERTIES (
"timeout" = "3600" -- load property
);
fe.conf
以下の設定項目は、すべてのBroker Loadインポートタスクに影響するBroker Load用のシステムレベル設定です。これらの設定はfe.confファイルを変更することで調整できます。
| Session Variable | タイプ | デフォルト値 | デスクリプション |
|---|---|---|---|
| min_bytes_per_broker_scanner | Long | 67108864 (64 MB) | Broker Loadジョブにおいて単一のBEが処理するデータの最小量(バイト単位)。 |
| max_bytes_per_broker_scanner | Long | 536870912000 (500 GB) | Broker Loadジョブにおいて単一のBEが処理するデータの最大量(バイト単位)。通常、インポートジョブでサポートされるデータの最大量はmax_bytes_per_broker_scanner * BEノード数です。より大量のデータをインポートする必要がある場合は、max_bytes_per_broker_scannerパラメータのサイズを適切に調整する必要があります。 |
| max_broker_concurrency | Integer | 10 | ジョブの最大同時インポート数を制限します。 |
| default_load_parallelism | Integer | 8 | BEノードあたりの最大同時実行インスタンス数 |
| broker_load_default_timeout_second | 14400 | Broker Loadインポートのデフォルトタイムアウト(秒単位)。 |
注意:min_bytes_per_broker_scanner、max_broker_concurrency、ソースファイルのサイズ、および現在のクラスター内のBE数が、このロードの同時実行インスタンス数を共同で決定します。
Import Concurrency = Math.min(Source File Size / min_bytes_per_broker_scanner, max_broker_concurrency, Current Number of BE Nodes * load_parallelism)
Processing Volume per BE for this Import = Source File Size / Import Concurrency
session variables
| Session Variable | タイプ | Default | デスクリプション |
|---|---|---|---|
| time_zone | String | "Asia/Shanghai" | デフォルトのタイムゾーンで、インポート時のタイムゾーン関連関数の結果に影響します。 |
| send_batch_parallelism | Integer | 1 | sink nodeのデータ送信の並行度で、enable_memtable_on_sink_nodeがfalseに設定されている場合のみ有効です。 |
よくある問題
よくあるエラー
1. インポートエラー: Scan bytes per broker scanner exceed limit:xxx
ドキュメントのベストプラクティスセクションを参照し、FE設定項目max_bytes_per_broker_scannerとmax_broker_concurrencyを変更してください。
2. インポートエラー: failed to send batchまたはTabletWriter add batch with unknown id
query_timeoutとstreaming_load_rpc_max_alive_time_secの設定を適切に調整してください。
3. インポートエラー: LOAD_RUN_FAIL; msg:Invalid Column Name:xxx
PARQUETまたはORC形式のデータの場合、ファイルヘッダーの列名はDorisTableの列名と一致する必要があります。例:
(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
)
これはparquetまたはorcファイルの(tmp_c1, tmp_c2)という名前の列を取得し、それらをDorisTableの(id, name)列にマッピングすることを表しています。setが指定されていない場合、ファイルヘッダーの列がマッピングに使用されます。
特定のHiveバージョンを使用してORCファイルを直接生成した場合、ORCファイルの列ヘッダーがHiveメタデータではなく、(_col0, _col1, _col2, ...)になる可能性があり、これによりInvalid Column Nameエラーが発生する場合があります。この場合、SETを使用したマッピングが必要です。
5. インポートエラー: Failed to get S3 FileSystem for bucket is null/empty
bucketの情報が間違っているか存在しません。またはbucketの形式がサポートされていません。GCSを使用してs3://gs_bucket/load_tblのようにアンダースコアを含むbucket名を作成する場合、S3 ClientがGCSにアクセスする際にエラーを報告する可能性があります。bucketを作成する際はアンダースコアを使用しないことを推奨します。
6. インポートタイムアウト
インポートのデフォルトタイムアウトは4時間です。タイムアウトが発生した場合、問題を解決するために最大インポートタイムアウトを直接増やすことは推奨されません。単一のインポート時間がデフォルトのインポートタイムアウトである4時間を超える場合、インポートするファイルを分割して複数回のインポートを実行することで問題を解決するのが最適です。過度に長いタイムアウト時間を設定すると、失敗したインポートの再試行にかかるコストが高くなる可能性があります。
以下の式を使用してDorisクラスターの期待最大インポートファイルデータ量を計算できます:
期待最大インポートファイルデータ量 = 14400s * 10M/s * BEの数
例えば、クラスターに10個のBEがある場合:
期待最大インポートファイルデータ量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G
一般的に、ユーザー環境では10M/sの速度に達しない可能性があるため、500Gを超えるファイルはインポート前に分割することを推奨します。
S3 Load URL style
-
S3 SDKはデフォルトでvirtual-hosted styleメソッドを使用してオブジェクトにアクセスします。ただし、一部のオブジェクトストレージシステムではvirtual-hosted styleアクセスが有効化またはサポートされていない場合があります。このような場合、
use_path_styleパラメータを追加してpath styleメソッドの使用を強制できます:WITH S3
(
"AWS_ENDPOINT" = "AWS_ENDPOINT",
"AWS_ACCESS_KEY" = "AWS_ACCESS_KEY",
"AWS_SECRET_KEY"="AWS_SECRET_KEY",
"AWS_REGION" = "AWS_REGION",
"use_path_style" = "true"
)
S3 Load temporary credentials
-
一時的な認証情報(TOKEN)を使用してS3プロトコルをサポートするすべてのオブジェクトストレージシステムにアクセスするためのサポートが利用可能です。使用方法は以下の通りです:
WITH S3
(
"AWS_ENDPOINT" = "AWS_ENDPOINT",
"AWS_ACCESS_KEY" = "AWS_TEMP_ACCESS_KEY",
"AWS_SECRET_KEY" = "AWS_TEMP_SECRET_KEY",
"AWS_TOKEN" = "AWS_TEMP_TOKEN",
"AWS_REGION" = "AWS_REGION"
)
HDFS Simple 認証
Simple認証とは、hadoop.security.authenticationが"simple"に設定されたHadoopの構成を指します。
(
"username" = "user",
"password" = ""
);
usernameは、アクセス対象のユーザーとして設定する必要があり、passwordは空白のままにしておくことができます。
HDFS Kerberos認証
この認証方式では、以下の情報が必要です:
-
hadoop.security.authentication: 認証方式としてKerberosを指定します。
-
hadoop.kerberos.principal: Kerberosプリンシパルを指定します。
-
hadoop.kerberos.keytab: Kerberos keytabのファイルパスを指定します。このファイルは、Brokerプロセスが配置されているサーバー上の絶対パスである必要があり、Brokerプロセスからアクセス可能である必要があります。
-
kerberos_keytab_content: base64でエンコードされたKerberos keytabファイルの内容を指定します。これは、kerberos_keytab設定の代替として使用できます。
設定例:
(
"hadoop.security.authentication" = "kerberos",
"hadoop.kerberos.principal" = "doris@YOUR.COM",
"hadoop.kerberos.keytab" = "/home/doris/my.keytab"
)
(
"hadoop.security.authentication" = "kerberos",
"hadoop.kerberos.principal" = "doris@YOUR.COM",
"kerberos_keytab_content" = "ASDOWHDLAWIDJHWLDKSALDJSDIWALD"
)
Kerberos認証を使用するには、krb5.conf (opens new window)ファイルが必要です。krb5.confファイルにはKerberosの設定情報が含まれています。通常、krb5.confファイルは/etcディレクトリにインストールする必要があります。KRB5_CONFIG環境変数を設定することで、デフォルトの場所を上書きできます。krb5.confファイルの内容の例は以下の通りです:
[libdefaults]
default_realm = DORIS.HADOOP
default_tkt_enctypes = des3-hmac-sha1 des-cbc-crc
default_tgs_enctypes = des3-hmac-sha1 des-cbc-crc
dns_lookup_kdc = true
dns_lookup_realm = false
[realms]
DORIS.HADOOP = {
kdc = kerberos-doris.hadoop.service:7005
}
HDFS HA Mode
この設定は、HA(High Availability)モードでデプロイされたHDFSクラスターにアクセスするために使用されます。
-
dfs.nameservices: HDFSサービスの名前を指定します。これはカスタマイズ可能です。例: "dfs.nameservices" = "my_ha"
-
dfs.ha.namenodes.xxx: namenodeの名前をカスタマイズします。複数の名前はカンマで区切ります。ここで、xxxはdfs.nameservicesで指定されたカスタム名を表します。例: "dfs.ha.namenodes.my_ha" = "my_nn"
-
dfs.namenode.rpc-address.xxx.nn: namenodeのRPCアドレス情報を指定します。この文脈で、nnはdfs.ha.namenodes.xxxで設定されたnamenode名を表します。例: "dfs.namenode.rpc-address.my_ha.my_nn" = "host:port"
-
dfs.client.failover.proxy.provider.[nameservice ID]: namenodeへのクライアント接続のプロバイダーを指定します。デフォルトはorg.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProviderです。
設定例は以下の通りです:
(
"fs.defaultFS" = "hdfs://my_ha",
"dfs.nameservices" = "my_ha",
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
"dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
HAモードは、クラスタアクセス用の前述の2つの認証方法と組み合わせることができます。例えば、simple認証を通じてHA HDFSにアクセスする場合:
(
"username"="user",
"password"="passwd",
"fs.defaultFS" = "hdfs://my_ha",
"dfs.nameservices" = "my_ha",
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
"dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
他のbrokerでの読み込み
他のリモートストレージシステム用のBrokerは、Dorisクラスターにおけるオプションのプロセスであり、主にDorisがリモートストレージ上のファイルやディレクトリの読み書きをサポートするために使用されます。
現在、Dorisは様々なリモートストレージシステム向けのBroker実装を提供しています。
以前のバージョンでは、異なるオブジェクトストレージBrokerも利用可能でしたが、現在はオブジェクトストレージからデータをインポートする際にはWITH S3メソッドの使用が推奨されており、WITH BROKERメソッドはもはや推奨されません。
- Tencent Cloud CHDFS
- Tencent Cloud GFS
- JuiceFS
BrokerはRPCサービスポートを通じてサービスを提供し、ステートレスなJavaプロセスとして動作します。その主な責任は、open、pread、pwrite等のリモートストレージに対するPOSIXライクなファイル操作をカプセル化することです。さらに、Brokerは他の情報を一切記録しないため、リモートストレージに関連するすべての接続詳細、ファイル情報、および権限詳細は、RPC呼び出し時にパラメータを通じてBrokerプロセスに渡す必要があります。これにより、Brokerが正しくファイルの読み書きを行えることが保証されます。
Brokerは純粋にデータパスウェイとしてのみ機能し、計算タスクは一切関与しないため、必要なメモリ使用量は最小限です。通常、Dorisシステムでは1つまたは複数のBrokerプロセスがデプロイされます。さらに、同じタイプのBrokerはグループ化され、一意の名前(Broker name)が割り当てられます。
このセクションでは主に、接続情報や認証詳細等、Brokerが異なるリモートストレージシステムにアクセスする際に必要なパラメータに焦点を当てています。これらのパラメータを理解し、正しく設定することは、Dorisとリモートストレージシステム間の成功かつ安全なデータ交換にとって重要です。
Broker情報
Brokerの情報は2つの部分から構成されます:名前(Broker name)と認証情報です。通常の構文フォーマットは以下の通りです:
WITH BROKER "broker_name"
(
"username" = "xxx",
"password" = "yyy",
"other_prop" = "prop_value",
...
);
Broker Name
通常、ユーザーは操作コマンドでWITH BROKER "broker_name"句を通じて既存のBroker Nameを指定する必要があります。Broker Nameは、ALTER SYSTEM ADD BROKERコマンドを通じてBrokerプロセスを追加する際にユーザーが指定する名前です。1つの名前は通常、1つ以上のBrokerプロセスに対応します。Dorisは名前に基づいて利用可能なBrokerプロセスを選択します。ユーザーはSHOW BROKERコマンドを通じて、クラスター内に現在存在するBrokerを確認できます。
Broker Nameは単にユーザー定義の名前であり、Brokerのタイプを表すものではありません。
認証情報
異なるBrokerタイプとアクセス方法には、異なる認証情報が必要です。認証情報は通常、WITH BROKER "broker_name"の後にKey-Value形式でProperty Mapで提供されます。
Broker Loadの例
HDFSからTXTファイルをインポートする
LOAD LABEL demo.label_20220402
(
DATA INFILE("hdfs://host:port/tmp/test_hdfs.txt")
INTO TABLE `load_hdfs_file_test`
COLUMNS TERMINATED BY "\t"
(id,age,name)
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username" = "user"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
HDFSはNameNode HA(High Availability)の設定が必要です
LOAD LABEL demo.label_20220402
(
DATA INFILE("hdfs://hafs/tmp/test_hdfs.txt")
INTO TABLE `load_hdfs_file_test`
COLUMNS TERMINATED BY "\t"
(id,age,name)
)
with HDFS
(
"hadoop.username" = "user",
"fs.defaultFS"="hdfs://hafs",
"dfs.nameservices" = "hafs",
"dfs.ha.namenodes.hafs" = "my_namenode1, my_namenode2",
"dfs.namenode.rpc-address.hafs.my_namenode1" = "nn1_host:rpc_port",
"dfs.namenode.rpc-address.hafs.my_namenode2" = "nn2_host:rpc_port",
"dfs.client.failover.proxy.provider.hafs" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
ワイルドカードを使用してHDFSから2つのファイルバッチにマッチするデータをインポートし、それらを2つの別々のTableにインポートする
LOAD LABEL example_db.label2
(
DATA INFILE("hdfs://host:port/input/file-10*")
INTO TABLE `my_table1`
PARTITION (p1)
COLUMNS TERMINATED BY ","
(k1, tmp_k2, tmp_k3)
SET (
k2 = tmp_k2 + 1,
k3 = tmp_k3 + 1
),
DATA INFILE("hdfs://host:port/input/file-20*")
INTO TABLE `my_table2`
COLUMNS TERMINATED BY ","
(k1, k2, k3)
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username" = "user"
);
ワイルドカード file-10* と file-20* にマッチする2つのバッチのファイルをHDFSからインポートし、それらを2つの別々のTable my_table1 と my_table2 にロードするには。この場合、my_table1はデータをパーティション p1 にインポートするよう指定し、ソースファイルの2列目と3列目の値をインポート前に1ずつ増加させます。
ワイルドカードを使用してHDFSからデータのバッチをインポートする
LOAD LABEL example_db.label3
(
DATA INFILE("hdfs://host:port/user/doris/data/*/*")
INTO TABLE `my_table`
COLUMNS TERMINATED BY "\\x01"
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username" = "user"
);
Hiveで一般的に使用されるデフォルトデリミタである\x01をデリミタとして指定し、ワイルドカード文字*を使用してデータディレクトリ下のすべてのディレクトリ内のすべてのファイルを参照します。
Parquet形式のデータをインポートし、FORMATをparquetとして指定する
LOAD LABEL example_db.label4
(
DATA INFILE("hdfs://host:port/input/file")
INTO TABLE `my_table`
FORMAT AS "parquet"
(k1, k2, k3)
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username" = "user"
);
デフォルトの方法は、ファイル拡張子によって判定することです。
データをインポートし、ファイルパスからパーティションフィールドを抽出する
LOAD LABEL example_db.label5
(
DATA INFILE("hdfs://host:port/input/city=beijing/*/*")
INTO TABLE `my_table`
FORMAT AS "csv"
(k1, k2, k3)
COLUMNS FROM PATH AS (city, utc_date)
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username" = "user"
);
my_table内のカラムはk1、k2、k3、city、およびutc_dateです。
ディレクトリhdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city=beijingには以下のファイルが含まれています:
hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv
このファイルには3つのデータ列のみが含まれています:k1、k2、k3。他の2つの列であるcityとutc_dateは、ファイルパスから抽出されます。
インポートしたデータをフィルタする
LOAD LABEL example_db.label6
(
DATA INFILE("hdfs://host:port/input/file")
INTO TABLE `my_table`
(k1, k2, k3)
SET (
k2 = k2 + 1
)
PRECEDING FILTER k1 = 1
WHERE k1 > k2
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username" = "user"
);
k1 = 1である元データの行で、かつ変換後にk1 > k2となる行のみがインポートされます。
データをインポートし、ファイルパスから時間パーティションフィールドを抽出する。
LOAD LABEL example_db.label7
(
DATA INFILE("hdfs://host:port/user/data/*/test.txt")
INTO TABLE `tbl12`
COLUMNS TERMINATED BY ","
(k2,k3)
COLUMNS FROM PATH AS (data_time)
SET (
data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
)
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username" = "user"
);
時刻には"%3A"が含まれています。HDFSパスではコロン":"は許可されていないため、すべてのコロンは"%3A"に置き換えられます。
パス配下には以下のファイルがあります:
/user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
/user/data/data_time=2020-02-18 00%3A00%3A00/test.txt
Table構造は以下の通りです:
CREATE TABLE IF NOT EXISTS tbl12 (
data_time DATETIME,
k2 INT,
k3 INT
) DISTRIBUTED BY HASH(data_time) BUCKETS 10
PROPERTIES (
"replication_num" = "3"
);
インポート時にMergeモードを使用する
LOAD LABEL example_db.label8
(
MERGE DATA INFILE("hdfs://host:port/input/file")
INTO TABLE `my_table`
(k1, k2, k3, v2, v1)
DELETE ON v2 > 100
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username"="user"
)
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1"
);
Merge モードをインポートに使用するには、"my_table" は Unique Key Tableである必要があります。インポートされるデータの "v2" 列の値が 100 より大きい場合、その行は削除行として扱われます。インポートタスクのタイムアウトは 3600 秒で、最大 10% のエラー率が許可されます。
置換の順序を保証するために、インポート時に "source_sequence" 列を指定します。
LOAD LABEL example_db.label9
(
DATA INFILE("hdfs://host:port/input/file")
INTO TABLE `my_table`
COLUMNS TERMINATED BY ","
(k1,k2,source_sequence,v1,v2)
ORDER BY source_sequence
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username"="user"
);
The "my_table" must be a Unique Key model table and have a specified Sequence column. The data will maintain its order based on the values in the "source_sequence" column in the source data.
指定されたファイル形式をjsonとしてインポートし、それに応じてjson_rootとjsonpathsを指定します。
LOAD LABEL example_db.label10
(
DATA INFILE("hdfs://host:port/input/file.json")
INTO TABLE `my_table`
FORMAT AS "json"
PROPERTIES(
"json_root" = "$.item",
"jsonpaths" = "[\"$.id\", \"$.city\", \"$.code\"]"
)
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username"="user"
);
jsonpathsは、カラムリストおよびSET (column_mapping)と組み合わせて使用することもできます:
LOAD LABEL example_db.label10
(
DATA INFILE("hdfs://host:port/input/file.json")
INTO TABLE `my_table`
FORMAT AS "json"
(id, code, city)
SET (id = id * 10)
PROPERTIES(
"json_root" = "$.item",
"jsonpaths" = "[\"$.id\", \"$.city\", \"$.code\"]"
)
)
with HDFS
(
"fs.defaultFS"="hdfs://host:port",
"hadoop.username"="user"
);
JSONファイルのルートノードでJSONオブジェクトを読み込む必要がある場合、jsonpathsは $.として指定する必要があります。例:PROPERTIES("jsonpaths"="$.")"
他のブローカーからの読み込み
-
Alibaba Cloud OSS
(
"fs.oss.accessKeyId" = "",
"fs.oss.accessKeySecret" = "",
"fs.oss.endpoint" = ""
) -
JuiceFS
(
"fs.defaultFS" = "jfs://xxx/",
"fs.jfs.impl" = "io.juicefs.JuiceFileSystem",
"fs.AbstractFileSystem.jfs.impl" = "io.juicefs.JuiceFS",
"juicefs.meta" = "xxx",
"juicefs.access-log" = "xxx"
) -
GCS
Brokerを使用してGCSにアクセスする場合、Project IDが必要ですが、その他のパラメータはオプションです。すべてのパラメータ設定については、GCS Configを参照してください。
(
"fs.gs.project.id" = "Your Project ID",
"fs.AbstractFileSystem.gs.impl" = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
"fs.gs.impl" = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
)
その他のヘルプ
Broker Loadの使用に関するより詳細な構文とベストプラクティスについては、Broker Loadコマンドマニュアルを参照してください。また、MySQLクライアントのコマンドラインでHELP BROKER LOADと入力することで、より多くのヘルプ情報を取得できます。