メインコンテンツまでスキップ
バージョン: 4.x

BROKER LOAD

概要

Broker LoadはDorisにおけるデータインポート手法の一つで、主にHDFSやS3などのリモートストレージシステムから大規模データをインポートするために使用されます。MySQL APIを通じて開始され、非同期インポート手法です。インポートの進捗と結果はSHOW LOAD文を使用して照会できます。

以前のバージョンでは、S3およびHDFS LoadはBrokerプロセスに依存していました。現在は、追加のBrokerプロセスに依存することなく、データソースから直接データを読み取ります。それでも、構文が類似しているため、S3 Load、HDFS Load、およびBroker Loadは総称してBroker Loadと呼ばれています。

構文

LOAD LABEL [<db_name>.]<load_label>
(
[ { MERGE | APPEND | DELETE } ]
DATA INFILE
(
"<file_path>"[, ...]
)
[ NEGATIVE ]
INTO TABLE `<table_name>`
[ PARTITION ( <partition_name> [ , ... ] ) ]
[ COLUMNS TERMINATED BY "<column_separator>" ]
[ LINES TERMINATED BY "<line_delimiter>" ]
[ FORMAT AS "<file_type>" ]
[ COMPRESS_TYPE AS "<compress_type>" ]
[ (<column_list>) ]
[ COLUMNS FROM PATH AS (<column_name> [ , ... ] ) ]
[ SET (<column_mapping>) ]
[ PRECEDING FILTER <predicate> ]
[ WHERE <predicate> ]
[ DELETE ON <expr> ]
[ ORDER BY <source_sequence> ]
[ PROPERTIES ("<key>"="<value>" [, ...] ) ]
)
WITH BROKER "<broker_name>"
( <broker_properties>
[ , ... ])
[ PROPERTIES (
<load_properties>
[ , ... ]) ]
[COMMENT "<comment>" ];

必須パラメータ

1. <db_name>

インポート用データベースの名前を指定します。

2. <load_label>

各インポートタスクには一意のLabelを指定する必要があります。このLabelを使用して後でジョブの進行状況を照会できます。

3. <table_name>

インポートタスクに対応するTableを指定します。

4. <file_path>

インポートするファイルパスを指定します。複数のパスを指定でき、ワイルドカードを使用できます。パスは最終的にファイルと一致する必要があります。ディレクトリのみと一致する場合、インポートは失敗します。

5. <broker_name>

使用するBrokerサービスの名前を指定します。例えば、public - cloud DorisでのBrokerサービス名はbosです。

6. <broker_properties>

brokerが必要とする情報を指定します。この情報は通常、BrokerがBOSやHDFSなどのリモートストレージシステムにアクセスできるようにするために使用されます。

 (
"username" = "user",
"password" = "pass",
...
)

オプションパラメータ

1. merge | append | delete

データマージタイプ。デフォルトはappendで、このインポートが通常の追記書き込み操作であることを示します。mergedeleteタイプは、unique keyモデルを持つTableにのみ適用されます。mergeタイプは[delete on]文と組み合わせて使用し、削除フラグ列をマークする必要があります。deleteタイプは、今回インポートされるすべてのデータが削除データであることを示します。

2. negative

「負の」インポートを示します。この方法は整数sum集約タイプの集約データTableにのみ適用されます。インポートされたデータのsum集約列に対応する整数値を負の値にし、誤ったデータを相殺するために使用されます。

3. <partition_name>

Tableの特定のパーティションのみをインポートすることを指定します。例:partition (p1, p2,...)。パーティション範囲外の他のデータは無視されます。

4. <column_separator>

列セパレータを指定します。CSV形式でのみ有効で、単一バイトセパレータのみを指定できます。

5. <line_delimiter>

行セパレータを指定します。CSV形式でのみ有効で、単一バイトセパレータのみを指定できます。

6. <file_type>

ファイル形式を指定します。csv(デフォルト)、parquetorc形式をサポートしています。

7. <compress_type>

ファイル圧縮タイプを指定します。gzbz2lz4frameをサポートしています。

8. <column_list>

元ファイルの列順序を指定します。

9. columns from path as (<c1>, <c2>,...)

