メインコンテンツまでスキップ
バージョン: 4.x

Doris Kafka Connector

Kafka Connect は、Apache Kafka と他のシステム間でのスケーラブルで信頼性の高いデータ転送ツールです。Connectorを定義することで、大量のデータをKafkaに出し入れできます。

Dorisコミュニティでは、Kafkaトピック内のデータをDorisに書き込むことができる doris-kafka-connector プラグインを提供しています。

バージョン説明

Connector VersionKafka VersionDoris VersionJava Version
1.0.02.4+2.0+8
1.1.02.4+2.0+8
24.0.02.4+2.0+8
25.0.02.4+2.0+8

使用方法

ダウンロード

doris-kafka-connector

maven dependencies

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>doris-kafka-connector</artifactId>
<version>25.0.0</version>
</dependency>

スタンドアロンモード起動

$KAFKA_HOME配下にpluginsディレクトリを作成し、ダウンロードしたdoris-kafka-connectorのjarパッケージを配置します
config/connect-standalone.propertiesを設定します

# Modify broker address
bootstrap.servers=127.0.0.1:9092

# Modify to the created plugins directory
# Note: Please fill in the direct path to Kafka here. For example: plugin.path=/opt/kafka/plugins
plugin.path=$KAFKA_HOME/plugins

# It is recommended to increase the max.poll.interval.ms time of Kafka to more than 30 minutes, the default is 5 minutes
# Avoid Stream Load import data consumption timeout and consumers being kicked out of the consumer group
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000

doris-connector-sink.propertiesを設定する

configディレクトリにdoris-connector-sink.propertiesを作成し、以下の内容を設定します:

name=test-doris-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=10.10.10.1
doris.http.port=8030
doris.query.port=9030
doris.user=root
doris.password=
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

スタンドアロンを開始

$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/doris-connector-sink.properties
注記

注意: 本番環境では一般的にスタンドアロンモードの使用は推奨されません。

Distributed mode startup

$KAFKA_HOME配下にpluginsディレクトリを作成し、ダウンロードしたdoris-kafka-connectorのjarパッケージをその中に配置します

config/connect-distributed.propertiesを設定します

# Modify kafka server address
bootstrap.servers=127.0.0.1:9092

# Modify group.id, the same cluster needs to be consistent
group.id=connect-cluster

# Modify to the created plugins directory
# Note: Please fill in the direct path to Kafka here. For example: plugin.path=/opt/kafka/plugins
plugin.path=$KAFKA_HOME/plugins

# It is recommended to increase the max.poll.interval.ms time of Kafka to more than 30 minutes, the default is 5 minutes
# Avoid Stream Load import data consumption timeout and consumers being kicked out of the consumer group
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000

分散処理を開始

$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

コネクタを追加

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-doris-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'

Operation Connector

# View connector status
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/status -X GET
# Delete connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster -X DELETE
# Pause connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/pause -X PUT
# Restart connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/resume -X PUT
# Restart tasks within the connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/tasks/0/restart -X POST

参照: Connect REST Interface

注記

kafka-connectが初回起動時に、kafka-connectの共有コネクタ設定を記録するために、kafkaクラスタに3つのトピック config.storage.topic offset.storage.topic status.storage.topic が作成されることに注意してください。オフセットデータとステータス更新。How to Use Kafka Connect - Get Started

SSL認証されたKafkaクラスタへのアクセス

kafka-connectを通じてSSL認証されたKafkaクラスタにアクセスするには、Kafka Brokerの公開鍵を認証するために使用する証明書ファイル(client.truststore.jks)をユーザーが提供する必要があります。connect-distributed.propertiesファイルに以下の設定を追加できます:

# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234

# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

SSL認証を通じてKafka clusterに接続されたkafka-connectの設定手順については、以下を参照してください:Configure Kafka Connect

Dead letter queue

デフォルトでは、変換中またはその間に発生したエラーはconnectorの失敗を引き起こします。各connector設定は、そのようなエラーをスキップすることで許容することもでき、オプションで各エラーと失敗した操作の詳細、および問題のあるレコード(詳細レベルは様々)をログ記録のためにdead-letter queueに書き込むことができます。

errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

設定項目

