Doris Kafka Connector
Kafka Connectは、Apache Kafkaと他のシステム間でのデータ転送のためのスケーラブルで信頼性の高いツールです。Connectorを定義することで、Kafkaに大量のデータを出し入れすることができます。
Dorisコミュニティではdoris-kafka-connectorプラグインを提供しており、これによりKafka topic内のデータをDorisに書き込むことができます。
バージョン説明
| Connector Version | Kafka Version | Doris Version | Java Version |
|---|---|---|---|
| 1.0.0 | 2.4+ | 2.0+ | 8 |
| 1.1.0 | 2.4+ | 2.0+ | 8 |
| 24.0.0 | 2.4+ | 2.0+ | 8 |
| 25.0.0 | 2.4+ | 2.0+ | 8 |
使用方法
ダウンロード
maven dependencies
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>doris-kafka-connector</artifactId>
<version>25.0.0</version>
</dependency>
Standalone mode startup
$KAFKA_HOME の下に plugins ディレクトリを作成し、ダウンロードした doris-kafka-connector jar パッケージをその中に配置します
config/connect-standalone.properties を設定します
# Modify broker address
bootstrap.servers=127.0.0.1:9092
# Modify to the created plugins directory
# Note: Please fill in the direct path to Kafka here. For example: plugin.path=/opt/kafka/plugins
plugin.path=$KAFKA_HOME/plugins
# It is recommended to increase the max.poll.interval.ms time of Kafka to more than 30 minutes, the default is 5 minutes
# Avoid Stream Load import data consumption timeout and consumers being kicked out of the consumer group
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000
doris-connector-sink.propertiesを設定する
configディレクトリにdoris-connector-sink.propertiesを作成し、以下の内容を設定します:
name=test-doris-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=10.10.10.1
doris.http.port=8030
doris.query.port=9030
doris.user=root
doris.password=
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
スタンドアロン起動
$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/doris-connector-sink.properties
注意: 本番環境では、スタンドアロンモードの使用は一般的に推奨されません。
分散モードの起動
$KAFKA_HOME配下にpluginsディレクトリを作成し、ダウンロードしたdoris-kafka-connectorのjarパッケージをその中に配置します
config/connect-distributed.propertiesを設定します
# Modify kafka server address
bootstrap.servers=127.0.0.1:9092
# Modify group.id, the same cluster needs to be consistent
group.id=connect-cluster
# Modify to the created plugins directory
# Note: Please fill in the direct path to Kafka here. For example: plugin.path=/opt/kafka/plugins
plugin.path=$KAFKA_HOME/plugins
# It is recommended to increase the max.poll.interval.ms time of Kafka to more than 30 minutes, the default is 5 minutes
# Avoid Stream Load import data consumption timeout and consumers being kicked out of the consumer group
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000
分散処理を開始
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
Connectorを追加
curl -i http://127.0.0.1:8083/connectors -H "Content-タイプ: application/json" -X POST -d '{
"name":"test-doris-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'
Operation Connector
# View connector status
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/status -X GET
# Delete connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster -X DELETE
# Pause connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/pause -X PUT
# Restart connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/resume -X PUT
# Restart tasks within the connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/tasks/0/restart -X POST
kafka-connectを初回起動時に、kafka-connectの共有コネクタ設定を記録するために、kafkaクラスタ内に3つのトピックconfig.storage.topic、offset.storage.topic、status.storage.topicが作成されることに注意してください。オフセットデータとステータス更新が記録されます。How to Use Kafka Connect - Get Started
SSL認証されたKafkaクラスタへのアクセス
kafka-connectを通じてSSL認証されたKafkaクラスタにアクセするには、ユーザーがKafka Brokerの公開鍵を認証するために使用する証明書ファイル(client.truststore.jks)を提供する必要があります。connect-distributed.propertiesファイルに以下の設定を追加することができます:
# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234
kafka-connectを通じてSSL認証に接続されたKafkaクラスターの設定手順については、以下を参照してください: Configure Kafka Connect
Dead letter queue
デフォルトでは、変換中またはその間に発生したエラーはコネクターを失敗させます。各コネクター設定では、これらのエラーをスキップすることで許容することも可能で、オプションで各エラーと失敗した操作の詳細、および問題となったレコード(詳細レベルは様々)をログ記録用のdead-letter queueに書き込むことができます。
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1
Configuration項目
| Key | Enum | デフォルト値 | Required | デスクリプション |
|---|---|---|---|---|
| name | - | - | Y | Connectアプリケーション名、Kafka Connect環境内で一意である必要があります |
| connector.class | - | - | Y | org.apache.doris.kafka.connector.DorisSinkConnector |
| topics | - | - | Y | 購読するtopicのリスト、カンマ区切り。例: topic1, topic2 |
| doris.urls | - | - | Y | Doris FE接続アドレス。複数ある場合は、カンマで区切ります。例: 10.20.30.1,10.20.30.2,10.20.30.3 |
| doris.http.port | - | - | Y | Doris HTTPプロトコルポート |
| doris.query.port | - | - | Y | Doris MySQLプロトコルポート |
| doris.user | - | - | Y | Dorisユーザー名 |
| doris.password | - | - | Y | Dorisパスワード |
| doris.database | - | - | Y | 書き込み先データベース。複数のライブラリがある場合は空にできます。同時に、topic2table.mapで具体的なライブラリ名を設定する必要があります。 |
| doris.topic2table.map | - | - | N | topicとtableTableの対応関係、例: topic1:tb1,topic2:tb2 デフォルトは空で、topicとTable名が一対一対応することを示します。 複数ライブラリの形式: topic1:db1.tbl1,topic2:db2.tbl2 |
| buffer.count.records | - | 50000 | N | dorisにフラッシュする前に各Kafkaパーティションがメモリ内でバッファするレコード数。デフォルト50000レコード |
| buffer.flush.time | - | 120 | N | バッファリフレッシュ間隔、秒単位、デフォルト120秒 |
| buffer.size.bytes | - | 10485760(100MB) | N | 各Kafkaパーティションでメモリ内にバッファされるレコードの累積サイズ、バイト単位、デフォルト100MB |
| jmx | - | true | N | JMXを通じてconnector内部監視指標を取得するには、以下を参照してください: Doris-Connector-JMX |
| enable.2pc | - | true | N | Stream Loadの2フェーズコミット(TwoPhaseCommit)を有効にするかどうか、デフォルトはtrueです。 |
| enable.delete | - | false | N | レコードを同期的に削除するかどうか、デフォルトfalse |
| label.prefix | - | ${name} | N | データインポート時のStream loadラベルプレフィックス。デフォルトはConnectorアプリケーション名です。 |
| auto.redirect | - | true | N | StreamLoadリクエストをリダイレクトするかどうか。オンにすると、StreamLoadはFEを通じてデータを書き込む必要があるBEにリダイレクトし、BE情報は表示されなくなります。 |
| sink.properties.* | - | 'sink.properties.format':'json', 'sink.properties.read_json_by_line':'true' | N | Stream Loadのインポートパラメータ。 例: カラム区切り文字の定義 'sink.properties.column_separator':',' 詳細パラメータはこちらを参照 Group Commitを有効にする、例: sync_modeモードでgroup commitを有効にする: "sink.properties.group_commit":"sync_mode"。Group Commitはoff_mode、sync_mode、async_modeの3つのモードで設定できます。具体的な使用方法については以下を参照してください: Group-Commit部分カラム更新を有効にする、例: 指定したcol2の部分カラム更新を有効にする: "sink.properties.partial_columns":"true", "sink.properties.columns": " col2", |
| delivery.guarantee | at_least_once,exactly_once | at_least_once | N | KafkaデータをDorisにインポートする際のデータ整合性を保証する方法。at_least_once exactly_onceをサポート、デフォルトはat_least_onceです。データexactly_onceを保証するには、Dorisを2.1.0以上にアップグレードする必要があります |
| converter.mode | normal,debezium_ingestion | normal | N | ConnectorでKafkaデータを消費する際のアップストリームデータの型変換モード。normalはKafka内のデータを通常通り消費し、型変換を行わないことを意味します。debezium_ingestionは、DebeziumなどのCDC(Changelog Data Capture)ツールでKafkaアップストリームデータが収集される場合、アップストリームデータをサポートするために特別な型変換を行う必要があることを意味します。 |
| debezium.schema.evolution | none,basic | none | N | Debeziumを使用してアップストリームデータベースシステム(MySQLなど)を収集し、構造変更が発生した場合、追加されたフィールドをDorisに同期できます。noneは、アップストリームデータベースシステムの構造が変更された場合、変更された構造がDorisに同期されないことを意味します。basicは、アップストリームデータベースのデータ変更操作を同期することを意味します。カラム構造の変更は危険な操作であるため(DorisTable構造のカラムを誤って削除する可能性があります)、現在はアップストリームのカラム追加操作の同期のみサポートしています。カラムの名前が変更された場合、古いカラムは変更されず、ConnectorはターゲットTableに新しいカラムを追加し、名前変更された新しいデータを新しいカラムにsinkします。 |
| database.time_zone | - | UTC | N | converter.modeがnormalモード以外の場合、日付データタイプ(datetime、date、timestampなど)のタイムゾーン変換を指定する方法を提供します。デフォルトはUTCタイムゾーンです。 |
| avro.topic2schema.filepath | - | - | N | ローカルに提供されたAvro Schemaファイルを読み取ることで、Topic内のAvroファイル内容を解析し、Confluentが提供するSchema登録センターからのデカップリングを実現します。 この設定は key.converterまたはvalue.converterプレフィックスと組み合わせて使用する必要があります。例: avro-userとavro-product Topicのローカル Avro Schemaファイルを設定する: "value.converter.avro.topic2schema. filepath":"avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc" 具体的な使用方法については以下を参照してください: #32 |
| record.tablename.field | - | - | N | このパラメータを設定すると、1つのkafka topicからのデータを複数のdorisTableに流すことができます。設定の詳細については以下を参照してください: #58 |
| enable.combine.flush | true,false | false | N | すべてのパーティションからのデータをまとめて書き込むかどうか。デフォルト値はfalseです。有効にした場合、at_least_onceセマンティクスのみが保証されます。 |
| max.retries | - | 10 | N | タスクを失敗させる前にエラー時に再試行する最大回数。 |
| retry.interval.ms | - | 6000 | N | エラー後に再試行を試みるまでの待機時間(ミリ秒)。 |
| behavior.on.null.values | ignore,fail | ignore | N | null値を持つレコードの処理方法を定義します。 |
その他のKafka Connect Sink共通設定項目については、以下を参照してください: connect_configuring
型マッピング
Doris-kafka-connectorは、カラムのデータ型を解決するために論理型またはプリミティブ型マッピングを使用します。
プリミティブ型は、Kafka connectのSchemaを使用して表現される単純なデータ型を指します。論理データ型は通常、Struct構造を使用して複合型、または日付と時間型を表現します。
| Kafka Primitive タイプ | Doris タイプ |
|---|---|
| INT8 | TINYINT |
| INT16 | SMALLINT |
| INT32 | INT |
| INT64 | BIGINT |
| FLOAT32 | FLOAT |
| FLOAT64 | DOUBLE |
| BOOLEAN | BOOLEAN |
| STRING | STRING |
| BYTES | STRING |
| Kafka Logical タイプ | Doris タイプ |
|---|---|
| org.apache.kafka.connect.data.Decimal | DECIMAL |
| org.apache.kafka.connect.data.Date | DATE |
| org.apache.kafka.connect.data.Time | STRING |
| org.apache.kafka.connect.data.Timestamp | DATETIME |
| Debezium Logical タイプ | Doris タイプ |
|---|---|
| io.debezium.time.Date | DATE |
| io.debezium.time.Time | String |
| io.debezium.time.MicroTime | DATETIME |
| io.debezium.time.NanoTime | DATETIME |
| io.debezium.time.ZonedTime | DATETIME |
| io.debezium.time.Timestamp | DATETIME |
| io.debezium.time.MicroTimestamp | DATETIME |
| io.debezium.time.NanoTimestamp | DATETIME |
| io.debezium.time.ZonedTimestamp | DATETIME |
| io.debezium.data.VariableScaleDecimal | DOUBLE |
ベストプラクティス
プレーンJSONデータのロード
-
インポートデータサンプル
Kafkaに以下のサンプルデータがありますkafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-data-topic --from-beginning
{"user_id":1,"name":"Emily","age":25}
{"user_id":2,"name":"Benjamin","age":35}
{"user_id":3,"name":"Olivia","age":28}
{"user_id":4,"name":"Alexander","age":60}
{"user_id":5,"name":"Ava","age":17}
{"user_id":6,"name":"William","age":69}
{"user_id":7,"name":"Sophia","age":32}
{"user_id":8,"name":"James","age":64}
{"user_id":9,"name":"Emma","age":37}
{"user_id":10,"name":"Liam","age":64} -
インポートする必要があるTableを作成する
Dorisで、インポートするTableを作成します。具体的な構文は以下の通りですCREATE TABLE test_db.test_kafka_connector_tbl(
user_id BIGINT NOT NULL COMMENT "user id",
name VARCHAR(20) COMMENT "name",
age INT COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12; -
インポートタスクを作成する
Kafka-connectがデプロイされているマシン上で、curlコマンドを通じて以下のインポートタスクを送信しますcurl -i http://127.0.0.1:8083/connectors -H "Content-タイプ: application/json" -X POST -d '{
"name":"test-doris-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max":"10",
"topics":"test-data-topic",
"doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter"
}
}'
Debeziumコンポーネントによって収集されたデータをロードする
- MySQLデータベースには以下のTableがあります
CREATE TABLE test.test_user (
user_id int NOT NULL ,
name varchar(20),
age int,
PRIMARY KEY (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
insert into test.test_user values(1,'zhangsan',20);
insert into test.test_user values(2,'lisi',21);
insert into test.test_user values(3,'wangwu',22);
- DorisでインポートTableを作成する
CREATE TABLE test_db.test_user(
user_id BIGINT NOT NULL COMMENT "user id",
name VARCHAR(20) COMMENT "name",
age INT COMMENT "age"
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;
- MySQL用のDebeziumコネクタコンポーネントをデプロイします。参照: Debezium connector for MySQL
- doris-kafka-connectorインポートタスクを作成します
Debeziumを通じて収集されたMySQLTableデータがmysql_debezium.test.test_userTopicにあると仮定します
curl -i http://127.0.0.1:8083/connectors -H "Content-タイプ: application/json" -X POST -d '{
"name":"test-debezium-doris-sink",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max":"10",
"topics":"mysql_debezium.test.test_user",
"doris.topic2table.map": "mysql_debezium.test.test_user:test_user",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"10.10.10.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test_db",
"converter.mode":"debezium_ingestion",
"enable.delete":"true",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'
Avroシリアライズされたデータの読み込み
curl -i http://127.0.0.1:8083/connectors -H "Content-タイプ: application/json" -X POST -d '{
"name":"doris-avro-test",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"avro_topic",
"tasks.max":"10",
"doris.topic2table.map": "avro_topic:avro_tab",
"buffer.count.records":"100000",
"buffer.flush.time":"120",
"buffer.size.bytes":"10000000",
"doris.urls":"127.0.0.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test",
"load.model":"stream_load",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://127.0.0.1:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://127.0.0.1:8081"
}
}'
Protobufシリアライズされたデータの読み込み
curl -i http://127.0.0.1:8083/connectors -H "Content-タイプ: application/json" -X POST -d '{
"name":"doris-protobuf-test",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"proto_topic",
"tasks.max":"10",
"doris.topic2table.map": "proto_topic:proto_tab",
"buffer.count.records":"100000",
"buffer.flush.time":"120",
"buffer.size.bytes":"10000000",
"doris.urls":"127.0.0.1",
"doris.user":"root",
"doris.password":"",
"doris.http.port":"8030",
"doris.query.port":"9030",
"doris.database":"test",
"load.model":"stream_load",
"key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"key.converter.schema.registry.url":"http://127.0.0.1:8081",
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://127.0.0.1:8081"
}
}'
Kafka Connect Single Message Transformsを使用したデータの読み込み
例として、以下の形式のデータを考えてみましょう:
{
"registertime": 1513885135404,
"userid": "User_9",
"regionid": "Region_3",
"gender": "MALE"
}
Kafkaメッセージにハードコードされたカラムを追加するには、InsertFieldを使用できます。また、TimestampConverterを使用して、Bigint型のタイムスタンプを時刻文字列に変換できます。
curl -i http://127.0.0.1:8083/connectors -H "Content-タイプ: application/json" -X POST -d '{
"name": "insert_field_tranform",
"config": {
"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
"tasks.max": "1",
"topics": "users",
"doris.topic2table.map": "users:kf_users",
"buffer.count.records": "10",
"buffer.flush.time": "11",
"buffer.size.bytes": "5000000",
"doris.urls": "127.0.0.1:8030",
"doris.user": "root",
"doris.password": "123456",
"doris.http.port": "8030",
"doris.query.port": "9030",
"doris.database": "testdb",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "InsertField,TimestampConverter",
// Insert Static Field
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "repo",
"transforms.InsertField.static.value": "Apache Doris",
// Convert Timestamp Format
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "registertime",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.TimestampConverter.target.type": "string"
}
}'
InsertField と TimestampConverter の変換後、データは以下のようになります:
{
"userid": "User_9",
"regionid": "Region_3",
"gender": "MALE",
"repo": "Apache Doris",// Static field added
"registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string
}
Kafka Connect Single Message Transforms (SMT)のその他の例については、SMT ドキュメントを参照してください。
FAQ
1. Json型データの読み取り時に以下のエラーが発生する場合:
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:337)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
reason:
これはorg.apache.kafka.connect.json.JsonConverterコンバーターを使用する場合、"schema"と"payload"フィールドを一致させる必要があるためです。
2つの解決方法から1つを選択してください:
org.apache.kafka.connect.json.JsonConverterをorg.apache.kafka.connect.storage.StringConverterに置き換える- 起動モードがStandaloneモードの場合、config/connect-standalone.propertiesの
value.converter.schemas.enableまたはkey.converter.schemas.enableをfalseに変更する; 起動モードがDistributedモードの場合、config/connect-distributed.propertiesのvalue.converter.schemas.enableまたはkey.converter.schemas.enableをfalseに変更する
2. 消費がタイムアウトしてコンシューマーが消費グループから除外される:
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1318)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:1127)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1093)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1590)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:361)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:376)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:467)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:381)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
解決策:
シナリオに応じてKafkaのmax.poll.interval.msを増加させます。デフォルト値は300000です。
- Standaloneモードで起動している場合は、config/connect-standalone.propertiesの設定ファイルに
max.poll.interval.msとconsumer.max.poll.interval.msパラメータを追加し、パラメータ値を設定します。 - Distributedモードで起動している場合は、config/connect-distributed.propertiesの設定ファイルに
max.poll.interval.msとconsumer.max.poll.interval.msパラメータを追加し、パラメータ値を設定します。
パラメータを調整した後、kafka-connectを再起動します。
3. Doris-kafka-connectorをバージョン1.0.0または1.1.0から24.0.0にアップグレードする際にエラーが発生する
org.apache.kafka.common.config.ConfigException: Topic 'connect-status' supplied via the 'status.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of connector and task statuses, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing connector and task statuses and problems restarting this Connect cluster in the future. Change the 'status.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'.
at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:228)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:164)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run
解決方法:
connect-configs connect-status Topicのクリアリング戦略をcompactに調整する
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-configs --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
$KAFKA_HOME/bin/kafka-configs.sh --alter --entity-type topics --entity-name connect-status --add-config cleanup.policy=compact --bootstrap-server 127.0.0.1:9092
4. debezium_ingestion コンバーターモードでTableスキーマ変更が失敗しました
[2025-01-07 14:26:20,474] WARN [doris-normal_test_sink-connector|task-0] Table 'test_sink' cannot be altered because schema evolution is disabled. (org.apache.doris.kafka.connector.converter.RecordService:183)
[2025-01-07 14:26:20,475] ERROR [doris-normal_test_sink-connector|task-0] WorkerSinkTask{id=doris-normal_test_sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot alter table org.apache.doris.kafka.connector.model.TableDescriptor@67cd8027 because schema evolution is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
org.apache.doris.kafka.connector.exception.SchemaChangeException: Cannot alter table org.apache.doris.kafka.connector.model.TableDescriptor@67cd8027 because schema evolution is disabled
at org.apache.doris.kafka.connector.converter.RecordService.alterTableIfNeeded(RecordService.java:186)
at org.apache.doris.kafka.connector.converter.RecordService.checkAndApplyTableChangesIfNeeded(RecordService.java:150)
at org.apache.doris.kafka.connector.converter.RecordService.processStructRecord(RecordService.java:100)
at org.apache.doris.kafka.connector.converter.RecordService.getProcessedRecord(RecordService.java:305)
at org.apache.doris.kafka.connector.writer.DorisWriter.putBuffer(DorisWriter.java:155)
at org.apache.doris.kafka.connector.writer.DorisWriter.insertRecord(DorisWriter.java:124)
at org.apache.doris.kafka.connector.writer.StreamLoadWriter.insert(StreamLoadWriter.java:151)
at org.apache.doris.kafka.connector.service.DorisDefaultSinkService.insert(DorisDefaultSinkService.java:154)
at org.apache.doris.kafka.connector.service.DorisDefaultSinkService.insert(DorisDefaultSinkService.java:135)
at org.apache.doris.kafka.connector.DorisSinkTask.put(DorisSinkTask.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
解決策:
debezium_ingestionコンバータモードでは、Tableスキーマ変更はデフォルトで無効になっています。Tableスキーマ変更を有効にするには、debezium.schema.evolutionをbasicに設定する必要があります。
Table構造変更を有効にしても、この変更されたカラムをDorisTable内で唯一のカラムとして正確に保持されないことに注意する必要があります(詳細はdebezium.schema.evolutionパラメータの説明を参照)。上流と下流で一意のカラムのみを保持する必要がある場合は、変更されたカラムを手動でDorisTableに追加してから、Connectorタスクを再開することが最適です。Connectorは未消費のoffsetを継続的に消費してデータ整合性を維持します。