Golang中测量吞吐量和延迟分布的代码实现

Golang中测量吞吐量和延迟分布的代码实现 我是Go语言的初学者,而且Go是我的第一门编程语言。我正在通过Udemy学习课程。

我正尝试为项目中的gRPC服务设置负载测试。但我无法确定实现以下功能所需的代码/语法:

  • 测量以请求/秒为单位的吞吐量
  • 使用可配置的区间大小(例如1毫秒)测量延迟分布
  • 从分布直方图中推导出90%和99%百分位数,这只需要精确到区间大小,但应采用保守估计(向上取整)

以下是我的部分代码:

		start := time.Now() // Capturing the start time
		r, err := c.GetItem(ctx, &pb.GetItemRequest{Id: "NAME"}) // Making a gRPC request
		elapsed := time.Since(start) // Capturing the latency
	if err != nil {
			return fmt.Errorf("Couldn't fetch a response :%r", err)
		} else {
			log.Printf("Response: %s", r)
			log.Printf("GetItem took %s", elapsed)
		}
	}

更多关于Golang中测量吞吐量和延迟分布的代码实现的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中测量吞吐量和延迟分布的代码实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


以下是实现gRPC服务负载测试中测量吞吐量和延迟分布的完整代码示例:

package main

import (
	"context"
	"fmt"
	"log"
	"math"
	"sort"
	"sync"
	"time"

	pb "your/proto/package" // 替换为实际的proto包路径
)

type Metrics struct {
	mu              sync.RWMutex
	totalRequests   int64
	totalDuration   time.Duration
	latencies       []time.Duration
	bucketSize      time.Duration
	latencyBuckets  map[int64]int64 // 桶索引 -> 计数
}

func NewMetrics(bucketSize time.Duration) *Metrics {
	return &Metrics{
		bucketSize:     bucketSize,
		latencyBuckets: make(map[int64]int64),
		latencies:      make([]time.Duration, 0),
	}
}

func (m *Metrics) RecordLatency(latency time.Duration) {
	m.mu.Lock()
	defer m.mu.Unlock()
	
	m.totalRequests++
	m.totalDuration += latency
	m.latencies = append(m.latencies, latency)
	
	// 将延迟分配到对应的桶中
	bucketIndex := int64(latency / m.bucketSize)
	m.latencyBuckets[bucketIndex]++
}

func (m *Metrics) Throughput() float64 {
	m.mu.RLock()
	defer m.mu.RUnlock()
	
	if m.totalDuration == 0 {
		return 0
	}
	return float64(m.totalRequests) / m.totalDuration.Seconds()
}

func (m *Metrics) Percentile(percentile float64) time.Duration {
	m.mu.RLock()
	defer m.mu.RUnlock()
	
	if len(m.latencies) == 0 {
		return 0
	}
	
	// 对延迟进行排序
	latencies := make([]time.Duration, len(m.latencies))
	copy(latencies, m.latencies)
	sort.Slice(latencies, func(i, j int) bool {
		return latencies[i] < latencies[j]
	})
	
	// 计算百分位位置(保守估计:向上取整)
	index := int(math.Ceil(float64(len(latencies)) * percentile / 100)) - 1
	if index < 0 {
		index = 0
	}
	if index >= len(latencies) {
		index = len(latencies) - 1
	}
	
	return latencies[index]
}

func (m *Metrics) PercentileFromBuckets(percentile float64) time.Duration {
	m.mu.RLock()
	defer m.mu.RUnlock()
	
	if len(m.latencyBuckets) == 0 {
		return 0
	}
	
	// 从桶分布计算百分位数
	total := int64(0)
	for _, count := range m.latencyBuckets {
		total += count
	}
	
	targetCount := int64(math.Ceil(float64(total) * percentile / 100))
	
	// 获取排序后的桶索引
	bucketIndices := make([]int64, 0, len(m.latencyBuckets))
	for idx := range m.latencyBuckets {
		bucketIndices = append(bucketIndices, idx)
	}
	sort.Slice(bucketIndices, func(i, j int) bool {
		return bucketIndices[i] < bucketIndices[j]
	})
	
	// 找到目标百分位对应的桶
	currentCount := int64(0)
	for _, idx := range bucketIndices {
		currentCount += m.latencyBuckets[idx]
		if currentCount >= targetCount {
			// 保守估计:使用桶的上边界
			return time.Duration(idx+1) * m.bucketSize
		}
	}
	
	// 如果所有桶都遍历完,返回最大桶的上边界
	maxIdx := bucketIndices[len(bucketIndices)-1]
	return time.Duration(maxIdx+1) * m.bucketSize
}

