Skip to content

Data Prepper

Data Prepper is the pipeline engine that receives OTLP telemetry, routes events by signal type, applies processors, and writes to OpenSearch indices. The default configuration handles traces, logs, and service map generation.

  • OpenSearch Observability Stack running (see Quickstart)
  • OTel Collector configured to export to Data Prepper on port 21890

The pipeline configuration file (pipelines.yaml) defines a directed acyclic graph of named pipelines. Each pipeline has a source, optional processors, optional routing rules, and one or more sinks.

The entry-point pipeline receives all OTLP data and routes events by type:

otlp-pipeline:
workers: 5
delay: 10
source:
otlp:
ssl: false
port: 21890
router:
- "LOG": '/eventType == "LOG"'
- "TRACE": '/eventType == "TRACE"'
route:
- LOG: [otel-logs-pipeline]
- TRACE: [otel-traces-pipeline]
ParameterDescription
workersNumber of threads processing events in parallel
delayMilliseconds to wait before flushing a partial batch
source.otlp.portgRPC port for receiving OTLP data (default: 21890)
routerConditional expressions that classify events by type
routeMaps router labels to downstream pipeline names

The log pipeline receives routed log events, copies the timestamp field, and writes to the log analytics index:

otel-logs-pipeline:
workers: 5
delay: 10
source:
pipeline:
name: "otlp-pipeline"
processor:
- copy_values:
entries:
- from_key: "time"
to_key: "@timestamp"
sink:
- opensearch:
hosts: ["https://opensearch:9200"]
username: "admin"
password: "${OPENSEARCH_PASSWORD}"
insecure: true
index_type: log-analytics-plain

The copy_values processor maps the OTel time field to the @timestamp field that OpenSearch log analytics expects.

Traces fan out to two downstream pipelines for parallel processing:

otel-traces-pipeline:
source:
pipeline:
name: "otlp-pipeline"
sink:
- pipeline:
name: "traces-raw-pipeline"
- pipeline:
name: "service-map-pipeline"

Processes raw span data and writes to the trace analytics index:

traces-raw-pipeline:
source:
pipeline:
name: "otel-traces-pipeline"
processor:
- otel_traces:
sink:
- opensearch:
hosts: ["https://opensearch:9200"]
username: "admin"
password: "${OPENSEARCH_PASSWORD}"
insecure: true
index_type: trace-analytics-plain-raw

The otel_traces processor converts OpenTelemetry span data into the document format that the OpenSearch trace analytics UI expects.

Aggregates span relationships to build a service dependency map:

service-map-pipeline:
source:
pipeline:
name: "otel-traces-pipeline"
processor:
- otel_apm_service_map:
group_by_attributes:
- telemetry.sdk.language
window_duration: "10s"
route:
- SERVICE_MAP: [opensearch-service-map-sink]
- METRIC: [prometheus-service-map-sink]
sink:
- opensearch:
name: opensearch-service-map-sink
hosts: ["https://opensearch:9200"]
username: "admin"
password: "${OPENSEARCH_PASSWORD}"
insecure: true
index_type: otel-v2-apm-service-map
- prometheus_remote_write:
name: prometheus-service-map-sink
url: "http://prometheus:9090/api/v1/write"
max_events: 500
flush_timeout: "5s"
ParameterDescription
group_by_attributesAttributes to include as dimensions in the service map (e.g., telemetry.sdk.language)
window_durationTime window for aggregating service relationships
max_eventsMaximum events per Prometheus remote write batch
flush_timeoutMaximum wait before flushing a partial Prometheus batch

The service map pipeline has a dual sink: it writes service relationship documents to OpenSearch for the service map UI, and exports derived metrics to Prometheus via remote write.

Some processors like otel_apm_service_map and the Prometheus sink require explicit opt-in via data-prepper-config.yaml:

ssl: false
experimental:
enabled_plugins:
processor:
- otel_apm_service_map
sink:
- prometheus
Index TypePipelineDescription
log-analytics-plainotel-logs-pipelinePlain-text log events with @timestamp
trace-analytics-plain-rawtraces-raw-pipelineRaw span documents for trace analytics
otel-v2-apm-service-mapservice-map-pipelineService dependency relationships

Insert processors into the processor list of any pipeline. Processors execute in order.

Common examples:

# Filter events by attribute
processor:
- filter:
condition: '/severity_text == "DEBUG"'
action: drop
# Add a static field
processor:
- add_entries:
entries:
- key: "environment"
value: "production"
# Truncate long fields
processor:
- truncate:
entries:
- source_key: "body"
length: 10000

Chain multiple processors to build a transformation pipeline:

processor:
- copy_values:
entries:
- from_key: "time"
to_key: "@timestamp"
- add_entries:
entries:
- key: "cluster"
value: "prod-us-east-1"
- filter:
condition: '/severity_text == "DEBUG"'
action: drop

Use environment variable substitution to avoid hardcoding credentials:

sink:
- opensearch:
hosts: ["${OPENSEARCH_HOSTS}"]
username: "${OPENSEARCH_USERNAME}"
password: "${OPENSEARCH_PASSWORD}"

Set these in your deployment environment or Docker Compose file:

Terminal window
export OPENSEARCH_HOSTS="https://opensearch:9200"
export OPENSEARCH_USERNAME="admin"
export OPENSEARCH_PASSWORD="your-secure-password"