メインコンテンツまでスキップ
バージョン: 4.x

Routine Loadを使用してKafkaからVeloDBにデータを取り込む

Routine Loadは、Kafkaトピックからメッセージを継続的に消費し、VeloDBTableに読み込む永続的なストリーミングジョブを作成します。

前提条件

VeloDB Cloudアカウント(必須)

アクティブなクラスターを持つVeloDB Cloudアカウントが必要です。アカウントをお持ちでない場合は、VeloDB Quick Start Guideに従ってアカウントとクラスターを作成してください。

VeloDBでSQLクエリを実行し、データベースを作成するには、VeloDB Consoleサイドバーの組み込みSQL Editorを使用してください:

SQL Editor

Kafkaソース要件

要件詳細
Kafkaバージョン0.10.0.0以上(推奨)
データフォーマットJSONまたはCSV
ネットワークアクセスVeloDBへのパブリックエンドポイントまたはPrivate Link

クラウドKafkaプロバイダー

ネットワーク: デフォルトでパブリックアクセスが有効

認証: API KeyとSecretを使用したSASL/PLAIN

SSL証明書: 不要(証明書検証を無効化)

セットアップ:

  1. Cluster SettingsからBootstrapサーバーを取得
  2. Cluster 概要 → API KeysでAPI Keyを作成
  3. サンプルデータには、JSONフォーマットのDatagen Sourceコネクタを使用
Critical Requirement
  • Confluent Cloud: SASL/PLAIN認証をAPI Key/Secretで使用します
  • その他のKafkaプロバイダー: SCRAM-SHA-512を使用してください(SCRAM-SHA-256は使用しないでください。「Broker transport failure」エラーで失敗します)

1. VeloDBでSSL証明書を作成

SQL Editorで、CREATE FILEを使用してSSL証明書をアップロードします。

Important

証明書ファイルは、Routine Loadジョブを作成する同じデータベースで作成する必要があります。まずデータベースを作成または選択してから、ファイルを作成してください。

SSL証明書は不要です。 Confluent CloudはSSLを内部で処理します。

この手順をスキップして、対象Tableの作成に進んでください。

Routine Loadジョブを作成する際は、以下を使用してください:

"property.enable.ssl.certificate.verification" = "false"
備考

ファイルを作成した後、Routine Load内で FILE:<filename>.pem として参照できます。

ヒント

現在のデータベース内の既存の証明書を確認するには:SHOW FILE;


2. (オプション) サンプルデータを含むサンプルトピックの作成

既存のKafkaトピックがない場合は、テスト用にサンプルデータを含むトピックを作成します。