func (m *Metrics) PrintMetrics() {
	m.mu.RLock()
	defer m.mu.RUnlock()
	
	fmt.Printf("总请求数: %d\n", m.totalRequests)
	fmt.Printf("吞吐量: %.2f 请求/秒\n", m.Throughput())
	fmt.Printf("P90延迟 (精确): %v\n", m.Percentile(90))
	fmt.Printf("P99延迟 (精确): %v\n", m.Percentile(99))
	fmt.Printf("P90延迟 (桶估计): %v\n", m.PercentileFromBuckets(90))
	fmt.Printf("P99延迟 (桶估计): %v\n", m.PercentileFromBuckets(99))
	
	// 打印延迟分布直方图
	fmt.Println("\n延迟分布直方图:")
	bucketIndices := make([]int64, 0, len(m.latencyBuckets))
	for idx := range m.latencyBuckets {
		bucketIndices = append(bucketIndices, idx)
	}
	sort.Slice(bucketIndices, func(i, j int) bool {
		return bucketIndices[i] < bucketIndices[j]
	})
	
	for _, idx := range bucketIndices {
		lowerBound := time.Duration(idx) * m.bucketSize
		upperBound := time.Duration(idx+1) * m.bucketSize
		count := m.latencyBuckets[idx]
		percentage := float64(count) / float64(m.totalRequests) * 100
		fmt.Printf("[%v - %v): %d 请求 (%.1f%%)\n", 
			lowerBound, upperBound, count, percentage)
	}
}

// 负载测试函数
func RunLoadTest(ctx context.Context, c pb.YourServiceClient, numRequests int, concurrency int) {
	metrics := NewMetrics(1 * time.Millisecond) // 1毫秒桶大小
	var wg sync.WaitGroup
	requestsPerWorker := numRequests / concurrency
	
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			
			for j := 0; j < requestsPerWorker; j++ {
				start := time.Now()
				r, err := c.GetItem(ctx, &pb.GetItemRequest{Id: "NAME"})
				elapsed := time.Since(start)
				
				if err != nil {
					log.Printf("Worker %d 请求失败: %v", workerID, err)
				} else {
					log.Printf("Worker %d 响应: %s, 延迟: %v", workerID, r, elapsed)
				}
				
				metrics.RecordLatency(elapsed)
			}
		}(i)
	}
	
	wg.Wait()
	metrics.PrintMetrics()
}

// 使用示例
func main() {
	// 初始化gRPC客户端
	// conn, err := grpc.Dial(...)
	// c := pb.NewYourServiceClient(conn)
	
	ctx := context.Background()
	
	// 运行负载测试:1000个请求,10个并发
	// RunLoadTest(ctx, c, 1000, 10)
}

这个实现提供了:

  1. 吞吐量测量:通过Throughput()方法计算请求/秒
  2. 延迟分布:使用可配置的桶大小(示例中为1毫秒)记录延迟分布
  3. 百分位数计算
    • Percentile():基于排序的精确计算
    • PercentileFromBuckets():基于桶分布的保守估计(向上取整)
  4. 并发安全:使用互斥锁保护共享数据
  5. 详细统计输出:包括直方图分布和关键指标

使用方法:

// 创建指标收集器,桶大小为1毫秒
metrics := NewMetrics(1 * time.Millisecond)

// 记录每次请求的延迟
metrics.RecordLatency(elapsed)

// 获取吞吐量
throughput := metrics.Throughput()

// 获取百分位数
p90 := metrics.PercentileFromBuckets(90) // 基于桶的保守估计
p99 := metrics.PercentileFromBuckets(99)
回到顶部