メインコンテンツまでスキップ
バージョン: 26.x

KafkaからVeloDBへのデータ取り込み(Routine Loadを使用)

Routine Loadは、Kafkaトピックからメッセージを継続的に消費し、VeloDBTableにロードする永続的なストリーミングジョブを作成します。

前提条件

VeloDB Cloudアカウント(必須)

アクティブなクラスターを持つVeloDB Cloudアカウントが必要です。アカウントをお持ちでない場合は、VeloDB Quick Start Guideに従ってアカウントとクラスターを作成してください。

VeloDBでSQLクエリを実行し、データベースを作成するには、VeloDB Consoleサイドバーの組み込みSQL Editorを使用してください:

SQL Editor

Kafkaソース要件

要件詳細
Kafkaバージョン0.10.0.0以上(推奨)
データ形式JSONまたはCSV
ネットワークアクセスVeloDBへのパブリックエンドポイントまたはPrivate Link

クラウドKafkaプロバイダー

ネットワーク: デフォルトでパブリックアクセスが有効

認証: API KeyとSecretを使用するSASL/PLAIN

SSL証明書: 不要(証明書検証を無効にする)

セットアップ:

  1. Cluster SettingsからBootstrapサーバーを取得
  2. Cluster 概要 → API KeysでAPI Keyを作成
  3. サンプルデータには、JSON形式のDatagen Sourceコネクターを使用
Critical Requirement
  • Confluent Cloud: SASL/PLAIN認証をAPI Key/Secretで使用
  • その他のKafkaプロバイダー: SCRAM-SHA-512を使用(SCRAM-SHA-256は使用しない。「Broker transport failure」エラーで失敗する)

1. VeloDBでSSL証明書を作成

SQL Editorで、CREATE FILEを使用してSSL証明書をアップロードします。

Important

証明書ファイルは、Routine Loadジョブを作成する同じデータベースに作成する必要があります。まずデータベースを作成または選択してから、ファイルを作成してください。

SSL証明書は不要です。 Confluent CloudはSSLを内部で処理します。

この手順をスキップし、ターゲットTableの作成に進んでください。

Routine Loadジョブを作成する際は、以下を使用してください:

"property.enable.ssl.certificate.verification" = "false"
備考

ファイル作成後、Routine LoadでFILE:<filename>.pemとして参照できます。

ヒント

現在のデータベースの既存証明書を確認するには:SHOW FILE;


2. (オプション) サンプルデータを含むトピックの作成

既存のKafkaトピックがない場合、テスト用にサンプルデータを含むトピックを作成してください。

