Help build the future of open source observability software Open positions

Check out the open source projects we support Downloads

We cannot remember your choice unless you click the consent notice at the bottom.

Load testing Kafka producers and consumers

Load testing Kafka producers and consumers

2021-09-20 9 min

This content was originally published on k6.io.
30 Jan 2024

Note: The instructions in this tutorial work with xk6-kafka v0.8.0 or below. To use the latest xk6-kafka version, check out the changes on the API documentation and examples.

Recently, k6 started supporting k6 extensions to extend k6 capabilities for other cases required by the community. The community has already built plenty of extensions. k6 extensions are written in Go, and many of them are reusing existing Go libraries.

This makes k6 a versatile load testing tool to test different protocols and adapt to multiple cases. This post is the third part of my series of articles testing various systems using k6:

In this load testing example, we’ll focus on testing Apache Kafka, a powerful event streaming platform that provides the following features:

  • Writing and reading streams of events
  • Storing streams of events for as long as you want to
  • Processing streams of events in parallel retrospectively

It works by having client applications write events to the Kafka server. We call this type of application producers. Client applications that read and process events from the Kafka server are called consumers.

Kafka itself is capable of handling hundreds to millions of events per second seamlessly on simple setup. But what if you wanted to test and observe how your Kafka service behaves before going live?

The xk6-kafka extension provides some convenient functions for interacting with Kafka producers and consumers. It serves as a producer that can send a high volume of messages per second, allowing you to monitor the system under test (SUT) and test how the application will keep up with the load.

xk6-kafka

At the time of this writing, the xk6-kafka extension provides the following APIs:

FUNCTIONDESCRIPTION
consume(reader, limit)Consume messages from the Kafka server.
createTopic(address, topic)Create a new topic.
listTopics(address)Return a unique set of topics.
produce(writer, messages)Produce messages to the Kafka server.
reader(brokers, topic)Instantiate a new Reader instance.
writer(brokers, topic)Instantiate a new Writer instance.

Some of the APIs mentioned above do accept additional optional parameters meant for authentication and message compression. Refer to the “More examples” section below for additional information.

Building k6 with the Kafka extension

By default, k6 does not support testing Kafka. Building k6 with the xk6-kafka extension creates a k6 version with the capabilities to test Kafka producers and consumers.

Make sure you have the following installed and ready before you continue:

  • Go (>=1.7)
  • Git

Next, continue the installation by running the following command in your terminal to install the xk6 module:

go install go.k6.io/xk6/cmd/xk6@latest

Once the command finishes successfully, you can start making your own custom k6 binary for Kafka as follows:

xk6 build --with github.com/mostafa/xk6-kafka@latest

It will take some time for the process to create a new k6 binary in your working directory.

Troubleshooting: If you experienced issues installing xk6 extension, kindly head over to the GitHub repository to download the pre-compiled binaries instead.

Running Kafka

The recommended approach is to use Docker, as manual installation is quite complicated and prone to errors. You can pull the following image by lensesio from Docker Hub. It contains the complete Kafka setup for development.

docker pull lensesio/fast-data-dev:latest

After that, run the following to start Docker in detached mode:

sudo docker run -d --rm --name lenseio -p 2181:2181 -p 3030:3030 \
       -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092  \
       -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev

sudo docker logs -f -t lenseio

Visit http://localhost:3030 to get into the fast-data-dev environment.

k6 test

Import

Now, let’s create a new JavaScript file called test_script.js in the same directory as your k6 binary. Then, add the following import statement at the top of the file:

import { check } from 'k6';
import { writer, produce, reader, consume, createTopic } from 'k6/x/kafka';

Initialization

Continue by appending the following initialization code:

const bootstrapServers = ['localhost:9092'];
const kafkaTopic = 'xk6_kafka_json_topic';

const producer = writer(bootstrapServers, kafkaTopic);
const consumer = reader(bootstrapServers, kafkaTopic);

The code will initialize both the writer and reader instances based on the configuration specified. If you are using a different IP/host address and port for your Kafka server, kindly modify it accordingly.

Next, call the createTopic function to create a new topic. Rest assured that this function will do nothing if the topic already exists.

createTopic(bootstrapServers[0], kafkaTopic);

Let’s create a function that generates a random integer as a unique identifier for each message later on. Please note that this is optional and not a mandatory requirement to do load testing.

function getRandomInt(max = 1000) {
  return Math.floor(Math.random() * max + 1);
}

Default function

As for the default function, define it as follows:

export default function () {
  const messages = [
    {
      key: JSON.stringify({
        correlationId: 'test-id-sql-' + getRandomInt(),
      }),
      value: JSON.stringify({
        title: 'Load Testing SQL Databases with k6',
        url: 'https://k6.io/blog/load-testing-sql-databases-with-k6/',
        locale: 'en',
      }),
    },
    {
      key: JSON.stringify({
        correlationId: 'test-id-redis-' + getRandomInt(),
      }),
      value: JSON.stringify({
        title: 'Benchmarking Redis with k6',
        url: 'https://k6.io/blog/benchmarking-redis-with-k6/',
        locale: 'en',
      }),
    },
  ];

  const error = produce(producer, messages);
  check(error, {
    'is sent': (err) => err == undefined,
  });
}

The code block above works as follows:

  • Initialize a list of messages
  • Call produce function to publish the messages
  • Check if messages are successfully sent

Teardown

Once you are done with it, create a teardown function and close the connections:

export function teardown(data) {
  producer.close();
  consumer.close();
}

Run the test

Save the file and run the following command on your terminal:

./k6 run --vus 50 --duration 5s test_script.js

You should see the following output:

