Routine Loadを使用してKafkaからVeloDBにデータを取り込む
Routine Loadは、Kafkaトピックからメッセージを継続的に消費し、VeloDBTableに読み込む永続的なストリーミングジョブを作成します。
前提条件
VeloDB Cloudアカウント(必須)
アクティブなクラスターを持つVeloDB Cloudアカウントが必要です。アカウントをお持ちでない場合は、VeloDB Quick Start Guideに従ってアカウントとクラスターを作成してください。
VeloDBでSQLクエリを実行し、データベースを作成するには、VeloDB Consoleサイドバーの組み込みSQL Editorを使用してください:

Kafkaソース要件
| 要件 | 詳細 |
|---|---|
| Kafkaバージョン | 0.10.0.0以上(推奨) |
| データフォーマット | JSONまたはCSV |
| ネットワークアクセス | VeloDBへのパブリックエンドポイントまたはPrivate Link |
クラウドKafkaプロバイダー
- Confluent Cloud
- Generic Kafka
- Amazon MSK
- Redpanda Cloud
ネットワーク: デフォルトでパブリックアクセスが有効
認証: API KeyとSecretを使用したSASL/PLAIN
SSL証明書: 不要(証明書検証を無効化)
セットアップ:
- Cluster SettingsからBootstrapサーバーを取得
- Cluster 概要 → API KeysでAPI Keyを作成
- サンプルデータには、JSONフォーマットのDatagen Sourceコネクタを使用
ネットワーク: VeloDB CloudがKafkaブローカーに到達できることを確認(パブリックIPまたはPrivate Link)
認証オプション:
- SASL_SSLとSCRAM-SHA-512(セキュアな接続に推奨)
- PLAINTEXTは内部/信頼できるネットワークで認証なし
SSL証明書: KafkaクラスターのCA証明書を使用
ネットワーク: クラスター設定で「パブリックアクセス」を有効化、またはPrivate Linkを使用
認証: SASL/SCRAM-SHA-512(認証情報をAWS Secrets Managerに保存)
SSL証明書: Amazon Root CA 1
ネットワーク: デフォルトでパブリックアクセスが有効
認証: SASL/SCRAM-SHA-512
SSL証明書: ISRG Root X1(Let's Encrypt)
ユーザー設定:
- Redpanda Console → Security → Usersでユーザーを作成
- メカニズムとしてSCRAM-SHA-512を選択
rpkCLIを使用してACL権限を付与:
rpk security acl create \
--allow-principal "User:<USERNAME>" \
--operation read,describe \
--topic <YOUR_TOPIC> \
--group "*"
- Confluent Cloud: SASL/PLAIN認証をAPI Key/Secretで使用します
- その他のKafkaプロバイダー: SCRAM-SHA-512を使用してください(SCRAM-SHA-256は使用しないでください。「Broker transport failure」エラーで失敗します)
1. VeloDBでSSL証明書を作成
SQL Editorで、CREATE FILEを使用してSSL証明書をアップロードします。
証明書ファイルは、Routine Loadジョブを作成する同じデータベースで作成する必要があります。まずデータベースを作成または選択してから、ファイルを作成してください。
- Confluent Cloud
- Generic Kafka
- Amazon MSK
- Redpanda Cloud
SSL証明書は不要です。 Confluent CloudはSSLを内部で処理します。
この手順をスキップして、対象Tableの作成に進んでください。
Routine Loadジョブを作成する際は、以下を使用してください:
"property.enable.ssl.certificate.verification" = "false"
KafkaクラスターのCA証明書をアップロードします。証明書がURL経由でアクセス可能な場合:
-- Create database first (if not exists)
CREATE DATABASE IF NOT EXISTS demo;
USE demo;
-- Create SSL certificate in this database
CREATE FILE "kafka-ca.pem"
PROPERTIES (
"url" = "https://your-domain.com/path/to/ca.pem",
"catalog" = "internal"
);
または、ローカル証明書のアップロードについてはCREATE FILEを確認してください。
-- Create database first (if not exists)
CREATE DATABASE IF NOT EXISTS demo;
USE demo;
-- Create SSL certificate in this database
CREATE FILE "AmazonRootCA1.pem"
PROPERTIES (
"url" = "https://www.amazontrust.com/repository/AmazonRootCA1.pem",
"catalog" = "internal"
);
-- Create database first (if not exists)
CREATE DATABASE IF NOT EXISTS demo;
USE demo;
-- Create SSL certificate in this database
CREATE FILE "isrgrootx1.pem"
PROPERTIES (
"url" = "https://letsencrypt.org/certs/isrgrootx1.pem",
"catalog" = "internal"
);
ファイルを作成した後、Routine Load内で FILE:<filename>.pem として参照できます。
現在のデータベース内の既存の証明書を確認するには:SHOW FILE;
2. (オプション) サンプルデータを含むサンプルトピックの作成
既存のKafkaトピックがない場合は、テスト用にサンプルデータを含むトピックを作成します。
- Confluent Cloud
- Kafka CLI
- Redpanda (rpk)
Confluent Cloud ConsoleのDatagen Sourceコネクタを使用します:
- クラスター内のConnectorsに移動
- Add connectorをクリック → Datagen Sourceを検索
- 設定:
- Topic:トピック名を入力(例:
orders) - Output record value format:JSONを選択(必須 - VeloDBはAVROをサポートしていません)
- Template:テンプレートを選択(例:
Orders、Users、Pageviews)
- Topic:トピック名を入力(例:
- 残りの手順でContinueをクリック
- コネクタを起動
コネクタは自動的にトピックを作成し、サンプルメッセージを生成します。
出力フォーマットとしてJSONを選択してください。AVROおよびJSON_SRフォーマットはVeloDB Routine Loadでサポートされていません。
# Create topic
kafka-topics.sh --create --topic clickstream \
--bootstrap-server <BROKER>:9092 \
--partitions 3 --replication-factor 1
# Produce sample messages
echo '{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}
{"event_time": "2025-01-15 10:31:00", "user_id": 1002, "event_type": "click", "page": "/products"}
{"event_time": "2025-01-15 10:32:00", "user_id": 1001, "event_type": "purchase", "page": "/checkout"}' | \
kafka-console-producer.sh --topic clickstream --bootstrap-server <BROKER>:9092
# Create topic
rpk topic create clickstream
# Produce sample messages
echo '{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}
{"event_time": "2025-01-15 10:31:00", "user_id": 1002, "event_type": "click", "page": "/products"}
{"event_time": "2025-01-15 10:32:00", "user_id": 1001, "event_type": "purchase", "page": "/checkout"}' | rpk topic produce clickstream
# Verify messages
rpk topic consume clickstream --num 3
3. VeloDBでターゲットTableを作成
Kafkaメッセージフォーマットに基づいてVeloDBTableスキーマを設計します。
スキーママッピング: KafkaからVeloDB
JSONメッセージの例:
{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}
VeloDBTableへのマッピング:
| Kafka JSON Field | JSON タイプ | VeloDB Column | VeloDB タイプ |
|---|---|---|---|
event_time | string | event_time | DATETIME |
user_id | number | user_id | BIGINT |
event_type | string | event_type | VARCHAR(50) |
page | string | page | VARCHAR(255) |
型マッピングリファレンス
VeloDBデータ型の完全な情報については、Data タイプ 概要を参照してください。
| Kafka/JSON タイプ | VeloDB タイプ |
|---|---|
| string (datetime) | DATETIME, DATE |
| string | VARCHAR(n), STRING, TEXT |
| integer | INT, BIGINT, SMALLINT, TINYINT |
| number (decimal) | DOUBLE, FLOAT, DECIMAL(p,s) |
| boolean | BOOLEAN |
| nested object | JSON, VARIANT |
| array | ARRAY<T>, JSON |
Tableの作成
SQL EditorでデータベースとTableを作成します:
CREATE DATABASE IF NOT EXISTS demo;
USE demo;
CREATE TABLE events (
event_time DATETIME NOT NULL,
user_id BIGINT NOT NULL,
event_type VARCHAR(50),
page VARCHAR(255)
)
DUPLICATE KEY(event_time, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
Routine Loadのjsonpathsパラメータは、JSONフィールドをTableカラムに順序通りにマッピングします。Tableカラムがjsonpathsで指定された順序と一致していることを確認してください。
4. Routine Loadジョブの作成
CREATE ROUTINE LOADを使用してKafkaからの消費を開始します:
- Confluent Cloud
- Cloud Kafka (SASL_SSL)
- Self-hosted Kafka (No Auth)
- CSV形式
CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<BOOTSTRAP_SERVER>",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "<API_KEY>",
"property.sasl.password" = "<API_SECRET>",
"property.enable.ssl.certificate.verification" = "false",
"property.group.id" = "velodb-consumer"
);
プレースホルダーを置換してください:
| プレースホルダー | 例 |
|---|---|
<BOOTSTRAP_SERVER> | pkc-xxxxx.us-east-1.aws.confluent.cloud:9092 |
<YOUR_TOPIC> | orders |
<API_KEY> | Your Confluent Cloud API Key |
<API_SECRET> | Your Confluent Cloud API Secret |
- Bootstrap server: Cluster 概要 → Cluster Settings
- API Key/Secret: Cluster 概要 → API Keys → Create key
Confluent Cloud DatagenコネクターはJSONフォーマットを使用する必要があります。AVROおよびJSON_SRフォーマットはVeloDB Routine Loadではサポートされていません。
CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<YOUR_BROKER>:9092",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "SCRAM-SHA-512",
"property.sasl.username" = "<YOUR_USERNAME>",
"property.sasl.password" = "<YOUR_PASSWORD>",
"property.ssl.ca.location" = "FILE:isrgrootx1.pem",
"property.group.id" = "velodb-consumer"
);
プレースホルダーを置き換えてください:
| プレースホルダー | 例 |
|---|---|
<YOUR_BROKER> | abc123.any.us-west-2.mpx.prd.cloud.redpanda.com (Redpanda) |
<YOUR_TOPIC> | clickstream |
<YOUR_USERNAME> | velodb-user |
<YOUR_PASSWORD> | your-password |
rpk cluster infoを実行してブローカーアドレスを確認するか、Redpanda Console → Overviewで確認できます。
CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<YOUR_BROKER>:9092",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "velodb-consumer"
);
CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS TERMINATED BY ",",
COLUMNS(event_time, user_id, event_type, page)
FROM KAFKA (
"kafka_broker_list" = "<YOUR_BROKER>:9092",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "SCRAM-SHA-512",
"property.sasl.username" = "<YOUR_USERNAME>",
"property.sasl.password" = "<YOUR_PASSWORD>",
"property.ssl.ca.location" = "FILE:isrgrootx1.pem",
"property.group.id" = "velodb-consumer"
);
5. 検証
Routine Load ジョブステータスを確認する:
SHOW ROUTINE LOAD FOR demo.kafka_load;
期待される出力はRUNNING状態を示します:
+-------+-------------+-----------+--------+-----------+
| Id | Name | TableName | State | DataSource|
+-------+-------------+-----------+--------+-----------+
| 12345 | kafka_load | events | RUNNING| KAFKA |
+-------+-------------+-----------+--------+-----------+
読み込まれたデータの確認:
SELECT * FROM demo.events ORDER BY event_time LIMIT 10;
期待される結果:
+---------------------+---------+------------+------------+
| event_time | user_id | event_type | page |
+---------------------+---------+------------+------------+
| 2025-01-15 10:30:00 | 1001 | page_view | /home |
| 2025-01-15 10:31:00 | 1002 | click | /products |
| 2025-01-15 10:32:00 | 1001 | purchase | /checkout |
+---------------------+---------+------------+------------+
タスクの進行状況を監視する:
SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";
6. Routine Loadジョブの管理
| 操作 | SQLコマンド |
|---|---|
| ジョブの一時停止 | PAUSE ROUTINE LOAD FOR demo.kafka_load; |
| ジョブの再開 | RESUME ROUTINE LOAD FOR demo.kafka_load; |
| ジョブの停止 | STOP ROUTINE LOAD FOR demo.kafka_load; |
| 全ジョブの表示 | SHOW ALL ROUTINE LOAD; |
ネットワーク接続
オプション1: パブリックネットワークアクセス
Kafkaプロバイダーが提供するパブリックエンドポイントを使用します。VeloDB Cloudがポート9092(または設定されたポート)でブローカーに到達できることを確認してください。
オプション2: Private Link(本番環境推奨)
本番ワークロードでは、トラフィックをクラウドプロバイダーのネットワーク内に保つためにPrivate Linkを設定します:
- VeloDB ConsoleのSettings → Endpoint Servicesに移動
- KafkaプロバイダーへのPrivate Link接続を作成
kafka_broker_listでプライベートエンドポイントを使用
Routine Loadパラメータ
| パラメータ | 説明 |
|---|---|
kafka_broker_list | Kafkaブローカーアドレス(複数の場合はカンマ区切り) |
kafka_topic | 消費するトピック |
property.security.protocol | クラウドKafkaの場合はSASL_SSL、認証なしの場合はPLAINTEXT |
property.sasl.mechanism | Confluent Cloudの場合はPLAIN、その他の場合はSCRAM-SHA-512(SCRAM-SHA-256ではない) |
property.sasl.username | SASLユーザー名 |
property.sasl.password | SASLパスワード |
property.ssl.ca.location | アップロードされたCA証明書へのパス(例:FILE:isrgrootx1.pem)。Confluent Cloudでは不要 |
property.enable.ssl.certificate.verification | Confluent Cloudの場合はfalseに設定(証明書不要) |
property.group.id | コンシューマーグループID |
property.kafka_default_offsets | OFFSET_BEGINNING、OFFSET_END、または特定のオフセット |
トラブルシューティング
| 問題 | 解決方法 |
|---|---|
| "Broker transport failure" | Confluent Cloudの場合:PLAINを使用。その他の場合:SCRAM-SHA-512を使用(SCRAM-SHA-256ではない) |
| "SSL handshake failed" | CA証明書がプロバイダーと一致することを確認(RedpandaはISRG Root X1、MSKはAmazon Root CA 1) |
| "認証 failed" | ユーザー名/パスワードを確認、ユーザーがトピック権限を持つことを確認 |
| "Topic authorization failed" | ACL権限を付与:rpk security acl create --allow-principal "User:xxx" --operation read,describe --topic xxx --group "*" |
| 接続タイムアウト | パブリックアクセスを有効化またはPrivate Linkを設定 |
| ジョブの自動一時停止:"max_error_number exceeded" | JSONとTable間のスキーマ不一致。jsonpathsがJSONフィールドと一致することを確認。エラーの一部を許容するために"max_filter_ratio" = "0.5"を追加 |
ジョブステータスがPAUSEDを表示 | SHOW ROUTINE LOAD FOR <job_name>;で理由を確認してから修正し、RESUME ROUTINE LOAD FOR <job_name>;を実行 |
| ファイルが既に存在 | 作成前にSHOW FILE;で既存ファイルを確認 |
デバッグのヒント
ジョブエラーの詳細を確認:
SHOW ROUTINE LOAD FOR demo.kafka_load;
-- Look at ReasonOfStateChanged and ErrorLogUrls columns
タスクレベルのステータスを表示:
SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";
新しいメッセージのみでテストする(古いデータをスキップ):
-- Use OFFSET_END to start from latest messages
"property.kafka_default_offsets" = "OFFSET_END"
リファレンス
- Routine Load Manual
- CREATE ROUTINE LOAD
- CREATE FILE
- Doris Kafka Connector (Kafka Connectを使用する代替方法)