CREATE ROUTINE LOAD
説明
Routine Load機能により、ユーザーは常駐インポートタスクを送信し、指定されたデータソースからデータを継続的に読み取り、Dorisにインポートすることができます。
現在、認証なしまたはSSL認証方式を通じて、KafkaからCSVまたはJson形式のデータのインポートのみをサポートしています。Json形式データのインポート例
構文
CREATE ROUTINE LOAD [<db>.]<job_name> [ON <tbl_name>]
[<merge_type>]
[<load_properties>]
[<job_properties>]
FROM <data_source> [<data_source_properties>]
[COMMENT "<comment>"]
必須パラメータ
1. [<db>.]<job_name>
インポートジョブの名前。同じデータベース内では、同じ名前のジョブは1つしか実行できません。
2. FROM <data_source>
データソースのタイプ。現在サポートしているのは: KAFKA
3. <data_source_properties>
<kafka_broker_list>Kafka brokerの接続情報。フォーマットはip:hostです。複数のbrokerはカンマで区切られます。
"kafka_broker_list" = "broker1:9092,broker2:9092"
<kafka_topic>購読するKafka topicを指定します。
"kafka_topic" = "my_topic"
オプションパラメータ
1. <tbl_name>
インポート先のTable名を指定します。これはオプションパラメータです。指定されない場合、動的Tableメソッドが使用され、Kafka内のデータにTable名情報が含まれている必要があります。
現在、KafkaのValueからTable名を取得することのみをサポートしており、次の形式に従う必要があります: jsonの例:
table_name|{"col1": "val1", "col2": "val2"}、 ここでtbl_nameはTable名で、|がTable名とTableデータの区切り文字です。csv形式データの場合も同様です:
table_name|val1,val2,val3。ここでtable_nameはDoris内のTable名と一致する必要があります。そうでなければインポートが失敗します。ヒント: 動的Tableは
columns_mappingパラメータをサポートしていません。Table構造がDoris内のTable構造と一致し、インポートするTable情報が大量にある場合、この方法が最良の選択となります。
2. <merge_type>
データマージタイプ。デフォルトはAPPENDで、インポートされたデータが通常の追加書き込み操作であることを意味します。MERGEおよびDELETEタイプはUnique KeyモデルTableでのみ利用可能です。MERGEタイプは[DELETE ON]ステートメントと併用してDelete Flagカラムをマークする必要があります。DELETEタイプは、すべてのインポートされたデータが削除データであることを意味します。
ヒント: 動的複数Tableを使用する場合、このパラメータは各動的Tableのタイプと一致している必要があります。そうでなければインポートが失敗します。
3. <load_properties>
インポートデータを記述するために使用されます。構成は以下の通りです:
[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]
<column_separator>カラム区切り文字を指定します。デフォルトは
\tです
COLUMNS TERMINATED BY ","
<columns_mapping>ファイルのカラムとTableのカラム間のマッピング関係、および各種カラム変換を指定するために使用されます。この部分の詳細な説明については、[Column Mapping, Transformation and Filtering]ドキュメントを参照してください。
(k1, k2, tmpk1, k3 = tmpk1 + 1)ヒント: 動的Tableはこのパラメータをサポートしていません。
<preceding_filter>生データをフィルタリングします。この部分の詳細な情報については、[Column Mapping, Transformation and Filtering]ドキュメントを参照してください。
WHERE k1 > 100 and k2 = 1000ヒント: 動的Tableはこのパラメータをサポートしていません。
<where_predicates>条件に基づいてインポートデータをフィルタリングします。この部分の詳細な情報については、[Column Mapping, Transformation and Filtering]ドキュメントを参照してください。
WHERE k1 > 100 and k2 = 1000ヒント: 動的複数Tableを使用する場合、このパラメータは各動的Tableのカラムと一致する必要があります。そうでなければインポートが失敗します。動的複数Tableを使用する場合、共通のパブリックカラムにのみこのパラメータの使用を推奨します。
<partitions>インポート先Tableのどのパーティションにインポートするかを指定します。指定されない場合、データは自動的に対応するパーティションにインポートされます。
PARTITION(p1, p2, p3)ヒント: 動的複数Tableを使用する場合、このパラメータは各動的Tableと一致する必要があります。そうでなければインポートが失敗します。
<DELETE ON>MERGEインポートモードと併用する必要があり、Unique KeyモデルTableにのみ適用されます。インポートデータ内のDelete Flagカラムと計算関係を指定するために使用されます。
DELETE ON v3 >100ヒント: 動的複数Tableを使用する場合、このパラメータは各動的Tableと一致する必要があります。そうでなければインポートが失敗します。
<ORDER BY>Unique KeyモデルTableにのみ適用されます。インポートデータ内のSequence Colカラムを指定するために使用されます。主にインポート時のデータ順序を保証するために使用されます。
ヒント: 動的複数Tableを使用する場合、このパラメータは各動的Tableと一致する必要があります。そうでなければインポートが失敗します。
4. <job_properties>
ルーチンインポートジョブの一般的なパラメータを指定するために使用されます。
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)現在、以下のパラメータをサポートしています:
<desired_concurrent_number>希望並行数。ルーチンインポートジョブは複数のサブタスクに分割されて実行されます。このパラメータは、ジョブに対して同時に実行できるタスクの数を指定します。0より大きい値である必要があります。デフォルトは5です。
この並行数は実際の並行数ではありません。実際の並行数は、クラスターノード数、負荷状況、データソース状況を考慮して決定されます。
"desired_concurrent_number" = "3"
<max_batch_interval>/<max_batch_rows>/<max_batch_size>これら3つのパラメータは以下を表します:
- 各サブタスクの最大実行時間(秒)。1以上である必要があります。デフォルトは10です。
- 各サブタスクが読み取る最大行数。200000以上である必要があります。デフォルトは20000000です。
- 各サブタスクが読み取る最大バイト数。単位はバイトで、範囲は100MBから10GBです。デフォルトは1Gです。
これら3つのパラメータは、サブタスクの実行時間と処理量を制御するために使用されます。いずれかが閾値に達すると、タスクが終了します。
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
<max_error_number>サンプリングウィンドウ内で許可される最大エラー行数。0以上である必要があります。デフォルトは0で、エラー行が許可されないことを意味します。
サンプリングウィンドウは
max_batch_rows * 10です。サンプリングウィンドウ内のエラー行数がmax_error_numberを超えた場合、ルーチンジョブが一時停止され、データ品質問題をチェックするための手動介入が必要になります。where条件でフィルタリングされた行はエラー行としてカウントされません。
<strict_mode>ストリクトモードを有効にするかどうか。デフォルトはoffです。有効にした場合、非null元データのカラムタイプ変換でNULLになった場合、フィルタリングされます。指定方法:
"strict_mode" = "true"ストリクトモードとは: インポート処理中のカラムタイプ変換を厳密にフィルタリングすることを意味します。厳密フィルタリング戦略は以下の通りです:
- カラムタイプ変換について、ストリクトモードがtrueの場合、エラーデータがフィルタリングされます。ここでエラーデータとは: nullでない元データが、カラムタイプ変換後にnull値になるデータを指します。
- インポート中の関数変換で生成されるカラムについて、ストリクトモードは影響しません。
- 範囲制限があるカラムについて、元データがタイプ変換を通過できるが範囲制限を通過できない場合、ストリクトモードは影響しません。例: タイプがdecimal(1,0)で元データが10の場合、タイプ変換は通過できますが、カラムの宣言範囲外です。このようなデータにストリクトモードは影響しません。
ストリクトモードとソースデータインポートの関係
TinyIntカラムタイプを使用した例
注: Table内のカラムがnull値を許可する場合
source data source data example string to int strict_mode result null \NN/A true or false NULL not null aaa or 2000 NULL true invalid data(filtered) not null aaa NULL false NULL not null 1 1 true or false correct data Decimal(1,0)カラムタイプを使用した例
注: Table内のカラムがnull値を許可する場合
source data source data example string to int strict_mode result null \NN/A true or false NULL not null aaa NULL true invalid data(filtered) not null aaa NULL false NULL not null 1 or 10 1 true or false correct data 注: 10は範囲を超える値ですが、そのタイプがdecimalの要求を満たすため、ストリクトモードは影響しません。10は最終的に他のETL処理フローでフィルタリングされますが、ストリクトモードではフィルタリングされません。
<timezone>インポートジョブで使用するタイムゾーンを指定します。デフォルトはSessionのtimezoneパラメータです。このパラメータは、インポートに関わるすべてのタイムゾーン関連関数の結果に影響します。
"timezone" = "Asia/Shanghai"
<format>インポートデータフォーマットを指定します。デフォルトはcsvで、json形式もサポートされています。
"format" = "json"
<jsonpaths>json形式データをインポートする際、jsonpathsを使用してJsonデータから抽出するフィールドを指定できます。
-H "jsonpaths: [\"$.k2\", \"$.k1\"]"
<strip_outer_array>json形式データをインポートする際、strip_outer_arrayをtrueに設定すると、Jsonデータが配列として表示され、データ内の各要素が1行として扱われます。デフォルト値はfalseです。
-H "strip_outer_array: true"
<json_root>json形式データをインポートする際、json_rootを使用してJsonデータのルートノードを指定できます。Dorisはjson_rootを通じてルートノードから抽出された要素を解析します。デフォルトは空です。
-H "json_root: $.RECORDS"
<send_batch_parallelism>整数タイプ。バッチデータ送信の並列度を設定するために使用されます。並列度の値がBE設定の
max_send_batch_parallelism_per_jobを超える場合、調整ポイントとして機能するBEはmax_send_batch_parallelism_per_jobの値を使用します。
"send_batch_parallelism" = "10"
<load_to_single_tablet>ブール型。trueは対応するパーティションの1つのtabletのみにデータをインポートするタスクをサポートすることを示します。デフォルト値はfalseです。このパラメータは、ランダムバケッティングを持つolapTableにデータをインポートする場合にのみ設定が許可されます。
"load_to_single_tablet" = "true"
<partial_columns>ブール型。trueは部分カラム更新の使用を示します。デフォルト値はfalseです。このパラメータは、TableモデルがUniqueでMerge on Writeを使用する場合にのみ設定が許可されます。動的複数Tableはこのパラメータをサポートしていません。
"partial_columns" = "true"
<max_filter_ratio>サンプリングウィンドウ内で許可される最大フィルター率。0以上1以下である必要があります。デフォルト値は0です。
サンプリングウィンドウは
max_batch_rows * 10です。サンプリングウィンドウ内で、エラー行/総行数がmax_filter_ratioを超えた場合、ルーチンジョブが一時停止され、データ品質問題をチェックするための手動介入が必要になります。where条件でフィルタリングされた行はエラー行としてカウントされません。
<enclose>囲み文字。csvデータフィールドに行または列の区切り文字が含まれている場合、誤った切り詰めを防ぐために、保護用の囲み文字として1バイト文字を指定できます。例えば、列区切り文字が","で囲み文字が"'"の場合、データ"a,'b,c'"に対して"b,c"が1つのフィールドとして解析されます。
注: encloseが
"に設定される場合、trim_double_quotesをtrueに設定する必要があります。
<escape>エスケープ文字。csvフィールド内の囲み文字と同じ文字をエスケープするために使用されます。例えば、データが"a,'b,'c'"、囲み文字が"'"で、"b,'c"を1つのフィールドとして解析したい場合、
\などの1バイトエスケープ文字を指定し、データをa,'b,\'c'に変更する必要があります。
5. data_source_properties内のオプションプロパティ
<kafka_partitions>/<kafka_offsets>購読するkafkaパーティションと各パーティションの開始オフセットを指定します。時刻が指定された場合、その時刻以上の最も近いオフセットから消費を開始します。
offsetは0以上の具体的なオフセット、または以下を指定できます:
OFFSET_BEGINNING: データが存在する場所から購読を開始します。OFFSET_END: 終端から購読を開始します。- 時刻形式、例: "2021-05-22 11:00:00"
指定されない場合、デフォルトで
OFFSET_ENDからtopic下のすべてのパーティションを購読します。"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END""kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"注: 時刻形式とOFFSET形式を混在させることはできません。
<property>カスタムkafkaパラメータを指定します。kafka shellの"--property"パラメータと同じ機能です。
パラメータの値がファイルの場合、値の前にキーワード"FILE:"を追加する必要があります。
ファイルの作成方法については、CREATE FILEコマンドドキュメントを参照してください。
サポートされているカスタムパラメータの詳細については、librdkafkaの公式CONFIGURATIONドキュメントのクライアント設定項目を参照してください。例:
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"2.1 SSLを使用してKafkaに接続する場合、以下のパラメータを指定する必要があります:
```text
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
```
このうち:
`property.security.protocol`と`property.ssl.ca.location`は必須で、接続方式をSSLに指定し、CA証明書の場所を指定するために使用されます。
Kafkaサーバー側でクライアント認証が有効になっている場合、以下も設定する必要があります:
```text
"property.ssl.certificate.location"
"property.ssl.key.location"
"property.ssl.key.password"
```
それぞれクライアントの公開鍵、秘密鍵、秘密鍵パスワードを指定するために使用されます。2.2 kafkaパーティションのデフォルト開始オフセットを指定
<kafka_partitions>/<kafka_offsets>が指定されない場合、デフォルトですべてのパーティションが消費されます。この場合、
<kafka_default_offsets>を指定して開始オフセットを設定できます。デフォルトはOFFSET_ENDで、終端から購読を開始することを意味します。例:
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
6. COMMENT
ルーチンロードタスクのコメント情報。
アクセス制御要件
このSQLコマンドを実行するユーザーは、少なくとも以下の権限を持つ必要があります:
| 権限 | オブジェクト | 注記 |
|---|---|---|
| LOAD_PRIV | Table | CREATE ROUTINE LOADはTableLOAD操作に属します |
使用上の注意
- 動的Tableは
columns_mappingパラメータをサポートしていません - 動的複数Tableを使用する場合、merge_type、where_predicatesなどのパラメータは、各動的Tableの要件に適合する必要があります
- 時刻形式とOFFSET形式を混在させることはできません
kafka_partitionsとkafka_offsetsは一対一で対応する必要がありますencloseが"に設定される場合、trim_double_quotesをtrueに設定する必要があります。
例
-
example_db内のexample_tblに対してtest1という名前のKafkaルーチンロードタスクを作成します。列区切り文字、group.idおよびclient.idを指定し、デフォルトですべてのパーティションを自動消費し、データが存在する場所(OFFSET_BEGINNING)から購読を開始します
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
); -
example_db用のtest1という名前のKafkaルーチン動的マルチTableロードタスクを作成します。列区切り文字、group.idとclient.idを指定し、デフォルトですべてのパーティションを自動的に消費し、データが存在する場所から購読を開始します(OFFSET_BEGINNING)
Kafkaからexample_db内のtest1とtest2Tableにデータをインポートする必要があると仮定し、test1という名前のroutine loadタスクを作成し、test1とtest2からのデータを
my_topicという名前のKafkaトピックに書き込みます。この方法により、1つのroutine loadタスクを通してKafkaから2つのTableにデータをインポートできます。CREATE ROUTINE LOAD example_db.test1
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
); -
example_db内のexample_tblに対してtest1という名前のKafka routine loadタスクを作成します。このインポートタスクはstrict modeで実行されます。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
PRECEDING FILTER k1 = 1,
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
); -
SSL認証を使用してKafkaクラスターからデータをインポートします。また、client.idパラメータを設定します。インポートタスクは非厳密モードで、タイムゾーンはAfrica/Abidjanです。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"timezone" = "Africa/Abidjan"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
"property.client.id" = "my_client_id"
); -
Json形式のデータをインポートします。デフォルトでJsonのフィールド名をカラム名マッピングとして使用します。インポートするパーティション0,1,2を指定し、すべての開始オフセットは0です
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
COLUMNS(category,price,author)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
); -
Jsonデータをインポートし、Jsonpathを通じてフィールドを抽出し、Jsonドキュメントのルートノードを指定する
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"json_root" = "$.RECORDS"
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
); -
example_db の example_tbl に対して条件フィルタリングを行う test1 という名前の Kafka routine load タスクを作成します。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
WITH MERGE
COLUMNS(k1, k2, k3, v1, v2, v3),
WHERE k1 > 100 and k2 like "%doris%",
DELETE ON v3 >100
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
); -
シーケンス列を含むUnique KeyモデルTableにデータをインポートする
CREATE ROUTINE LOAD example_db.test_job ON example_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(k1,k2,source_sequence,v1,v2),
ORDER BY source_sequence
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "30",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
) FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
); -
指定された時点から消費を開始する
CREATE ROUTINE LOAD example_db.test_job ON example_tbl
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "30",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
) FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"kafka_default_offsets" = "2021-05-21 10:00:00"
);