メインコンテンツまでスキップ
バージョン: 4.x

Doris Kafka Connector

Kafka Connectは、Apache Kafkaと他のシステム間でのデータ伝送のためのスケーラブルで信頼性の高いツールです。Connectorは、Kafkaに対して大量のデータの入出力を定義できます。

Dorisコミュニティではdoris-kafka-connectorプラグインを提供しており、KafkaトピックのデータをDorisに書き込むことができます。

バージョンの説明

Connector VersionKafka VersionDoris VersionJava Version
1.0.02.4+2.0+8
1.1.02.4+2.0+8
24.0.02.4+2.0+8
25.0.02.4+2.0+8

使用方法

ダウンロード

doris-kafka-connector

maven依存関係

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>doris-kafka-connector</artifactId>
<version>25.0.0</version>
</dependency>

Standalone mode startup

$KAFKA_HOME 配下にpluginsディレクトリを作成し、ダウンロードしたdoris-kafka-connector jarパッケージをそこに配置してください
config/connect-standalone.properties を設定してください

# Modify broker address
bootstrap.servers=127.0.0.1:9092

# Modify to the created plugins directory
# Note: Please fill in the direct path to Kafka here. For example: plugin.path=/opt/kafka/plugins
plugin.path=$KAFKA_HOME/plugins

# It is recommended to increase the max.poll.interval.ms time of Kafka to more than 30 minutes, the default is 5 minutes
# Avoid Stream Load import data consumption timeout and consumers being kicked out of the consumer group
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000

doris-connector-sink.propertiesを設定する

configディレクトリにdoris-connector-sink.propertiesを作成し、以下の内容を設定してください:

name=test-doris-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=10.10.10.1
doris.http.port=8030
doris.query.port=9030
doris.user=root
doris.password=
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

Standalone を開始

$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/doris-connector-sink.properties
注記

注意: 本番環境では一般的にスタンドアロンモードの使用は推奨されません。

Distributed modeの起動

$KAFKA_HOME配下にpluginsディレクトリを作成し、ダウンロードしたdoris-kafka-connectorのjarパッケージをその中に配置します

config/connect-distributed.propertiesを設定します

# Modify kafka server address
bootstrap.servers=127.0.0.1:9092

# Modify group.id, the same cluster needs to be consistent
group.id=connect-cluster

# Modify to the created plugins directory
# Note: Please fill in the direct path to Kafka here. For example: plugin.path=/opt/kafka/plugins
plugin.path=$KAFKA_HOME/plugins

# It is recommended to increase the max.poll.interval.ms time of Kafka to more than 30 minutes, the default is 5 minutes
# Avoid Stream Load import data consumption timeout and consumers being kicked out of the consumer group
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000

分散開始

$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

Connectorを追加

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-doris-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'

Operation Connector

# View connector status
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/status -X GET
# Delete connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster -X DELETE
# Pause connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/pause -X PUT
# Restart connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/resume -X PUT
# Restart tasks within the connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/tasks/0/restart -X POST

参照:Connect REST Interface

注記

kafka-connectが初回起動時に、kafkaクラスタ内にkafka-connectの共有コネクタ設定を記録するため、3つのトピックconfig.storage.topicoffset.storage.topicstatus.storage.topicが作成されることに注意してください。オフセットデータとステータス更新が記録されます。How to Use Kafka Connect - Get Started

SSL認証されたKafkaクラスタへのアクセス

kafka-connectを通してSSL認証されたKafkaクラスタにアクセスするには、ユーザーはKafka Brokerの公開鍵を認証するために使用する証明書ファイル(client.truststore.jks)を提供する必要があります。connect-distributed.propertiesファイルに以下の設定を追加できます:

# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234

# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

kafka-connectを通じてSSL認証に接続されたKafkaクラスターの設定手順については、次を参照してください: Configure Kafka Connect

Dead letter queue

デフォルトでは、変換中またはその間に発生したエラーは、コネクターの失敗を引き起こします。各コネクター設定では、そのようなエラーをスキップすることで許容することもでき、オプションで各エラーと失敗した操作の詳細、および問題となったレコード(詳細レベルは様々)をログ記録のためのdead-letter queueに書き込むことができます。

errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

設定項目