KeyEnumDefault ValueRequiredDescription
name--YConnectアプリケーション名。Kafka Connect環境内で一意である必要があります
connector.class--Yorg.apache.doris.kafka.connector.DorisSinkConnector
topics--Y購読するトピックのリスト。カンマ区切りで指定します。例: topic1, topic2
doris.urls--YDoris FE接続アドレス。複数ある場合はカンマで区切ります。例: 10.20.30.1,10.20.30.2,10.20.30.3
doris.http.port--YDoris HTTPプロトコルポート
doris.query.port--YDoris MySQLプロトコルポート
doris.user--YDorisユーザー名
doris.password--YDorisパスワード
doris.database--Y書き込み先データベース。複数のライブラリがある場合は空にできます。同時に、topic2table.mapで特定のライブラリ名を設定する必要があります。
doris.topic2table.map--Ntopicとテーブルの対応関係。例: topic1:tb1,topic2:tb2
デフォルトは空で、topicとテーブル名が1対1で対応することを示します。
複数ライブラリの形式はtopic1:db1.tbl1,topic2:db2.tbl2
buffer.count.records-50000Ndorisにフラッシュする前に各Kafkaパーティションがメモリにバッファするレコード数。デフォルト50000レコード
buffer.flush.time-120Nバッファ更新間隔(秒)。デフォルト120秒
buffer.size.bytes-10485760(100MB)N各Kafkaパーティションのメモリにバッファされるレコードの累積サイズ(バイト)。デフォルト100MB
jmx-trueNJMXを通じてconnector内部監視指標を取得するには、以下を参照してください: Doris-Connector-JMX
enable.2pc-trueNStream Loadの二相コミット(TwoPhaseCommit)を有効にするかどうか。デフォルトはtrue。
enable.delete-falseNレコードを同期的に削除するかどうか。デフォルトfalse
label.prefix-${name}Nデータインポート時のStream loadラベルプレフィックス。デフォルトはConnectorアプリケーション名。
auto.redirect-trueNStreamLoadリクエストをリダイレクトするかどうか。有効にすると、StreamLoadはFEを通じてデータを書き込む必要があるBEにリダイレクトされ、BE情報は表示されなくなります。
sink.properties.*-'sink.properties.format':'json',
'sink.properties.read_json_by_line':'true'
NStream Loadのインポートパラメータ。
例: 列区切り文字定義 'sink.properties.column_separator':','
詳細なパラメータリファレンスはこちら

Group Commitを有効にする、例: sync_modeモードでgroup commitを有効にする場合: "sink.properties.group_commit":"sync_mode"。Group Commitは3つのモードで設定できます: off_modesync_modeasync_mode。具体的な使用方法については以下を参照してください: Group-Commit

部分列更新を有効にする、例: 指定されたcol2の部分列更新を有効にする場合: "sink.properties.partial_columns":"true""sink.properties.columns": " col2"
delivery.guaranteeat_least_once,
exactly_once
at_least_onceNKafkaデータをDorisにインポートする際のデータ整合性を確保する方法。at_least_once exactly_onceをサポート。デフォルトはat_least_once。データexactly_onceを保証するにはDorisを2.1.0以上にアップグレードする必要があります
converter.modenormal,
debezium_ingestion
normalNConnectorでKafkaデータを消費する際のアップストリームデータの型変換モード。
normalはKafka内のデータを通常通り消費し、型変換を行わないことを意味します。
debezium_ingestionは、KafkaアップストリームデータがDebeziumなどのCDC(Changelog Data Capture)ツールで収集される場合、アップストリームデータをサポートするために特別な型変換を行う必要があることを意味します。
debezium.schema.evolutionnone,
basic
noneNDebeziumを使用してアップストリームデータベースシステム(MySQLなど)を収集し、構造変更が発生した場合、追加されたフィールドをDorisに同期できます。
noneはアップストリームデータベースシステムの構造が変更された場合、変更された構造はDorisに同期されないことを意味します。
basicはアップストリームデータベースのデータ変更操作を同期することを意味します。列構造の変更は危険な操作であるため(Dorisテーブル構造の列を誤って削除する可能性があります)、現在は列を追加する操作の同期アップストリームのみをサポートしています。列名が変更された場合、古い列は変更されず、Connectorはターゲットテーブルに新しい列を追加し、名前変更された新しいデータを新しい列にシンクします。
database.time_zone-UTCNconverter.modenormalモードでない場合、日付データ型(datetime、date、timestampなど)のタイムゾーン変換を指定する方法を提供します。デフォルトはUTCタイムゾーンです。
avro.topic2schema.filepath--Nローカルで提供されるAvro Schemaファイルを読み取ることにより、Topic内のAvroファイルコンテンツを解析し、Confluentが提供するSchema登録センターからの分離を実現します。
この設定はkey.converterまたはvalue.converterプレフィックスと組み合わせて使用する必要があります。例: avro-userとavro-product TopicのローカルAvro Schemaファイル設定は以下のとおりです: "value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc"
具体的な使用方法については以下を参照してください: #32
record.tablename.field--Nこのパラメータを設定すると、1つのkafka topicからのデータを複数のdorisテーブルに流すことができます。設定の詳細については以下を参照してください: #58
enable.combine.flushtrue,
false
falseNすべてのパーティションからのデータをまとめて書き込むかどうか。デフォルト値はfalse。有効にした場合、at_least_onceセマンティクスのみが保証されます。
max.retries-10Nタスクが失敗する前にエラー時に再試行する最大回数。
retry.interval.ms-6000Nエラー発生後、再試行を試みる前に待機する時間(ミリ秒)。
behavior.on.null.valuesignore,
fail
ignoreNnull値を持つレコードの処理方法を定義します。

