Replaying Google Pub/Sub
Background
Google Pub/Sub uses HTTP-based APIs, which means you can use Speedscale's standard replay capabilities to capture and replay Pub/Sub traffic. Unlike protocol-specific brokers like Kafka or RabbitMQ, Pub/Sub traffic can be replayed directly using Speedscale's standard HTTP replay functionality.
For more information about Google Pub/Sub, see the Google Cloud Pub/Sub documentation.
Prerequisites
- speedctl is installed
- Create a snapshot containing the traffic you need.
Standard Replay Approach
Since Google Pub/Sub uses HTTP-based APIs, you can use Speedscale's standard replay capabilities:
- Capture Production Traffic: Use Speedscale to capture your application's interactions with Google Pub/Sub
- Create a Snapshot: Create a snapshot from the captured traffic
- Run a Replay: Use Speedscale's standard replay functionality to replay the traffic
For detailed instructions on using Speedscale's standard replay capabilities, see the Getting Started Guide.
Alternative: Custom Load Driver Approach
If you need more control over message replay patterns or want to extract and replay messages independently, you can use a custom load driver approach similar to Kafka and RabbitMQ:
Extract the data
Grab your snapshot id and run this command to extract message payloads from Pub/Sub traffic:
speedctl extract data <snapshot-id> --path .http.request.body --path .ts
This will generate a CSV with message data, timestamps, and corresponding RRPair UUIDs.
Create your producer
Create a custom load producer using the Google Cloud Pub/Sub client library for your preferred language. The steps are:
- Read the CSV from the previous step
- Create a Google Pub/Sub publisher client
- Iterate over the CSV
- For each row, extract the message body and optionally the timestamp
- If timing mode is enabled, wait between messages to match the original recording timing
- Publish the message to the Pub/Sub topic
- Close the client when complete
An example script in Go is provided below:
package main
import (
"context"
"encoding/csv"
"flag"
"fmt"
"io"
"os"
"time"
"cloud.google.com/go/pubsub"
)
var (
respectTiming = flag.Bool("respect-timing", false, "Respect original message timing from recording")
csvFile = flag.String("csv", "your_file.csv", "Path to CSV file")
topicID = flag.String("topic", "demo-topic", "Pub/Sub topic ID")
projectID = flag.String("project", "your-project-id", "GCP project ID")
)
func main() {
flag.Parse()
if err := do(); err != nil {
panic(err)
}
}
func do() error {
// Open CSV file
file, err := os.Open(*csvFile)
if err != nil {
return fmt.Errorf("failed to open CSV file: %w", err)
}
defer file.Close()
// Create CSV reader
reader := csv.NewReader(file)
// Skip header row
if _, err := reader.Read(); err != nil {
return fmt.Errorf("failed to read CSV header: %w", err)
}
// Create Pub/Sub client
ctx := context.Background()
client, err := pubsub.NewClient(ctx, *projectID)
if err != nil {
return fmt.Errorf("failed to create Pub/Sub client: %w", err)
}
defer client.Close()
topic := client.Topic(*topicID)
var lastTimestamp time.Time
startTime := time.Now()
// Iterate over CSV rows
for {
row, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read CSV row: %w", err)
}
// Extract message body from first column
messageBody := []byte(row[0])
// Handle timing if enabled
if *respectTiming && len(row) > 1 {
// Parse timestamp from second column
timestamp, err := time.Parse(time.RFC3339Nano, row[1])
if err != nil {
return fmt.Errorf("failed to parse timestamp %s: %w", row[1], err)
}
// Calculate delay relative to previous message
if !lastTimestamp.IsZero() {
delay := timestamp.Sub(lastTimestamp)
if delay > 0 {
time.Sleep(delay)
}
} else {
// First message - record start time
startTime = time.Now()
}
lastTimestamp = timestamp
}
// Publish message to Pub/Sub
result := topic.Publish(ctx, &pubsub.Message{
Data: messageBody,
})
// Wait for publish to complete
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("failed to publish message to Pub/Sub: %w", err)
}
}
if *respectTiming {
elapsed := time.Since(startTime)
fmt.Printf("Replay completed in %s with original timing\n", elapsed)
} else {
fmt.Println("Replay completed at maximum speed")
}
return nil
}
Usage Examples
Send messages as fast as possible (default):
go run main.go --csv your_file.csv --topic demo-topic --project your-project-id
Respect original message timing from the recording:
go run main.go --csv your_file.csv --topic demo-topic --project your-project-id --respect-timing
Make sure to update the project ID, topic name, and ensure you have proper GCP authentication configured. Use the --respect-timing flag to preserve the original message timing patterns from your production traffic, or omit it to send messages as fast as possible for maximum throughput testing.