メインコンテンツまでスキップ
バージョン: 26.x

Kafka から VeloDB へのデータ取り込み(Routine Load を使用)

Routine Load は、Kafka トピックからメッセージを継続的に消費し、VeloDB テーブルに読み込む永続的なストリーミングジョブを作成します。

前提条件

VeloDB Cloud アカウント(必須)

アクティブなクラスタを持つ VeloDB Cloud アカウントが必要です。アカウントをお持ちでない場合は、VeloDB Quick Start Guide に従ってアカウントとクラスタを作成してください。

VeloDB で SQL クエリを実行し、データベースを作成するには、VeloDB Console サイドバーの組み込み SQL Editor を使用します:

SQL Editor

Kafka ソース要件

要件詳細
Kafka バージョン0.10.0.0 以上(推奨)
データ形式JSON または CSV
ネットワークアクセスVeloDB へのパブリックエンドポイントまたは Private Link

クラウド Kafka プロバイダー

ネットワーク: デフォルトでパブリックアクセスが有効

認証: API Key と Secret を使用した SASL/PLAIN

SSL 証明書: 不要(証明書検証を無効にする)

セットアップ:

  1. Cluster Settings から Bootstrap サーバーを取得
  2. Cluster Overview → API Keys で API Key を作成
  3. サンプルデータには、JSON 形式の Datagen Source コネクターを使用
重要な要件
  • Confluent Cloud: API Key/SecretでSASL/PLAIN認証を使用します
  • その他のKafkaプロバイダー: SCRAM-SHA-512を使用してください(SCRAM-SHA-256は「Broker transport failure」エラーで失敗します)

1. VeloDBでSSL証明書を作成

SQL Editorで、CREATE FILEを使用してSSL証明書をアップロードします。

重要

証明書ファイルは、Routine Loadジョブを作成する同じデータベース内に作成する必要があります。最初にデータベースを作成または選択してから、ファイルを作成してください。

SSL証明書は不要です。 Confluent CloudはSSLを内部で処理します。

この手順をスキップして、ターゲットテーブルの作成に進んでください。

Routine Loadジョブを作成する際は、以下を使用してください:

"property.enable.ssl.certificate.verification" = "false"
備考

ファイル作成後、Routine LoadでFILE:<filename>.pemとして参照できます。

ヒント

現在のデータベースの既存証明書を確認するには:SHOW FILE;


2. (任意) サンプルデータ付きトピックの作成

既存のKafkaトピックがない場合は、テスト用のサンプルデータ付きトピックを作成してください。