Confluent Cloud ConsoleのDatagen Sourceコネクタを使用します:

  1. クラスター内のConnectorsに移動
  2. Add connectorをクリック → Datagen Sourceを検索
  3. 設定:
    • Topic:トピック名を入力(例:orders
    • Output record value formatJSONを選択(必須 - VeloDBはAVROをサポートしていません)
    • Template:テンプレートを選択(例:OrdersUsersPageviews
  4. 残りの手順でContinueをクリック
  5. コネクタを起動

コネクタは自動的にトピックを作成し、サンプルメッセージを生成します。

Important

出力フォーマットとしてJSONを選択してください。AVROおよびJSON_SRフォーマットはVeloDB Routine Loadでサポートされていません。


3. VeloDBでターゲットTableを作成

Kafkaメッセージフォーマットに基づいてVeloDBTableスキーマを設計します。

スキーママッピング: KafkaからVeloDB

JSONメッセージの例:

{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}

VeloDBTableへのマッピング:

Kafka JSON FieldJSON タイプVeloDB ColumnVeloDB タイプ
event_timestringevent_timeDATETIME
user_idnumberuser_idBIGINT
event_typestringevent_typeVARCHAR(50)
pagestringpageVARCHAR(255)

型マッピングリファレンス

VeloDBデータ型の完全な情報については、Data タイプ 概要を参照してください。

Kafka/JSON タイプVeloDB タイプ
string (datetime)DATETIME, DATE
stringVARCHAR(n), STRING, TEXT
integerINT, BIGINT, SMALLINT, TINYINT
number (decimal)DOUBLE, FLOAT, DECIMAL(p,s)
booleanBOOLEAN
nested objectJSON, VARIANT
arrayARRAY<T>, JSON

Tableの作成

SQL EditorでデータベースとTableを作成します:

CREATE DATABASE IF NOT EXISTS demo;

USE demo;

CREATE TABLE events (
event_time DATETIME NOT NULL,
user_id BIGINT NOT NULL,
event_type VARCHAR(50),
page VARCHAR(255)
)
DUPLICATE KEY(event_time, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
ヒント

Routine Loadのjsonpathsパラメータは、JSONフィールドをTableカラムに順序通りにマッピングします。Tableカラムがjsonpathsで指定された順序と一致していることを確認してください。


4. Routine Loadジョブの作成

CREATE ROUTINE LOADを使用してKafkaからの消費を開始します:

CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<BOOTSTRAP_SERVER>",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "<API_KEY>",
"property.sasl.password" = "<API_SECRET>",
"property.enable.ssl.certificate.verification" = "false",
"property.group.id" = "velodb-consumer"
);

プレースホルダーを置換してください:

プレースホルダー
<BOOTSTRAP_SERVER>pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
<YOUR_TOPIC>orders
<API_KEY>Your Confluent Cloud API Key
<API_SECRET>Your Confluent Cloud API Secret
Confluent Cloudの接続詳細の確認方法
  • Bootstrap server: Cluster 概要 → Cluster Settings
  • API Key/Secret: Cluster 概要 → API Keys → Create key
データフォーマット

Confluent Cloud DatagenコネクターはJSONフォーマットを使用する必要があります。AVROおよびJSON_SRフォーマットはVeloDB Routine Loadではサポートされていません。


5. 検証

Routine Load ジョブステータスを確認する:

SHOW ROUTINE LOAD FOR demo.kafka_load;

期待される出力はRUNNING状態を示します:

+-------+-------------+-----------+--------+-----------+
| Id | Name | TableName | State | DataSource|
+-------+-------------+-----------+--------+-----------+
| 12345 | kafka_load | events | RUNNING| KAFKA |
+-------+-------------+-----------+--------+-----------+

読み込まれたデータの確認:

SELECT * FROM demo.events ORDER BY event_time LIMIT 10;

期待される結果:

+---------------------+---------+------------+------------+
| event_time | user_id | event_type | page |
+---------------------+---------+------------+------------+
| 2025-01-15 10:30:00 | 1001 | page_view | /home |
| 2025-01-15 10:31:00 | 1002 | click | /products |
| 2025-01-15 10:32:00 | 1001 | purchase | /checkout |
+---------------------+---------+------------+------------+

タスクの進行状況を監視する:

SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";

6. Routine Loadジョブの管理

操作SQLコマンド
ジョブの一時停止PAUSE ROUTINE LOAD FOR demo.kafka_load;
ジョブの再開RESUME ROUTINE LOAD FOR demo.kafka_load;
ジョブの停止STOP ROUTINE LOAD FOR demo.kafka_load;
全ジョブの表示SHOW ALL ROUTINE LOAD;

ネットワーク接続

オプション1: パブリックネットワークアクセス

Kafkaプロバイダーが提供するパブリックエンドポイントを使用します。VeloDB Cloudがポート9092(または設定されたポート)でブローカーに到達できることを確認してください。

オプション2: Private Link(本番環境推奨)

本番ワークロードでは、トラフィックをクラウドプロバイダーのネットワーク内に保つためにPrivate Linkを設定します:

  1. VeloDB ConsoleのSettings → Endpoint Servicesに移動
  2. KafkaプロバイダーへのPrivate Link接続を作成
  3. kafka_broker_listでプライベートエンドポイントを使用

Routine Loadパラメータ

パラメータ説明
kafka_broker_listKafkaブローカーアドレス(複数の場合はカンマ区切り)
kafka_topic消費するトピック
property.security.protocolクラウドKafkaの場合はSASL_SSL、認証なしの場合はPLAINTEXT
property.sasl.mechanismConfluent Cloudの場合はPLAIN、その他の場合はSCRAM-SHA-512(SCRAM-SHA-256ではない)
property.sasl.usernameSASLユーザー名
property.sasl.passwordSASLパスワード
property.ssl.ca.locationアップロードされたCA証明書へのパス(例:FILE:isrgrootx1.pem)。Confluent Cloudでは不要
property.enable.ssl.certificate.verificationConfluent Cloudの場合はfalseに設定(証明書不要)
property.group.idコンシューマーグループID
property.kafka_default_offsetsOFFSET_BEGINNINGOFFSET_END、または特定のオフセット

トラブルシューティング

問題解決方法
"Broker transport failure"Confluent Cloudの場合:PLAINを使用。その他の場合:SCRAM-SHA-512を使用(SCRAM-SHA-256ではない)
"SSL handshake failed"CA証明書がプロバイダーと一致することを確認(RedpandaはISRG Root X1、MSKはAmazon Root CA 1)
"認証 failed"ユーザー名/パスワードを確認、ユーザーがトピック権限を持つことを確認
"Topic authorization failed"ACL権限を付与:rpk security acl create --allow-principal "User:xxx" --operation read,describe --topic xxx --group "*"
接続タイムアウトパブリックアクセスを有効化またはPrivate Linkを設定
ジョブの自動一時停止:"max_error_number exceeded"JSONとTable間のスキーマ不一致。jsonpathsがJSONフィールドと一致することを確認。エラーの一部を許容するために"max_filter_ratio" = "0.5"を追加
ジョブステータスがPAUSEDを表示SHOW ROUTINE LOAD FOR <job_name>;で理由を確認してから修正し、RESUME ROUTINE LOAD FOR <job_name>;を実行
ファイルが既に存在作成前にSHOW FILE;で既存ファイルを確認

デバッグのヒント

ジョブエラーの詳細を確認:

SHOW ROUTINE LOAD FOR demo.kafka_load;
-- Look at ReasonOfStateChanged and ErrorLogUrls columns

タスクレベルのステータスを表示:

SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";

新しいメッセージのみでテストする(古いデータをスキップ):

-- Use OFFSET_END to start from latest messages
"property.kafka_default_offsets" = "OFFSET_END"

リファレンス