Skip to main content
Version: 4.x

Ingesting data from Kafka to VeloDB (using Routine Load)

Routine Load creates a persistent streaming job that continuously consumes messages from Kafka topics and loads them into VeloDB tables.

Prerequisites

VeloDB Cloud Account (Required)

You need a VeloDB Cloud account with an active cluster. If you don't have one, follow the VeloDB Quick Start Guide to create your account and cluster.

To run SQL queries and create databases in VeloDB, use the built-in SQL Editor in the VeloDB Console sidebar:

SQL Editor

Kafka Source Requirements

RequirementDetails
Kafka Version0.10.0.0 or higher (recommended)
Data FormatJSON or CSV
Network AccessPublic endpoint or Private Link to VeloDB

Cloud Kafka Providers

Network: Public access enabled by default

Authentication: SASL/PLAIN with API Key and Secret

SSL Certificate: Not required (disable certificate verification)

Setup:

  1. Get Bootstrap server from Cluster Settings
  2. Create API Key in Cluster Overview → API Keys
  3. For sample data, use Datagen Source connector with JSON format
Critical Requirement
  • Confluent Cloud: Uses SASL/PLAIN authentication with API Key/Secret
  • Other Kafka providers: Use SCRAM-SHA-512 (not SCRAM-SHA-256, which will fail with "Broker transport failure" errors)

1. Create SSL Certificate in VeloDB

In the SQL Editor, use CREATE FILE to upload the SSL certificate.

Important

The certificate file must be created in the same database where you will create the Routine Load job. First create or select your database, then create the file.

No SSL certificate required. Confluent Cloud handles SSL internally.

Skip this step and proceed to Create Target Table.

When creating the Routine Load job, use:

"property.enable.ssl.certificate.verification" = "false"
info

After creating the file, it can be referenced in Routine Load as FILE:<filename>.pem.

tip

To check existing certificates in current database: SHOW FILE;


2. (Optional) Create Sample Topic with Data

If you don't have an existing Kafka topic, create one with sample data for testing.

Use the Datagen Source connector in Confluent Cloud Console:

  1. Go to Connectors in your cluster
  2. Click Add connector → search for Datagen Source
  3. Configure:
    • Topic: Enter a topic name (e.g., orders)
    • Output record value format: Select JSON (required - VeloDB does not support AVRO)
    • Template: Choose a template (e.g., Orders, Users, Pageviews)
  4. Click Continue through the remaining steps
  5. Launch the connector

The connector will automatically create the topic and produce sample messages.

Important

Select JSON as the output format. AVRO and JSON_SR formats are not supported by VeloDB Routine Load.


3. Create Target Table in VeloDB

Design your VeloDB table schema based on your Kafka message format.

Schema Mapping: Kafka to VeloDB

JSON Message Example:

{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}

Mapping to VeloDB Table:

Kafka JSON FieldJSON TypeVeloDB ColumnVeloDB Type
event_timestringevent_timeDATETIME
user_idnumberuser_idBIGINT
event_typestringevent_typeVARCHAR(50)
pagestringpageVARCHAR(255)

Type Mapping Reference

For complete VeloDB data types, see Data Type Overview.

Kafka/JSON TypeVeloDB Type
string (datetime)DATETIME, DATE
stringVARCHAR(n), STRING, TEXT
integerINT, BIGINT, SMALLINT, TINYINT
number (decimal)DOUBLE, FLOAT, DECIMAL(p,s)
booleanBOOLEAN
nested objectJSON, VARIANT
arrayARRAY<T>, JSON

Create the Table

In the SQL Editor, create the database and table:

CREATE DATABASE IF NOT EXISTS demo;

USE demo;

CREATE TABLE events (
event_time DATETIME NOT NULL,
user_id BIGINT NOT NULL,
event_type VARCHAR(50),
page VARCHAR(255)
)
DUPLICATE KEY(event_time, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
tip

The jsonpaths parameter in Routine Load maps JSON fields to table columns in order. Ensure your table columns match the order specified in jsonpaths.


4. Create Routine Load Job

Use CREATE ROUTINE LOAD to start consuming from Kafka:

CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<BOOTSTRAP_SERVER>",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "<API_KEY>",
"property.sasl.password" = "<API_SECRET>",
"property.enable.ssl.certificate.verification" = "false",
"property.group.id" = "velodb-consumer"
);

Replace the placeholders:

