DataX Doriswriter
DataX Doriswriterプラグインは、MySQL、Oracle、SQL Serverなどの様々なデータソースから、Stream Load方式を使用してDorisにデータを同期することをサポートしています。
このプラグインはDataXサービスと組み合わせて使用する必要があります。 DataXは複数のデータソースをサポートしています。詳細については、こちらを参照してください。
使用方法
DataXインストールパッケージの直接ダウンロード
DataXは、既にDataXが含まれている公式インストールパッケージを提供しており、直接ダウンロードして使用できます。詳細については、こちらを参照してください。
DorisWriterプラグインの手動コンパイル
DorisWriterプラグインのソースコードをダウンロードします。
-
init-env.shを実行する -
必要に応じて
DataX/doriswriter内のdoriswriterのコードを修正する -
doriswriterをビルドする
doriswriterを単独でビルド:
`mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests`DataXプロジェクト全体をコンパイルする必要がある場合は、こちらを参照してください
コンパイルエラー
以下のコンパイルエラーが発生した場合:
```
Could not find artifact com.alibaba.datax:datax-all:pom:0.0.1-SNAPSHOT ...
```
以下の解決策を試すことができます:
1. [alibaba-datax-maven-m2-20210928.tar.gz](https://doris-thirdparty-repo.bj.bcebos.com/thirdparty/alibaba-datax-maven-m2-20210928.tar.gz) をダウンロードします
2. 展開後、生成された `alibaba/datax/` ディレクトリを、使用している maven に対応する `.m2/repository/com/alibaba/` にコピーし、再度コンパイルを試行します。
Datax DorisWriter パラメータ紹介:
-
jdbcUrl
- 説明: Doris の JDBC 接続文字列で、ユーザーが preSql または postSQL を実行します。
- 必須: はい
- デフォルト: なし
-
loadUrl
- 説明: Stream Load の接続先として使用されます。形式は "ip:port" です。ここで IP は FE ノードの IP、port は FE ノードの http_port です。複数指定することができ、英語のカンマ
,で区切って指定すると、doriswriter はポーリング方式でアクセスします。 - 必須: はい
- デフォルト: なし
- 説明: Stream Load の接続先として使用されます。形式は "ip:port" です。ここで IP は FE ノードの IP、port は FE ノードの http_port です。複数指定することができ、英語のカンマ
-
username
- 説明: Doris データベースにアクセスするためのユーザー名
- 必須: はい
- デフォルト: なし
-
password
- 説明: Doris データベースにアクセスするためのパスワード
- 必須: いいえ
- デフォルト: 空
-
connection.selectedDatabase
- 説明: 書き込み先となる Doris データベースの名前。
- 必須: はい
- デフォルト: なし
-
connection. table
- 説明: 書き込み先となる Doris テーブルの名前。
- 必須: はい
- デフォルト: なし
- 説明: 書き込み先となる Doris テーブルの名前。
-
flushInterval
- 説明: データをバッチで書き込む時間間隔。この時間間隔を小さく設定しすぎると、Doris の書き込みブロッキング問題が発生し、エラーコード -235 になります。また、この時間間隔を小さく設定しすぎて
maxBatchRowsとbatchSizeパラメータを大きく設定した場合、設定したデータサイズに到達できない可能性もありますが、インポートは実行されます。 - 必須: いいえ
- デフォルト: 30000 (ms)
- 説明: データをバッチで書き込む時間間隔。この時間間隔を小さく設定しすぎると、Doris の書き込みブロッキング問題が発生し、エラーコード -235 になります。また、この時間間隔を小さく設定しすぎて
-
column
- 説明: 宛先テーブルにデータを書き込む必要があるフィールドで、これらのフィールドは生成される Json データのフィールド名として使用されます。フィールドはカンマで区切られます。例: "column": ["id","name","age"]。
- 必須: はい
- デフォルト: なし
-
preSql
- 説明: 宛先テーブルにデータを書き込む前に、ここの標準ステートメントが最初に実行されます。
- 必須: いいえ
- デフォルト: なし
-
postSql
- 説明: 宛先テーブルにデータを書き込んだ後に、ここの標準ステートメントが実行されます。
- 必須: いいえ
- デフォルト: なし
-
maxBatchRows
- 説明: インポートデータの各バッチの最大行数。batchSize と合わせて、バッチごとのインポートレコード行数を制御します。各バッチのデータが2つの閾値のいずれかに達すると、このバッチのデータのインポートが開始されます。
- 必須: いいえ
- デフォルト: 500000
-
batchSize
- 説明: 各バッチでインポートされるデータの最大量。maxBatchRows と連携してバッチごとのインポート数を制御します。各バッチのデータが2つの閾値のいずれかに達すると、このバッチのデータのインポートが開始されます。
- 必須: いいえ
- デフォルト: 94371840
-
maxRetries
- 説明: 各バッチのデータインポートが失敗した後の再試行回数。
- 必須: いいえ
- デフォルト: 3
-
labelPrefix
- 説明: 各バッチのインポートタスクのラベルプレフィックス。最終的なラベルは
labelPrefix + UUIDを持ち、グローバルに一意のラベルを形成して、データが重複してインポートされないことを保証します - 必須: いいえ
- デフォルト:
datax_doris_writer_
- 説明: 各バッチのインポートタスクのラベルプレフィックス。最終的なラベルは
-
loadProps
-
説明: StreamLoad のリクエストパラメータ。詳細については、StreamLoad 紹介ページを参照してください。Stream load - Apache Doris
これには、インポートデータフォーマット:format などが含まれます。インポートデータフォーマットのデフォルトは csv で、JSON をサポートしています。詳細については、下記のタイプ変換セクションを参照するか、上記の Stream load の公式情報を参照してください。
-
必須: いいえ
-
デフォルト: なし
-
例
1. Stream がデータを読み取り、Doris にインポートする
doriswriter プラグインの使用方法については、こちらを参照してください。
2.Mysql がデータを読み取り、Doris にインポートする
1.Mysql テーブル構造
CREATE TABLE `t_test`(
`id`bigint(30) NOT NULL,
`order_code` varchar(30) DEFAULT NULL COMMENT '',
`line_code` varchar(30) DEFAULT NULL COMMENT '',
`remark` varchar(30) DEFAULT NULL COMMENT '',
`unit_no` varchar(30) DEFAULT NULL COMMENT '',
`unit_name` varchar(30) DEFAULT NULL COMMENT '',
`price` decimal(12,2) DEFAULT NULL COMMENT '',
PRIMARY KEY(`id`) USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='';
2.Dorisテーブル構造
CREATE TABLE `ods_t_test` (
`id`bigint(30) NOT NULL,
`order_code` varchar(30) DEFAULT NULL COMMENT '',
`line_code` varchar(30) DEFAULT NULL COMMENT '',
`remark` varchar(30) DEFAULT NULL COMMENT '',
`unit_no` varchar(30) DEFAULT NULL COMMENT '',
`unit_name` varchar(30) DEFAULT NULL COMMENT '',
`price` decimal(12,2) DEFAULT NULL COMMENT ''
)ENGINE=OLAP
UNIQUE KEY(id`, `order_code`)
DISTRIBUTED BY HASH(`order_code`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
3.dataxスクリプトを作成する
my_import.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id","order_code","line_code","remark","unit_no","unit_name","price"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://localhost:3306/demo"],
"table": ["employees_1"]
}
],
"username": "root",
"password": "xxxxx",
"where": ""
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"loadUrl": ["127.0.0.1:8030"],
"column": ["id","order_code","line_code","remark","unit_no","unit_name","price"],
"username": "root",
"password": "xxxxxx",
"postSql": ["select count(1) from all_employees_info"],
"preSql": [],
"flushInterval":30000,
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:9030/demo",
"selectedDatabase": "demo",
"table": ["all_employees_info"]
}
],
"loadProps": {
"format": "json",
"strip_outer_array":"true",
"line_delimiter": "\\x02"
}
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
備考:
"loadProps": {
"format": "json",
"strip_outer_array": "true",
"line_delimiter": "\\x02"
}
- ここでは JSON 形式を使用してデータをインポートします
line_delimiterのデフォルトは改行文字ですが、データ内の値と競合する可能性があります。特殊文字や不可視文字を使用してインポートエラーを回避できます- strip_outer_array : インポートデータのバッチ内の複数行データを表します。Doris は解析時に配列を展開し、その中の各 Object を順次データの行として解析します。
- Stream load のその他のパラメータについては、Stream load - Apache Doris を参照してください
- CSV 形式の場合は、次のように使用できます
"loadProps": {
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02"
}CSV 形式では、データ内の特殊文字との競合を避けるために、行と列の区切り文字に特別な注意が必要です。ここでは非表示文字の使用を推奨します。デフォルトの列区切り文字は \t、行区切り文字は \n です
4.datax タスクを実行します。具体的な内容は datax 公式サイト を参照してください
python bin/datax.py my_import.json
実行後、以下の情報を確認できます
2022-11-16 14:28:54.012 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2022-11-16 14:28:54.012 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2022-11-16 14:28:54.013 [job-0] INFO JobContainer - DataX Writer.Job [doriswriter] do prepare work .
2022-11-16 14:28:54.020 [job-0] INFO JobContainer - jobContainer starts to do split ...
2022-11-16 14:28:54.020 [job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2022-11-16 14:28:54.023 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2022-11-16 14:28:54.023 [job-0] INFO JobContainer - DataX Writer.Job [doriswriter] splits to [1] tasks.
2022-11-16 14:28:54.033 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2022-11-16 14:28:54.036 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2022-11-16 14:28:54.037 [job-0] INFO JobContainer - Running by standalone Mode.
2022-11-16 14:28:54.041 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022-11-16 14:28:54.043 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2022-11-16 14:28:54.043 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2022-11-16 14:28:54.049 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2022-11-16 14:28:54.052 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select taskid,projectid,taskflowid,templateid,template_name,status_task from dwd_universal_tb_task
] jdbcUrl:[jdbc:mysql://localhost:3306/demo?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
Wed Nov 16 14:28:54 GMT+08:00 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2022-11-16 14:28:54.071 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select taskid,projectid,taskflowid,templateid,template_name,status_task from dwd_universal_tb_task
] jdbcUrl:[jdbc:mysql://localhost:3306/demo?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2022-11-16 14:28:54.104 [Thread-1] INFO DorisStreamLoadObserver - Start to join batch data: rows[2] bytes[438] label[datax_doris_writer_c4e08cb9-c157-4689-932f-db34acc45b6f].
2022-11-16 14:28:54.104 [Thread-1] INFO DorisStreamLoadObserver - Executing stream load to: 'http://127.0.0.1:8030/api/demo/dwd_universal_tb_task/_stream_load', size: '441'
2022-11-16 14:28:54.224 [Thread-1] INFO DorisStreamLoadObserver - StreamLoad response :{"Status":"Success","BeginTxnTimeMs":0,"Message":"OK","NumberUnselectedRows":0,"CommitAndPublishTimeMs":17,"Label":"datax_doris_writer_c4e08cb9-c157-4689-932f-db34acc45b6f","LoadBytes":441,"StreamLoadPutTimeMs":1,"NumberTotalRows":2,"WriteDataTimeMs":11,"TxnId":217056,"LoadTimeMs":31,"TwoPhaseCommit":"false","ReadDataTimeMs":0,"NumberLoadedRows":2,"NumberFilteredRows":0}
2022-11-16 14:28:54.225 [Thread-1] INFO DorisWriterManager - Async stream load finished: label[datax_doris_writer_c4e08cb9-c157-4689-932f-db34acc45b6f].
2022-11-16 14:28:54.249 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[201]ms
2022-11-16 14:28:54.250 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2022-11-16 14:29:04.048 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 214 bytes | Speed 21B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-11-16 14:29:04.049 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2022-11-16 14:29:04.049 [job-0] INFO JobContainer - DataX Writer.Job [doriswriter] do post work.
Wed Nov 16 14:29:04 GMT+08:00 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2022-11-16 14:29:04.187 [job-0] INFO DorisWriter$Job - Start to execute preSqls:[select count(1) from dwd_universal_tb_task]. context info:jdbc:mysql://172.16.0.13:9030/demo.
2022-11-16 14:29:04.204 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.
2022-11-16 14:29:04.204 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2022-11-16 14:29:04.204 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /data/datax/hook
2022-11-16 14:29:04.205 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 1 | 1 | 1 | 0.017s | 0.017s | 0.017s
PS Scavenge | 1 | 1 | 1 | 0.007s | 0.007s | 0.007s
2022-11-16 14:29:04.205 [job-0] INFO JobContainer - PerfTrace not enable!
2022-11-16 14:29:04.206 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 214 bytes | Speed 21B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-11-16 14:29:04.206 [job-0] INFO JobContainer -
Task Start Time : 2022-11-16 14:28:53
Task End Time : 2022-11-16 14:29:04
Total Task Duration : 10s
Average Task Throughput : 21B/s
Record Write Speed : 0rec/s
Total Records Read : 2
Total Read/Write Failures : 0