メインコンテンツまでスキップ
バージョン: 2.1

VectorとDorisの統合

Vectorについて

VectorはRustで書かれた高性能なオブザーバビリティデータパイプラインで、ログ、メトリクス、トレースの収集、変換、ルーティングを目的として特別に設計されています。Dorisエコシステムをより良くサポートするため、私たちはVector用の専用Doris Sinkコンポーネントを開発し、様々なデータソースからDorisへの効率的なデータ取り込みを可能にして分析を行えるようにしました。

インストール

インストールパッケージのダウンロード

wget https://apache-doris-releases.oss-cn-beijing.aliyuncs.com/extension/vector-x86_64-unknown-linux-gnu.tar.gz

ソースからのビルド

cd ${Vector_HOME}

## Choose the appropriate option based on your deployment environment. Multiple options are available in the Makefile.
make package-x86_64-unknown-linux-gnu

設定パラメータ

Doris Sinkは、様々なシナリオでのデータ書き込み要件を満たすため、豊富な設定オプションをサポートしています:

基本設定

パラメータデフォルト説明
typestring-dorisに固定
inputsarray-上流データソース名のリスト
endpointsarray<string>-Doris FE HTTP/HTTPSアドレス、プロトコルとポートを含む必要があります(例:["https://fe1:8030"]
databasestring/template-ターゲットデータベース名、Templateをサポート
tablestring/template-ターゲットTable名、テンプレートをサポート
label_prefixstring"vector"Stream Loadラベルプレフィックス、最終ラベル形式は{label_prefix}_{database}_{table}_{timestamp}_{uuid}

認証設定

パラメータデフォルト説明
auth.strategystring"basic"認証戦略、Dorisは現在Basic Authのみをサポート
auth.userstring-Dorisユーザー名
auth.passwordstring-Dorisパスワード、環境変数やシークレット管理システムと組み合わせて使用可能

リクエストと同時実行設定

パラメータデフォルト説明
request.concurrencystring/integer"adaptive"同時実行戦略を制御、"adaptive""none"(直列)、または正の整数で同時実行制限をサポート
request.timeout_secsinteger60単一のStream Loadリクエストのタイムアウト(秒)
request.rate_limit_duration_secsinteger1レート制限時間窓(秒)
request.rate_limit_numintegeri64::MAX時間窓あたりの許可リクエスト数、デフォルトは事実上無制限
request.retry_attemptsintegerusize::MAXTowerミドルウェアの最大リトライ試行回数、デフォルトは無制限リトライ
request.retry_initial_backoff_secsinteger1最初のリトライ前の待機時間(秒)、以降のリトライはフィボナッチバックオフを使用
request.retry_max_duration_secsinteger30単一リトライバックオフの最大待機時間(秒)
request.retry_jitter_modestring"full"リトライジッターモード、fullまたはnoneをサポート

適応的同時実行(request.adaptive_concurrencyrequest.concurrency = "adaptive"の場合のみ有効)

パラメータデフォルト説明
request.adaptive_concurrency.initial_concurrencyinteger1適応的同時実行の初期値
request.adaptive_concurrency.max_concurrency_limitinteger200適応的同時実行の上限、過負荷を防ぐため
request.adaptive_concurrency.decrease_ratiofloat0.9速度低下をトリガーする際に使用される削減比率
request.adaptive_concurrency.ewma_alphafloat0.4RTTメトリクスの指数移動平均重み
request.adaptive_concurrency.rtt_deviation_scalefloat2.5RTT偏差増幅係数、正常な変動を無視するために使用

エンコーディングとデータ形式

Doris Sinkはencodingブロックを使用してイベントシリアライゼーション動作を制御し、デフォルトはNDJSON(改行区切りJSON)です:

パラメータデフォルト説明
encoding.codecstring"json"シリアライゼーションエンコーディング、オプションはjsontextcsvなど
encoding.timestamp_formatstring-タイムスタンプ出力形式の調整、rfc3339unixなどをサポート
encoding.only_fields / encoding.except_fieldsarray<string>-フィールドホワイトリストまたはブラックリストの制御
encoding.framing.methodstringauto-inferredカスタムフレーミング形式が必要な場合に設定(例:newline_delimitedcharacter_delimited

Stream Loadヘッダー(headers

headersは、Doris Stream LoadのHTTPヘッダーとして直接渡されるキー・バリューペアのマッピングです。stream loadヘッダーで利用可能なすべてのパラメータを使用できます。 一般的な設定は以下の通りです(すべての値は文字列である必要があります):

パラメータデフォルト説明
headers.formatstring"json"データ形式、jsoncsvparquetなどをサポート
headers.read_json_by_linestring"true"JSONを1行ずつ読み取るか(NDJSON)
headers.strip_outer_arraystring"false"最外側の配列を削除するか
headers.column_separatorstring-CSV列区切り文字(format = csvの場合に有効)
headers.columnsstring-CSV/JSONマッピングの列順序(例:timestamp,client_ip,status_code
headers.wherestring-Stream Load whereフィルタ条件

バッチ設定

パラメータデフォルト説明
batch.max_bytesinteger10485760バッチあたりの最大バイト数(10 MB)
batch.max_eventsinteger/nullnullバッチあたりの最大イベント数、デフォルトは無制限、主にバイト数で制御
batch.timeout_secsfloat1バッチの最大待機時間(秒)

信頼性とセキュリティ設定

パラメータデフォルト説明
max_retriesinteger-1Sinkレベルでの最大リトライ数、-1は無制限
log_requestbooleanfalse各Stream Loadリクエストとレスポンスを出力するか(本番環境では必要に応じて有効化)
compression-Not supported-
distribution.retry_initial_backoff_secsinteger1エンドポイントヘルスチェック復旧の初期バックオフ時間(秒)
distribution.retry_max_duration_secsinteger3600最大ヘルスチェックバックオフ期間(秒)
tls.verify_certificatebooleantrue上流証明書検証の有効/無効
tls.verify_hostnamebooleantrueホスト名検証の有効/無効
tls.ca_file / tls.crt_file / tls.key_file / tls.key_pass / tls.alpn_protocols / tls.server_namevarious-カスタムCA、相互認証、またはSNI用の標準Vector TLSクライアント設定オプション
acknowledgements.enabledbooleanfalseacknowledgementをサポートするSourcesで使用するためのエンドツーエンドacknowledgementsを有効化

使用例

TEXTログ収集例

この例では、Doris FEログを例としてTEXTログ収集を説明します。

1. データ

FEログファイルは通常、Dorisインストールディレクトリ下のfe/log/fe.logに配置されます。これは、タイムスタンプ、ログレベル、スレッド名、コード位置、ログメッセージなどのフィールドを含む典型的なJavaアプリケーションログです。通常のログに加えて、複数行にわたるスタックトレースを含む例外ログもあります。ログ収集と保存では、メインログとスタックトレースを単一のログエントリに結合する必要があります。

2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]

2. Create Table

Table構造には、ログ生成時刻、収集時刻、hostname、ログファイルパス、ログタイプ、ログレベル、thread名、コード位置、およびログメッセージのフィールドが含まれます。

CREATE TABLE `doris_log` (
`log_time` datetime NULL COMMENT 'log content time',
`collect_time` datetime NULL COMMENT 'log agent collect time',
`host` text NULL COMMENT 'hostname or ip',
`path` text NULL COMMENT 'log file path',
`type` text NULL COMMENT 'log type',
`level` text NULL COMMENT 'log level',
`thread` text NULL COMMENT 'log thread',
`position` text NULL COMMENT 'log code position',
`message` text NULL COMMENT 'log message',
INDEX idx_host (`host`) USING INVERTED COMMENT '',
INDEX idx_path (`path`) USING INVERTED COMMENT '',
INDEX idx_type (`type`) USING INVERTED COMMENT '',
INDEX idx_level (`level`) USING INVERTED COMMENT '',
INDEX idx_thread (`thread`) USING INVERTED COMMENT '',
INDEX idx_position (`position`) USING INVERTED COMMENT '',
INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`log_time`)
COMMENT 'OLAP'
PARTITION BY RANGE(`log_time`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"compaction_policy" = "time_series"
);

3. Vector設定

# ==================== Sources ====================
[sources.fe_log_input]
type = "file"
include = ["/path/fe/log/fe.log"]
start_at_beginning = true
max_line_bytes = 102400
ignore_older_secs = 0
fingerprint.strategy = "device_and_inode"

# Multi-line log handling - corresponds to Logstash's multiline codec
# Lines starting with a timestamp are new logs, other lines are merged with the previous line (handling stack traces)
multiline.start_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}"
multiline.mode = "halt_before"
multiline.condition_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}"
multiline.timeout_ms = 10000

# ==================== Transforms ====================
# Use grok to parse log content
[transforms.parse_log]
inputs = ["fe_log_input"]
type = "remap"
source = '''
# Add type field (corresponds to Logstash's add_field)
.type = "fe.log"

# Add collect_time (corresponds to Logstash's @timestamp)
# Use Asia/Shanghai timezone, consistent with log_time
.collect_time = format_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Shanghai")

# Parse log format: 2024-01-01 12:00:00,123 INFO (thread-name) [position] message
# Use (?s) to enable DOTALL mode, allowing .* to match newlines (handling multi-line logs)
parsed, err = parse_regex(.message, r'(?s)^(?P<log_time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>[A-Z]+) \((?P<thread>[^\)]+)\) \[(?P<position>[^\]]+)\] (?P<content>.*)')

# Extract parsed fields
if err == null {
.log_time = parsed.log_time
.level = parsed.level
.thread = parsed.thread
.position = parsed.position
# Keep the complete original message (including multi-line stack traces)
} else {
# If parsing fails, set default values to avoid NULL (avoid partition errors)
.log_time = .collect_time
.level = "UNKNOWN"
.thread = ""
.position = ""
}

# Extract host and path (Vector automatically adds these metadata)
.host = .host
.path = .file
'''

# ==================== Sinks ====================
[sinks.doris]
inputs = ["parse_log"]
type = "doris"
endpoints = ["http://fe_ip:http_port"]
database = "log_db"
table = "doris_log"
label_prefix = "vector_fe_log"
log_request = true

[sinks.doris.auth]
user = "root"
password = ""
strategy = "basic"

[sinks.doris.encoding]
codec = "json"

[sinks.doris.framing]
method = "newline_delimited"

[sinks.doris.request]
concurrency = 10

[sinks.doris.headers]
format = "json"
read_json_by_line = "true"
load_to_single_tablet = "true"

[sinks.doris.batch]
max_events = 10000
timeout_secs = 3
max_bytes = 100000000

4. Vectorを実行


${VECTOR_HOME}/bin/vector --config vector_fe_log.toml

# When log_request is true, the log will output the request parameters and response results of each Stream Load
2025-11-19T10:14:40.822071Z INFO sink{component_kind="sink" component_id=doris component_type=doris}:request{request_id=82}: vector::sinks::doris::service: Doris stream load response received. status_code=200 OK stream_load_status=Successful response={
"TxnId": 169721,
"Label": "vector_fe_log_log_db_doris_log_1763547280791_e2e619ee-4067-4fe8-974e-9f35f0d4e48e",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 10,
"NumberLoadedRows": 10,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 7301,
"LoadTimeMs": 30,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 8,
"ReceiveDataTimeMs": 2,
"CommitAndPublishTimeMs": 18
} internal_log_rate_limit=true

JSON ログ収集の例

この例では、GitHubイベントアーカイブデータを使用したJSONログ収集を説明します。

1. データ

GitHubイベントアーカイブには、GitHubユーザーの操作イベントのアーカイブデータがJSON形式で含まれています。https://www.gharchive.org/ からダウンロードできます。例えば、2024年1月1日の15:00のデータをダウンロードします。

wget https://data.gharchive.org/2024-01-01-15.json.gz

以下はサンプルデータエントリです。実際のデータは1行につき1つのエントリとなっています。ここではフォーマットは表示目的で追加されています。

{
"id": "37066529221",
"type": "PushEvent",
"actor": {
"id": 46139131,
"login": "Bard89",
"display_login": "Bard89",
"gravatar_id": "",
"url": "https://api.github.com/users/Bard89",
"avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
},
"repo": {
"id": 780125623,
"name": "Bard89/talk-to-me",
"url": "https://api.github.com/repos/Bard89/talk-to-me"
},
"payload": {
"repository_id": 780125623,
"push_id": 17799451992,
"size": 1,
"distinct_size": 1,
"ref": "refs/heads/add_mvcs",
"head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
"before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
},
"public": true,
"created_at": "2024-04-01T23:00:00Z"
}

2. DorisTableの作成

CREATE DATABASE log_db;
USE log_db;

CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor` VARIANT,
`repo` VARIANT,
`payload` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor` (`actor`) USING INVERTED,
INDEX `idx_host` (`repo`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"inverted_index_storage_format"= "v2",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);

3. Vector設定

# ==================== Sources ====================
[sources.github_events_reload]
type = "file"
include = ["/path/2024-01-01-15.json"]
read_from = "beginning"
ignore_checkpoints = true
max_line_bytes = 10485760
ignore_older_secs = 0
line_delimiter = "\n"
fingerprint.strategy = "device_and_inode"

# ==================== Transforms ====================
# Parse JSON format GitHub Events data, VARIANT type can directly store nested objects
[transforms.parse_json]
inputs = ["github_events_reload"]
type = "remap"
source = '''
# Parse JSON data (each line is a complete JSON object)
. = parse_json!(.message)

# Convert payload field to JSON string (TEXT type)
.payload = encode_json(.payload)

# Keep only the fields needed for the table
. = {
"created_at": .created_at,
"id": .id,
"type": .type,
"public": .public,
"actor": .actor,
"repo": .repo,
"payload": .payload
}
'''

# ==================== Sinks ====================
[sinks.doris]
inputs = ["parse_json"]
type = "doris"
endpoints = ["http://fe_ip:http_port"]
database = "log_db"
table = "github_events"
label_prefix = "vector_github_events"
log_request = true

[sinks.doris.auth]
user = "root"
password = ""
strategy = "basic"

[sinks.doris.encoding]
codec = "json"

[sinks.doris.framing]
method = "newline_delimited"

[sinks.doris.request]
concurrency = 10

[sinks.doris.headers]
format = "json"
read_json_by_line = "true"
load_to_single_tablet = "true"

[sinks.doris.batch]
max_events = 10000
timeout_secs = 3
max_bytes = 100000000

Start Vector

次のコマンドを使用してVectorサービスを開始します:

vector --config vector_config.toml