その他のKafka Connect Sink共通設定項目については、以下を参照してください: connect_configuring

型マッピング

Doris-kafka-connectorは論理または基本型マッピングを使用して列のデータ型を解決します。
基本型は、Kafka connectのSchemaを使用して表現される単純なデータ型を指します。論理データ型は通常、Struct構造を使用して複合型、または日付と時刻型を表します。

Kafka Primitive TypeDoris Type
INT8TINYINT
INT16SMALLINT
INT32INT
INT64BIGINT
FLOAT32FLOAT
FLOAT64DOUBLE
BOOLEANBOOLEAN
STRINGSTRING
BYTESSTRING
Kafka Logical TypeDoris Type
org.apache.kafka.connect.data.DecimalDECIMAL
org.apache.kafka.connect.data.DateDATE
org.apache.kafka.connect.data.TimeSTRING
org.apache.kafka.connect.data.TimestampDATETIME
Debezium Logical TypeDoris Type
io.debezium.time.DateDATE
io.debezium.time.TimeString
io.debezium.time.MicroTimeDATETIME
io.debezium.time.NanoTimeDATETIME
io.debezium.time.ZonedTimeDATETIME
io.debezium.time.TimestampDATETIME
io.debezium.time.MicroTimestampDATETIME
io.debezium.time.NanoTimestampDATETIME
io.debezium.time.ZonedTimestampDATETIME
io.debezium.data.VariableScaleDecimalDOUBLE

ベストプラクティス

プレーンJSONデータの読み込み

  1. インポートデータサンプル
    Kafkaには以下のサンプルデータがあります

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-data-topic --from-beginning
    {"user_id":1,"name":"Emily","age":25}
    {"user_id":2,"name":"Benjamin","age":35}
    {"user_id":3,"name":"Olivia","age":28}
    {"user_id":4,"name":"Alexander","age":60}
    {"user_id":5,"name":"Ava","age":17}
    {"user_id":6,"name":"William","age":69}
    {"user_id":7,"name":"Sophia","age":32}
    {"user_id":8,"name":"James","age":64}
    {"user_id":9,"name":"Emma","age":37}
    {"user_id":10,"name":"Liam","age":64}
  2. インポートが必要なテーブルを作成する
    Dorisで、インポートするテーブルを作成します。具体的な構文は以下の通りです

    CREATE TABLE test_db.test_kafka_connector_tbl(
    user_id BIGINT NOT NULL COMMENT "user id",
    name VARCHAR(20) COMMENT "name",
    age INT COMMENT "age"
    )
    DUPLICATE KEY(user_id)
    DISTRIBUTED BY HASH(user_id) BUCKETS 12;
  3. インポートタスクを作成する
    Kafka-connectがデプロイされているマシンで、curlコマンドを通じて以下のインポートタスクを送信します

    curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
    "name":"test-doris-sink-cluster",
    "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "tasks.max":"10",
    "topics":"test-data-topic",
    "doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"10.10.10.1",
    "doris.user":"root",
    "doris.password":"",
    "doris.http.port":"8030",
    "doris.query.port":"9030",
    "doris.database":"test_db",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
    }
    }'