PlaceholderExample
<BOOTSTRAP_SERVER>pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
<YOUR_TOPIC>orders
<API_KEY>Your Confluent Cloud API Key
<API_SECRET>Your Confluent Cloud API Secret
Finding Confluent Cloud connection details
  • Bootstrap server: Cluster Overview → Cluster Settings
  • API Key/Secret: Cluster Overview → API Keys → Create key
Data Format

Confluent Cloud Datagen connector must use JSON format. AVRO and JSON_SR formats are not supported by VeloDB Routine Load.


5. Verify

Check Routine Load job status:

SHOW ROUTINE LOAD FOR demo.kafka_load;

Expected output shows RUNNING state:

+-------+-------------+-----------+--------+-----------+
| Id | Name | TableName | State | DataSource|
+-------+-------------+-----------+--------+-----------+
| 12345 | kafka_load | events | RUNNING| KAFKA |
+-------+-------------+-----------+--------+-----------+

Check loaded data:

SELECT * FROM demo.events ORDER BY event_time LIMIT 10;

Expected result:

+---------------------+---------+------------+------------+
| event_time | user_id | event_type | page |
+---------------------+---------+------------+------------+
| 2025-01-15 10:30:00 | 1001 | page_view | /home |
| 2025-01-15 10:31:00 | 1002 | click | /products |
| 2025-01-15 10:32:00 | 1001 | purchase | /checkout |
+---------------------+---------+------------+------------+

Monitor task progress:

SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";

6. Manage Routine Load Jobs

ActionSQL Command
Pause jobPAUSE ROUTINE LOAD FOR demo.kafka_load;
Resume jobRESUME ROUTINE LOAD FOR demo.kafka_load;
Stop jobSTOP ROUTINE LOAD FOR demo.kafka_load;
View all jobsSHOW ALL ROUTINE LOAD;

Network Connectivity

Option 1: Public Network Access

Use the public endpoint provided by your Kafka provider. Ensure VeloDB Cloud can reach the broker on port 9092 (or your configured port).

For production workloads, configure Private Link to keep traffic within the cloud provider's network:

  1. Navigate to Settings → Endpoint Services in VeloDB Console
  2. Create a Private Link connection to your Kafka provider
  3. Use the private endpoint in your kafka_broker_list

Routine Load Parameters

ParameterDescription
kafka_broker_listKafka broker addresses (comma-separated for multiple)
kafka_topicTopic to consume from
property.security.protocolSASL_SSL for cloud Kafka, PLAINTEXT for no auth
property.sasl.mechanismPLAIN for Confluent Cloud, SCRAM-SHA-512 for others (not SCRAM-SHA-256)
property.sasl.usernameSASL username
property.sasl.passwordSASL password
property.ssl.ca.locationPath to uploaded CA certificate (e.g., FILE:isrgrootx1.pem). Not needed for Confluent Cloud
property.enable.ssl.certificate.verificationSet to false for Confluent Cloud (no certificate needed)
property.group.idConsumer group ID
property.kafka_default_offsetsOFFSET_BEGINNING, OFFSET_END, or specific offset

Troubleshooting

IssueSolution
"Broker transport failure"For Confluent Cloud: use PLAIN. For others: use SCRAM-SHA-512 (not SCRAM-SHA-256)
"SSL handshake failed"Verify CA certificate matches your provider (ISRG Root X1 for Redpanda, Amazon Root CA 1 for MSK)
"Authentication failed"Check username/password, ensure user has topic permissions
"Topic authorization failed"Grant ACL permissions: rpk security acl create --allow-principal "User:xxx" --operation read,describe --topic xxx --group "*"
Connection timeoutEnable public access or configure Private Link
Job auto-paused: "max_error_number exceeded"Schema mismatch between JSON and table. Check jsonpaths matches your JSON fields. Add "max_filter_ratio" = "0.5" to tolerate some errors
Job status shows PAUSEDCheck reason with SHOW ROUTINE LOAD FOR <job_name>; then fix and run RESUME ROUTINE LOAD FOR <job_name>;
File already existsCheck existing files with SHOW FILE; before creating

Debugging Tips

Check job error details:

SHOW ROUTINE LOAD FOR demo.kafka_load;
-- Look at ReasonOfStateChanged and ErrorLogUrls columns

View task-level status:

SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";

Test with new messages only (skip old data):

-- Use OFFSET_END to start from latest messages
"property.kafka_default_offsets" = "OFFSET_END"

References