Streams API
The Streams module provides real-time event collection, transformation, validation, and forwarding. It supports standard event types (track, identify, page, group) and includes event source management, schema contracts, JavaScript transformations, and forwarding rules.
Stream events are ingested through a dedicated ingest endpoint that authenticates via an event source key. Management endpoints for sources, contracts, transformations, and forwarding rules live under the standard /api/v1/workspaces/{id} namespace.
Endpoint Groups
| Group | Description |
|---|---|
| Event Ingestion | Track, identify, page, screen, and group calls |
| Event Sources | Manage SDK authentication keys for stream ingestion |
| Contracts | Define and enforce event schemas |
| Transformations | JavaScript event transformations |
| Forwarding Rules | Route events to destinations |
| Volume & Live | Event volume metrics and live stream |
Event Ingestion
Event ingestion endpoints live on the dedicated stream ingest service (not the control-plane /api/v1 namespace). Authentication uses an event source key passed via the X-Write-Key header, the HTTP Basic Auth username, or a ?writeKey= query parameter — the workspace context is derived from that key.
| Method | Path | Description |
|---|---|---|
POST | /v1/track | Record a user action |
POST | /v1/identify | Set user traits |
POST | /v1/page | Record a page view |
POST | /v1/screen | Record a mobile screen view |
POST | /v1/group | Associate a user with a group |
POST | /v1/events | Unified endpoint (event type from body) |
POST | /v1/batch | Submit a batch of events in a single request |
Track Event
POST /v1/track
Records a user action or event.
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
event | string | Yes | Event name (e.g., purchase_completed) |
user_id | string | Conditional | Identified user ID (required if no anonymous_id) |
anonymous_id | string | Conditional | Anonymous user ID (required if no user_id) |
properties | object | No | Event-specific properties |
timestamp | string (ISO 8601) | No | Event timestamp (defaults to server time) |
context | object | No | Context metadata (device, location, etc.) |
{
"event": "purchase_completed",
"user_id": "user-123",
"properties": {
"order_id": "ORD-456",
"total": 99.99,
"currency": "USD",
"items": ["SKU-001", "SKU-002"]
},
"timestamp": "2024-01-15T15:00:00Z",
"context": {
"ip": "203.0.113.1",
"user_agent": "Mozilla/5.0..."
}
}Example
curl -X POST https://stream.composable.zeotap.com/v1/track \
-H "X-Write-Key: <event-source-key>" \
-H "Content-Type: application/json" \
-d '{
"event": "purchase_completed",
"user_id": "user-123",
"properties": {
"order_id": "ORD-456",
"total": 99.99
}
}'Identify User
POST /v1/identify
Associates traits with a user identity.
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
user_id | string | Conditional | User ID |
anonymous_id | string | Conditional | Anonymous ID |
traits | object | No | User traits to set |
{
"user_id": "user-123",
"traits": {
"email": "alice@example.com",
"name": "Alice Smith",
"plan": "enterprise",
"signup_date": "2024-01-01"
}
}Page View
POST /v1/page
Records a page view.
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
user_id | string | Conditional | User ID |
anonymous_id | string | Conditional | Anonymous ID |
name | string | No | Page name |
properties | object | No | Page properties (url, title, referrer, etc.) |
{
"user_id": "user-123",
"name": "Pricing",
"properties": {
"url": "https://example.com/pricing",
"title": "Pricing - Example App",
"referrer": "https://google.com"
}
}Group Association
POST /v1/group
Associates a user with a group (company, team, account).
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
user_id | string | Conditional | User ID |
anonymous_id | string | Conditional | Anonymous ID |
group_id | string | Yes | Group identifier |
traits | object | No | Group traits |
{
"user_id": "user-123",
"group_id": "company-456",
"traits": {
"name": "Acme Corp",
"industry": "Technology",
"employee_count": 250
}
}Event Sources
Event sources authenticate stream ingestion requests. Each source produces a single key that is scoped to a workspace. Source keys are public identifiers (Segment-style) — the full plaintext value is returned in every list and create response, since the key is embedded in client SDKs and visible in outbound requests anyway. (The API path remains event-keys for backwards compatibility.)
| Method | Path | Description |
|---|---|---|
GET | /api/v1/workspaces/{id}/event-keys | List event sources |
POST | /api/v1/workspaces/{id}/event-keys | Create an event source |
DELETE | /api/v1/workspaces/{id}/event-keys/{keyId} | Revoke an event source |
POST | /api/v1/workspaces/{id}/event-keys/{keyId}/test-event | Send a test event through the source |
Create Event Source
{
"name": "Web App"
}Event Source Object
| Field | Type | Description |
|---|---|---|
id | string (UUID) | Unique identifier |
workspace_id | string (UUID) | Owning workspace |
name | string | Display name |
key | string | Plaintext key — pass in the X-Write-Key header (or as the HTTP Basic Auth username / ?writeKey= query param) |
status | string | active or revoked |
pubsub_topic | string | Pub/Sub topic for event routing |
last_used_at | string (ISO 8601) or null | Last event received time |
created_by | string (UUID) | Account that created the source |
created_at | string (ISO 8601) | Creation timestamp |
updated_at | string (ISO 8601) | Last update timestamp |
Contracts
Event contracts define the expected schema for event types and enforce validation rules.
| Method | Path | Description |
|---|---|---|
GET | /api/v1/workspaces/{id}/event-contracts | List contracts |
POST | /api/v1/workspaces/{id}/event-contracts | Create a contract |
GET | /api/v1/workspaces/{id}/event-contracts/{contractId} | Get a contract |
PUT | /api/v1/workspaces/{id}/event-contracts/{contractId} | Update a contract |
DELETE | /api/v1/workspaces/{id}/event-contracts/{contractId} | Delete a contract |
GET | /api/v1/workspaces/{id}/event-contracts/{contractId}/violations | Get violations |
Create Contract
{
"name": "Purchase Completed Schema",
"event_type": "track",
"event_name": "purchase_completed",
"description": "Schema for purchase completion events",
"schema_definition": {
"properties": {
"order_id": {"type": "string", "required": true},
"total": {"type": "number", "required": true},
"currency": {"type": "string", "required": true}
}
},
"enforcement_mode": "warn",
"undeclared_fields": "allow"
}Enforcement Modes
| Mode | Description |
|---|---|
allow | Schema is informational only; all events pass through |
warn | Events pass through but violations are recorded |
block | Events that violate the schema are rejected |
Contract Object
| Field | Type | Description |
|---|---|---|
id | string (UUID) | Unique identifier |
name | string | Display name |
event_type | string | Event type (e.g., track, identify) |
event_name | string | Event name to match |
schema_definition | object | JSON schema for validation |
enforcement_mode | string | allow, warn, or block |
undeclared_fields | string | How to handle fields not in the schema |
status | string | active or archived |
Transformations
Event transformations apply JavaScript functions to modify events before forwarding.
| Method | Path | Description |
|---|---|---|
GET | /api/v1/workspaces/{id}/event-transformations | List transformations |
POST | /api/v1/workspaces/{id}/event-transformations | Create a transformation |
GET | /api/v1/workspaces/{id}/event-transformations/{transformId} | Get a transformation |
PUT | /api/v1/workspaces/{id}/event-transformations/{transformId} | Update a transformation |
DELETE | /api/v1/workspaces/{id}/event-transformations/{transformId} | Delete a transformation |
POST | /api/v1/workspaces/{id}/event-transformations/{transformId}/test | Test a transformation against sample input |
Create Transformation
{
"name": "Enrich Purchase Events",
"description": "Add computed fields to purchase events",
"source_code": "function transform(event) {\n if (event.properties.total > 100) {\n event.properties.tier = 'high_value';\n }\n return event;\n}",
"test_input": {"event": "purchase_completed", "properties": {"total": 150}},
"test_output": {"event": "purchase_completed", "properties": {"total": 150, "tier": "high_value"}}
}Transformation Object
| Field | Type | Description |
|---|---|---|
id | string (UUID) | Unique identifier |
name | string | Display name |
description | string | Description |
source_code | string | JavaScript transformation function |
test_input | object | Sample input for testing |
test_output | object | Expected output for testing |
status | string | draft, active, or error |
Forwarding Rules
Forwarding rules define how events are routed to destinations.
| Method | Path | Description |
|---|---|---|
GET | /api/v1/workspaces/{id}/event-forwarding-rules | List forwarding rules |
POST | /api/v1/workspaces/{id}/event-forwarding-rules | Create a forwarding rule |
GET | /api/v1/workspaces/{id}/event-forwarding-rules/{ruleId} | Get a forwarding rule |
PUT | /api/v1/workspaces/{id}/event-forwarding-rules/{ruleId} | Update a forwarding rule |
DELETE | /api/v1/workspaces/{id}/event-forwarding-rules/{ruleId} | Delete a forwarding rule |
Create Forwarding Rule
{
"name": "All Track Events to Amplitude",
"destination_id": "880e8400-e29b-41d4-a716-446655440000",
"event_types": ["track"],
"event_names": [],
"transformation_id": null,
"enabled": true
}Volume & Live
Event Volume
GET /api/v1/workspaces/{id}/events/volume
Returns hourly aggregated event counts for the workspace.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
hours | integer | Number of hours to look back (default: 24) |
Response
[
{
"event_type": "track",
"event_name": "purchase_completed",
"hour_bucket": "2024-01-15T14:00:00Z",
"event_count": 1523
}
]Live Event Stream
GET /api/v1/workspaces/{id}/events/live
Returns a real-time stream of recent events via Server-Sent Events (SSE). Used by the live debugger in the UI.