Debeziumコンポーネントによって収集されたデータを読み込む

  1. MySQLデータベースには以下のテーブルがあります
   CREATE TABLE test.test_user (
user_id int NOT NULL ,
name varchar(20),
age int,
PRIMARY KEY (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

insert into test.test_user values(1,'zhangsan',20);
insert into test.test_user values(2,'lisi',21);
insert into test.test_user values(3,'wangwu',22);
  1. Dorisでインポートされたテーブルを作成する
   CREATE TABLE test_db.test_user(
user_id BIGINT NOT NULL COMMENT "user id",
name VARCHAR(20) COMMENT "name",
age INT COMMENT "age"
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;
  1. MySQL用のDebeziumコネクターコンポーネントをデプロイします。参照: Debezium connector for MySQL
  2. doris-kafka-connectorインポートタスクを作成します
    Debeziumを通じて収集されたMySQLテーブルデータがmysql_debezium.test.test_user Topicにあると仮定します
   curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-debezium-doris-sink",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max":"10",
"topics":"mysql_debezium.test.test_user",
"doris.topic2table.map": "mysql_debezium.test.test_user:test_user",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"converter.mode":"debezium_ingestion",
"enable.delete":"true",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'

Avro シリアライズデータの読み込み

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ 
"name":"doris-avro-test",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"avro_topic",
"tasks.max":"10",
"doris.topic2table.map": "avro_topic:avro_tab",
"buffer.count.records":"100000",
"buffer.flush.time":"120",
"buffer.size.bytes":"10000000",
"doris.urls":"127.0.0.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test",
"load.model":"stream_load",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://127.0.0.1:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://127.0.0.1:8081"
}
}'

Protobufシリアライズされたデータを読み込む

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ 
"name":"doris-protobuf-test",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"proto_topic",
"tasks.max":"10",
"doris.topic2table.map": "proto_topic:proto_tab",
"buffer.count.records":"100000",
"buffer.flush.time":"120",
"buffer.size.bytes":"10000000",
"doris.urls":"127.0.0.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test",
"load.model":"stream_load",
"key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"key.converter.schema.registry.url":"http://127.0.0.1:8081",
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://127.0.0.1:8081"
}
}'

Kafka Connect Single Message Transformsを使用したデータの読み込み

例として、以下の形式のデータを考えてみます:

{
"registertime": 1513885135404,
"userid": "User_9",
"regionid": "Region_3",
"gender": "MALE"
}

Kafkaメッセージにハードコードされた列を追加するには、InsertFieldを使用できます。さらに、TimestampConverterを使用してBigint型のタイムスタンプを時刻文字列に変換できます。

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name": "insert_field_tranform",
"config": {
"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max": "1",
"topics": "users",
"doris.topic2table.map": "users:kf_users",
"buffer.count.records": "10",
"buffer.flush.time": "11",
"buffer.size.bytes": "5000000",
"doris.urls": "127.0.0.1:8030",
"doris.user": "root",
"doris.password": "123456",
"doris.http.port": "8030",
"doris.query.port": "9030",
"doris.database": "testdb",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "InsertField,TimestampConverter",
// Insert Static Field
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "repo",
"transforms.InsertField.static.value": "Apache Doris",
// Convert Timestamp Format
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "registertime",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.TimestampConverter.target.type": "string"
}
}'

InsertFieldとTimestampConverterの変換後、データは以下のようになります:

{
"userid": "User_9",
"regionid": "Region_3",
"gender": "MALE",
"repo": "Apache Doris",// Static field added
"registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string
}

Kafka Connect Single Message Transforms (SMT)のより多くの例については、SMT documentationを参照してください。

FAQ

1. Json型データを読み取る際に以下のエラーが発生します:

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:337)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)

reason: これは、org.apache.kafka.connect.json.JsonConverterコンバーターを使用する場合、"schema"と"payload"フィールドを一致させる必要があるためです。

