Event Enrichment
Sometimes, you may need to pre-process events before they are ingested into OpenMeter. This can be useful for normalizing data, enriching events, or calculating derived fields like cost.
OpenMeter supports this through the use of Using the OpenMeter Collector and Bloblang.
Example: Weighted Cost Calculation
Let's say you want to calculate the cost of a container based on its resources to ingest the cost into OpenMeter. Using the OpenMeter Collector, you can pre-process the event before ingesting it into OpenMeter.
Ingesting cost and tracking balances with Entitlements enables implementing currency-based wallets.
Example Event
The incoming event before enrichment:
{
"id": "00001",
"specversion": "1.0",
"type": "container_duration_miliseconds",
"source": "worker",
"subject": "customer-1",
"time": "2024-01-01T00:00:00.001Z",
"data": {
"container_id": "container-1",
"duration_ms": "1000",
"cpu_family": "intel",
"cpu_cores": "4",
"gpu": "A100-40",
"gpu_count": "2",
"mem_mb": "4096"
}
}
The event after enrichment:
{
"id": "00001",
"specversion": "1.0",
"type": "container_duration_miliseconds",
"source": "worker",
"subject": "customer-1",
"time": "2024-01-01T00:00:00.001Z",
"data": {
"container_id": "container-1",
"duration_ms": "1000",
"cpu_family": "intel",
"cpu_cores": "4",
"gpu": "A100-40",
"gpu_count": "2",
"mem_mb": "4096",
"cost": "5.0"
}
}
Note that the cost
field was added to the event after enrichment.
Event Enrichment
To calculate the cost of the container, you can use the following Bloblang mapping. Check out the OpenMeter Collector to learn how to run the Bloblang pipeline.
pipeline:
processors:
- mapping: |
root = this
# initialize cost
let cost = 0
# 1 cent per MB memory cost
let cost = $cost + this.data.mem_mb.int64() * 0.001
# CPU core cost depends on the CPU family
let cost = $cost + this.data.cpu_cores.int64() * match this.data.cpu_family {
"intel" => 1.0,
"graviton" => 1.5,
_ => 0, # Default case for unmatched CPU family: could be some default price
}
# Volume discount for CPU count
let cost = $cost + this.data.gpu_count.int64() * match this.data.gpu_count {
this > 5 => 0.5,
this > 2 => 0.8,
_ => 1,
} * match this.data.gpu {
"A100-40" => 1.0,
"A100-60" => 1.5,
_ => 0, # Default case for unmatched GPU type: could be some default price
}
root.data.cost = $cost
# For advanced mapping logic, consider writing unit tests:
# https://docs.redpanda.com/redpanda-connect/guides/bloblang/walkthrough/#unit-testing
output:
stdout:
codec: lines
Check out other OpenMeter pipeline presets.