KeyEnumデフォルト値Requiredデスクリプション
name--YConnect アプリケーション名。Kafka Connect 環境内で一意である必要があります
connector.class--Yorg.apache.doris.kafka.connector.DorisSinkConnector
topics--Y購読するトピックのリスト。カンマで区切ります。例: topic1, topic2
doris.urls--YDoris FE 接続アドレス。複数ある場合はカンマで区切ります。例: 10.20.30.1,10.20.30.2,10.20.30.3
doris.http.port--YDoris HTTPプロトコルポート
doris.query.port--YDoris MySQLプロトコルポート
doris.user--YDoris ユーザー名
doris.password--YDoris パスワード
doris.database--Y書き込み先のデータベース。複数のライブラリがある場合は空にできます。同時に、topic2table.map で具体的なライブラリ名を設定する必要があります。
doris.topic2table.map--Ntopic とTableの対応関係。例: topic1:tb1,topic2:tb2
デフォルトは空で、topic とTable名が一対一対応することを示します。
複数ライブラリの形式は topic1:db1.tbl1,topic2:db2.tbl2 です
buffer.count.records-50000Ndoris にフラッシュする前に各 Kafka パーティションがメモリにバッファするレコード数。デフォルト 50000 レコード
buffer.flush.time-120Nバッファリフレッシュ間隔(秒単位)。デフォルト 120 秒
buffer.size.bytes-10485760(100MB)N各 Kafka パーティションでメモリにバッファされるレコードの累積サイズ(バイト単位)。デフォルト 100MB
jmx-trueNJMX を通じて connector 内部監視指標を取得します。参考: Doris-Connector-JMX
enable.2pc-trueNStream Load の二相コミット(TwoPhaseCommit)を有効にするかどうか。デフォルトは true です。
enable.delete-falseNレコードを同期的に削除するかどうか。デフォルト false
label.prefix-${name}Nデータインポート時の Stream load ラベルプレフィックス。デフォルトは Connector アプリケーション名です。
auto.redirect-trueNStreamLoad リクエストをリダイレクトするかどうか。有効にすると、StreamLoad は FE を通じてデータを書き込む必要がある BE にリダイレクトされ、BE 情報は表示されなくなります。
sink.properties.*-'sink.properties.format':'json',
'sink.properties.read_json_by_line':'true'
NStream Load のインポートパラメータ。
例: カラム区切り文字を定義 'sink.properties.column_separator':','
詳細パラメータはこちらを参照

Group Commit を有効にする場合、例: sync_mode モードで group commit を有効にする: "sink.properties.group_commit":"sync_mode"。Group Commit は off_modesync_modeasync_mode の3つのモードで設定できます。具体的な使用方法は Group-Commit を参照してください

部分カラム更新を有効にする場合、例: 指定した col2 の部分カラム更新を有効にする: "sink.properties.partial_columns":"true""sink.properties.columns": " col2"
delivery.guaranteeat_least_once,
exactly_once
at_least_onceNKafka データを消費して Doris にインポートする際のデータ整合性保証方法。at_least_once exactly_once をサポートし、デフォルトは at_least_once です。データ exactly_once を保証するには Doris を 2.1.0 以上にアップグレードする必要があります
converter.modenormal,
debezium_ingestion
normalNConnector を使用して Kafka データを消費する際の上流データの型変換モード。
normal は Kafka 内のデータを通常通り消費し、型変換を行いません。
debezium_ingestion は Kafka 上流データが Debezium などの CDC(Changelog Data Capture)ツールを通じて収集される場合、上流データをサポートするために特別な型変換を行う必要があることを意味します。
debezium.schema.evolutionnone,
basic
noneNDebezium を使用して上流データベースシステム(MySQL など)を収集し、構造変更が発生した場合、追加されたフィールドを Doris に同期できます。
none は上流データベースシステムの構造が変更された場合、変更された構造は Doris に同期されないことを意味します。
basic は上流データベースのデータ変更操作を同期することを意味します。カラム構造の変更は危険な操作(Doris Table構造のカラムを誤って削除する可能性があります)であるため、現在は上流のカラム追加操作の同期のみをサポートしています。カラムがリネームされた場合、古いカラムは変更されず、Connector は対象Tableに新しいカラムを追加し、リネームされた新しいデータを新しいカラムにシンクします。
database.time_zone-UTCNconverter.modenormal モード以外の場合、日付データ型(datetime、date、timestamp など)のタイムゾーン変換を指定する方法を提供します。デフォルトは UTC タイムゾーンです。
avro.topic2schema.filepath--Nローカルに提供された Avro Schema ファイルを読み取ることで、Topic 内の Avro ファイル内容を解析し、Confluent が提供する Schema 登録センターからの分離を実現します。
この設定は key.converter または value.converter プレフィックスと組み合わせて使用する必要があります。例: avro-user と avro-product Topic のローカル Avro Schema ファイルを設定する場合: "value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"
具体的な使用方法は #32 を参照してください
record.tablename.field--Nこのパラメータを設定すると、一つの kafka topic からのデータを複数の doris Tableに流すことができます。設定詳細は #58 を参照してください
enable.combine.flushtrue,
false
falseN全パーティションからのデータをまとめて書き込むかどうか。デフォルト値は false です。有効にした場合、at_least_once セマンティクスのみが保証されます。
max.retries-10Nタスクを失敗させる前にエラー時に再試行する最大回数。
retry.interval.ms-6000Nエラー後、再試行を試みる前に待機する時間(ミリ秒)。
behavior.on.null.valuesignore,
fail
ignoreNnull 値を持つレコードの処理方法を定義します。