2つの解決策のうち1つを選択してください:

  1. org.apache.kafka.connect.json.JsonConverterorg.apache.kafka.connect.storage.StringConverterに置き換える
  2. 起動モードがStandaloneモードの場合、config/connect-standalone.propertiesのvalue.converter.schemas.enableまたはkey.converter.schemas.enableをfalseに変更する; 起動モードがDistributedモードの場合、config/connect-distributed.propertiesのvalue.converter.schemas.enableまたはkey.converter.schemas.enableをfalseに変更する

2. 消費がタイムアウトし、consumerが消費グループから除外される:

org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1318)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:1127)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1093)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1590)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:361)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:376)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:467)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:381)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)

解決策:

シナリオに応じてKafkaのmax.poll.interval.msを増加させます。デフォルト値は300000です

  • Standaloneモードで起動している場合、config/connect-standalone.propertiesの設定ファイルにmax.poll.interval.msconsumer.max.poll.interval.msパラメータを追加し、パラメータ値を設定します。
  • Distributedモードで起動している場合、config/connect-distributed.propertiesの設定ファイルにmax.poll.interval.msconsumer.max.poll.interval.msパラメータを追加し、パラメータ値を設定します。

パラメータ調整後、kafka-connectを再起動してください

3. Doris-kafka-connectorでバージョンを1.0.0または1.1.0から24.0.0にアップグレードする際にエラーが発生する

org.apache.kafka.common.config.ConfigException: Topic 'connect-status' supplied via the 'status.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of connector and task statuses, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing connector and task statuses and problems restarting this Connect cluster in the future. Change the 'status.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'.
at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:228)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:164)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run

解決方法: connect-configs connect-status Topicのクリアリング戦略をcompactに調整する

$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-configs --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-status --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092

4. debezium_ingestionコンバーターモードでテーブルスキーマの変更が失敗しました

[2025-01-07 14:26:20,474] WARN [doris-normal_test_sink-connector|task-0] Table 'test_sink' cannot be altered because schema evolution is disabled. (org.apache.doris.kafka.connector.converter.RecordService:183)
[2025-01-07 14:26:20,475] ERROR [doris-normal_test_sink-connector|task-0] WorkerSinkTask{id=doris-normal_test_sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot alter table org.apache.doris.kafka.connector.model.TableDescriptor@67cd8027 because schema evolution is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
org.apache.doris.kafka.connector.exception.SchemaChangeException: Cannot alter table org.apache.doris.kafka.connector.model.TableDescriptor@67cd8027 because schema evolution is disabled
at org.apache.doris.kafka.connector.converter.RecordService.alterTableIfNeeded(RecordService.java:186)
at org.apache.doris.kafka.connector.converter.RecordService.checkAndApplyTableChangesIfNeeded(RecordService.java:150)
at org.apache.doris.kafka.connector.converter.RecordService.processStructRecord(RecordService.java:100)
at org.apache.doris.kafka.connector.converter.RecordService.getProcessedRecord(RecordService.java:305)
at org.apache.doris.kafka.connector.writer.DorisWriter.putBuffer(DorisWriter.java:155)
at org.apache.doris.kafka.connector.writer.DorisWriter.insertRecord(DorisWriter.java:124)
at org.apache.doris.kafka.connector.writer.StreamLoadWriter.insert(StreamLoadWriter.java:151)
at org.apache.doris.kafka.connector.service.DorisDefaultSinkService.insert(DorisDefaultSinkService.java:154)
at org.apache.doris.kafka.connector.service.DorisDefaultSinkService.insert(DorisDefaultSinkService.java:135)
at org.apache.doris.kafka.connector.DorisSinkTask.put(DorisSinkTask.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

解決策:

debezium_ingestion converterモードでは、テーブルスキーマ変更はデフォルトで無効になっています。テーブルスキーマ変更を有効にするには、debezium.schema.evolutionbasic に設定する必要があります。
テーブル構造変更を有効にしても、この変更されたカラムをDorisテーブルの唯一のカラムとして正確に保持されないことに注意してください(詳細は debezium.schema.evolution パラメータの説明を参照)。上流と下流で一意のカラムのみを保持する必要がある場合は、変更されたカラムをDorisテーブルに手動で追加してから、Connectorタスクを再起動することが最適です。Connectorは消費されていない offset を継続して消費し、データの一貫性を維持します。