VeloDB Cloud
Developer Guide
Ecological Integration
Spark VeloDB Connector

Spark VeloDB Connector

Quick introduction

Spark VeloDB Connector supports writing large amounts of upstream data to VeloDB Cloud.

Implementation principle

The underlying implementation of Spark VeloDB Connector depends on the stage import method of VeloDB Cloud. By calling the VeloDB Cloud api (/copy/upload), a redirected object storage address is returned, and the byte stream is sent to the object storage address using http. Finally, through Copy into (/copyinto) to import the data in the object storage bucket to VeloDB Cloud. For the specific use of the stage import method, please refer to User Guide / Data Import / Stage Import.

Version support

ConnectorRuntime Jar
2.3.4-2.11-1.0spark-selectdb-connector-2.3.4_2.11-1.0 (opens in a new tab)
3.1.2-2.12-1.0spark-selectdb-connector-3.1.2_2.12-1.0 (opens in a new tab)
3.2.0-2.12-1.0spark-selectdb-connector-3.2.0_2.12-1.0 (opens in a new tab)

Notice:

  • The format of Connector is: spark-selectdb-connector-${spark.version}_${scala.version}-${connector.version}-SNAPSHOT.jar;
  • All jar packages are compiled with java 8;
  • If you have other version requirements, you can contact us (opens in a new tab) through the contact information on the VeloDB official website;

How to use

Copy the downloaded jar package to Sparkthe ClassPathto use it spark-selectdb-connector. For example, if the Localmode is running Spark, put this jars/file under the folder. YarnIf it runs in cluster mode Spark, put this file in the pre-deployment package. For example, spark-selectdb-connector-2.3.4-2.11-1.0.-SNAPSHOT.jarupload to hdfs and add the Jar package path on hdfs to the spark.yarn.jars parameter

  • Upload spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jarto hdfs.
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar /spark-jars/
  • spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jarAdd dependencies to the cluster .
spark.yarn.jars=hdfs:///spark-jars/doris-spark-connector-3.1.2-2.12-1.0.0.jar

If you want to reference it in the project, you can use mvn installthe method to install it to the local warehouse, and use the following method to add dependencies in maven.

<dependency>
  <groupId>com.selectdb.spark</groupId>
  <artifactId>spark-selectdb-connector-3.1_2.12</artifactId>
  <version>1.0</version>
</dependency>

Example of use

Write through sparksql

val selectdbHttpPort = "127.0.0.1:47968"
val selectdbJdbc = "jdbc:mysql://127.0.0.1:18836/test"
val selectdbUser = "admin"
val selectdbPwd = "selectdb2022"
val selectdbTable = "test.test_order"
  
CREATE TEMPORARY VIEW test_order
USING selectdb
OPTIONS(
 "table.identifier"="test.test_order",
 "jdbc.url"="${selectdbJdbc}",
 "http.port"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.file.type"="json"
);
 
insert into test_order select  order_id,order_amount,order_status from tmp_tb ;

Write via DataFrame

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "Pending"),
  ("2", 200, null),
  ("3", 300, "received")
)).toDF("order_id", "order_amount", "order_status")
 
df.write
  .format("selectdb")
  .option("selectdb.http.port", selectdbHttpPort)
  .option("selectdb.table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 4)
  .option("sink.max-retries", 2)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

Configuration

KeyDefaultValueCommentRequired
selectdb.http.port--VeloDB Cloud http addressAND
selectdb.jdbc.url--VeloDB Cloud jdbc address, this configuration belongs to spark sqlAND
selectdb.table.identifier--VeloDB Cloud table name, format library name. table name, for example: db1.tbl1AND
user--Username to access VeloDB CloudAND
password--Password to access VeloDB CloudAND
sink.batch.size100000The maximum number of rows written to VeloDB Cloud at a timeN
sink.max-retries3Number of retries after write VeloDB failsN
sink.properties.*--Import parameters for copy into. For example: "sink.properties.file.type"="json" For more parameter descriptions of copy into, please refer to the copy into section of the VeloDB official website.