VeloDB Cloud
User Guide
Ecological Integration
Kafka VeloDB Connector

Kafka VeloDB Connector

Kafka Connect (opens in a new tab) is a scalable and reliable tool for data transmission between Apache Kafka and other systems. Connectors can be defined to move large amounts of data in and out of Kafka.

VeloDB provides the Sink Connector plug-in, which can write JSON data in Kafka topic to VeloDB.

Version support

KafkaJavaRuntime Jar
2.4.x8kafka-connect-selectdb-1.0.0 (opens in a new tab)

Kafka Connect Usage

Standalone mode

Configure connect-standalone.properties

# broker address
bootstrap.servers=127.0.0.1:9092

Configure connect-selectdb-sink.properties

name=test-selectdb-sink
connector.class=com.selectdb.kafka.connector.SelectdbSinkConnector
topics=topic
selectdb.topic2table.map=topic:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
selectdb.url=xxx.cn-beijing.privatelink.aliyuncs.com
selectdb.http.port=48614
selectdb.query.port=25865
selectdb.user=admin
selectdb.password=
selectdb.database=test_db
selectdb.cluster=cluster_name
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

Start

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties

Distributed mode

Configure connect-distributed.properties

# broker address
bootstrap.servers=127.0.0.1:9092
 
# Modify group.id, the same cluster needs to be consistent
group.id=connect-cluster

Start

$KAFKA_HOME/bin/connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties

Add connector

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"test-selectdb-sink-cluster",
  "config":{
    "connector.class":"com.selectdb.kafka.connector.SelectdbSinkConnector",
    "topics":"topic",
    "selectdb.topic2table.map": "topic:test_kafka_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "selectdb.url":"xx.cn-beijing.privatelink.aliyuncs.com",
    "selectdb.user":"admin",
    "selectdb.password":"",
    "selectdb.http.port":"48614",
    "selectdb.query.port":"25865",
    "selectdb.database":"test_db",
    "selectdb.cluster":"cluster_name",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "value.converter.schemas.enable":"false",
  }
}'

Access an SSL-certified Kafka cluster

Accessing the SSL-certified Kafka cluster through kafka-connect requires the user to provide the certificate file (client.truststore.jks) used to authenticate the Kafka Broker public key. You can add the following configuration to the connect-distributed.propertiesfile :

# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
 
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

For instructions on configuring Kafka clusters with SSL authentication through kafka-connect, please refer to: Configure Kafka Connect (opens in a new tab)

Configuration items

KeyDefault ValueRequiredDescription
name-YConnect application name, must be unique in the Kafka Connect environment
connector.class-Ycom.selectdb.kafka.connector.SelectdbSinkConnector
topics-YA list of topics to subscribe to, separated by commas: topic1,topic2
selectdb.url-YVeloDB connection address
selectdb.http.port-YVeloDB HTTP protocol port
selectdb.query.port-YVeloDB MySQL protocol port
selectdb.user-YVeloDB username
selectdb.password-YVeloDB password
selectdb.database-Ythe database to write to
selectdb.cluster-YWrite the cluster name to use
selectdb.topic2table.map-NMapping between topic and table tables, for example: topic1:tb1, topic2:tb2 is empty by default, indicating that topic and table names correspond one-to-one
buffer.count.records10000NThe number of records buffered in memory per Kafka partition before flushing to VeloDB. Default 10000 records
buffer.flush.time120NBuffer refresh interval, in seconds, default 120 seconds
buffer.size.bytes5000000(5MB)NCumulative size of records buffered in memory for each Kafka partition, in bytes, default 5MB
jmxtrueNJMX is enabled by default to obtain connector internal monitoring indicators

For other general configuration items of Kafka Connect Sink, please refer to: Kafka Connect Sink Configuration Properties (opens in a new tab)