Stage Import
Currently VeloDB Cloud supports two stage import methods:
- Batch pull and import is performed by creating a stage on the object storage. This is mainly suitable for large-scale data import. The premise is that the user has his own object storage and its key.
- Based on the push import of the built-in stage, this is mainly suitable for small batch push, and it is easy to use.
Create external stage
Create an external stage for importing data files into VeloDB Cloud tables.
Suggestion: Users can create a sub-account dedicated to data import, and use the bucket policy to grant the sub-account the read permission of a specific prefix, so that VeloDB Cloud can read the object data source to be imported.
Grammar
CREATE STAGE [IF NOT EXISTS] <stage_name> PROPERTIES (
{stage_properties}
)
-
stage_properties
Specify stage-related parameters. The following parameters are currently supported:
-
endpoint
object storage
Endpoint
. required. -
region
object storage
Region
. required. -
bucket
object storage
Bucket
. required. -
prefix
Bucket
The prefix path of user data files under this . Not required, defaults toBucket
the root path under. -
provider
Specifies the cloud platform that provides object storage. required. Currently supported:
OSS
: Alibaba CloudCOS
: Tencent CloudOBS
: Huawei CloudS3
: AWS
-
access_type
Authorization method。required. Currently supported:
AKSK
: Authorization byAKSK
. Use in combination withak
andsk
parameters.IAM
: Authorization byIAM
. Please configure an IAM role in cloud platform.
-
ak
object storage
Access Key ID
. required. -
sk
object storage
Secret Access Key
. required. -
default.file.type
The default type of the stage storage file, currently supports
csv
,json
,orc
,parquet
. Not required, this parameter can be overridden when importing. -
default.file.compression
The default compression type of the stage storage file, currently supports
gz
,bz2
,lz4
,lzo
,deflate
. Not required, this parameter can be overridden when importing. -
default.file.column_separator
The default column separator of the stage storage file, default
\t
. Not required, this parameter can be overridden when importing. -
default.file.line_delimiter
The default line separator of the stage storage file, default
\n
. Not required, this parameter can be overridden when importing. -
default.copy.size_limit
When importing files under this stage, the default import size is in Byte, and the default is unlimited. Not required, this parameter can be overridden when importing.
-
default.copy.on_error
When importing files under this stage, when the data quality is not up to standard, the default error handling method. Not required, this parameter can be overridden when importing. Currently supported:
-
max_filter_ratio_{number}
: Set the maximum error rate{number}
, where,{number}
is[0-1]
, a floating-point number in the interval. If the error rate of the imported data is below the threshold, those erroneous rows will be ignored and other correct data will be imported.abort_statement
: When the data has error rows, break the import, equivalent tomax_filter_ratio_0
. default behaviorcontinue
: Ignore the wrong line, import the correct line, equivalent tomax_filter_ratio_1
-
-
default.copy.strict_mode
For strict filtering of column type conversion during the import process, refer to Doris's import strict mode. Default is
false
. Not required, this parameter can be overridden when importing.
-
Example
- Create
test_stage
a stage named:
CREATE STAGE test_stage PROPERTIES (
'endpoint' = 'cos.ap-beijing.myqcloud.com',
'region' = 'ap-beijing',
'bucket' = 'velodb_test',
'prefix' = 'test_stage',
'provider' = 'cos',
'ak' = 'XX',
'sk' = 'XX'
)
- Create
test_stage
a stage named , specifying the default file type and column separator:
CREATE STAGE test_stage PROPERTIES (
'endpoint' = 'cos.ap-beijing.myqcloud.com',
'region' = 'ap-beijing',
'bucket' = 'velodb_test',
'prefix' = 'test_stage',
'provider' = 'cos',
'ak' = 'XX',
'sk' = 'XX',
'default.file.type' = 'csv',
'default.file.column_separator' = ','
)
Other operations
SHOW STAGES
Display all external stage information that the logged-in user has permission to access, including name
,id
,endpoint
,region
,bucket
,prefix
,ak
,sk
and default properties.
example
mysql> SHOW STAGES;
+----------------------------+--------------------------------------+-----------------------------+------------+------------------------+---------------------+--------------------------------------+----------------------------------+----------+-----------------------------------------------------------------+
| StageName | StageId | Endpoint | Region | Bucket | Prefix | AK | SK | Provider | DefaultProperties |
+----------------------------+--------------------------------------+-----------------------------+------------+------------------------+---------------------+--------------------------------------+----------------------------------+----------+-----------------------------------------------------------------+
| regression_test_copy_stage | e8ed6ea0-33c8-4381-b7a9-c19ea1801bca | cos.ap-beijing.myqcloud.com | ap-beijing | doris-build-1308700295 | regression/tpch/sf1 | AKXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX | SKXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX | COS | {"default.file.column_separator":"|"} |
| root_stage | 8b8329de-be1a-40a8-9eab-91d31f9798bf | cos.ap-beijing.myqcloud.com | ap-beijing | justtmp-bj-1308700295 | | AKXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX | SKXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX | COS | {"default.file.type":"CSV","default.file.column_separator":","} |
| admin_stage | 9284a9ec-3ba7-47b9-b276-1ccde875469c | cos.ap-beijing.myqcloud.com | ap-beijing | justtmp-bj-1308700295 | meiyi_cloud_test | AKXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX | SKXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX | COS | {"default.file.column_separator":",","default.file.type":"CSV"} |
+----------------------------+--------------------------------------+-----------------------------+------------+------------------------+---------------------+--------------------------------------+----------------------------------+----------+-----------------------------------------------------------------+
3 rows in set (0.15 sec)
DROP STAGE
To delete the external stage, the user needs to have the ADMIN authority of the stage
grammar
DROP STAGE [IF EXISTS] <stage_name>
example
Delete test_stage
the stage named:
DROP STAGE test_stage
GRANT STAGE and REVOKE
Issues related to user access to external stage permissions in the storage-computing separation version
Note: The user names mentioned in this document are all sql user names, such as mysql -ujack, where jack is the user name
Grant stage access rights to users
- create a new user using mysql client
- grammar
GRANT USAGE_PRIV ON STAGE {stage_name} TO {user}
- example
// create user jack
mysql> CREATE USER jack IDENTIFIED BY '123456' DEFAULT ROLE "admin";
Query OK, 0 rows affected (0.01 sec)
mysql> GRANT USAGE_PRIV ON STAGE not_exist_stage TO jack;
Query OK, 0 rows affected (0.01 sec)
mysql> show all grants;
+--------------+----------+-------------------------------+--------------+---------------------------------------------------+------------+---------------+--------------+--------------------------------------+
| UserIdentity | Password | GlobalPrivs | CatalogPrivs | DatabasePrivs | TablePrivs | ResourcePrivs | CloudCluster | CloudStage |
+--------------+----------+-------------------------------+--------------+---------------------------------------------------+------------+---------------+--------------+--------------------------------------+
| 'jack'@'%' | Yes | (false) | NULL | internal.information_schema: Select_priv (false) | NULL | NULL | NULL | not_exist_stage: Usage_priv (false) |
| 'root'@'%' | No | Node_priv Admin_priv (false) | NULL | NULL | NULL | NULL | NULL | NULL |
| 'admin'@'%' | No | Admin_priv (false) | NULL | NULL | NULL | NULL | NULL | NULL | |
+--------------+----------+-------------------------------+--------------+---------------------------------------------------+------------+---------------+--------------+--------------------------------------+
3 rows in set (0.00 sec)
revoke user access stage permissions
- grammar
REVOKE USAGE_PRIV ON STAGE {stage_name} FROM {user}
- example:
// create user jack
mysql> revoke USAGE_PRIV ON STAGE not_exist_stage FROM jack;
Query OK, 0 rows affected (0.01 sec)
mysql> show all grants;
+--------------+----------+-------------------------------+--------------+---------------------------------------------------+------------+---------------+--------------+------------+
| UserIdentity | Password | GlobalPrivs | CatalogPrivs | DatabasePrivs | TablePrivs | ResourcePrivs | CloudCluster | CloudStage |
+--------------+----------+-------------------------------+--------------+---------------------------------------------------+------------+---------------+--------------+------------+
| 'root'@'%' | No | Node_priv Admin_priv (false) | NULL | NULL | NULL | NULL | NULL | NULL |
| 'admin'@'%' | No | Admin_priv (false) | NULL | NULL | NULL | NULL | NULL | NULL |
| 'jack'@'%' | Yes | (false) | NULL | internal.information_schema: Select_priv (false) | NULL | NULL | NULL | NULL |
+--------------+----------+-------------------------------+--------------+---------------------------------------------------+------------+---------------+--------------+------------+
3 rows in set (0.00 sec)
COPY INTO
Import the data files in the stage into the table of VeloDB Cloud.
Note: A file with the same name and content under a stage can only be imported into a table once, and cannot be imported repeatedly.
Grammar
COPY INTO [<db_name>.]<table_name> FROM {copy_from_param} PROPERTIES (
{copy_into_properties}
)
-
copy_from_param
Specifies the imported stage name, file, column conversion, mapping, filtering rules, etc.
copy_from_param ::= {stage_and_glob} | ( SELECT {copy_select_expr_list} FROM {stage_and_glob} {copy_where_expr_list} )
stage_and_glob ::= @{stage_name} | @{stage_name}('{file_glob}')
stage_name
- External stage name created by the user
- The default internal stage belonging to the user, named
~
file_glob
- Use the glob syntax to specify the files that need to be imported
copy_select_expr_list
-
Perform column conversion, mapping, etc. Mapping with different columns of the target table can be achieved by adjusting the column order of the input data source (note: only the entire row can be mapped):
copy_select_expr_list ::= * | { $<file_col_num> | <expr> }[ , ... ]
file_col_num
- The serial numbers listed in the import file separated by the specified delimiter (such as
1
column 1)
expr
- Specify an expression, such as arithmetic operations, etc.
- The serial numbers listed in the import file separated by the specified delimiter (such as
copy_where_expr_list
-
Filter the columns in the file according to the expression, and the filtered rows will not be imported into the table
copy_where_expr_list ::= WHERE <predicate_expr>
-
copy_into_properties
Specify the parameters related to CopyInto. The following parameters are currently supported:
-
file.type
The type of the imported file, currently supports
csv
,json
,orc
,parquet
.Not required. If not set, the default file type configured by the stage will be used first; if not set on the stage, the system will automatically infer the type.
-
file.compression
The compression type of the imported file, currently supports
gz
,bz2
,lz4
,lzo
,deflate
.Not required. If not set, the default compression type configured by the stage will be used first; if not set on the stage, the system will automatically infer the type.
-
file.column_separator
The column separator for the imported file.
Not required. If not set, the default column separator configured by the stage will be used first; if not set on the stage, the system default value will be used
\t
. -
file.line_delimiter
Line separator for imported files.
Not required. If not set, the default line separator configured by the stage will be used first; if not set on the stage, the system default value will be used
\n
. -
copy.size_limit
Imported file size, the unit is Byte. If the matching files to be imported exceed the size limit, only some of the files meeting the size limit will be imported.
Not required. If not set, the default import size configured by the stage is preferred; if not set on the stage, there is no limit by default.
-
copy.on_error
When importing, the error handling method when the data quality is not up to standard. Currently supported:
max_filter_ratio_{number}
: Set the maximum error rate{number}
, where,{number}
is[0-1]
a floating-point number in the interval. If the error rate of the imported data is below the threshold, those erroneous rows will be ignored and other correct data will be imported.abort_statement
: When the data has error rows, break the import, equivalent tomax_filter_ratio_0
. default behaviorcontinue
: Ignore the wrong line, import the correct line, equivalent tomax_filter_ratio_1
Not required. If not set, the default error handling policy configured by the stage will be used first; if not set on the stage, the system default policy will be used.
-
copy.async
Whether the import is performed asynchronously. support
true
,false
. The default valuetrue
is asynchronous execution, byshow copy
viewing the copy task executed asynchronously. -
copy.strict_mode
For strict filtering of column type conversion during the import process, refer to Doris's import strict mode. Default is
false
.Not required. If not set, the default mode of the stage configuration will be used first; if not set on the stage, the system default policy will be used.
-
Output
Copy into is executed asynchronously by default and returns one queryId
, such as:
mysql> copy into db.t1 from @exs('2.csv');
+-----------------------------------+---------+------+------+------------+------------+--------------+------+
| id | state | type | msg | loadedRows | filterRows | unselectRows | url |
+-----------------------------------+---------+------+------+------------+------------+--------------+------+
| 8fcf20b156dc4f66_99aa062042941aff | PENDING | | | | | | |
+-----------------------------------+---------+------+------+------------+------------+--------------+------+
1 row in set (0.14 sec)
According to id
, use the SHOW COPY
command to query the execution result:
mysql> SHOW COPY WHERE id = '8fcf20b156dc4f66_99aa062042941aff';
+-----------------------------------+-------+----------------------------------------+----------+---------------------+------+-----------------------------------------------------+-----------------------------------------------------+----------+---------------------+---------------------+---------------------+---------------------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+--------------+-------------------------------------------------------+
| Id | JobId | Label | State | Progress | Type | EtlInfo | TaskInfo | ErrorMsg | CreateTime | EtlStartTime | EtlFinishTime | LoadStartTime | LoadFinishTime | URL | JobDetails | TransactionId | ErrorTablets | Files |
+-----------------------------------+-------+----------------------------------------+----------+---------------------+------+-----------------------------------------------------+-----------------------------------------------------+----------+---------------------+---------------------+---------------------+---------------------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+--------------+-------------------------------------------------------+
| 8fcf20b156dc4f66_99aa062042941aff | 17012 | copy_f8a124900f7d42f6_91dad473d45a34bd | FINISHED | ETL:100%; LOAD:100% | COPY | unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=2 | cluster:N/A; timeout(s):14400; max_filter_ratio:0.0 | NULL | 2022-10-21 09:06:48 | 2022-10-21 09:06:54 | 2022-10-21 09:06:54 | 2022-10-21 09:06:54 | 2022-10-21 09:06:55 | NULL | {"Unfinished backends":{"3e2fc170198240c0-929be46e8ca47838":[]},"ScannedRows":2,"TaskNumber":1,"LoadBytes":30,"All backends":{"3e2fc170198240c0-929be46e8ca47838":[10003]},"FileNumber":1,"FileSize":14} | 6141324627542016 | {} | ["s3://justtmp-bj-1308700295/meiyi_cloud_test/2.csv"] |
+-----------------------------------+-------+----------------------------------------+----------+---------------------+------+-----------------------------------------------------+-----------------------------------------------------+----------+---------------------+---------------------+---------------------+---------------------+---------------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+--------------+-------------------------------------------------------+
1 row in set (0.01 sec)
Among them, files
the files imported by this Copy task are listed.
Example
-
Import the data in the stage named
ext_stage
into the tabletest_table
COPY INTO test_table FROM @ext_stage
The system will automatically scan
test_table
some files under the stage that are not imported into the table and import them -
ext_stage
Import the data file in the stage named into1.csv
the tabletest_table
COPY INTO test_table FROM @ext_stage('1.csv')
-
ext_stage
Import the data file in the stage named intodir1/subdir_2/1.csv
the tabletest_table
If the prefix is empty when creating
ext_stage
, the import statement is:COPY INTO test_table FROM @ext_stage('dir1/subdir_2/1.csv')
If
ext_stage
the prefix isdir1
, then the import statement is:
COPY INTO test_table FROM @ext_stage('subdir_2/1.csv')
If `ext_stage`the prefix is `dir1/subdir_2`, then the import statement is:
```text
COPY INTO test_table FROM @ext_stage('1.csv')
-
Import the file ending under the path in the named
ext_stage
stage into the tabledir1/subdir_2/``.csv``test_table
If the prefix is empty when creating
ext_stage
, the import statement is:COPY INTO test_table FROM @ext_stage('dir1/subdir_2/*.csv')
If
ext_stage
the prefix isdir1
, then the import statement is:
COPY INTO test_table FROM @ext_stage('subdir_2/*.csv')
If `ext_stage`the prefix is `dir1/subdir_2`, then the import statement is:
```text
COPY INTO test_table FROM @ext_stage('*.csv')
-
Import the files ending in subdirectories at all levels under the directory named
ext_stage
stage into the tabledir1``.csv``test_table
COPY INTO test_table FROM @ext_stage('dir1/**.csv')
-
ext_stage
Import the data file in the named stage1.csv
into the tabletest_table
, and specify the column delimiter of the file as,
, and the row delimiter as\n
:COPY INTO test_table FROM @ext_stage('1.csv') PROPERTIES ( 'file.column_separator' = ',', 'file.line_delimiter' = '\n' )
-
ext_stage
Import the data file in the named stage1.csv
into the tabletest_table
, and specify synchronous execution:COPY INTO test_table FROM @ext_stage('1.csv') PROPERTIES ( 'copy.async' = 'false' )
-
Import the data files in the user's default internal stage
1.csv
into the tabletest_table
COPY INTO test_table FROM @~('1.csv')
-
Column mapping, conversion, filtering, etc.
If the file has 3 columns, respectively
$1
(column 1),$2
(column 2),$3
(column 3), import them into the three columns of the table:id
,name
,score
ie$1
$\rightarrow$id
,$2
$\rightarrow$name
,$3
$\ rightarrow$score
, the following statements are equivalent:COPY INTO test_table FROM (SELECT * FROM @ext_stage('1.csv')) COPY INTO test_table FROM (SELECT $1, $2, $3 FROM @ext_stage('1.csv'))
If the file has 4 columns, respectively
$1
(column 1),$2
(column 2),$3
(column 3),$4
(column 4), import columns 1, 3, and 4 into the three columns of the table:id
,name
,score
, That is,$1
$\rightarrow$id
,$3
$\rightarrow$name
,$4
$\rightarrow$score
:COPY INTO test_table FROM (SELECT $1, $3, $4 FROM @ext_stage('1.csv'))
If the file has 2 columns, namely
$1
(column 1),$2
(column 2), import them into the first two columns of the table respectively:id
,name
,score
use the default value of the table orNULL
, that is,$1
$\rightarrow$id
,$2
$\rightarrow$name
,NULL
$ \rightarrow$score
COPY INTO test_table FROM (SELECT $1, $2, NULL FROM @ext_stage('1.csv'))
If the file has 3 columns, namely
$1
(column 1),$2
(column 2),$3
(column 3), import columns 1, 3, and 2 into the three columns of the table:id
,name
,score
ie$1
$\rightarrow$id
,$3
$\rightarrow$name
,$2
$\rightarrow$score
:COPY INTO test_table FROM (SELECT $1, $3, $2 FROM @ext_stage('1.csv'))
If the file has 3 columns, respectively
$1
(column 1),$2
(column 2),$3
(column 3), filter out the third column greater than60
, and then import to the three columns of the table:id
,name
,score
:COPY INTO test_table FROM (SELECT $1, $2, $3 FROM @ext_stage('1.csv') WHERE $3 > 60)
If the file has 3 columns, namely
$1
(column 1),$2
(column 2),$3
(column 3), the third column is uniformly added10
, and then imported into the three columns of the table:id
,name
,score
:COPY INTO test_table FROM (SELECT $1, $2, $3 + 10 FROM @ext_stage('1.csv'))
If the file has 3 columns, respectively
$1
(column 1),$2
(column 2),$3
(column 3), filter out the ones that are smaller than60
the third column, add them separately10
, and then import them into the three columns of the table:id
,name
,score
:COPY INTO test_table FROM (SELECT $1, $2, $3 + 10 FROM @ext_stage('1.csv') WHERE $3 < 60)
If the file has 3 columns, respectively
$1
(column 1),$2
(column 2),$3
(column 3), intercept the string in the second column, and then import it into the three columns of the table:id
,name
,score
:COPY INTO test_table FROM (SELECT $1, substring($2, 2), $3 FROM @ext_stage('1.csv'))
Push import through built-in stage
If the user does not have an external object storage, the data file can be temporarily stored in the default object storage provided by VeloDB Cloud, which is called internal stage.
Different from external stage:
- The internal stage does not need to be created manually, it will be created automatically when the user uses it for the first time. The name is fixed as
~
; - It can only be accessed by the current owner, and does not support grant permissions to other users;
- The list cannot be performed, and the user needs to remember which files have been uploaded;
Then call the import statement copy into to import the file into the table.
Grammar
upload files:
curl -u {user}:{password} -H "fileName: {file_name_in_storage}" -T {local_file_path} -L '{velodb_host}:{velodb_copy_port}/copy/upload'
Import the file into the VeloDB Cloud table (note: the content of the request body is in json
format, and some characters in the sql need to be escaped):
curl -X POST -u {user}:{password} '{velodb_host}:{velodb_copy_port}/copy/query' -H "Content-Type: application/json" -d '{"sql": "{copy_into_sql}"}'
Example
User user1 data/2022-10-20/1.csv
uploads a local file to the internal stage, and the uploaded file is named 2022-10-20/1.csv
:
curl -u user1:passwd -H "fileName: 2022-10-20/1.csv" -T data/2022-10-20/1.csv -L '172.21.21.12:8035/copy/upload'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 14 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
100 14 0 0 100 14 0 28 --:--:-- --:--:-- --:--:-- 28
Execute the import:
curl -X POST -u user1:password '172.21.21.12:8035/copy/query' -H "Content-Type: application/json" -d '{"sql": "copy into db1.t5 from @~(\"2022-10-20/1.csv\")"}'