Skip to main content
Version: 2.1

Integrating Vector with Doris

About Vector

Vector is a high-performance observability data pipeline written in Rust, specifically designed for collecting, transforming, and routing logs, metrics, and traces. To better support the Doris ecosystem, we have developed a dedicated Doris Sink component for Vector, enabling efficient data ingestion from various data sources into Doris for analysis.

Installation

Download Installation Package

wget https://apache-doris-releases.oss-cn-beijing.aliyuncs.com/extension/vector-x86_64-unknown-linux-gnu.tar.gz

Build from Source

cd ${Vector_HOME}

## Choose the appropriate option based on your deployment environment. Multiple options are available in the Makefile.
make package-x86_64-unknown-linux-gnu

Configuration Parameters

Doris Sink supports extensive configuration options to meet data writing requirements in different scenarios:

Basic Configuration

ParameterTypeDefaultDescription
typestring-Fixed as doris
inputsarray-List of upstream data source names
endpointsarray<string>-Doris FE HTTP/HTTPS addresses, must include protocol and port, e.g., ["https://fe1:8030"]
databasestring/template-Target database name, supports Template
tablestring/template-Target table name, supports template
label_prefixstring"vector"Stream Load label prefix, final label format is {label_prefix}_{database}_{table}_{timestamp}_{uuid}

Authentication Configuration

ParameterTypeDefaultDescription
auth.strategystring"basic"Authentication strategy, Doris currently only supports Basic Auth
auth.userstring-Doris username
auth.passwordstring-Doris password, can be used with environment variables or secret management systems

Request and Concurrency Configuration

ParameterTypeDefaultDescription
request.concurrencystring/integer"adaptive"Controls concurrency strategy, supports "adaptive", "none" (serial), or a positive integer for concurrency limit
request.timeout_secsinteger60Timeout for a single Stream Load request (seconds)
request.rate_limit_duration_secsinteger1Rate limit time window (seconds)
request.rate_limit_numintegeri64::MAXNumber of requests allowed per time window, default is virtually unlimited
request.retry_attemptsintegerusize::MAXMaximum retry attempts for Tower middleware, default means unlimited retries
request.retry_initial_backoff_secsinteger1Wait time before the first retry (seconds), subsequent retries use Fibonacci backoff
request.retry_max_duration_secsinteger30Maximum wait time for a single retry backoff (seconds)
request.retry_jitter_modestring"full"Retry jitter mode, supports full or none

Adaptive Concurrency (request.adaptive_concurrency, only effective when request.concurrency = "adaptive")

ParameterTypeDefaultDescription
request.adaptive_concurrency.initial_concurrencyinteger1Initial value for adaptive concurrency
request.adaptive_concurrency.max_concurrency_limitinteger200Upper limit for adaptive concurrency to prevent overload
request.adaptive_concurrency.decrease_ratiofloat0.9Reduction ratio used when triggering slowdown
request.adaptive_concurrency.ewma_alphafloat0.4Exponential moving average weight for RTT metrics
request.adaptive_concurrency.rtt_deviation_scalefloat2.5RTT deviation amplification factor, used to ignore normal fluctuations

Encoding and Data Format

Doris Sink uses the encoding block to control event serialization behavior, defaulting to NDJSON (newline-delimited JSON):

ParameterTypeDefaultDescription
encoding.codecstring"json"Serialization encoding, options include json, text, csv, etc.
encoding.timestamp_formatstring-Adjust timestamp output format, supports rfc3339, unix, etc.
encoding.only_fields / encoding.except_fieldsarray<string>-Control field whitelist or blacklist
encoding.framing.methodstringauto-inferredSet when custom framing format is needed, e.g., newline_delimited, character_delimited

Stream Load Headers (headers)

headers is a key-value pair mapping that is passed directly as HTTP headers for Doris Stream Load. You can use all parameters available in stream load headers. Common settings are as follows (all values must be strings):

ParameterTypeDefaultDescription
headers.formatstring"json"Data format, supports json, csv, parquet, etc.
headers.read_json_by_linestring"true"Whether to read JSON line by line (NDJSON)
headers.strip_outer_arraystring"false"Whether to remove the outermost array
headers.column_separatorstring-CSV column separator (effective when format = csv)
headers.columnsstring-Column order for CSV/JSON mapping, e.g., timestamp,client_ip,status_code
headers.wherestring-Stream Load where filter condition

Batch Configuration

ParameterTypeDefaultDescription
batch.max_bytesinteger10485760Maximum bytes per batch (10 MB)
batch.max_eventsinteger/nullnullMaximum events per batch, default is unlimited, primarily controlled by byte count
batch.timeout_secsfloat1Maximum wait time for a batch (seconds)

