Skip to main content

Replaying Google Pub/Sub

Background

Google Pub/Sub uses HTTP-based APIs, which means you can use Speedscale's standard replay capabilities to capture and replay Pub/Sub traffic. Unlike protocol-specific brokers like Kafka or RabbitMQ, Pub/Sub traffic can be replayed directly using Speedscale's standard HTTP replay functionality.

For more information about Google Pub/Sub, see the Google Cloud Pub/Sub documentation.

Prerequisites

  1. speedctl is installed
  2. Create a snapshot containing the traffic you need.

Standard Replay Approach

Since Google Pub/Sub uses HTTP-based APIs, you can use Speedscale's standard replay capabilities:

  1. Capture Production Traffic: Use Speedscale to capture your application's interactions with Google Pub/Sub
  2. Create a Snapshot: Create a snapshot from the captured traffic
  3. Run a Replay: Use Speedscale's standard replay functionality to replay the traffic

For detailed instructions on using Speedscale's standard replay capabilities, see the Getting Started Guide.

Alternative: Custom Load Driver Approach

If you need more control over message replay patterns or want to extract and replay messages independently, you can use a custom load driver approach similar to Kafka and RabbitMQ:

Extract the data

Grab your snapshot id and run this command to extract message payloads from Pub/Sub traffic:

speedctl extract data <snapshot-id> --path .http.request.body --path .ts

This will generate a CSV with message data, timestamps, and corresponding RRPair UUIDs.

Create your producer

Create a custom load producer using the Google Cloud Pub/Sub client library for your preferred language. The steps are:

  1. Read the CSV from the previous step
  2. Create a Google Pub/Sub publisher client
  3. Iterate over the CSV
  4. For each row, extract the message body and optionally the timestamp
  5. If timing mode is enabled, wait between messages to match the original recording timing
  6. Publish the message to the Pub/Sub topic
  7. Close the client when complete

An example script in Go is provided below:

package main

import (
"context"
"encoding/csv"
"flag"
"fmt"
"io"
"os"
"time"

"cloud.google.com/go/pubsub"
)

var (
respectTiming = flag.Bool("respect-timing", false, "Respect original message timing from recording")
csvFile = flag.String("csv", "your_file.csv", "Path to CSV file")
topicID = flag.String("topic", "demo-topic", "Pub/Sub topic ID")
projectID = flag.String("project", "your-project-id", "GCP project ID")
)

func main() {
flag.Parse()
if err := do(); err != nil {
panic(err)
}
}

func do() error {
// Open CSV file
file, err := os.Open(*csvFile)
if err != nil {
return fmt.Errorf("failed to open CSV file: %w", err)
}
defer file.Close()

// Create CSV reader
reader := csv.NewReader(file)

// Skip header row
if _, err := reader.Read(); err != nil {
return fmt.Errorf("failed to read CSV header: %w", err)
}

// Create Pub/Sub client
ctx := context.Background()
client, err := pubsub.NewClient(ctx, *projectID)
if err != nil {
return fmt.Errorf("failed to create Pub/Sub client: %w", err)
}
defer client.Close()

topic := client.Topic(*topicID)

var lastTimestamp time.Time
startTime := time.Now()

// Iterate over CSV rows
for {
row, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read CSV row: %w", err)
}

// Extract message body from first column
messageBody := []byte(row[0])

// Handle timing if enabled
if *respectTiming && len(row) > 1 {
// Parse timestamp from second column
timestamp, err := time.Parse(time.RFC3339Nano, row[1])
if err != nil {
return fmt.Errorf("failed to parse timestamp %s: %w", row[1], err)
}

// Calculate delay relative to previous message
if !lastTimestamp.IsZero() {
delay := timestamp.Sub(lastTimestamp)
if delay > 0 {
time.Sleep(delay)
}
} else {
// First message - record start time
startTime = time.Now()
}
lastTimestamp = timestamp
}

// Publish message to Pub/Sub
result := topic.Publish(ctx, &pubsub.Message{
Data: messageBody,
})

// Wait for publish to complete
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("failed to publish message to Pub/Sub: %w", err)
}
}

if *respectTiming {
elapsed := time.Since(startTime)
fmt.Printf("Replay completed in %s with original timing\n", elapsed)
} else {
fmt.Println("Replay completed at maximum speed")
}

return nil
}

Usage Examples

Send messages as fast as possible (default):

go run main.go --csv your_file.csv --topic demo-topic --project your-project-id

Respect original message timing from the recording:

go run main.go --csv your_file.csv --topic demo-topic --project your-project-id --respect-timing
note

Make sure to update the project ID, topic name, and ensure you have proper GCP authentication configured. Use the --respect-timing flag to preserve the original message timing patterns from your production traffic, or omit it to send messages as fast as possible for maximum throughput testing.