Spark VeloDB Connector
Quick introduction
Spark VeloDB Connector supports writing large amounts of upstream data to VeloDB Cloud.
Implementation principle
The underlying implementation of Spark VeloDB Connector depends on the stage import method of VeloDB Cloud. By calling the VeloDB Cloud api (/copy/upload), a redirected object storage address is returned, and the byte stream is sent to the object storage address using http. Finally, through Copy into (/copyinto) to import the data in the object storage bucket to VeloDB Cloud. For the specific use of the stage import method, please refer to User Guide / Data Import / Stage Import.
Version support
Connector | Runtime Jar |
---|---|
2.3.4-2.11-1.0 | spark-selectdb-connector-2.3.4_2.11-1.0 (opens in a new tab) |
3.1.2-2.12-1.0 | spark-selectdb-connector-3.1.2_2.12-1.0 (opens in a new tab) |
3.2.0-2.12-1.0 | spark-selectdb-connector-3.2.0_2.12-1.0 (opens in a new tab) |
Notice:
- The format of Connector is: spark-selectdb-connector-${spark.version}_${scala.version}-${connector.version}-SNAPSHOT.jar;
- All jar packages are compiled with java 8;
- If you have other version requirements, you can contact us (opens in a new tab) through the contact information on the VeloDB official website;
How to use
Copy the downloaded jar package to Spark
the ClassPath
to use it spark-selectdb-connector
. For example, if the Local
mode is running Spark
, put this jars/
file under the folder. Yarn
If it runs in cluster mode Spark
, put this file in the pre-deployment package. For example, spark-selectdb-connector-2.3.4-2.11-1.0.-SNAPSHOT.jar
upload to hdfs and add the Jar package path on hdfs to the spark.yarn.jars parameter
- Upload
spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar
to hdfs.
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar /spark-jars/
spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar
Add dependencies to the cluster .
spark.yarn.jars=hdfs:///spark-jars/doris-spark-connector-3.1.2-2.12-1.0.0.jar
If you want to reference it in the project, you can use mvn install
the method to install it to the local warehouse, and use the following method to add dependencies in maven.
<dependency>
<groupId>com.selectdb.spark</groupId>
<artifactId>spark-selectdb-connector-3.1_2.12</artifactId>
<version>1.0</version>
</dependency>
Example of use
Write through sparksql
val selectdbHttpPort = "127.0.0.1:47968"
val selectdbJdbc = "jdbc:mysql://127.0.0.1:18836/test"
val selectdbUser = "admin"
val selectdbPwd = "selectdb2022"
val selectdbTable = "test.test_order"
CREATE TEMPORARY VIEW test_order
USING selectdb
OPTIONS(
"table.identifier"="test.test_order",
"jdbc.url"="${selectdbJdbc}",
"http.port"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.file.type"="json"
);
insert into test_order select order_id,order_amount,order_status from tmp_tb ;
Write via DataFrame
val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "Pending"),
("2", 200, null),
("3", 300, "received")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("selectdb")
.option("selectdb.http.port", selectdbHttpPort)
.option("selectdb.table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 4)
.option("sink.max-retries", 2)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()
Configuration
Key | DefaultValue | Comment | Required |
---|---|---|---|
selectdb.http.port | -- | VeloDB Cloud http address | AND |
selectdb.jdbc.url | -- | VeloDB Cloud jdbc address, this configuration belongs to spark sql | AND |
selectdb.table.identifier | -- | VeloDB Cloud table name, format library name. table name, for example: db1.tbl1 | AND |
user | -- | Username to access VeloDB Cloud | AND |
password | -- | Password to access VeloDB Cloud | AND |
sink.batch.size | 100000 | The maximum number of rows written to VeloDB Cloud at a time | N |
sink.max-retries | 3 | Number of retries after write VeloDB fails | N |
sink.properties.* | -- | Import parameters for copy into. For example: "sink.properties.file.type"="json" For more parameter descriptions of copy into, please refer to the copy into section of the VeloDB official website. |