Confluent Cloud ConsoleでDatagen Sourceコネクタを使用:

  1. クラスター内のConnectorsに移動
  2. Add connectorをクリック → Datagen Sourceを検索
  3. 設定:
    • Topic:トピック名を入力(例:orders
    • Output record value formatJSONを選択(必須 - VeloDBはAVROをサポートしません)
    • Template:テンプレートを選択(例:OrdersUsersPageviews
  4. 残りのステップでContinueをクリック
  5. コネクタを起動

コネクタは自動的にトピックを作成し、サンプルメッセージを生成します。

Important

出力フォーマットとしてJSONを選択してください。AVROおよびJSON_SRフォーマットはVeloDB Routine Loadでサポートされていません。


3. VeloDBでターゲットテーブルを作成

Kafkaメッセージフォーマットに基づいてVeloDBテーブルスキーマを設計します。

スキーママッピング: KafkaからVeloDBへ

JSONメッセージの例:

{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}

VeloDBテーブルへのマッピング:

Kafka JSON FieldJSON TypeVeloDB ColumnVeloDB Type
event_timestringevent_timeDATETIME
user_idnumberuser_idBIGINT
event_typestringevent_typeVARCHAR(50)
pagestringpageVARCHAR(255)

型マッピングリファレンス

VeloDBデータ型の完全な情報については、Data Type Overviewを参照してください。

Kafka/JSON TypeVeloDB Type
string (datetime)DATETIME, DATE
stringVARCHAR(n), STRING, TEXT
integerINT, BIGINT, SMALLINT, TINYINT
number (decimal)DOUBLE, FLOAT, DECIMAL(p,s)
booleanBOOLEAN
nested objectJSON, VARIANT
arrayARRAY<T>, JSON

テーブルの作成

SQL Editorで、データベースとテーブルを作成します:

CREATE DATABASE IF NOT EXISTS demo;

USE demo;

CREATE TABLE events (
event_time DATETIME NOT NULL,
user_id BIGINT NOT NULL,
event_type VARCHAR(50),
page VARCHAR(255)
)
DUPLICATE KEY(event_time, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
ヒント

Routine Loadのjsonpathsパラメータは、JSON フィールドをテーブルカラムに順序通りにマップします。テーブルカラムがjsonpathsで指定された順序と一致することを確認してください。


4. Routine Load ジョブの作成

CREATE ROUTINE LOADを使用してKafkaからの消費を開始します:

CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<BOOTSTRAP_SERVER>",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "<API_KEY>",
"property.sasl.password" = "<API_SECRET>",
"property.enable.ssl.certificate.verification" = "false",
"property.group.id" = "velodb-consumer"
);

プレースホルダーを置き換える:

プレースホルダー
<BOOTSTRAP_SERVER>pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
<YOUR_TOPIC>orders
<API_KEY>Your Confluent Cloud API Key
<API_SECRET>Your Confluent Cloud API Secret
Confluent Cloud接続詳細の確認方法
  • Bootstrap server: Cluster Overview → Cluster Settings
  • API Key/Secret: Cluster Overview → API Keys → Create key
データ形式

Confluent Cloud Datagen connectorはJSON形式を使用する必要があります。AVROおよびJSON_SR形式はVeloDB Routine Loadではサポートされていません。


5. 検証

Routine Loadジョブステータスを確認:

SHOW ROUTINE LOAD FOR demo.kafka_load;

期待される出力ではRUNNING状態が表示されます:

+-------+-------------+-----------+--------+-----------+
| Id | Name | TableName | State | DataSource|
+-------+-------------+-----------+--------+-----------+
| 12345 | kafka_load | events | RUNNING| KAFKA |
+-------+-------------+-----------+--------+-----------+

読み込まれたデータを確認:

SELECT * FROM demo.events ORDER BY event_time LIMIT 10;

期待される結果:

+---------------------+---------+------------+------------+
| event_time | user_id | event_type | page |
+---------------------+---------+------------+------------+
| 2025-01-15 10:30:00 | 1001 | page_view | /home |
| 2025-01-15 10:31:00 | 1002 | click | /products |
| 2025-01-15 10:32:00 | 1001 | purchase | /checkout |
+---------------------+---------+------------+------------+

タスクの進行状況を監視:

SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";

6. Routine Loadジョブの管理

アクションSQLコマンド
ジョブの一時停止PAUSE ROUTINE LOAD FOR demo.kafka_load;
ジョブの再開RESUME ROUTINE LOAD FOR demo.kafka_load;
ジョブの停止STOP ROUTINE LOAD FOR demo.kafka_load;
全ジョブの表示SHOW ALL ROUTINE LOAD;

ネットワーク接続

オプション1: パブリックネットワークアクセス

Kafkaプロバイダーが提供するパブリックエンドポイントを使用します。VeloDB CloudがポートKafkaがプロバイダーが提供するパブリックエンドポイントを使用します。VeloDB Cloudがポート9092(または設定されたポート)でブローカーにアクセスできることを確認してください。

オプション2: Private Link(本番環境推奨)

本番環境のワークロードでは、クラウドプロバイダーのネットワーク内でトラフィックを保持するためにPrivate Linkを設定します:

  1. VeloDB ConsoleでSettings → Endpoint Servicesに移動
  2. KafkaプロバイダーへのPrivate Link接続を作成
  3. kafka_broker_listでプライベートエンドポイントを使用

Routine Loadパラメータ

パラメータ説明
kafka_broker_listKafkaブローカーアドレス(複数の場合はカンマ区切り)
kafka_topic消費対象のトピック
property.security.protocolクラウドKafkaの場合はSASL_SSL、認証なしの場合はPLAINTEXT
property.sasl.mechanismConfluent Cloudの場合はPLAIN、その他の場合はSCRAM-SHA-512(SCRAM-SHA-256ではない)
property.sasl.usernameSASLユーザー名
property.sasl.passwordSASLパスワード
property.ssl.ca.locationアップロードされたCA証明書へのパス(例:FILE:isrgrootx1.pem)。Confluent Cloudでは不要
property.enable.ssl.certificate.verificationConfluent Cloudの場合はfalseに設定(証明書不要)
property.group.idコンシューマーグループID
property.kafka_default_offsetsOFFSET_BEGINNINGOFFSET_END、または特定のオフセット

トラブルシューティング

問題解決方法
"Broker transport failure"Confluent Cloudの場合:PLAINを使用。その他の場合:SCRAM-SHA-512を使用(SCRAM-SHA-256ではない)
"SSL handshake failed"CA証明書がプロバイダーと一致することを確認(RedpandaはISRG Root X1、MSKはAmazon Root CA 1)
"Authentication failed"ユーザー名/パスワードを確認し、ユーザーがトピック権限を持っていることを確認
"Topic authorization failed"ACL権限を付与:rpk security acl create --allow-principal "User:xxx" --operation read,describe --topic xxx --group "*"
接続タイムアウトパブリックアクセスを有効にするかPrivate Linkを設定
ジョブの自動一時停止:"max_error_number exceeded"JSONとテーブル間のスキーマ不一致。jsonpathsがJSONフィールドと一致することを確認。一部のエラーを許容するために"max_filter_ratio" = "0.5"を追加
ジョブステータスがPAUSEDを表示SHOW ROUTINE LOAD FOR <job_name>;で理由を確認し、修正後にRESUME ROUTINE LOAD FOR <job_name>;を実行
ファイルが既に存在作成前にSHOW FILE;で既存ファイルを確認

デバッグのヒント

ジョブエラーの詳細を確認:

SHOW ROUTINE LOAD FOR demo.kafka_load;
-- Look at ReasonOfStateChanged and ErrorLogUrls columns

タスクレベルのステータスを表示:

SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";

新しいメッセージのみでテスト(古いデータをスキップ):

-- Use OFFSET_END to start from latest messages
"property.kafka_default_offsets" = "OFFSET_END"

参考資料