インポートファイルパスから抽出する列を指定します。

10. <column_mapping>

列変換関数を指定します。

11. preceding filter <predicate>

データは最初にcolumn listcolumns from path asに従って元のデータ行に接合され、その後preceding filterの条件に従ってフィルタリングされます。

12. where <predicate>

条件に従ってインポートデータをフィルタリングします。

13. delete on <expr>

mergeインポートモードと組み合わせて使用し、unique keyモデルを持つTableにのみ適用されます。インポートされたデータの削除フラグを表す列と計算関係を指定します。

14. <source_sequence>

unique keyモデルを持つTableにのみ適用されます。インポートされたデータのsequence列を表す列を指定し、主にインポート時のデータ順序を保証するために使用されます。

15. properties ("<key>"="<value>",...)

インポートファイル形式のパラメータを指定します。CSV、JSONなどの形式に適用されます。例えば、json_rootjsonpathsfuzzy_parseなどのパラメータを指定できます。
enclose: 囲み文字。CSVデータフィールドに行セパレータまたは列セパレータが含まれている場合、単一バイト文字を囲み文字として指定して、誤った切り詰めを防ぐことができます。例えば、列セパレータが","で、囲み文字が"'"で、データが"a,'b,c'"の場合、"b,c"は1つのフィールドとして解析されます。
注意:enclose"に設定する場合、trim_double_quotestrueに設定する必要があります。
escape: エスケープ文字。フィールド内で囲み文字と同じ文字をエスケープするために使用されます。例えば、データが"a,'b,'c'"で、囲み文字が"'"で、"b,'c"を1つのフィールドとして解析したい場合、""などの単一バイトエスケープ文字を指定し、データを"a,'b,'c'"に変更する必要があります。

16. <load_properties>

オプションパラメータは以下のとおりで、実際の環境に基づいて追加できます。