その他の Kafka Connect Sink 共通設定項目については、connect_configuring を参照してください

型マッピング

Doris-kafka-connector は論理型またはプリミティブ型マッピングを使用してカラムのデータ型を解決します。
プリミティブ型は Kafka connect の Schema を使用して表現される単純なデータ型を指します。論理データ型は通常、Struct 構造を使用して複合型、または日付と時刻型を表現します。

Kafka Primitive TypeDoris Type
INT8TINYINT
INT16SMALLINT
INT32INT
INT64BIGINT
FLOAT32FLOAT
FLOAT64DOUBLE
BOOLEANBOOLEAN
STRINGSTRING
BYTESSTRING
Kafka Logical TypeDoris Type
org.apache.kafka.connect.data.DecimalDECIMAL
org.apache.kafka.connect.data.DateDATE
org.apache.kafka.connect.data.TimeSTRING
org.apache.kafka.connect.data.TimestampDATETIME
Debezium Logical TypeDoris Type
io.debezium.time.DateDATE
io.debezium.time.TimeString
io.debezium.time.MicroTimeDATETIME
io.debezium.time.NanoTimeDATETIME
io.debezium.time.ZonedTimeDATETIME
io.debezium.time.TimestampDATETIME
io.debezium.time.MicroTimestampDATETIME
io.debezium.time.NanoTimestampDATETIME
io.debezium.time.ZonedTimestampDATETIME
io.debezium.data.VariableScaleDecimalDOUBLE

ベストプラクティス

プレーン JSON データの読み込み

  1. インポートデータサンプル
    Kafka には以下のサンプルデータがあります

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-data-topic --from-beginning
    {"user_id":1,"name":"Emily","age":25}
    {"user_id":2,"name":"Benjamin","age":35}
    {"user_id":3,"name":"Olivia","age":28}
    {"user_id":4,"name":"Alexander","age":60}
    {"user_id":5,"name":"Ava","age":17}
    {"user_id":6,"name":"William","age":69}
    {"user_id":7,"name":"Sophia","age":32}
    {"user_id":8,"name":"James","age":64}
    {"user_id":9,"name":"Emma","age":37}
    {"user_id":10,"name":"Liam","age":64}
  2. インポートが必要なTableを作成する
    Dorisで、インポートするTableを作成します。具体的な構文は以下の通りです

    CREATE TABLE test_db.test_kafka_connector_tbl(
    user_id BIGINT NOT NULL COMMENT "user id",
    name VARCHAR(20) COMMENT "name",
    age INT COMMENT "age"
    )
    DUPLICATE KEY(user_id)
    DISTRIBUTED BY HASH(user_id) BUCKETS 12;
  3. インポートタスクを作成する
    Kafka-connectがデプロイされているマシン上で、curlコマンドを通じて以下のインポートタスクを送信します

    curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
    "name":"test-doris-sink-cluster",
    "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "tasks.max":"10",
    "topics":"test-data-topic",
    "doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"10.10.10.1",
    "doris.user":"root",
    "doris.password":"",
    "doris.http.port":"8030",
    "doris.query.port":"9030",
    "doris.database":"test_db",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
    }
    }'