Confluent Cloud ConsoleでDatagen Sourceコネクターを使用します:

  1. クラスターのConnectorsに移動
  2. Add connectorをクリック → Datagen Sourceを検索
  3. 設定:
    • Topic: トピック名を入力(例:orders
    • Output record value format: JSONを選択(必須 - VeloDBはAVROをサポートしていません)
    • Template: テンプレートを選択(例:OrdersUsersPageviews
  4. 残りのステップでContinueをクリック
  5. コネクターを起動

コネクターは自動的にトピックを作成し、サンプルメッセージを生成します。

Important

出力形式としてJSONを選択してください。AVROおよびJSON_SR形式はVeloDB Routine Loadでサポートされていません。


3. VeloDBでターゲットTableを作成

KafkaメッセージフォーマットをベースにVeloDBTableスキーマを設計してください。

スキーママッピング: KafkaからVeloDBへ

JSONメッセージ例:

{"event_time": "2025-01-15 10:30:00", "user_id": 1001, "event_type": "page_view", "page": "/home"}

VeloDBTableへのマッピング:

Kafka JSON FieldJSON タイプVeloDB ColumnVeloDB タイプ
event_timestringevent_timeDATETIME
user_idnumberuser_idBIGINT
event_typestringevent_typeVARCHAR(50)
pagestringpageVARCHAR(255)

型マッピング参考資料

VeloDBの完全なデータ型については、Data タイプ 概要を参照してください。

Kafka/JSON タイプVeloDB タイプ
string (datetime)DATETIME, DATE
stringVARCHAR(n), STRING, TEXT
integerINT, BIGINT, SMALLINT, TINYINT
number (decimal)DOUBLE, FLOAT, DECIMAL(p,s)
booleanBOOLEAN
nested objectJSON, VARIANT
arrayARRAY<T>, JSON

Tableの作成

SQL Editorで、データベースとTableを作成します:

CREATE DATABASE IF NOT EXISTS demo;

USE demo;

CREATE TABLE events (
event_time DATETIME NOT NULL,
user_id BIGINT NOT NULL,
event_type VARCHAR(50),
page VARCHAR(255)
)
DUPLICATE KEY(event_time, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
ヒント

Routine Loadのjsonpathsパラメータは、JSON フィールドをTableカラムに順序通りにマッピングします。Tableカラムがjsonpathsで指定された順序と一致することを確認してください。


4. Routine Load ジョブの作成

CREATE ROUTINE LOADを使用してKafkaからの消費を開始します:

CREATE ROUTINE LOAD demo.kafka_load ON events
COLUMNS(event_time, user_id, event_type, page)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[\"$.event_time\",\"$.user_id\",\"$.event_type\",\"$.page\"]"
)
FROM KAFKA (
"kafka_broker_list" = "<BOOTSTRAP_SERVER>",
"kafka_topic" = "<YOUR_TOPIC>",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol" = "SASL_SSL",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "<API_KEY>",
"property.sasl.password" = "<API_SECRET>",
"property.enable.ssl.certificate.verification" = "false",
"property.group.id" = "velodb-consumer"
);

プレースホルダーを置き換えてください:

プレースホルダー
<BOOTSTRAP_SERVER>pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
<YOUR_TOPIC>orders
<API_KEY>Your Confluent Cloud API Key
<API_SECRET>Your Confluent Cloud API Secret
Confluent Cloud接続詳細の確認方法
  • Bootstrap server: Cluster 概要 → Cluster Settings
  • API Key/Secret: Cluster 概要 → API Keys → Create key
データフォーマット

Confluent Cloud DatagenコネクタはJSONフォーマットを使用する必要があります。AVROおよびJSON_SRフォーマットはVeloDB Routine Loadでサポートされていません。


5. 検証

Routine Loadジョブのステータスを確認:

SHOW ROUTINE LOAD FOR demo.kafka_load;

期待される出力ではRUNNING状態が表示されます:

+-------+-------------+-----------+--------+-----------+
| Id | Name | TableName | State | DataSource|
+-------+-------------+-----------+--------+-----------+
| 12345 | kafka_load | events | RUNNING| KAFKA |
+-------+-------------+-----------+--------+-----------+

読み込まれたデータの確認:

SELECT * FROM demo.events ORDER BY event_time LIMIT 10;

期待される結果:

+---------------------+---------+------------+------------+
| event_time | user_id | event_type | page |
+---------------------+---------+------------+------------+
| 2025-01-15 10:30:00 | 1001 | page_view | /home |
| 2025-01-15 10:31:00 | 1002 | click | /products |
| 2025-01-15 10:32:00 | 1001 | purchase | /checkout |
+---------------------+---------+------------+------------+

タスクの進行状況を監視:

SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";

6. Routine Loadジョブの管理

アクションSQLコマンド
ジョブを一時停止PAUSE ROUTINE LOAD FOR demo.kafka_load;
ジョブを再開RESUME ROUTINE LOAD FOR demo.kafka_load;
ジョブを停止STOP ROUTINE LOAD FOR demo.kafka_load;
全てのジョブを表示SHOW ALL ROUTINE LOAD;

ネットワーク接続

オプション1: パブリックネットワークアクセス

Kafkaプロバイダーが提供するパブリックエンドポイントを使用します。VeloDB Cloudがポート9092(または設定されたポート)でブローカーに到達できることを確認してください。

オプション2: Private Link(本番環境推奨)

本番ワークロードでは、クラウドプロバイダーのネットワーク内でトラフィックを保持するためにPrivate Linkを設定します:

  1. VeloDB ConsoleのSettings → Endpoint Servicesに移動
  2. KafkaプロバイダーへのPrivate Link接続を作成
  3. kafka_broker_listでプライベートエンドポイントを使用

Routine Loadパラメータ

パラメータ説明
kafka_broker_listKafkaブローカーアドレス(複数の場合はカンマ区切り)
kafka_topic消費するトピック
property.security.protocolクラウドKafkaにはSASL_SSL、認証なしにはPLAINTEXT
property.sasl.mechanismConfluent CloudにはPLAIN、その他にはSCRAM-SHA-512(SCRAM-SHA-256ではない)
property.sasl.usernameSASLユーザー名
property.sasl.passwordSASLパスワード
property.ssl.ca.locationアップロードされたCA証明書のパス(例:FILE:isrgrootx1.pem)。Confluent Cloudでは不要
property.enable.ssl.certificate.verificationConfluent Cloudではfalseに設定(証明書不要)
property.group.idコンシューマーグループID
property.kafka_default_offsetsOFFSET_BEGINNINGOFFSET_END、または特定のオフセット

トラブルシューティング

問題解決策
"Broker transport failure"Confluent Cloudの場合:PLAINを使用。その他の場合:SCRAM-SHA-512を使用(SCRAM-SHA-256ではない)
"SSL handshake failed"CA証明書がプロバイダーと一致することを確認(RedpandaにはISRG Root X1、MSKにはAmazon Root CA 1)
"認証 failed"ユーザー名/パスワードを確認し、ユーザーがトピック権限を持っていることを確認
"Topic authorization failed"ACL権限を付与:rpk security acl create --allow-principal "User:xxx" --operation read,describe --topic xxx --group "*"
接続タイムアウトパブリックアクセスを有効化するかPrivate Linkを設定
ジョブの自動一時停止:"max_error_number exceeded"JSONとTable間のスキーマの不一致。jsonpathsがJSONフィールドと一致することを確認。一部のエラーを許容するために"max_filter_ratio" = "0.5"を追加
ジョブステータスがPAUSEDを表示SHOW ROUTINE LOAD FOR <job_name>;で理由を確認し、修正後にRESUME ROUTINE LOAD FOR <job_name>;を実行
ファイルが既に存在作成前にSHOW FILE;で既存ファイルを確認

デバッグのヒント

ジョブエラーの詳細を確認:

SHOW ROUTINE LOAD FOR demo.kafka_load;
-- Look at ReasonOfStateChanged and ErrorLogUrls columns

タスクレベルのステータスを表示:

SHOW ROUTINE LOAD TASK WHERE JobName = "kafka_load";

新しいメッセージのみでテスト(古いデータをスキップ):

-- Use OFFSET_END to start from latest messages
"property.kafka_default_offsets" = "OFFSET_END"

参考資料