メインコンテンツまでスキップ
バージョン: 4.x

Confluent Cloudに接続

このガイドでは、ビジュアルインターフェースを使用してVeloDB CloudをConfluent Cloud Kafkaに接続する手順を説明します。

警告

前提条件: 先に進む前に、Confluent Cloud Setup Guideを完了して、クラスター、APIキー、およびサンプルデータトピックを作成してください。

ステップ1: Importに移動

VeloDBウェアハウスで、左サイドバーのDataセクションを探し、Importをクリックします。

Createをクリックして新しいインポートジョブを開始します。

New Import Page

ステップ2: Confluent Cloudを選択

Event Streamsの下で、Confluent Cloudをクリックしてストリーミングインポートウィザードを開始します。

ステップ3: 接続を設定

Confluent Cloud Setupからの接続詳細を入力します:

Connection 構成

フィールド説明
Task Nameこのインポートジョブの一意の名前confluent_orders
Consumer GroupKafkaコンシューマーグループIDvelodb-consumer
BrokersBootstrapサーバーURLpkc-xxxxx.us-east-1.aws.confluent.cloud:9092
API KeyConfluent Cloud APIキーYour API Key
API SecretConfluent Cloud APIシークレットYour API Secret
SASL Mechanism認証方法PLAIN

すべてのフィールドを入力後、Nextをクリックします。

ステップ4: ソースデータを選択

Kafkaトピックとデータフォーマットを設定します:

Source Data 構成

フィールド説明
TopicドロップダウンからKafkaトピックを選択
Offset Selectionすべてのデータを読み込む場合はFrom beginning、新しいデータのみの場合はFrom latest
Data FormatJSONを選択
データフォーマット

VeloDBはJSONCSVフォーマットのみをサポートします。ConfluentトピックでAVROを使用している場合は、Confluent CloudでJSONフォーマットの新しいトピックを作成する必要があります。

Nextをクリックして続行します。

ステップ5: 宛先Tableを設定

VeloDBはKafkaメッセージからスキーマを自動的に検出します:

Data Preview

Data Previewセクションには、トピックからのサンプルレコードが表示されます。

Table設定を構成

Destination Table

フィールド説明
Load Data to新しいTableを作成するにはNew Tableを選択
Databaseデータベースを選択または作成
TableTable名を入力

カラム設定

カラムマッピングを確認して調整します:

Column Settings

設定説明
Source FieldKafkaメッセージからのJSONフィールド
Column NameVeloDBTableのカラム名
Data タイプVeloDBデータタイプ(自動検出)

詳細設定

設定説明
Table Models追記専用の場合はDUPLICATE、アップサートの場合はUNIQUE
Sorting Keyデータ順序付けのためのカラム
バケット Keyデータ分散のためのカラム
バケット NumberAUTOを推奨

Nextをクリックして続行します。

ステップ6: 設定を構成

インポートジョブの設定を調整します:

Settings

設定デフォルト説明
Concurrency256並列コンシューマー数
Max Batch Interval (s)60コミット前の最大待機時間
Max Batch Rows20000000バッチあたりの最大行数
Max Batch Size (MB)1024最大バッチサイズ

デフォルトは大部分のユースケースで適切に動作します。Nextをクリックして続行します。

ステップ7: 検証

VeloDBが設定を検証します:

Verification

チェックリストが以下を検証します:

  • Kafka Validation - Confluent Cloudへの接続
  • Warehouse Connectivity Test - VeloDBがKafkaに到達可能か
  • User 許可 Check - インポート作成の権限があるか

すべてのチェックがSucceededと表示された場合、Startをクリックしてデータストリーミングを開始します。


データインポートを確認

インポートを開始した後、データが流れているか確認します:

インポートステータスを確認

サイドバーのImportに移動してジョブステータスを確認します:

  • RUNNING - ジョブがアクティブにデータを消費中
  • PAUSED - ジョブが一時停止中(エラーを確認)

データをクエリ

SQL Editorに移動して次を実行します:

-- Check row count
SELECT COUNT(*) FROM your_database.your_table;

-- View sample data
SELECT * FROM your_database.your_table LIMIT 10;

インポートジョブの管理

アクション方法
一時停止ジョブをクリックし、Pauseをクリック
再開ジョブをクリックし、Resumeをクリック
削除ジョブをクリックし、Deleteをクリック

またはSQLを使用:

-- Pause job
PAUSE ROUTINE LOAD FOR database.job_name;

-- Resume job
RESUME ROUTINE LOAD FOR database.job_name;

-- Stop job
STOP ROUTINE LOAD FOR database.job_name;

-- View job status
SHOW ROUTINE LOAD FOR database.job_name;

トラブルシューティング

問題解決方法
"Incorrect credentials"Confluent CloudからAPI KeyとSecretを確認してください
"Broker transport failure"SASL MechanismがPLAINに設定されていることを確認してください
"Topic not found"トピック名が正確に一致することを確認してください(大文字小文字を区別)
JSON parse errorConfluentトピックがAVROではなくJSON形式を使用していることを確認してください
Job paused with errorsエラーの詳細についてSHOW ROUTINE LOADを確認してください

参考資料