パラメータパラメータ説明
timeoutインポートタイムアウト期間。デフォルトは4時間で、単位は秒です。
max_filter_ratioフィルタリング可能なデータの最大許容比率(データの不規則性などの理由による)。デフォルトはゼロ許容で、値の範囲は0から1です。
exec_mem_limitインポートメモリ制限。デフォルトは2GBで、単位はバイトです。
strict_modeデータに対して厳格な制限を課すかどうか。デフォルトはfalseです。
partial_columnsboolean型。trueに設定すると、部分列更新の使用を示します。デフォルト値はfalseです。TableモデルがUniqueでMerge on Writeを使用している場合にのみ設定できます。
timezoneタイムゾーンを指定します。strftimealignment_timestampfrom_unixtimeなど、タイムゾーンの影響を受ける一部の関数に影響します。詳細については、[Time Zone](https://chatgpt.com/advanced/time - zone.md)ドキュメントを参照してください。指定しない場合、"Asia/Shanghai"が使用されます。
load_parallelismインポート並行性。デフォルトは1です。インポート並行性を増やすと、複数の実行プランが開始され、インポートタスクを同時に実行し、インポートプロセスを高速化します。
send_batch_parallelismバッチデータ送信の並列性を設定します。並列性の値がBE設定のmax_send_batch_parallelism_per_jobを超える場合、max_send_batch_parallelism_per_jobの値が使用されます。
load_to_single_tabletboolean型。trueに設定すると、対応するパーティションの単一タブレットへのデータインポートのサポートを示します。デフォルト値はfalseです。ジョブ内のタスク数は全体の並行性に依存し、ランダムバケットを持つOLAPTableをインポートする場合にのみ設定できます。
priorityインポートタスクの優先度を設定します。HIGH/NORMAL/LOWのオプションがあり、デフォルトはNORMALです。PENDING状態のインポートタスクについて、優先度の高いタスクが最初にLOADING状態に入ります。
commentインポートタスクの備考情報を指定します。

アクセス制御要件

このSQLコマンドを実行するユーザーは、少なくとも以下の権限を持っている必要があります:

権限オブジェクト備考
LOAD_PRIVTable指定されたデータベースTableのインポート権限。

  1. HDFSからデータのバッチをインポートします。インポートファイルはfile.txtで、カンマ区切りで、Tablemy_tableにインポートされます。

    LOAD LABEL example_db.label1
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt")
    INTO TABLE `my_table`
    COLUMNS TERMINATED BY ","
    )
    WITH BROKER hdfs
    (
    "username"="hdfs_user",
    "password"="hdfs_password"
    );
  2. ワイルドカードを使用してHDFSからデータをインポートし、2つのファイルバッチをマッチさせて、それぞれを2つのTableにインポートします。ワイルドカードを使用して2つのファイルバッチ file - 10*file - 20* をマッチさせ、それぞれをTable my_table1my_table2 にインポートします。my_table1 については、パーティション p1 へのインポートを指定し、ソースファイルの2列目と3列目の値に1を加算した後にインポートします。

    LOAD LABEL example_db.label2
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*")
    INTO TABLE `my_table1`
    PARTITION (p1)
    COLUMNS TERMINATED BY ","
    (k1, tmp_k2, tmp_k3)
    SET (
    k2 = tmp_k2 + 1,
    k3 = tmp_k3 + 1
    ),
    DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*")
    INTO TABLE `my_table2`
    COLUMNS TERMINATED BY ","
    (k1, k2, k3)
    )
    WITH BROKER hdfs
    (
    "username"="hdfs_user",
    "password"="hdfs_password"
    );
  3. HDFSからデータのバッチをインポートします。区切り文字をデフォルトのHive区切り文字\\x01として指定し、ワイルドカード*を使用してdataディレクトリ配下のすべてのディレクトリ内のすべてのファイルを指定します。シンプル認証を使用し、同時にnamenode HAを設定します。

    LOAD LABEL example_db.label3
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*")
    INTO TABLE `my_table`
    COLUMNS TERMINATED BY "\\x01"
    )
    WITH BROKER my_hdfs_broker
    (
    "username" = "",
    "password" = "",
    "fs.defaultFS" = "hdfs://my_ha",
    "dfs.nameservices" = "my_ha",
    "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
    "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
    "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
    "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
  4. Parquet形式でデータをインポートし、FORMATparquetとして指定します。デフォルトでは、ファイルの拡張子によって判定されます。

    LOAD LABEL example_db.label4
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file")
    INTO TABLE `my_table`
    FORMAT AS "parquet"
    (k1, k2, k3)
    )
    WITH BROKER hdfs
    (
    "username"="hdfs_user",
    "password"="hdfs_password"
    );
  5. ファイルパスからデータをインポートし、パーティションフィールドを抽出します。my_tableの列はk1, k2, k3, city, utc_dateです。ディレクトリhdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city = beijingには以下のファイルが含まれています:

    hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
    hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
    hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
    hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv

ファイルにはk1, k2, k3の3つのデータ列のみが含まれており、cityutc_dateの2つのデータ列はファイルパスから抽出されます。

```sql
LOAD LABEL example_db.label10
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*")
INTO TABLE `my_table`
FORMAT AS "csv"
(k1, k2, k3)
COLUMNS FROM PATH AS (city, utc_date)
)
WITH BROKER hdfs
(
"username"="hdfs_user",
"password"="hdfs_password"
);
```

6. インポートするデータをフィルタリングします。元データでk1 = 1かつ変換後にk1 > k2となる行のみがインポートされます。

```sql
LOAD LABEL example_db.label6
(
DATA INFILE("hdfs://host:port/input/file")
INTO TABLE `my_table`
(k1, k2, k3)
SET (
k2 = k2 + 1
)
PRECEDING FILTER k1 = 1
WHERE k1 > k2
)
WITH BROKER hdfs
(
"username"="user",
"password"="pass"
);
```

7. データをインポートし、ファイルパスから時間パーティションフィールドを抽出します。この時間には%3Aが含まれます(HDFSパスでは:が許可されていないため、すべての:%3Aに置き換えられます)。

LOAD LABEL example_db.label7
(
DATA INFILE("hdfs://host:port/user/data/*/test.txt")
INTO TABLE `tbl12`
COLUMNS TERMINATED BY ","
(k2,k3)
COLUMNS FROM PATH AS (data_time)
SET (
data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
)
)
WITH BROKER hdfs
(
"username"="user",
"password"="pass"
);

