Logo
8 min read

Collect Usage Data from Anywhere: Database, Logs, KubernetesUsing Benthos to ingest usage from anywhere

Sági-Kazár Márk
Sági-Kazár Márk@sagikazarmark
cover

Companies building in the cloud must meter usage accurately for billing, chargeback, and FinOps use cases. Due to the lack of standardization around metering, it is challenging to collect usage data from all the different cloud components. Engineers must extract consumption data from databases, Kubernetes, S3, logs, and vendors to run a usage-based business. At the same time, all these solutions have custom APIs and data formats that must be turned into a unified usage without losing accuracy.

At OpenMeter, we help businesses meter customer and internal consumption to power usage-based billing and cloud cost use cases. Learning from our customer's experience, we sought a solution to simplify this task for them.

Today, we announce our Benthos-based open-source project to extract usage data from any cloud infrastructure and vendor. This blog post will dive deep into why and how we built it.

The Data Collection Challenge

Having gathered considerable customer feedback, we learned that collecting, formatting, and forwarding usage data often requires significant effort due to the many different data sources and unique metering requirements around accuracy and scale.

For example, when usage data is used for billing, it must be accurate, which requires a reliable method of sending data to OpenMeter. This involves retries, handling back-pressure, and other tasks to ensure the metering system receives all events successfully.

Therefore, the ideal solution that offers the most value to our customers is capable of reading data from various sources, easily enriching and converting that data to CloudEvents, and reliably sending it to OpenMeter.

Creating a robust data pipeline is challenging due to the wide range of requirements around accuracy and scale. We are certainly aware of that, so our first instinct after envisioning the ideal solution was to look around and see if an existing (ideally Open Source) solution fits these requirements or can at least be extended to become a solution.

Fortunately, we didn't have to look for long. As it turned out, we have already used a tool for testing OpenMeter that fits these needs.

Introducing Benthos

Benthos is a robust stream processing tool that we have been using to test OpenMeter for quite some time. Soon after deciding to invest time in improving our customers' experience with ingesting data into OpenMeter, Benthos came up as one of the potential solutions.

Architecturally, Benthos manages data processing in a way that closely reflects the challenges our customers face. It supports reading data from a broad array of inputs, validates and modifies that data using a set of processing tools, and can then send the processed data to a wide range of outputs.

Benthos ingestion data pipeline for OpenMeter

Leveraging these characteristics, Benthos is a flexible yet powerful tool to accommodate our customers' needs for easily ingesting data from their systems into OpenMeter. In Benthos terminology, there are three major components of a data pipeline:

  • Inputs read data from various sources
  • Processors validate, transform, and filter individual pieces of data
  • Outputs are sinks where data is sent after being consumed and processed

Although Benthos has other components and tools, these three directly correspond to the primary challenges of sending data to a usage metering solution like OpenMeter.

Ingesting data into OpenMeter

Although Benthos is capable on its own, fortunately, it's also quite easy to extend. So, our first step was to create a plugin that allows data ingestion into OpenMeter by integrating our Go SDK into Benthos.

After the integration, instructing Benthos to ingest data into OpenMeter requires only about four lines of configuration:

output:
  openmeter:
    url: https://openmeter.cloud # optional
    token: '<YOUR OPENMETER CLOUD TOKEN>'

To use the plugin, a custom Benthos binary needs to be compiled. To simplify things for our customers, we decided to publish a custom Benthos distribution that acts as a drop-in replacement for the upstream Benthos distribution. You can use the official Helm chart or even Benthos Studio with our custom distribution.

Example

Let’s see an actual example. To follow along, you are going to need a running OpenMeter instance. The fastest way is to sign up for a free trial of OpenMeter Cloud or follow our OSS quickstart guide.

First, we will need an input that collects data from a data source. To quickly get started, use the generate input (as explained in our previous blog post) to create some dummy data.

In this example, we are going to use PostgreSQL (but you can use any of the supported databases):

input:
  sql_select:
    driver: driver
    dsn: '${DATABASE_DSN}'
    table: messages
    columns:
      - message_id
      - account_id
      - message
      - time
    where: time >= ?
    args_mapping: root = [ now().ts_unix() - 30 ]

The above input gathers data from an imaginary chat application. To simplify things, this chat application will charge its customers for every message sent, so let's create a meter in OpenMeter with event type chat-message and aggregation COUNT. Fill in the rest of the details as you see them fit or use our cloud template:

Create Meter in Cloud

If you don't have a database at hand, you can use the following generate input instead (or you can follow our database example):

input:
  generate:
    count: 1000 # Generate 1000 messages
    mapping: |
      let accounts = ["acmeinc", "dundermifflin", "sabre"]
 
      let account = $accounts.index(random_int(seed: timestamp_unix_nano()) % $accounts.length())
 
      root = {
        "message_id": uuid_v4(),
        "account_id": $account,
        "sender": fake("first_name").lowercase(),
        "recipient": fake("first_name").lowercase(),
        "message": fake("paragraph"),
        "time": now(),
      }

Next, we are going to convert our data to CloudEvents format using the mapping processor:

pipeline:
  processors:
    - mapping: |
        root = {
          "id": this.message_id,
          "specversion": "1.0",
          "type": "chat-message",
          "source": "clickhouse",
          "time": this.time,
          "subject": this.account_id,
          "data": {},
        }

The mapping processor uses Benthos' mapping language called bloblang. As an extra safety measure, we can add two additional processors to make sure the conversion to CloudEvents was successful:

pipeline:
  processors:
    # ...
    - json_schema:
        schema_path: 'file://./cloudevents.spec.json'
    - catch:
        - log:
            level: ERROR
            message: 'Schema validation failed due to: ${!error()}'
        - mapping: 'root = deleted()'

The first one validates the data against the CloudEvents spec, and the second one logs the error (if any) and drops the message (no point in sending invalid data to OpenMeter). Finally, we need to tell Benthos to send events to OpenMeter:

output:
  openmeter:
    url: https://openmeter.cloud # optional
    token: '<YOUR OPENMETER CLOUD TOKEN>'

(You can create a cloud token here.)

You are now ready ingest data into OpenMeter:

docker run --rm -it \
 -v $PWD/YOUR_CONFIG.yaml:/etc/benthos/config.yaml \
 -e DATABASE_DSN=<YOUR DB DSN> ghcr.io/openmeterio/benthos-openmeter benthos \
 -c /etc/benthos/config.yaml

You can check your ingested events by querying them or using the event debugger.

Check out our open-source repository or our previous testing example for more options and additional examples.

Cloud Meterig