Debezium コンポーネントによって収集されたデータの読み込み

  1. MySQL データベースには以下のTableがあります
   CREATE TABLE test.test_user (
user_id int NOT NULL ,
name varchar(20),
age int,
PRIMARY KEY (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

insert into test.test_user values(1,'zhangsan',20);
insert into test.test_user values(2,'lisi',21);
insert into test.test_user values(3,'wangwu',22);
  1. DorisでインポートされたTableを作成する
   CREATE TABLE test_db.test_user(
user_id BIGINT NOT NULL COMMENT "user id",
name VARCHAR(20) COMMENT "name",
age INT COMMENT "age"
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;
  1. MySQL用のDebeziumコネクタコンポーネントをデプロイします。参照:Debezium connector for MySQL
  2. doris-kafka-connectorインポートタスクを作成します
    Debeziumを通じて収集されたMySQLTableデータがmysql_debezium.test.test_user Topicにあると仮定します
   curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-debezium-doris-sink",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max":"10",
"topics":"mysql_debezium.test.test_user",
"doris.topic2table.map": "mysql_debezium.test.test_user:test_user",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"converter.mode":"debezium_ingestion",
"enable.delete":"true",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'

Avroシリアライズされたデータの読み込み

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ 
"name":"doris-avro-test",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"avro_topic",
"tasks.max":"10",
"doris.topic2table.map": "avro_topic:avro_tab",
"buffer.count.records":"100000",
"buffer.flush.time":"120",
"buffer.size.bytes":"10000000",
"doris.urls":"127.0.0.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test",
"load.model":"stream_load",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://127.0.0.1:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://127.0.0.1:8081"
}
}'

Protobuf シリアル化データの読み込み

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ 
"name":"doris-protobuf-test",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"proto_topic",
"tasks.max":"10",
"doris.topic2table.map": "proto_topic:proto_tab",
"buffer.count.records":"100000",
"buffer.flush.time":"120",
"buffer.size.bytes":"10000000",
"doris.urls":"127.0.0.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test",
"load.model":"stream_load",
"key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"key.converter.schema.registry.url":"http://127.0.0.1:8081",
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://127.0.0.1:8081"
}
}'

Kafka Connect Single Message Transformsを使用したデータの読み込み

例として、以下の形式のデータを考えてみましょう:

{
"registertime": 1513885135404,
"userid": "User_9",
"regionid": "Region_3",
"gender": "MALE"
}

Kafkaメッセージにハードコードされた列を追加するには、InsertFieldを使用できます。さらに、TimestampConverterを使用してBigint型のタイムスタンプを時刻文字列に変換できます。

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name": "insert_field_tranform",
"config": {
"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max": "1",
"topics": "users",
"doris.topic2table.map": "users:kf_users",
"buffer.count.records": "10",
"buffer.flush.time": "11",
"buffer.size.bytes": "5000000",
"doris.urls": "127.0.0.1:8030",
"doris.user": "root",
"doris.password": "123456",
"doris.http.port": "8030",
"doris.query.port": "9030",
"doris.database": "testdb",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "InsertField,TimestampConverter",
// Insert Static Field
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "repo",
"transforms.InsertField.static.value": "Apache Doris",
// Convert Timestamp Format
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "registertime",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.TimestampConverter.target.type": "string"
}
}'

InsertFieldとTimestampConverterの変換後、データは次のようになります:

{
"userid": "User_9",
"regionid": "Region_3",
"gender": "MALE",
"repo": "Apache Doris",// Static field added
"registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string
}

Kafka Connect Single Message Transforms (SMT)のその他の例については、SMT ドキュメントを参照してください。

FAQ

1. Json型データの読み取り時に以下のエラーが発生する場合:

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:337)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)

reason: これは、org.apache.kafka.connect.json.JsonConverter コンバーターを使用する場合、"schema" と "payload" フィールドを一致させる必要があるためです。

