Skip to main content

Replaying Kafka

Background

In this guide we will show you how to replay Kafka traffic using data captured by Speedscale. The network level modelling of Kafka does not match most people's mental model which leads to confusion and an undesirable replay scenario. Most people view their app interacting with Kafka like this.

For more information about Apache Kafka, see the official Kafka documentation.

when the reality looks more like this

This is why when observed in Speedscale, you see something like this that has a lot of internal Kafka calls along with the most important Produce and Fetch calls.

Kafka Traffic

This sort of outbound traffic would typically be part of a mock when running a replay. In the case of Kafka however, we can simplify the replay process by simply turning Fetch calls made by our app into Produce calls.

Prerequisites

  1. speedctl is installed
  2. Create a snapshot containing the traffic you need.

The simplest way to replay Kafka traffic is using the replay wizard in the Speedscale UI. The wizard provides a guided experience for configuring and running replays directly against your Kafka brokers in-cluster. Speedscale automatically handles Kafka message production during replay, eliminating the need for manual setup or scripting.

For detailed instructions, see the replay wizard guide.

note

The latency reported on the report in the UI is the time to produce messages to the broker, not the full latency for downstream applications to consume them.

Replay with proxymock

The proxymock CLI tool can be used to replay recorded Kafka traffic directly into a Kafka broker using the proxymock replay command.

Pull a Snapshot

First, download the snapshot from Speedscale cloud:

proxymock cloud pull snapshot <snapshot-id>

This will download the snapshot traffic to individual local files.

Replay Traffic

proxymock replay --in <directory> --test-against <kafka-broker>

For example:

proxymock replay --in ./proxymock/snapshot-abc123 --test-against localhost:9092

Output

When replaying Kafka traffic, proxymock displays a Kafka-specific latency table:

LATENCY / THROUGHPUT
+-----------------+---------+-------+-------------+-------+-------+-------+-------+--------+-----------------+------------+
| TOPIC | API | COUNT | AVG LATENCY | P50 | P90 | P95 | P99 | FAILED | RESULT MISMATCH | PER-SECOND |
+-----------------+---------+-------+-------------+-------+-------+-------+-------+--------+-----------------+------------+
| dev-fulfillment | Produce | 3 | 94.33 ms | 12 ms | 12 ms | 12 ms | 12 ms | 0% | 0% | 10.28 |
+-----------------+---------+-------+-------------+-------+-------+-------+-------+--------+-----------------+------------+
| TOTAL | | 3 | 94.33 ms | 12 ms | 12 ms | 12 ms | 12 ms | 0% | 0% | 10.28 |
+-----------------+---------+-------+-------------+-------+-------+-------+-------+--------+-----------------+------------+

The output shows:

  • Topic: The Kafka topic messages were produced to
  • API: The Kafka API operation (e.g., Produce)
  • COUNT: Number of messages sent
  • Latency metrics: Average, P50, P90, P95, and P99 latencies for producing messages
  • FAILED: Percentage of failed produce operations
  • RESULT MISMATCH: Percentage of unexpected results
  • PER-SECOND: Throughput in messages per second
note

The latency reported is the time to produce messages to the broker, not the full latency for downstream applications to consume them.


Manual Replay (Alternative)

If you need more control over the replay process, you can extract the data from Speedscale and create a custom producer script.

Extract the data

Grab your snapshot id and run this command. This will extract the message data and timestamps from Kafka fetch commands, which are deeply nested in Speedscale's RRPair format.

speedctl extract data <snapshot-id> --path kafka.response.FetchResponse.topics.0.partitions.0.records.records.0.valueString --path .ts --filter='(command IS  "Fetch")'

This will generate a csv that looks something like this, with the message data, timestamp, and the corresponding RRPair UUID (not needed in this case)

kafka.response.FetchResponse.topics.0.partitions.0.records.records.0.valueString,.ts,RRPair UUID
"message1",2024-01-15T10:30:45.123Z,44f7a2cc-2045-4fb6-9635-3da8aa7fa909
"message2",2024-01-15T10:30:46.456Z,58f7a2cc-1135-4fa6-3433-ada5aa2fa161
tip

This example uses a basic case where single topic and a single record per Fetch call is assumed. For more advanced use cases see speedctl extract data --help

Create your producer

Next up, using the language and LLM of your choice, create a small load producer to send these messages to your Kafka broker. The steps here are

  1. Read the CSV from our previous step.
  2. Create a Kafka producer/client.
  3. Iterate over the CSV.
  4. For each row in the CSV, extract the message body and optionally the timestamp.
  5. If timing mode is enabled, wait between messages to match the original recording timing.
  6. Send the message to Kafka.
  7. Wait for the producer flush to complete.

Example scripts in multiple languages are provided below.

package main

import (
"encoding/csv"
"flag"
"fmt"
"io"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
respectTiming = flag.Bool("respect-timing", false, "Respect original message timing from recording")
csvFile = flag.String("csv", "out.csv", "Path to CSV file")
kafkaTopic = flag.String("topic", "demo-topic", "Kafka topic name")
brokers = flag.String("brokers", "localhost:9092", "Kafka broker addresses")
)

func main() {
flag.Parse()
if err := do(); err != nil {
panic(err)
}
}

func do() error {
// Open CSV file
file, err := os.Open(*csvFile)
if err != nil {
return fmt.Errorf("failed to open CSV file: %w", err)
}
defer file.Close()

// Create CSV reader
reader := csv.NewReader(file)

// Skip header row
if _, err := reader.Read(); err != nil {
return fmt.Errorf("failed to read CSV header: %w", err)
}

// Create Kafka producer
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": *brokers,
})
if err != nil {
return fmt.Errorf("failed to create Kafka producer: %w", err)
}
defer producer.Close()

var lastTimestamp time.Time
startTime := time.Now()

// Iterate over CSV rows
for {
row, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read CSV row: %w", err)
}

// Extract message body from first column
messageBody := row[0]

// Handle timing if enabled
if *respectTiming && len(row) > 1 {
// Parse timestamp from second column
timestamp, err := time.Parse(time.RFC3339Nano, row[1])
if err != nil {
return fmt.Errorf("failed to parse timestamp %s: %w", row[1], err)
}

// Calculate delay relative to previous message
if !lastTimestamp.IsZero() {
delay := timestamp.Sub(lastTimestamp)
if delay > 0 {
time.Sleep(delay)
}
} else {
// First message - record start time
startTime = time.Now()
}
lastTimestamp = timestamp
}

// Create Kafka message
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: *kafkaTopic,
Partition: kafka.PartitionAny,
},
Value: []byte(messageBody),
}

// Produce message to Kafka
if err := producer.Produce(msg, nil); err != nil {
return fmt.Errorf("failed to produce message to Kafka: %w", err)
}
}

// Wait for all messages to be delivered
producer.Flush(15000) // 15 second timeout

if *respectTiming {
elapsed := time.Since(startTime)
fmt.Printf("Replay completed in %s with original timing\n", elapsed)
} else {
fmt.Println("Replay completed at maximum speed")
}

return nil
}

Usage Examples

Send messages as fast as possible (default):

go run main.go --csv out.csv --topic my-topic --brokers localhost:9092

Respect original message timing from the recording:

go run main.go --csv out.csv --topic my-topic --brokers localhost:9092 --respect-timing