Logo

Testing Stream Processing with BenthosMimicking real-world data to test stream processing

Sági-Kazár Márk
Sági-Kazár Márk@sagikazarmark
cover

We are building OpenMeter to help engineers collect and process millions of usage events to power usage-based pricing and product use cases. Working with large volumes of data used for accurate billing requires rigorous development and testing procedures. One significant challenge we faced in our journey was generating and ingesting sample data at a scale that would mimic real-world scenarios. We needed a tool that consistently generates random sample usage, formats it into CloudEvents, and sends it to our HTTP ingestion endpoint.

Our journey led us through several options. From thinking about writing simple scripts to trying out third-party tools. That’s when we stumbled upon Benthos.

Introducing Benthos

Benthos is a robust stream processing tool. Given our involvement in the event processing domain, Benthos felt familiar and synergistic with our goals. At its core, Benthos operates on a simple yet effective concept: it seamlessly ingests data from various inputs, sends it through custom-designed pipelines, and then directs it to one or multiple outputs.

One standout feature of Benthos is Bloblang (or blobl for short). Bloblang is a data mapping language that facilitates a wide range of transformations on structured data. For example, it can generate random data and act as a Benthos input, serving as a data source.

Utilizing Bloblang, we can shape our data into the desired CloudEvents format and randomize elements, ensuring that our testing data remains diverse and representative of real-world usage data.

Example Usage

Let’s look at an example of creating a Benthos configuration capable of generating and ingesting sample data.

First, we need an input that generates data. We use a Benthos input called generate for that and create a CloudEvents compatible structure using Bloblang:

input:
  generate:
    interval: '50ms' # Generate an event every 50ms
    mapping: |
      let max_subjects = 100
      let event_type = "api-calls"
      let source = "api-gateway"
      let methods = ["GET", "POST"]
      let paths = ["/", "/about", "/contact", "/pricing", "/docs"]
      let subject = "subject-%d".format(random_int(seed: timestamp_unix_nano()) % $max_subjects)
      let time = (now().ts_sub_iso8601("P3D").ts_unix() + random_int(min: 60, max: 60 * 60 * 24 * 3)).ts_format()
      let method = $methods.index(random_int(seed: timestamp_unix_nano()) % $methods.length())
      let path = $paths.index(random_int(seed: timestamp_unix_nano()) % $paths.length())
      let duration = random_int(seed: timestamp_unix_nano(), max: 1000) / 1000.0
      root = {
        "id": uuid_v4(),
        "specversion": "1.0",
        "type": $event_type,
        "source": $source,
        "subject": $subject,
        "time": $time,
        "data": {
          "method": $method,
          "path": $path,
          "duration_seconds": $duration,
        },
      }

The next step is sending the data to OpenMeter using the http_client output:

output:
  http_client:
    url: ${OPENMETER_BASE_URL:http://127.0.0.1:8888}/api/v1/events
    verb: POST
    headers:
      Content-Type: application/cloudevents+json
      Authorization: 'Bearer ${OPENMETER_TOKEN:}'
    max_in_flight: 64

Finally, run Benthos (check the Getting started guide for instructions to install Benthos):

benthos -c config.yaml

The above command will tell Benthos to generate a new event every 50ms and send it to the OpenMeter API.

Check out our the configuration we use for more options on our GitHub.