VeloDB Cloud
Integration
More
Seatunnel Doris Sink

Seatunnel Doris Sink

About SeaTunnel

SeaTunnel is a very easy-to-use ultra-high-performance distributed data integration platform that supports real-time synchronization of massive data. It can synchronize tens of billions of data stably and efficiently every day.

Connector-V2

The connector-v2 for SeaTunnel supports Doris Sink since version 2.3.1 and supports exactly-once write and CDC data synchronization

Plugin Code

SeaTunnel Doris Sink Plugin Code (opens in a new tab)

Options

nametyperequireddefault value
fenodesstringyes-
usernamestringyes-
passwordstringyes-
table.identifierstringyes-
sink.label-prefixstringyes-
sink.enable-2pcboolnotrue
sink.enable-deleteboolnofalse
doris.configmapyes-

fenodes [string]

Doris cluster FE Nodes address, the format is "fe_ip:fe_http_port, ..."

username [string]

Doris user username

password [string]

Doris`user password

table.identifier [string]

The name of Doris table,The format is DBName.TableName

sink.label-prefix [string]

The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.

sink.enable-2pc [bool]

Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to here.

sink.enable-delete [bool]

Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link:

batch delete

doris.config [map]

The parameter of the stream load data_desc, you can get more detail at this link:

More Stream Load parameters

Example

Use JSON format to import data

sink {
    Doris {
        fenodes = "doris_fe:8030"
        username = root
        password = ""
        table.identifier = "test.table_sink"
        sink.enable-2pc = "true"
        sink.label-prefix = "test_json"
        doris.config = {
            format="json"
            read_json_by_line="true"
        }
    }
}

Use CSV format to import data

sink {
    Doris {
        fenodes = "doris_fe:8030"
        username = root
        password = ""
        table.identifier = "test.table_sink"
        sink.enable-2pc = "true"
        sink.label-prefix = "test_csv"
        doris.config = {
          format = "csv"
          column_separator = ","
        }
    }
}

Connector-V1

Flink Sink Doris

Plugin Code

Seatunnel Flink Sink Doris plugin code (opens in a new tab)

Options

nametyperequireddefault valueengine
fenodesstringyes-Flink
databasestringyes-Flink
tablestringyes-Flink
userstringyes-Flink
passwordstringyes-Flink
batch_sizeintno100Flink
intervalintno1000Flink
max_retriesintno1Flink
doris.*-no-Flink

fenodes [string]

Doris Fe http url, eg: 127.0.0.1:8030

database [string]

Doris database

table [string]

Doris table

user [string]

Doris user

password [string]

Doris password

batch_size [int]

The maximum number of lines to write to Doris at a time, the default value is 100

interval [int]

The flush interval (in milliseconds), after which the asynchronous thread writes the data in the cache to Doris. Set to 0 to turn off periodic writes.

max_retries [int]

Number of retries after writing to Doris fails

doris.* [string]

Import parameters for Stream load. For example: 'doris.column_separator' = ', ' etc.

More Stream Load parameter configuration

Examples

Socket To Doris

env {
  execution.parallelism = 1
}
source {
    SocketStream {
      host = 127.0.0.1
      port = 9999
      result_table_name = "socket"
      field_name = "info"
    }
}
transform {
}
sink {
  DorisSink {
      fenodes = "127.0.0.1:8030"
      user = root
      password = 123456
      database = test
      table = test_tbl
      batch_size = 5
      max_retries = 1
      interval = 5000
    }
}

Start command

sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf

Spark Sink Doris

Plugin Code

Seatunnel Spark Sink Doris plugin code (opens in a new tab)

Options

nametyperequireddefault valueengine
fenodesstringyes-Spark
databasestringyes-Spark
tablestringyes-Spark
userstringyes-Spark
passwordstringyes-Spark
batch_sizeintyes100Spark
doris.*stringno-Spark

fenodes [string]

Doris FE address:8030

database [string]

Doris target database name

table [string]

Doris target table name

user [string]

Doris user name

password [string]

Doris user's password

batch_size [string]

Doris number of submissions per batch

doris. [string] Doris stream_load properties,you can use 'doris.' prefix + stream_load properties

More Doris stream_load Configurations

Examples

Hive to Doris

Config properties

env{
  spark.app.name = "hive2doris-template"
}

spark {
  spark.sql.catalogImplementation = "hive"
}

source {
  hive {
    preSql = "select * from tmp.test"
    result_table_name = "test"
  }
}

transform {
}


sink {

Console {

  }

Doris {
   fenodes="xxxx:8030"
   database="gl_mint_dim"
   table="dim_date"
   user="root"
   password="root"
   batch_size=1000
   doris.column_separator="\t"
   doris.columns="date_key,date_value,day_in_year,day_in_month"
   }
}

Start command

sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf