メインコンテンツまでスキップ
バージョン: 26.x

AutoMQ Load

AutoMQは、S3のようなオブジェクトストレージにストレージを分離することでクラウドネイティブ化されたKafkaのフォークです。Apache Kafka®との100%の互換性を保ちながら、最大10倍のコスト効率性と100倍の弾力性をユーザーに提供します。革新的な共有ストレージアーキテクチャにより、高いスループットと低レイテンシを確保しながら、数秒でのパーティション再配置、セルフバランシング、オートスケーリングなどの機能を実現しています。 AutoMQ Storage Architecture

この記事では、Apache Doris Routine Loadを使用してAutoMQからDorisにデータをインポートする方法について説明します。Routine Loadの詳細については、Routine Loadドキュメントを参照してください。

環境の準備

Apache Dorisとテストデータの準備

動作するApache Dorisクラスターが既にセットアップされていることを確認してください。デモンストレーション目的で、Quick Startedドキュメントに従ってLinux上にテスト用のApache Doris環境をデプロイしています。 データベースとテストテーブルを作成します:

create database automq_db;
CREATE TABLE automq_db.users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL

) DISTRIBUTED BY hash (id) PROPERTIES ('replication_num' = '1');

Kafka Command Line Tools の準備

AutoMQ Releases から最新のTGZパッケージをダウンロードして展開してください。展開ディレクトリを$AUTOMQ_HOMEとすると、この記事では$AUTOMQ_HOME/bin配下のスクリプトを使用してトピックの作成とテストデータの生成を行います。

AutoMQ とテストデータの準備

AutoMQ 公式デプロイメントドキュメント を参照して機能的なクラスターをデプロイし、AutoMQ と Apache Doris 間のネットワーク接続を確保してください。 以下の手順に従って、AutoMQ で example_topic という名前のトピックを素早く作成し、テスト用のJSONデータを書き込みます。

トピックの作成

AutoMQ の Apache Kafka® command line tool を使用してトピックを作成します。Kafka 環境にアクセスできること、および Kafka サービスが実行されていることを確認してください。トピックを作成するコマンドの例は以下の通りです:

$AUTOMQ_HOME/bin/kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 127.0.0.1:9092  --partitions 1 --replication-factor 1

ヒント: コマンドを実行する際は、topicbootstarp-serverを実際のAutoMQ Bootstrap Serverアドレスに置き換えてください。

topicを作成した後、以下のコマンドを使用してtopicが正常に作成されたことを確認できます。

$AUTOMQ_HOME/bin/kafka-topics.sh --describe example_topic --bootstrap-server 127.0.0.1:9092

テストデータの生成

前述のテーブルに対応するJSON形式のテストデータエントリを作成します。

{
"id": 1,
"name": "testuser",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

テストデータの書き込み

Kafkaのコマンドラインツールまたはプログラミングアプローチを使用して、example_topicという名前のトピックにテストデータを書き込みます。以下はコマンドラインツールを使用した例です:

echo '{"id": 1, "name": "testuser", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic example_topic

トピックに書き込まれたデータを表示するには、以下のコマンドを使用してください:

sh $AUTOMQ_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic example_topic --from-beginning

ヒント: コマンドを実行する際は、topicbootstarp-serverを実際のAutoMQ Bootstrap Serverアドレスに置き換えてください。

Routine Loadインポートジョブの作成

Apache Dorisコマンドラインで、AutoMQ Kafkaトピックからデータを継続的にインポートするためのJSONデータを受け入れるRoutine Loadジョブを作成します。Routine Loadの詳細なパラメータ情報については、[Doris Routine Load]を参照してください。

CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "example_topic",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Tips: コマンドを実行する際は、kafka_broker_listを実際のAutoMQ Bootstrap Serverアドレスに置き換える必要があります。

データインポートの検証

まず、Routine Loadインポートジョブのステータスを確認して、タスクが実行されていることを確認します。

show routine load\G;

次に、Apache Doris データベースの関連テーブルをクエリすると、データが正常にインポートされたことが確認できます。

select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | testuser | 2023-11-10T12:00:00 | active |
| 2 | testuser | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)