Skip to content

Ingest API

The OpenSearch Bulk API provides a direct ingestion path for sending telemetry data without going through the OTel Collector and Data Prepper pipeline. Use it when you have pre-processed data or need a lightweight integration.

ScenarioRecommended approach
Application traces and logs via OTel SDKsOTel Collector + Data Prepper pipeline
Pre-processed or transformed dataDirect Ingest API
Migrating historical dataDirect Ingest API
Custom log formats from legacy systemsDirect Ingest API
Infrastructure metrics from PrometheusOTel Collector + Data Prepper pipeline
One-off data imports or backfillsDirect Ingest API

The OTel pipeline is preferred for production application telemetry because it handles batching, retries, schema transformation, and service map generation automatically. Use the Ingest API when your data is already in the correct format or when you need direct control over the indexing process.

  • OpenSearch cluster accessible on port 9200
  • Valid credentials with write permissions to target indices

The Observability Stack uses specific index types. When ingesting directly, target the correct index pattern.

SignalIndex PatternDescription
Tracesotel-v1-apm-span-*Raw span documents
Logsss4o_logs-*Simple Schema for Observability log documents
Metricsss4o_metrics-*Simple Schema for Observability metric documents

Send span documents to the trace analytics index:

Terminal window
curl -X POST "https://localhost:9200/otel-v1-apm-span-000001/_bulk" \
-H "Content-Type: application/x-ndjson" \
-u "admin:${OPENSEARCH_PASSWORD}" \
--insecure \
-d '
{"index": {}}
{"traceId":"abc123","spanId":"span456","traceState":"","parentSpanId":"","name":"HTTP GET /api/users","kind":"SPAN_KIND_SERVER","startTime":"2025-01-15T10:30:00Z","endTime":"2025-01-15T10:30:00.150Z","durationInNanos":150000000,"serviceName":"user-service","status.code":0,"resource.attributes.service.name":"user-service","resource.attributes.telemetry.sdk.language":"python"}
'

Send log documents to the log analytics index:

Terminal window
curl -X POST "https://localhost:9200/ss4o_logs-application-000001/_bulk" \
-H "Content-Type: application/x-ndjson" \
-u "admin:${OPENSEARCH_PASSWORD}" \
--insecure \
-d '
{"index": {}}
{"@timestamp":"2025-01-15T10:30:00Z","severity_text":"ERROR","severity_number":17,"body":"Connection timeout to database host db-primary:5432","resource.attributes.service.name":"user-service","resource.attributes.host.name":"pod-abc123","attributes.error.type":"ConnectionTimeout","attributes.db.system":"postgresql"}
'

Send metric documents to the metrics index:

Terminal window
curl -X POST "https://localhost:9200/ss4o_metrics-application-000001/_bulk" \
-H "Content-Type: application/x-ndjson" \
-u "admin:${OPENSEARCH_PASSWORD}" \
--insecure \
-d '
{"index": {}}
{"@timestamp":"2025-01-15T10:30:00Z","name":"http.server.request.duration","kind":"HISTOGRAM","unit":"s","value":0.152,"resource.attributes.service.name":"user-service","attributes.http.request.method":"GET","attributes.http.route":"/api/users","attributes.http.response.status_code":200}
'

The Bulk API expects newline-delimited JSON (NDJSON). Each operation requires two lines: an action line and a document line.

{"index": {"_index": "target-index"}}
{"field1": "value1", "field2": "value2"}
{"index": {"_index": "target-index"}}
{"field1": "value3", "field2": "value4"}

Rules for the Bulk API:

  • Each line must be valid JSON terminated by a newline (\n).
  • The request body must end with a newline.
  • Do not include pretty-printed JSON — each document must be a single line.
  • Set Content-Type: application/x-ndjson.
Batch SizeUse Case
100-500 documentsLow-volume or latency-sensitive ingestion
500-2,000 documentsGeneral-purpose bulk ingestion
2,000-5,000 documentsHigh-volume backfills and migrations
5-15 MB per requestSize-based limit (stay under 15 MB)

Monitor the response for errors. The Bulk API returns a per-document status, so partial failures are possible:

Terminal window
# Check for errors in bulk response
curl -s -X POST "https://localhost:9200/_bulk" \
-H "Content-Type: application/x-ndjson" \
-u "admin:${OPENSEARCH_PASSWORD}" \
--insecure \
--data-binary @documents.ndjson | jq '.errors, .items[] | select(.index.error)'

For production workloads, use an OpenSearch client library instead of curl:

from opensearchpy import OpenSearch, helpers
client = OpenSearch(
hosts=["https://localhost:9200"],
http_auth=("admin", os.environ["OPENSEARCH_PASSWORD"]),
use_ssl=True,
verify_certs=False,
)
documents = [
{
"_index": "ss4o_logs-application-000001",
"_source": {
"@timestamp": "2025-01-15T10:30:00Z",
"severity_text": "ERROR",
"body": "Connection timeout",
"resource.attributes.service.name": "user-service",
},
},
]
success, errors = helpers.bulk(client, documents)
print(f"Indexed {success} documents, {len(errors)} errors")