Kafka から VeloDB へのデータ取り込み(Routine Load を使用)
Routine Load は、Kafka トピックからメッセージを継続的に消費し、VeloDB テーブルに読み込む永続的なストリーミングジョブを作成します。
前提条件
VeloDB Cloud アカウント(必須)
アクティブなクラスタを持つ VeloDB Cloud アカウントが必要です。アカウントをお持ちでない場合は、VeloDB Quick Start Guide に従ってアカウントとクラスタを作成してください。
VeloDB で SQL クエリを実行し、データベースを作成するには、VeloDB Console サイドバーの組み込み SQL Editor を使用します:

Kafka ソース要件
| 要件 | 詳細 |
|---|---|
| Kafka バージョン | 0.10.0.0 以上(推奨) |
| データ形式 | JSON または CSV |
| ネットワークアクセス | VeloDB へのパブリックエンドポイントまたは Private Link |
クラウド Kafka プロバイダー
- Confluent Cloud
- Generic Kafka
- Amazon MSK
- Redpanda Cloud
ネットワーク: デフォルトでパブリックアクセスが有効
認証: API Key と Secret を使用した SASL/PLAIN
SSL 証明書: 不要(証明書検証を無効にする)
セットアップ:
- Cluster Settings から Bootstrap サーバーを取得
- Cluster Overview → API Keys で API Key を作成
- サンプルデータには、JSON 形式の Datagen Source コネクターを使用
ネットワーク: VeloDB Cloud が Kafka ブローカーに到達できることを確認(パブリック IP または Private Link)
認証オプション:
- SASL_SSL と SCRAM-SHA-512(セキュア接続に推奨)
- PLAINTEXT 認証なしの内部/信頼ネットワーク用
SSL 証明書: Kafka クラスタの CA 証明書を使用
ネットワーク: クラスタ設定で「Public access」を有効にするか、Private Link を使用
認証: SASL/SCRAM-SHA-512(認証情報を AWS Secrets Manager に保存)
SSL 証明書: Amazon Root CA 1
ネットワーク: デフォルトでパブリックアクセスが有効
認証: SASL/SCRAM-SHA-512
SSL 証明書: ISRG Root X1 (Let's Encrypt)
ユーザーセットアップ:
- Redpanda Console → Security → Users でユーザーを作成
- メカニズムとして SCRAM-SHA-512 を選択
rpkCLI を使用して ACL 権限を付与:
rpk security acl create \
--allow-principal "User:<USERNAME>" \
--operation read,describe \
--topic <YOUR_TOPIC> \
--group "*"
- Confluent Cloud: API Key/SecretでSASL/PLAIN認証を使用します
- その他のKafkaプロバイダー: SCRAM-SHA-512を使用してください(SCRAM-SHA-256は「Broker transport failure」エラーで失敗します)
1. VeloDBでSSL証明書を作成
SQL Editorで、CREATE FILEを使用してSSL証明書をアップロードします。
証明書ファイルは、Routine Loadジョブを作成する同じデータベース内に作成する必要があります。最初にデータベースを作成または選択してから、ファイルを作成してください。
- Confluent Cloud
- Generic Kafka
- Amazon MSK
- Redpanda Cloud
SSL証明書は不要です。 Confluent CloudはSSLを内部で処理します。
この手順をスキップして、ターゲットテーブルの作成に進んでください。
Routine Loadジョブを作成する際は、以下を使用してください:
"property.enable.ssl.certificate.verification" = "false"
KafkaクラスターのCA証明書をアップロードしてください。証明書がURL経由でアクセス可能な場合:
-- Create database first (if not exists)
CREATE DATABASE IF NOT EXISTS demo;
USE demo;
-- Create SSL certificate in this database
CREATE FILE "kafka-ca.pem"
PROPERTIES (
"url" = "https://your-domain.com/path/to/ca.pem",
"catalog" = "internal"
);
または、ローカル証明書をアップロードするにはCREATE FILEを確認してください。
-- Create database first (if not exists)
CREATE DATABASE IF NOT EXISTS demo;
USE demo;
-- Create SSL certificate in this database
CREATE FILE "AmazonRootCA1.pem"
PROPERTIES (
"url" = "https://www.amazontrust.com/repository/AmazonRootCA1.pem",
"catalog" = "internal"
);
-- Create database first (if not exists)
CREATE DATABASE IF NOT EXISTS demo;
USE demo;
-- Create SSL certificate in this database
CREATE FILE "isrgrootx1.pem"
PROPERTIES (
"url" = "https://letsencrypt.org/certs/isrgrootx1.pem",
"catalog" = "internal"
);
ファイル作成後、Routine LoadでFILE:<filename>.pemとして参照できます。
現在のデータベースの既存証明書を確認するには:SHOW FILE;
2. (任意) サンプルデータ付きトピックの作成
既存のKafkaトピックがない場合は、テスト用のサンプルデータ付きトピックを作成してください。
- Confluent Cloud
- Kafka CLI
- Redpanda (rpk)
Confluent Cloud ConsoleでDatagen Sourceコネクタを使用:
- クラスター内のConnectorsに移動
- Add connectorをクリック → Datagen Sourceを検索
- 設定:
- Topic:トピック名を入力(例:
orders) - Output record value format:JSONを選択(必須 - VeloDBはAVROをサポートしません)
- Template:テンプレートを選択(例:
Orders、Users、Pageviews)
- Topic:トピック名を入力(例:
- 残りのステップでContinueをクリック
- コネクタを起動
コネクタは自動的にトピックを作成し、サンプルメッセージを生成します。
出力フォーマットとしてJSONを選択してください。AVROおよびJSON_SRフォーマットはVeloDB Routine Loadでサポートされていません。
# Create topic
kafka-topics.sh --create --topic clickstream \
--bootstrap-server <BROKER>:9092 \
--partitions 3 --replication-factor 1
# Produce sample messages
echo '{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}
{"event_time": "2025-01-15 10:31:00", "user_id": 1002, "event_type": "click", "page": "/products"}
{"event_time": "2025-01-15 10:32:00", "user_id": 1001, "event_type": "purchase", "page": "/checkout"}' | \
kafka-console-producer.sh --topic clickstream --bootstrap-server <BROKER>:9092
# Create topic
rpk topic create clickstream
# Produce sample messages
echo '{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}
{"event_time": "2025-01-15 10:31:00", "user_id": 1002, "event_type": "click", "page": "/products"}
{"event_time": "2025-01-15 10:32:00", "user_id": 1001, "event_type": "purchase", "page": "/checkout"}' | rpk topic produce clickstream
# Verify messages
rpk topic consume clickstream --num 3
3. VeloDBでターゲットテーブルを作成
Kafkaメッセージフォーマットに基づいてVeloDBテーブルスキーマを設計します。
スキーママッピング: KafkaからVeloDBへ
JSONメッセージの例:
{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}
VeloDBテーブルへのマッピング:
| Kafka JSON Field | JSON Type | VeloDB Column | VeloDB Type |
|---|---|---|---|
event_time | string | event_time | DATETIME |
user_id | number | user_id | BIGINT |
event_type | string | event_type | VARCHAR(50) |
page | string | page | VARCHAR(255) |
型マッピングリファレンス
VeloDBデータ型の完全な情報については、Data Type Overviewを参照してください。
| Kafka/JSON Type | VeloDB Type |
|---|---|
| string (datetime) | DATETIME, DATE |
| string | VARCHAR(n), STRING, TEXT |
| integer | INT, BIGINT, SMALLINT, TINYINT |
| number (decimal) | DOUBLE, FLOAT, DECIMAL(p,s) |
| boolean | BOOLEAN |
| nested object | JSON, VARIANT |
| array | ARRAY<T>, JSON |
テーブルの作成
SQL Editorで、データベースとテーブルを作成します:
CREATE DATABASE IF NOT EXISTS demo;
USE demo;
CREATE TABLE events (
event_time DATETIME NOT NULL,
user_id BIGINT NOT NULL,
event_type VARCHAR(50),
page VARCHAR(255)
)
DUPLICATE KEY(event_time, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
Routine Loadのjsonpathsパラメータは、JSON フィールドをテーブルカラムに順序通りにマップします。テーブルカラムがjsonpathsで指定された順序と一致することを確認してください。
4. Routine Load ジョブの作成
CREATE ROUTINE LOADを使用してKafkaからの消費を開始します:
- Confluent Cloud
- Cloud Kafka (SASL_SSL)
- Self-hosted Kafka (No Auth)
- CSV形式
CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<BOOTSTRAP_SERVER>",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "<API_KEY>",
"property.sasl.password" = "<API_SECRET>",
"property.enable.ssl.certificate.verification" = "false",
"property.group.id" = "velodb-consumer"
);
プレースホルダーを置き換える:
| プレースホルダー | 例 |
|---|---|
<BOOTSTRAP_SERVER> | pkc-xxxxx.us-east-1.aws.confluent.cloud:9092 |
<YOUR_TOPIC> | orders |
<API_KEY> | Your Confluent Cloud API Key |
<API_SECRET> | Your Confluent Cloud API Secret |
- Bootstrap server: Cluster Overview → Cluster Settings
- API Key/Secret: Cluster Overview → API Keys → Create key
Confluent Cloud Datagen connectorはJSON形式を使用する必要があります。AVROおよびJSON_SR形式はVeloDB Routine Loadではサポートされていません。
CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<YOUR_BROKER>:9092",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "SCRAM-SHA-512",
"property.sasl.username" = "<YOUR_USERNAME>",
"property.sasl.password" = "<YOUR_PASSWORD>",
"property.ssl.ca.location" = "FILE:isrgrootx1.pem",
"property.group.id" = "velodb-consumer"
);
プレースホルダーを置き換えます:
| プレースホルダー | 例 |
|---|---|
<YOUR_BROKER> | abc123.any.us-west-2.mpx.prd.cloud.redpanda.com (Redpanda) |
<YOUR_TOPIC> | clickstream |
<YOUR_USERNAME> | velodb-user |
<YOUR_PASSWORD> | your-password |
rpk cluster infoを実行してbrokerアドレスを確認するか、Redpanda Console → Overviewで確認できます。
CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<YOUR_BROKER>:9092",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "velodb-consumer"
);
CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS TERMINATED BY ",",
COLUMNS(event_time, user_id, event_type, page)
FROM KAFKA (
"kafka_broker_list" = "<YOUR_BROKER>:9092",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "SCRAM-SHA-512",
"property.sasl.username" = "<YOUR_USERNAME>",
"property.sasl.password" = "<YOUR_PASSWORD>",
"property.ssl.ca.location" = "FILE:isrgrootx1.pem",
"property.group.id" = "velodb-consumer"
);
5. 検証
Routine Loadジョブステータスを確認:
SHOW ROUTINE LOAD FOR demo.kafka_load;
期待される出力ではRUNNING状態が表示されます:
+-------+-------------+-----------+--------+-----------+
| Id | Name | TableName | State | DataSource|
+-------+-------------+-----------+--------+-----------+
| 12345 | kafka_load | events | RUNNING| KAFKA |
+-------+-------------+-----------+--------+-----------+
読み込まれたデータを確認:
SELECT * FROM demo.events ORDER BY event_time LIMIT 10;
期待される結果:
+---------------------+---------+------------+------------+
| event_time | user_id | event_type | page |
+---------------------+---------+------------+------------+
| 2025-01-15 10:30:00 | 1001 | page_view | /home |
| 2025-01-15 10:31:00 | 1002 | click | /products |
| 2025-01-15 10:32:00 | 1001 | purchase | /checkout |
+---------------------+---------+------------+------------+
タスクの進行状況を監視:
SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";
6. Routine Loadジョブの管理
| アクション | SQLコマンド |
|---|---|
| ジョブの一時停止 | PAUSE ROUTINE LOAD FOR demo.kafka_load; |
| ジョブの再開 | RESUME ROUTINE LOAD FOR demo.kafka_load; |
| ジョブの停止 | STOP ROUTINE LOAD FOR demo.kafka_load; |
| 全ジョブの表示 | SHOW ALL ROUTINE LOAD; |
ネットワーク接続
オプション1: パブリックネットワークアクセス
Kafkaプロバイダーが提供するパブリックエンドポイントを使用します。VeloDB CloudがポートKafkaがプロバイダーが提供するパブリックエンドポイントを使用します。VeloDB Cloudがポート9092(または設定されたポート)でブローカーにアクセスできることを確認してください。
オプション2: Private Link(本番環境推奨)
本番環境のワークロードでは、クラウドプロバイダーのネットワーク内でトラフィックを保持するためにPrivate Linkを設定します:
- VeloDB ConsoleでSettings → Endpoint Servicesに移動
- KafkaプロバイダーへのPrivate Link接続を作成
kafka_broker_listでプライベートエンドポイントを使用
Routine Loadパラメータ
| パラメータ | 説明 |
|---|---|
kafka_broker_list | Kafkaブローカーアドレス(複数の場合はカンマ区切り) |
kafka_topic | 消費対象のトピック |
property.security.protocol | クラウドKafkaの場合はSASL_SSL、認証なしの場合はPLAINTEXT |
property.sasl.mechanism | Confluent Cloudの場合はPLAIN、その他の場合はSCRAM-SHA-512(SCRAM-SHA-256ではない) |
property.sasl.username | SASLユーザー名 |
property.sasl.password | SASLパスワード |
property.ssl.ca.location | アップロードされたCA証明書へのパス(例:FILE:isrgrootx1.pem)。Confluent Cloudでは不要 |
property.enable.ssl.certificate.verification | Confluent Cloudの場合はfalseに設定(証明書不要) |
property.group.id | コンシューマーグループID |
property.kafka_default_offsets | OFFSET_BEGINNING、OFFSET_END、または特定のオフセット |
トラブルシューティング
| 問題 | 解決方法 |
|---|---|
| "Broker transport failure" | Confluent Cloudの場合:PLAINを使用。その他の場合:SCRAM-SHA-512を使用(SCRAM-SHA-256ではない) |
| "SSL handshake failed" | CA証明書がプロバイダーと一致することを確認(RedpandaはISRG Root X1、MSKはAmazon Root CA 1) |
| "Authentication failed" | ユーザー名/パスワードを確認し、ユーザーがトピック権限を持っていることを確認 |
| "Topic authorization failed" | ACL権限を付与:rpk security acl create --allow-principal "User:xxx" --operation read,describe --topic xxx --group "*" |
| 接続タイムアウト | パブリックアクセスを有効にするかPrivate Linkを設定 |
| ジョブの自動一時停止:"max_error_number exceeded" | JSONとテーブル間のスキーマ不一致。jsonpathsがJSONフィールドと一致することを確認。一部のエラーを許容するために"max_filter_ratio" = "0.5"を追加 |
ジョブステータスがPAUSEDを表示 | SHOW ROUTINE LOAD FOR <job_name>;で理由を確認し、修正後にRESUME ROUTINE LOAD FOR <job_name>;を実行 |
| ファイルが既に存在 | 作成前にSHOW FILE;で既存ファイルを確認 |
デバッグのヒント
ジョブエラーの詳細を確認:
SHOW ROUTINE LOAD FOR demo.kafka_load;
-- Look at ReasonOfStateChanged and ErrorLogUrls columns
タスクレベルのステータスを表示:
SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";
新しいメッセージのみでテスト(古いデータをスキップ):
-- Use OFFSET_END to start from latest messages
"property.kafka_default_offsets" = "OFFSET_END"
参考資料
- Routine Load Manual
- CREATE ROUTINE LOAD
- CREATE FILE
- Doris Kafka Connector (Kafka Connectを使用した代替方法)