Logo

Testing Stream Processing with Redpanda ConnectMimicking real-world data to test stream processing

Sági-Kazár Márk
Sági-Kazár Márk@sagikazarmark
cover

We are building OpenMeter to help engineers collect and process millions of usage events to power usage-based pricing and product use cases. Working with large volumes of data used for accurate billing requires rigorous development and testing procedures. One significant challenge we faced in our journey was generating and ingesting sample data at a scale that would mimic real-world scenarios. We needed a tool that consistently generates random sample usage, formats it into CloudEvents, and sends it to our HTTP ingestion endpoint.

Our journey led us through several options. From thinking about writing simple scripts to trying out third-party tools. That’s when we stumbled upon [Redpanda Connect](https://www.Redpanda Connect.dev).

Introducing Redpanda Connect

Redpanda Connect is a robust stream processing tool. Given our involvement in the event processing domain, Redpanda Connect felt familiar and synergistic with our goals. At its core, Redpanda Connect operates on a simple yet effective concept: it seamlessly ingests data from various inputs, sends it through custom-designed pipelines, and then directs it to one or multiple outputs.

One standout feature of Redpanda Connect is Bloblang (or blobl for short). Bloblang is a data mapping language that facilitates a wide range of transformations on structured data. For example, it can generate random data and act as a Redpanda Connect input, serving as a data source.

Utilizing Bloblang, we can shape our data into the desired CloudEvents format and randomize elements, ensuring that our testing data remains diverse and representative of real-world usage data.

Example Usage

Let’s look at an example of creating a Redpanda Connect configuration capable of generating and ingesting sample data.

First, we need an input that generates data. We use a Redpanda Connect input called generate for that and create a CloudEvents compatible structure using Bloblang:

input:
  generate:
    interval: '50ms' # Generate an event every 50ms
    mapping: |
      let max_subjects = 100
      let event_type = "api-calls"
      let source = "api-gateway"
      let methods = ["GET", "POST"]
      let paths = ["/", "/about", "/contact", "/pricing", "/docs"]
      let subject = "subject-%d".format(random_int(seed: timestamp_unix_nano()) % $max_subjects)
      let time = (now().ts_sub_iso8601("P3D").ts_unix() + random_int(min: 60, max: 60 * 60 * 24 * 3)).ts_format()
      let method = $methods.index(random_int(seed: timestamp_unix_nano()) % $methods.length())
      let path = $paths.index(random_int(seed: timestamp_unix_nano()) % $paths.length())
      let duration = random_int(seed: timestamp_unix_nano(), max: 1000) / 1000.0
      root = {
        "id": uuid_v4(),
        "specversion": "1.0",
        "type": $event_type,
        "source": $source,
        "subject": $subject,
        "time": $time,
        "data": {
          "method": $method,
          "path": $path,
          "duration_seconds": $duration,
        },
      }

The next step is sending the data to OpenMeter using the http_client output:

output:
  http_client:
    url: ${OPENMETER_BASE_URL:http://127.0.0.1:8888}/api/v1/events
    verb: POST
    headers:
      Content-Type: application/cloudevents+json
      Authorization: 'Bearer ${OPENMETER_TOKEN:}'
    max_in_flight: 64

Finally, run Redpanda Connect (check the Getting started guide for instructions to install Redpanda Connect):

Redpanda Connect -c config.yaml

The above command will tell Redpanda Connect to generate a new event every 50ms and send it to the OpenMeter API.

Check out our the configuration we use for more options on our GitHub.