2つの解決方法から1つを選択してください:

  1. org.apache.kafka.connect.json.JsonConverterorg.apache.kafka.connect.storage.StringConverter に置き換える
  2. 起動モードが Standalone モードの場合、config/connect-standalone.properties の value.converter.schemas.enable または key.converter.schemas.enable を false に変更する; 起動モードが Distributed モードの場合、config/connect-distributed.properties の value.converter.schemas.enable または key.converter.schemas.enable を false に変更する

2. 消費がタイムアウトし、コンシューマーが消費グループから除外される:

org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1318)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:1127)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1093)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1590)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:361)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:376)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:467)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:381)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)

解決方法:

シナリオに応じてKafkaのmax.poll.interval.msを増加させます。デフォルト値は300000です

  • Standaloneモードで起動している場合は、config/connect-standalone.propertiesの設定ファイルにmax.poll.interval.msconsumer.max.poll.interval.msパラメータを追加し、パラメータ値を設定します。
  • Distributedモードで起動している場合は、config/connect-distributed.propertiesの設定ファイルにmax.poll.interval.msconsumer.max.poll.interval.msパラメータを追加し、パラメータ値を設定します。

パラメータを調整した後、kafka-connectを再起動してください

3. Doris-kafka-connectorをバージョン1.0.0または1.1.0から24.0.0にアップグレードする際にエラーが発生する

org.apache.kafka.common.config.ConfigException: Topic 'connect-status' supplied via the 'status.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of connector and task statuses, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing connector and task statuses and problems restarting this Connect cluster in the future. Change the 'status.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'.
at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:228)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:164)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run

解決方法: connect-configs connect-status Topicのクリアリング戦略をcompactに調整する

$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-configs --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-status --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092

4. debezium_ingestion converterモードでTableスキーマ変更が失敗しました

[2025-01-07 14:26:20,474] WARN [doris-normal_test_sink-connector|task-0] Table 'test_sink' cannot be altered because schema evolution is disabled. (org.apache.doris.kafka.connector.converter.RecordService:183)
[2025-01-07 14:26:20,475] ERROR [doris-normal_test_sink-connector|task-0] WorkerSinkTask{id=doris-normal_test_sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot alter table org.apache.doris.kafka.connector.model.TableDescriptor@67cd8027 because schema evolution is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
org.apache.doris.kafka.connector.exception.SchemaChangeException: Cannot alter table org.apache.doris.kafka.connector.model.TableDescriptor@67cd8027 because schema evolution is disabled
at org.apache.doris.kafka.connector.converter.RecordService.alterTableIfNeeded(RecordService.java:186)
at org.apache.doris.kafka.connector.converter.RecordService.checkAndApplyTableChangesIfNeeded(RecordService.java:150)
at org.apache.doris.kafka.connector.converter.RecordService.processStructRecord(RecordService.java:100)
at org.apache.doris.kafka.connector.converter.RecordService.getProcessedRecord(RecordService.java:305)
at org.apache.doris.kafka.connector.writer.DorisWriter.putBuffer(DorisWriter.java:155)
at org.apache.doris.kafka.connector.writer.DorisWriter.insertRecord(DorisWriter.java:124)
at org.apache.doris.kafka.connector.writer.StreamLoadWriter.insert(StreamLoadWriter.java:151)
at org.apache.doris.kafka.connector.service.DorisDefaultSinkService.insert(DorisDefaultSinkService.java:154)
at org.apache.doris.kafka.connector.service.DorisDefaultSinkService.insert(DorisDefaultSinkService.java:135)
at org.apache.doris.kafka.connector.DorisSinkTask.put(DorisSinkTask.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

解決策:

debezium_ingestion converterモードでは、Tableスキーマ変更はデフォルトで無効になっています。Tableスキーマ変更を有効にするには、debezium.schema.evolutionbasicに設定する必要があります。
Table構造変更を有効にしても、この変更されたカラムをDorisTable内の唯一のカラムとして正確に保持することはできないことに注意してください(詳細についてはdebezium.schema.evolutionパラメータの説明を参照してください)。上流と下流で一意のカラムのみを保持する必要がある場合は、変更されたカラムをDorisTableに手動で追加してから、Connectorタスクを再起動することが最善です。Connectorは未消費のoffsetの消費を継続し、データの一貫性を維持します。