Logstash
Logstash is a server-side data processing pipeline that ingests, transforms, and forwards logs and events. When used with the OpenSearch Observability Stack, Logstash can serve as an alternative or complement to Data Prepper for log ingestion, particularly when you need complex transformations, conditional routing, or have existing Logstash pipelines.
Architecture
Section titled âArchitectureâgraph LR
subgraph Sources
F[Filebeat]
S[Syslog]
K[Kafka]
H[HTTP]
end
subgraph Processing
LS[Logstash]
end
subgraph Observability Stack
OS[OpenSearch]
DP[Data Prepper]
end
F --> LS
S --> LS
K --> LS
H --> LS
LS -->|OpenSearch output| OS
DP -->|traces, logs| OS
Prerequisites
Section titled âPrerequisitesâ- Java 11 or later (Java 17 recommended)
- Logstash 8.x with the OpenSearch output plugin
- Network access to OpenSearch on port 9200
Install Logstash with OpenSearch output
Section titled âInstall Logstash with OpenSearch outputâInstall Logstash
Section titled âInstall Logstashâ# Download and install Logstashwget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-8.9.0-linux-x64.tar.gztar -xzf logstash-oss-with-opensearch-output-plugin-8.9.0-linux-x64.tar.gzcd logstash-8.9.0Or install the plugin into an existing Logstash installation:
bin/logstash-plugin install logstash-output-opensearchDocker deployment
Section titled âDocker deploymentâservices: logstash: image: opensearchproject/logstash-oss-with-opensearch-output-plugin:8.9.0 volumes: - ./pipeline:/usr/share/logstash/pipeline - ./config/logstash.yml:/usr/share/logstash/config/logstash.yml ports: - "5044:5044" # Beats input - "5000:5000" # TCP input - "9600:9600" # Monitoring API environment: LS_JAVA_OPTS: "-Xmx512m -Xms512m"Pipeline configuration
Section titled âPipeline configurationâLogstash pipelines consist of three stages: input, filter, and output.
Basic log pipeline
Section titled âBasic log pipelineâinput { beats { port => 5044 } tcp { port => 5000 codec => json_lines }}
filter { # Parse JSON log bodies if [message] =~ /^\{/ { json { source => "message" target => "parsed" } }
# Add timestamp date { match => ["[parsed][timestamp]", "ISO8601", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" }
# Add environment metadata mutate { add_field => { "environment" => "production" "pipeline" => "logstash" } }}
output { opensearch { hosts => ["https://opensearch:9200"] index => "logs-%{+YYYY.MM.dd}" user => "admin" password => "admin" ssl_certificate_verification => false }}Syslog ingestion
Section titled âSyslog ingestionâinput { syslog { port => 1514 type => "syslog" }}
filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:log_message}" } } date { match => ["syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss"] } mutate { remove_field => ["message"] rename => { "log_message" => "message" } } }}
output { opensearch { hosts => ["https://opensearch:9200"] index => "syslog-%{+YYYY.MM.dd}" user => "admin" password => "admin" ssl_certificate_verification => false }}Conditional routing
Section titled âConditional routingâRoute different log types to different indices:
filter { if [fields][log_type] == "application" { mutate { add_field => { "[@metadata][target_index]" => "app-logs" } } } else if [fields][log_type] == "access" { mutate { add_field => { "[@metadata][target_index]" => "access-logs" } } } else { mutate { add_field => { "[@metadata][target_index]" => "misc-logs" } } }}
output { opensearch { hosts => ["https://opensearch:9200"] index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}" user => "admin" password => "admin" ssl_certificate_verification => false }}OpenSearch output plugin reference
Section titled âOpenSearch output plugin referenceâ| Parameter | Default | Description |
|---|---|---|
hosts | â | Array of OpenSearch endpoints |
index | logstash-%{+YYYY.MM.dd} | Index name pattern |
user | â | Basic auth username |
password | â | Basic auth password |
ssl_certificate_verification | true | Verify SSL certificates |
template_name | logstash | Index template name |
bulk_size | 500 | Number of events per bulk request |
retry_max_interval | 64 | Max seconds between retries |
pipeline | â | Ingest pipeline to apply |
When to use Logstash vs Data Prepper
Section titled âWhen to use Logstash vs Data Prepperâ| Consideration | Logstash | Data Prepper |
|---|---|---|
| Primary protocol | Beats, TCP, Syslog | OTLP, HTTP |
| Transform language | Ruby-based filter DSL | YAML pipeline config |
| Plugin ecosystem | 200+ plugins (inputs, filters, outputs) | Focused on OTel and OpenSearch |
| OTel integration | Limited (requires plugins) | Native OTLP support |
| Trace processing | Not designed for traces | Built-in trace analytics |
| Resource usage | Higher (JVM-based, full pipeline) | Moderate (JVM-based, focused) |
| Best for | Complex log transformations, legacy systems | OTel-native pipelines, trace/log correlation |
Recommendation: Use Data Prepper when your telemetry sources support OpenTelemetry. Use Logstash when you need to ingest from non-OTel sources (syslog, Beats, Kafka), require complex log transformations with Grok patterns, or have existing Logstash pipelines you want to keep.
Monitoring Logstash
Section titled âMonitoring LogstashâLogstash exposes a monitoring API on port 9600:
# Pipeline statscurl -s http://localhost:9600/_node/stats/pipelines | jq .
# Check if Logstash is runningcurl -s http://localhost:9600/ | jq '.status'
# Event processing ratecurl -s http://localhost:9600/_node/stats/pipelines | jq '.pipelines.main.events'Related links
Section titled âRelated linksâ- Infrastructure Monitoring Overview
- Fluentd & Fluent Bit
- Docker
- OpenSearch Logstash documentation â Official OpenSearch Logstash plugin reference