ディレクトリには以下のファイルが含まれています:

/user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
/user/data/data_time=2020-02-18 00%3A00%3A00/test.txt

Table構造は以下の通りです:

data_time DATETIME,
k2 INT,
k3 INT
  1. HDFSからデータのバッチをインポートし、タイムアウト期間とフィルタリング比率を指定します。プレーンテキスト認証でbroker my_hdfs_brokerを使用します。インポートされたデータのv2 > 100の列に一致する元データの列を削除し、他の列は通常通りインポートします。

    LOAD LABEL example_db.label8
    (
    MERGE DATA INFILE("HDFS://test:802/input/file")
    INTO TABLE `my_table`
    (k1, k2, k3, v2, v1)
    DELETE ON v2 > 100
    )
    WITH HDFS
    (
    "hadoop.username"="user",
    "password"="pass"
    )
    PROPERTIES
    (
    "timeout" = "3600",
    "max_filter_ratio" = "0.1"
    );

インポートにはMERGEメソッドを使用してください。my_tableはUnique Keyモデルを持つTableである必要があります。インポートされるデータのv2列の値が100より大きい場合、その行は削除行として扱われます。

インポートタスクのタイムアウト期間は3600秒で、最大10%のエラー率が許容されます。

  1. インポート時にsource_sequence列を指定して、UNIQUE_KEYSTableでの置換順序を保証します:

    LOAD LABEL example_db.label9
    (
    DATA INFILE("HDFS://test:802/input/file")
    INTO TABLE `my_table`
    COLUMNS TERMINATED BY ","
    (k1,k2,source_sequence,v1,v2)
    ORDER BY source_sequence
    )
    WITH HDFS
    (
    "hadoop.username"="user",
    "password"="pass"
    )

my_tableはUnique Keyモデルを持つTableである必要があり、Sequence Colを指定する必要があります。データはソースデータのsource_sequence列の値に従って順序付けされます。

  1. HDFSからデータのバッチをインポートし、ファイル形式をjsonとして指定し、json_rootjsonpathsを設定します:

    LOAD LABEL example_db.label10
    (
    DATA INFILE("HDFS://test:port/input/file.json")
    INTO TABLE `my_table`
    FORMAT AS "json"
    PROPERTIES(
    "json_root" = "$.item",
    "jsonpaths" = "[$.id, $.city, $.code]"
    )
    )
    WITH BROKER HDFS (
    "hadoop.username" = "user",
    "password" = ""
    )
    PROPERTIES
    (
    "timeout"="1200",
    "max_filter_ratio"="0.1"
    );

jsonpathscolumn listおよびSET (column_mapping)と組み合わせて使用できます:

```sql
LOAD LABEL example_db.label10
(
DATA INFILE("HDFS://test:port/input/file.json")
INTO TABLE `my_table`
FORMAT AS "json"
(id, code, city)
SET (id = id * 10)
PROPERTIES(
"json_root" = "$.item",
"jsonpaths" = "[$.id, $.code, $.city]"
)
)
WITH BROKER HDFS (
"hadoop.username" = "user",
"password" = ""
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
```

11. Tencent Cloud COSからCSV形式でデータをインポートします。

```sql
LOAD LABEL example_db.label10
(
DATA INFILE("cosn://my_bucket/input/file.csv")
INTO TABLE `my_table`
(k1, k2, k3)
)
WITH BROKER "broker_name"
(
"fs.cosn.userinfo.secretId" = "xxx",
"fs.cosn.userinfo.secretKey" = "xxxx",
"fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
)
```

12. CSVデータをインポートする際は、二重引用符を削除し、最初の5行をスキップしてください。

```sql 
LOAD LABEL example_db.label12
(
DATA INFILE("cosn://my_bucket/input/file.csv")
INTO TABLE `my_table`
(k1, k2, k3)
PROPERTIES("trim_double_quotes" = "true", "skip_lines" = "5")
)
WITH BROKER "broker_name"
(
"fs.cosn.userinfo.secretId" = "xxx",
"fs.cosn.userinfo.secretKey" = "xxxx",
"fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
)
```