🎉 Announcing our $3 million seed round
Logo
Getting Started

Benthos-based collectors

Benthos is a robust stream processing tool that can connect to a wide range of data sources and sinks. The tool's strong delivery guarantees and capability to retry failed messages make it an excellent choice as a collector for OpenMeter.

This page discusses the basics of using Benthos, focusing on how to configure it as a collector for OpenMeter. To learn more about Benthos, please review the Benthos documentation. They have excellent videos and tutorials that will help you get started.

Architecture

Benthos connects data sources to sinks through a pipeline comprising various processing steps. It reads messages (often referred to as documents in Benthos and its mapping language, bloblang) from a streaming source, processes them (validation, transformation, enrichment, etc.), and then writes them to one or more sinks. It does so with strong delivery guarantees, ensuring that messages are recovered and will be retried until they are successfully delivered to their destination.

Benthos ingestion data pipeline for OpenMeter

The three major components of a Benthos pipeline:

  • Inputs read data from various sources
  • Processors validate, transform, and filter individual pieces of data
  • Outputs are sinks where data is sent after being consumed and processed

There are other components (you can read about them here), but these are the ones generally required to ingest data into OpenMeter.

Benthos Collector

Benthos offers a plugin architecture that allows users to extend the tool's functionality. To achieve optimal performance, OpenMeter offers a custom Benthos plugin and distribution that implements ingesting data via the OpenMeter API.

Users with no prior experience with Benthos can use the OpenMeter distribution available as a binary, container image and Helm chart. Users with custom Benthos builds can use the OpenMeter Benthos plugin.

Ingesting data into OpenMeter

Let's work backward through an example of ingesting data into OpenMeter. In this example, we will use the access logs from an API gateway as our data source.

When using Benthos as an OpenMeter collector, the output of the pipeline is always an OpenMeter instance (cloud or self-hosted):

output:
  openmeter:
    url: https://openmeter.cloud # default, required for self-hosted
    token: '<YOUR OPENMETER CLOUD TOKEN>' # required for OpenMeter Cloud

As of today, OpenMeter requires ingested data to be in the CloudEvents format. Benthos offers a range of processors that can transform, validate, and enrich messages. The most commonly used processor is known as mapping, which utilizes Benthos' mapping language, bloblang.

Let's assume the API gateway access logs are in the following format:

{
  "timestamp": "2021-01-01T00:00:00.000Z",
  "method": "GET",
  "path": "/app",
  "user": "USERID",
  "duration": 100
}

We can use the mapping processor to transform the data into CloudEvents:

pipeline:
  processors:
    - mapping: |
        root = {
          "id": uuid_v4(),
          "specversion": "1.0",
          "type": "api-call",
          "source": "api-gateway",
          "time": this.timestamp,
          "subject": this.user,
          "data": {
            "duratio": this.duration,
          },
        }

Finally, you need an input to read data. Given the wide range of data sources supported by Benthos, we won't be able to go into detail here. Check out the documentation to learn more about the available inputs or reach out to us on one of our support channels to get help integrating Benthos with your system.

Installation

OpenMeter provides a custom Benthos distribution, called the Benthos Collector.

It is available via the following distribution strategies:

The installation method depends on your use case and environment. For example, Kubernetes is a common installation target:

export OPENMETER_TOKEN=om_...
 
helm install --wait --create-namespace \
  --namespace openmeter-collector \
  --set openmeter.token=${OPENMETER_TOKEN} \
  openmeter-collector oci://ghcr.io/openmeterio/helm-charts/benthos-openmeter

Check out the Helm chart README for configuration details.

Performance tuning

Achieving high throughput is often an essential requirement for usage metering, yet it can be challenging and depends on many factors. Benthos offers several performance tuning options that can be used to optimize throughput.

Additionally, the OpenMeter Benthos plugin provides various options that can be utilized to enhance performance.

Please contact us on one of our support channels to learn more about performance tuning.

Last edited on May 8, 2024