Reliability and Security Configuration

ParameterTypeDefaultDescription
max_retriesinteger-1Maximum retries at Sink level, -1 means unlimited
log_requestbooleanfalseWhether to print each Stream Load request and response (enable as needed in production)
compression-Not supported-
distribution.retry_initial_backoff_secsinteger1Initial backoff time for endpoint health check recovery (seconds)
distribution.retry_max_duration_secsinteger3600Maximum health check backoff duration (seconds)
tls.verify_certificatebooleantrueEnable/disable upstream certificate verification
tls.verify_hostnamebooleantrueEnable/disable hostname verification
tls.ca_file / tls.crt_file / tls.key_file / tls.key_pass / tls.alpn_protocols / tls.server_namevarious-Standard Vector TLS client configuration options for custom CA, mutual authentication, or SNI
acknowledgements.enabledbooleanfalseEnable end-to-end acknowledgements for use with Sources that support acknowledgements

Usage Examples

TEXT Log Collection Example

This example demonstrates TEXT log collection using Doris FE logs as an example.

1. Data

FE log files are typically located at fe/log/fe.log under the Doris installation directory. This is a typical Java application log containing fields such as timestamp, log level, thread name, code position, and log message. In addition to regular logs, there are exception logs with stack traces that span multiple lines. Log collection and storage need to combine the main log and stack trace into a single log entry.

2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]

2. Create Table

The table structure includes fields for log generation time, collection time, hostname, log file path, log type, log level, thread name, code position, and log message.

CREATE TABLE `doris_log` (
`log_time` datetime NULL COMMENT 'log content time',
`collect_time` datetime NULL COMMENT 'log agent collect time',
`host` text NULL COMMENT 'hostname or ip',
`path` text NULL COMMENT 'log file path',
`type` text NULL COMMENT 'log type',
`level` text NULL COMMENT 'log level',
`thread` text NULL COMMENT 'log thread',
`position` text NULL COMMENT 'log code position',
`message` text NULL COMMENT 'log message',
INDEX idx_host (`host`) USING INVERTED COMMENT '',
INDEX idx_path (`path`) USING INVERTED COMMENT '',
INDEX idx_type (`type`) USING INVERTED COMMENT '',
INDEX idx_level (`level`) USING INVERTED COMMENT '',
INDEX idx_thread (`thread`) USING INVERTED COMMENT '',
INDEX idx_position (`position`) USING INVERTED COMMENT '',
INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`log_time`)
COMMENT 'OLAP'
PARTITION BY RANGE(`log_time`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"compaction_policy" = "time_series"
);

3. Vector Configuration

# ==================== Sources ====================
[sources.fe_log_input]
type = "file"
include = ["/path/fe/log/fe.log"]
start_at_beginning = true
max_line_bytes = 102400
ignore_older_secs = 0
fingerprint.strategy = "device_and_inode"

# Multi-line log handling - corresponds to Logstash's multiline codec
# Lines starting with a timestamp are new logs, other lines are merged with the previous line (handling stack traces)
multiline.start_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}"
multiline.mode = "halt_before"
multiline.condition_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}"
multiline.timeout_ms = 10000

# ==================== Transforms ====================
# Use grok to parse log content
[transforms.parse_log]
inputs = ["fe_log_input"]
type = "remap"
source = '''
# Add type field (corresponds to Logstash's add_field)
.type = "fe.log"

# Add collect_time (corresponds to Logstash's @timestamp)
# Use Asia/Shanghai timezone, consistent with log_time
.collect_time = format_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Shanghai")

# Parse log format: 2024-01-01 12:00:00,123 INFO (thread-name) [position] message
# Use (?s) to enable DOTALL mode, allowing .* to match newlines (handling multi-line logs)
parsed, err = parse_regex(.message, r'(?s)^(?P<log_time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>[A-Z]+) \((?P<thread>[^\)]+)\) \[(?P<position>[^\]]+)\] (?P<content>.*)')

# Extract parsed fields
if err == null {
.log_time = parsed.log_time
.level = parsed.level
.thread = parsed.thread
.position = parsed.position
# Keep the complete original message (including multi-line stack traces)
} else {
# If parsing fails, set default values to avoid NULL (avoid partition errors)
.log_time = .collect_time
.level = "UNKNOWN"
.thread = ""
.position = ""
}

# Extract host and path (Vector automatically adds these metadata)
.host = .host
.path = .file
'''

