Logo

Event Enrichment

Sometimes, you may need to pre-process events before they are ingested into OpenMeter. This can be useful for normalizing data, enriching events, or calculating derived fields like cost.

OpenMeter supports this through the use of Using the OpenMeter Collector and Bloblang.

Benthos ingestion data pipeline for OpenMeter

Example: Weighted Cost Calculation

Let's say you want to calculate the cost of a container based on its resources to ingest the cost into OpenMeter. Using the OpenMeter Collector, you can pre-process the event before ingesting it into OpenMeter.

Tip

Ingesting cost and tracking balances with Entitlements enables implementing currency-based wallets.

Example Event

The incoming event before enrichment:

{
  "id": "00001",
  "specversion": "1.0",
  "type": "container_duration_miliseconds",
  "source": "worker",
  "subject": "customer-1",
  "time": "2024-01-01T00:00:00.001Z",
  "data": {
    "container_id": "container-1",
    "duration_ms": "1000",
    "cpu_family": "intel",
    "cpu_cores": "4",
    "gpu": "A100-40",
    "gpu_count": "2",
    "mem_mb": "4096"
  }
}

The event after enrichment:

{
  "id": "00001",
  "specversion": "1.0",
  "type": "container_duration_miliseconds",
  "source": "worker",
  "subject": "customer-1",
  "time": "2024-01-01T00:00:00.001Z",
  "data": {
    "container_id": "container-1",
    "duration_ms": "1000",
    "cpu_family": "intel",
    "cpu_cores": "4",
    "gpu": "A100-40",
    "gpu_count": "2",
    "mem_mb": "4096",
 
    "cost": "5.0"
  }
}
Note

Note that the cost field was added to the event after enrichment.

Event Enrichment

To calculate the cost of the container, you can use the following Bloblang mapping. Check out the OpenMeter Collector to learn how to run the Bloblang pipeline.

pipeline:
  processors:
    - mapping: |
        root = this
        # initialize cost
        let cost = 0
        # 1 cent per MB memory cost
        let cost = $cost + this.data.mem_mb.int64() * 0.001
        # CPU core cost depends on the CPU family
        let cost = $cost + this.data.cpu_cores.int64() * match this.data.cpu_family {
          "intel" => 1.0,
          "graviton" => 1.5,
          _ => 0, # Default case for unmatched CPU family: could be some default price
        }
        # Volume discount for CPU count
        let cost = $cost + this.data.gpu_count.int64() * match this.data.gpu_count {
          this > 5 => 0.5,
          this > 2 => 0.8,
          _ => 1,
        } * match this.data.gpu {
          "A100-40" => 1.0,
          "A100-60" => 1.5,
          _ => 0, # Default case for unmatched GPU type: could be some default price
        }
        root.data.cost = $cost
        # For advanced mapping logic, consider writing unit tests:
        # https://docs.redpanda.com/redpanda-connect/guides/bloblang/walkthrough/#unit-testing
output:
  stdout:
    codec: lines
OpenMeter Collector Presets

Check out other OpenMeter pipeline presets.