running (05.0s), 00/50 VUs, 15136 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  5s

    ✓ is sent

    █ teardown

    checks.........................: 100.00% ✓ 15136    ✗ 0
    data_received..................: 0 B    0 B/s
    data_sent......................: 0 B    0 B/s
    iteration_duration.............: avg=16.49ms min=31.9µs med=13.52ms max=1.14s p(90)=28.55ms p(95)=36.46ms
    iterations.....................: 15136   3017.4609/s
    kafka.writer.dial.count........: 151    30.102841/s
    kafka.writer.error.count.......: 0      0/s
    kafka.writer.message.bytes.....: 5.2 MB  1.0 MB/s
    kafka.writer.message.count.....: 30272   6034.9218/s
    kafka.writer.rebalance.count...: 0      0/s
    kafka.writer.write.count.......: 30272   6034.9218/s
    vus............................: 5      min=5       max=50
    vus_max........................: 50     min=50      max=50

Scale the load

You can easily scale the load by increasing the number of VUs. For example, the following command uses 500 VUs to load test for a minute:

./k6 run --vus 500 --duration 1m test_script.js

If you are new to k6, check out how to configure the load options in the script or run a stress test with k6.

Extend the test

The script above is all about producing messages to your Kafka server. In fact, you can easily modify the code into a test that produces and consumes messages.

Simply add the following code below the for loop code:

let result = consume(consumer, 10);
check(result, {
    "10 messages returned": (msgs) => msgs.length == 10,
});

The code will read 10 messages each time. Simply modify the value to something higher if you wish to consume more messages.

The output is as follows when you run it with the same command:

running (05.0s), 00/50 VUs, 9778 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  5s

    ✓ is sent
    ✓ 10 messages returned

    █ teardown

    checks.........................: 100.00% ✓ 19556        ✗ 0
    data_received..................: 0 B    0 B/s
    data_sent......................: 0 B    0 B/s
    iteration_duration.............: avg=25.53ms min=41.4µs med=18ms max=1.41s p(90)=37.73ms p(95)=52.37ms
    iterations.....................: 9778   1946.80798/s
    kafka.reader.dial.count........: 50     9.955042/s
    kafka.reader.error.count.......: 0      0/s
    kafka.reader.fetches.count.....: 101    20.109184/s
    kafka.reader.message.bytes.....: 15 MB   2.9 MB/s
    kafka.reader.message.count.....: 97830   19478.034846/s
    kafka.reader.rebalance.count...: 0      0/s
    kafka.reader.timeouts.count....: 46     9.158638/s
    kafka.writer.dial.count........: 152    30.263327/s
    kafka.writer.error.count.......: 0      0/s
    kafka.writer.message.bytes.....: 3.4 MB  669 kB/s
    kafka.writer.message.count.....: 19556   3893.615961/s
    kafka.writer.rebalance.count...: 0      0/s
    kafka.writer.write.count.......: 19556   3893.615961/s
    vus............................: 50     min=50      max=50
    vus_max........................: 50     min=50      max=50

Kafka metrics in k6

By default, k6 has its own built-in metrics that are collected automatically. Apart from that, you can create your own custom metrics. Custom metrics can be categorized into the following types:

  • Counter: A metric that cumulatively sums added values.
  • Gauge: A metric that stores the min, max, and last values added to it.
  • Rate: A metric that tracks the percentage of added values that are non-zero.
  • Trend: A metric that allows for calculating statistics on the added values (min, max, average, and percentiles).

Besides k6, k6 extensions can collect metrics and report them as part of the k6 results output. In this case, xk6-kafka collects individual stats for both reader and writer.

Reader

Let’s have a look at the metrics meant for the reader.

METRICSTYPEDESCRIPTION
kafka.reader.dial.countCounterTotal number of times the reader tries to connect to Kafka.
kafka.reader.error.countCounterTotal number of errors occurred when reading from Kafka.
kafka.reader.fetches.countCounterTotal number of times the reader fetches batches of messages from Kafka.
kafka.reader.message.bytesCounterTotal bytes consumed.
kafka.reader.message.countCounterTotal number of messages consumed.
kafka.reader.rebalance.countCounterTotal number of rebalances of a topic in a consumer group (deprecated) .
kafka.reader.timeouts.countCounterTotal number of timeouts occurred when reading from Kafka

Writer

As for the writer, the metrics are as follows:

METRICSTYPEDESCRIPTION
kafka.writer.dial.countCounterTotal number of times the writer tries to connect to Kafka.
kafka.writer.error.countCounterTotal number of errors occurred when writing to Kafka.
kafka.writer.message.bytesCounterTotal bytes produced.
kafka.writer.message.countCounterTotal number of messages produced.
kafka.writer.rebalance.countCounterTotal number of rebalances of a topic (deprecated).
kafka.writer.write.countCounterTotal number of times the writer writes batches of messages to Kafka.

There are more available Kafka metrics, and you can find them here. However, the extension does not collect all metrics yet. You can follow this GitHub issue to track the progress of their additions.

More examples

Moreover, the xk6-kafka repository provides a few test scripts that work out-of-the-box for new users. Feel free to experiment with them and modify the code accordingly based on your own use cases. If you bump into issues, report them on GitHub.

Conclusion

In conclusion, load testing Apache Kafka is now a lot easier with k6. k6 provides the foundation to create and scale your load tests, and the xk6-kafka extension brings a convenient API to interact with a Kafka server.

If you wish to find out more on other available k6 extensions, simply head over to the Explore extensions page.

If you have any questions or are interested in building an extension, join the k6 community on Slack.

Grafana Cloud is the easiest way to get started with Grafana k6 and performance testing. We have a generous forever-free tier and plans for every use case. Sign up for free now!