# ==================== Sinks ====================
[sinks.doris]
inputs = ["parse_log"]
type = "doris"
endpoints = ["http://fe_ip:http_port"]
database = "log_db"
table = "doris_log"
label_prefix = "vector_fe_log"
log_request = true

[sinks.doris.auth]
user = "root"
password = ""
strategy = "basic"

[sinks.doris.encoding]
codec = "json"

[sinks.doris.framing]
method = "newline_delimited"

[sinks.doris.request]
concurrency = 10

[sinks.doris.headers]
format = "json"
read_json_by_line = "true"
load_to_single_tablet = "true"

[sinks.doris.batch]
max_events = 10000
timeout_secs = 3
max_bytes = 100000000

4. Run Vector


${VECTOR_HOME}/bin/vector --config vector_fe_log.toml

# When log_request is true, the log will output the request parameters and response results of each Stream Load
2025-11-19T10:14:40.822071Z INFO sink{component_kind="sink" component_id=doris component_type=doris}:request{request_id=82}: vector::sinks::doris::service: Doris stream load response received. status_code=200 OK stream_load_status=Successful response={
"TxnId": 169721,
"Label": "vector_fe_log_log_db_doris_log_1763547280791_e2e619ee-4067-4fe8-974e-9f35f0d4e48e",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 10,
"NumberLoadedRows": 10,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 7301,
"LoadTimeMs": 30,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 8,
"ReceiveDataTimeMs": 2,
"CommitAndPublishTimeMs": 18
} internal_log_rate_limit=true

JSON Log Collection Example

This example demonstrates JSON log collection using GitHub events archive data.

1. Data

GitHub events archive contains archived data of GitHub user operation events in JSON format. You can download it from https://www.gharchive.org/, for example, downloading data from 15:00 on January 1, 2024.

wget https://data.gharchive.org/2024-01-01-15.json.gz

Below is a sample data entry. The actual data is one entry per line; formatting is added here for display purposes.

{
"id": "37066529221",
"type": "PushEvent",
"actor": {
"id": 46139131,
"login": "Bard89",
"display_login": "Bard89",
"gravatar_id": "",
"url": "https://api.github.com/users/Bard89",
"avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
},
"repo": {
"id": 780125623,
"name": "Bard89/talk-to-me",
"url": "https://api.github.com/repos/Bard89/talk-to-me"
},
"payload": {
"repository_id": 780125623,
"push_id": 17799451992,
"size": 1,
"distinct_size": 1,
"ref": "refs/heads/add_mvcs",
"head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
"before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
},
"public": true,
"created_at": "2024-04-01T23:00:00Z"
}

2. Create Doris Table

CREATE DATABASE log_db;
USE log_db;

CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor` VARIANT,
`repo` VARIANT,
`payload` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor` (`actor`) USING INVERTED,
INDEX `idx_host` (`repo`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"inverted_index_storage_format"= "v2",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);

3. Vector Configuration

# ==================== Sources ====================
[sources.github_events_reload]
type = "file"
include = ["/path/2024-01-01-15.json"]
read_from = "beginning"
ignore_checkpoints = true
max_line_bytes = 10485760
ignore_older_secs = 0
line_delimiter = "\n"
fingerprint.strategy = "device_and_inode"

# ==================== Transforms ====================
# Parse JSON format GitHub Events data, VARIANT type can directly store nested objects
[transforms.parse_json]
inputs = ["github_events_reload"]
type = "remap"
source = '''
# Parse JSON data (each line is a complete JSON object)
. = parse_json!(.message)

# Convert payload field to JSON string (TEXT type)
.payload = encode_json(.payload)

# Keep only the fields needed for the table
. = {
"created_at": .created_at,
"id": .id,
"type": .type,
"public": .public,
"actor": .actor,
"repo": .repo,
"payload": .payload
}
'''

# ==================== Sinks ====================
[sinks.doris]
inputs = ["parse_json"]
type = "doris"
endpoints = ["http://fe_ip:http_port"]
database = "log_db"
table = "github_events"
label_prefix = "vector_github_events"
log_request = true

[sinks.doris.auth]
user = "root"
password = ""
strategy = "basic"

[sinks.doris.encoding]
codec = "json"

[sinks.doris.framing]
method = "newline_delimited"

[sinks.doris.request]
concurrency = 10

[sinks.doris.headers]
format = "json"
read_json_by_line = "true"
load_to_single_tablet = "true"

[sinks.doris.batch]
max_events = 10000
timeout_secs = 3
max_bytes = 100000000

Start Vector

Use the following command to start the Vector service:

vector --config vector_config.toml