Load testing Kafka producers and consumers
Note: The instructions in this tutorial work with xk6-kafka v0.8.0 or below. To use the latest xk6-kafka version, check out the changes on the API documentation and examples.
Recently, k6 started supporting k6 extensions to extend k6 capabilities for other cases required by the community. The community has already built plenty of extensions. k6 extensions are written in Go, and many of them are reusing existing Go libraries.
This makes k6 a versatile load testing tool to test different protocols and adapt to multiple cases. This post is the third part of my series of articles testing various systems using k6:
In this load testing example, we’ll focus on testing Apache Kafka, a powerful event streaming platform that provides the following features:
- Writing and reading streams of events
- Storing streams of events for as long as you want to
- Processing streams of events in parallel retrospectively
It works by having client applications write events to the Kafka server. We call this type of application producers. Client applications that read and process events from the Kafka server are called consumers.
Kafka itself is capable of handling hundreds to millions of events per second seamlessly on simple setup. But what if you wanted to test and observe how your Kafka service behaves before going live?
The xk6-kafka extension provides some convenient functions for interacting with Kafka producers and consumers. It serves as a producer that can send a high volume of messages per second, allowing you to monitor the system under test (SUT) and test how the application will keep up with the load.
xk6-kafka
At the time of this writing, the xk6-kafka extension provides the following APIs:
FUNCTION | DESCRIPTION |
---|---|
consume(reader, limit) | Consume messages from the Kafka server. |
createTopic(address, topic) | Create a new topic. |
listTopics(address) | Return a unique set of topics. |
produce(writer, messages) | Produce messages to the Kafka server. |
reader(brokers, topic) | Instantiate a new Reader instance. |
writer(brokers, topic) | Instantiate a new Writer instance. |
Some of the APIs mentioned above do accept additional optional parameters meant for authentication and message compression. Refer to the “More examples” section below for additional information.
Building k6 with the Kafka extension
By default, k6 does not support testing Kafka. Building k6 with the xk6-kafka extension creates a k6 version with the capabilities to test Kafka producers and consumers.
Make sure you have the following installed and ready before you continue:
- Go (>=1.7)
- Git
Next, continue the installation by running the following command in your terminal to install the xk6 module:
go install go.k6.io/xk6/cmd/xk6@latest
Once the command finishes successfully, you can start making your own custom k6 binary for Kafka as follows:
xk6 build --with github.com/mostafa/xk6-kafka@latest
It will take some time for the process to create a new k6 binary in your working directory.
Troubleshooting: If you experienced issues installing xk6 extension, kindly head over to the GitHub repository to download the pre-compiled binaries instead.
Running Kafka
The recommended approach is to use Docker, as manual installation is quite complicated and prone to errors. You can pull the following image by lensesio from Docker Hub. It contains the complete Kafka setup for development.
docker pull lensesio/fast-data-dev:latest
After that, run the following to start Docker in detached mode:
sudo docker run -d --rm --name lenseio -p 2181:2181 -p 3030:3030 \
-p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 \
-e ADV_HOST=127.0.0.1 lensesio/fast-data-dev
sudo docker logs -f -t lenseio
Visit http://localhost:3030 to get into the fast-data-dev environment.
k6 test
Import
Now, let’s create a new JavaScript file called test_script.js in the same directory as your k6 binary. Then, add the following import statement at the top of the file:
import { check } from 'k6';
import { writer, produce, reader, consume, createTopic } from 'k6/x/kafka';
Initialization
Continue by appending the following initialization code:
const bootstrapServers = ['localhost:9092'];
const kafkaTopic = 'xk6_kafka_json_topic';
const producer = writer(bootstrapServers, kafkaTopic);
const consumer = reader(bootstrapServers, kafkaTopic);
The code will initialize both the writer and reader instances based on the configuration specified. If you are using a different IP/host address and port for your Kafka server, kindly modify it accordingly.
Next, call the createTopic function to create a new topic. Rest assured that this function will do nothing if the topic already exists.
createTopic(bootstrapServers[0], kafkaTopic);
Let’s create a function that generates a random integer as a unique identifier for each message later on. Please note that this is optional and not a mandatory requirement to do load testing.
function getRandomInt(max = 1000) {
return Math.floor(Math.random() * max + 1);
}
Default function
As for the default function, define it as follows:
export default function () {
const messages = [
{
key: JSON.stringify({
correlationId: 'test-id-sql-' + getRandomInt(),
}),
value: JSON.stringify({
title: 'Load Testing SQL Databases with k6',
url: 'https://k6.io/blog/load-testing-sql-databases-with-k6/',
locale: 'en',
}),
},
{
key: JSON.stringify({
correlationId: 'test-id-redis-' + getRandomInt(),
}),
value: JSON.stringify({
title: 'Benchmarking Redis with k6',
url: 'https://k6.io/blog/benchmarking-redis-with-k6/',
locale: 'en',
}),
},
];
const error = produce(producer, messages);
check(error, {
'is sent': (err) => err == undefined,
});
}
The code block above works as follows:
- Initialize a list of messages
- Call produce function to publish the messages
- Check if messages are successfully sent
Teardown
Once you are done with it, create a teardown function and close the connections:
export function teardown(data) {
producer.close();
consumer.close();
}
Run the test
Save the file and run the following command on your terminal:
./k6 run --vus 50 --duration 5s test_script.js
You should see the following output:
running (05.0s), 00/50 VUs, 15136 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs 5s
✓ is sent
█ teardown
checks.........................: 100.00% ✓ 15136 ✗ 0
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=16.49ms min=31.9µs med=13.52ms max=1.14s p(90)=28.55ms p(95)=36.46ms
iterations.....................: 15136 3017.4609/s
kafka.writer.dial.count........: 151 30.102841/s
kafka.writer.error.count.......: 0 0/s
kafka.writer.message.bytes.....: 5.2 MB 1.0 MB/s
kafka.writer.message.count.....: 30272 6034.9218/s
kafka.writer.rebalance.count...: 0 0/s
kafka.writer.write.count.......: 30272 6034.9218/s
vus............................: 5 min=5 max=50
vus_max........................: 50 min=50 max=50
Scale the load
You can easily scale the load by increasing the number of VUs. For example, the following command uses 500 VUs to load test for a minute:
./k6 run --vus 500 --duration 1m test_script.js
If you are new to k6, check out how to configure the load options in the script or run a stress test with k6.
Extend the test
The script above is all about producing messages to your Kafka server. In fact, you can easily modify the code into a test that produces and consumes messages.
Simply add the following code below the for loop code:
let result = consume(consumer, 10);
check(result, {
"10 messages returned": (msgs) => msgs.length == 10,
});
The code will read 10 messages each time. Simply modify the value to something higher if you wish to consume more messages.
The output is as follows when you run it with the same command:
running (05.0s), 00/50 VUs, 9778 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs 5s
✓ is sent
✓ 10 messages returned
█ teardown
checks.........................: 100.00% ✓ 19556 ✗ 0
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=25.53ms min=41.4µs med=18ms max=1.41s p(90)=37.73ms p(95)=52.37ms
iterations.....................: 9778 1946.80798/s
kafka.reader.dial.count........: 50 9.955042/s
kafka.reader.error.count.......: 0 0/s
kafka.reader.fetches.count.....: 101 20.109184/s
kafka.reader.message.bytes.....: 15 MB 2.9 MB/s
kafka.reader.message.count.....: 97830 19478.034846/s
kafka.reader.rebalance.count...: 0 0/s
kafka.reader.timeouts.count....: 46 9.158638/s
kafka.writer.dial.count........: 152 30.263327/s
kafka.writer.error.count.......: 0 0/s
kafka.writer.message.bytes.....: 3.4 MB 669 kB/s
kafka.writer.message.count.....: 19556 3893.615961/s
kafka.writer.rebalance.count...: 0 0/s
kafka.writer.write.count.......: 19556 3893.615961/s
vus............................: 50 min=50 max=50
vus_max........................: 50 min=50 max=50
Kafka metrics in k6
By default, k6 has its own built-in metrics that are collected automatically. Apart from that, you can create your own custom metrics. Custom metrics can be categorized into the following types:
- Counter: A metric that cumulatively sums added values.
- Gauge: A metric that stores the min, max, and last values added to it.
- Rate: A metric that tracks the percentage of added values that are non-zero.
- Trend: A metric that allows for calculating statistics on the added values (min, max, average, and percentiles).
Besides k6, k6 extensions can collect metrics and report them as part of the k6 results output. In this case, xk6-kafka collects individual stats for both reader and writer.
Reader
Let’s have a look at the metrics meant for the reader.
METRICS | TYPE | DESCRIPTION |
---|---|---|
kafka.reader.dial.count | Counter | Total number of times the reader tries to connect to Kafka. |
kafka.reader.error.count | Counter | Total number of errors occurred when reading from Kafka. |
kafka.reader.fetches.count | Counter | Total number of times the reader fetches batches of messages from Kafka. |
kafka.reader.message.bytes | Counter | Total bytes consumed. |
kafka.reader.message.count | Counter | Total number of messages consumed. |
kafka.reader.rebalance.count | Counter | Total number of rebalances of a topic in a consumer group (deprecated) . |
kafka.reader.timeouts.count | Counter | Total number of timeouts occurred when reading from Kafka |
Writer
As for the writer, the metrics are as follows:
METRICS | TYPE | DESCRIPTION |
---|---|---|
kafka.writer.dial.count | Counter | Total number of times the writer tries to connect to Kafka. |
kafka.writer.error.count | Counter | Total number of errors occurred when writing to Kafka. |
kafka.writer.message.bytes | Counter | Total bytes produced. |
kafka.writer.message.count | Counter | Total number of messages produced. |
kafka.writer.rebalance.count | Counter | Total number of rebalances of a topic (deprecated). |
kafka.writer.write.count | Counter | Total number of times the writer writes batches of messages to Kafka. |
There are more available Kafka metrics, and you can find them here. However, the extension does not collect all metrics yet. You can follow this GitHub issue to track the progress of their additions.
More examples
Moreover, the xk6-kafka repository provides a few test scripts that work out-of-the-box for new users. Feel free to experiment with them and modify the code accordingly based on your own use cases. If you bump into issues, report them on GitHub.
Conclusion
In conclusion, load testing Apache Kafka is now a lot easier with k6. k6 provides the foundation to create and scale your load tests, and the xk6-kafka extension brings a convenient API to interact with a Kafka server.
If you wish to find out more on other available k6 extensions, simply head over to the Explore extensions page.
If you have any questions or are interested in building an extension, join the k6 community on Slack.
Grafana Cloud is the easiest way to get started with Grafana k6 and performance testing. We have a generous forever-free tier and plans for every use case. Sign up for free now!