Golang中测量吞吐量和延迟分布的代码实现
Golang中测量吞吐量和延迟分布的代码实现 我是Go语言的初学者,而且Go是我的第一门编程语言。我正在通过Udemy学习课程。
我正尝试为项目中的gRPC服务设置负载测试。但我无法确定实现以下功能所需的代码/语法:
- 测量以请求/秒为单位的吞吐量
- 使用可配置的区间大小(例如1毫秒)测量延迟分布
- 从分布直方图中推导出90%和99%百分位数,这只需要精确到区间大小,但应采用保守估计(向上取整)
以下是我的部分代码:
start := time.Now() // Capturing the start time
r, err := c.GetItem(ctx, &pb.GetItemRequest{Id: "NAME"}) // Making a gRPC request
elapsed := time.Since(start) // Capturing the latency
if err != nil {
return fmt.Errorf("Couldn't fetch a response :%r", err)
} else {
log.Printf("Response: %s", r)
log.Printf("GetItem took %s", elapsed)
}
}
更多关于Golang中测量吞吐量和延迟分布的代码实现的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang中测量吞吐量和延迟分布的代码实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
以下是实现gRPC服务负载测试中测量吞吐量和延迟分布的完整代码示例:
package main
import (
"context"
"fmt"
"log"
"math"
"sort"
"sync"
"time"
pb "your/proto/package" // 替换为实际的proto包路径
)
type Metrics struct {
mu sync.RWMutex
totalRequests int64
totalDuration time.Duration
latencies []time.Duration
bucketSize time.Duration
latencyBuckets map[int64]int64 // 桶索引 -> 计数
}
func NewMetrics(bucketSize time.Duration) *Metrics {
return &Metrics{
bucketSize: bucketSize,
latencyBuckets: make(map[int64]int64),
latencies: make([]time.Duration, 0),
}
}
func (m *Metrics) RecordLatency(latency time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.totalRequests++
m.totalDuration += latency
m.latencies = append(m.latencies, latency)
// 将延迟分配到对应的桶中
bucketIndex := int64(latency / m.bucketSize)
m.latencyBuckets[bucketIndex]++
}
func (m *Metrics) Throughput() float64 {
m.mu.RLock()
defer m.mu.RUnlock()
if m.totalDuration == 0 {
return 0
}
return float64(m.totalRequests) / m.totalDuration.Seconds()
}
func (m *Metrics) Percentile(percentile float64) time.Duration {
m.mu.RLock()
defer m.mu.RUnlock()
if len(m.latencies) == 0 {
return 0
}
// 对延迟进行排序
latencies := make([]time.Duration, len(m.latencies))
copy(latencies, m.latencies)
sort.Slice(latencies, func(i, j int) bool {
return latencies[i] < latencies[j]
})
// 计算百分位位置(保守估计:向上取整)
index := int(math.Ceil(float64(len(latencies)) * percentile / 100)) - 1
if index < 0 {
index = 0
}
if index >= len(latencies) {
index = len(latencies) - 1
}
return latencies[index]
}
func (m *Metrics) PercentileFromBuckets(percentile float64) time.Duration {
m.mu.RLock()
defer m.mu.RUnlock()
if len(m.latencyBuckets) == 0 {
return 0
}
// 从桶分布计算百分位数
total := int64(0)
for _, count := range m.latencyBuckets {
total += count
}
targetCount := int64(math.Ceil(float64(total) * percentile / 100))
// 获取排序后的桶索引
bucketIndices := make([]int64, 0, len(m.latencyBuckets))
for idx := range m.latencyBuckets {
bucketIndices = append(bucketIndices, idx)
}
sort.Slice(bucketIndices, func(i, j int) bool {
return bucketIndices[i] < bucketIndices[j]
})
// 找到目标百分位对应的桶
currentCount := int64(0)
for _, idx := range bucketIndices {
currentCount += m.latencyBuckets[idx]
if currentCount >= targetCount {
// 保守估计:使用桶的上边界
return time.Duration(idx+1) * m.bucketSize
}
}
// 如果所有桶都遍历完,返回最大桶的上边界
maxIdx := bucketIndices[len(bucketIndices)-1]
return time.Duration(maxIdx+1) * m.bucketSize
}
func (m *Metrics) PrintMetrics() {
m.mu.RLock()
defer m.mu.RUnlock()
fmt.Printf("总请求数: %d\n", m.totalRequests)
fmt.Printf("吞吐量: %.2f 请求/秒\n", m.Throughput())
fmt.Printf("P90延迟 (精确): %v\n", m.Percentile(90))
fmt.Printf("P99延迟 (精确): %v\n", m.Percentile(99))
fmt.Printf("P90延迟 (桶估计): %v\n", m.PercentileFromBuckets(90))
fmt.Printf("P99延迟 (桶估计): %v\n", m.PercentileFromBuckets(99))
// 打印延迟分布直方图
fmt.Println("\n延迟分布直方图:")
bucketIndices := make([]int64, 0, len(m.latencyBuckets))
for idx := range m.latencyBuckets {
bucketIndices = append(bucketIndices, idx)
}
sort.Slice(bucketIndices, func(i, j int) bool {
return bucketIndices[i] < bucketIndices[j]
})
for _, idx := range bucketIndices {
lowerBound := time.Duration(idx) * m.bucketSize
upperBound := time.Duration(idx+1) * m.bucketSize
count := m.latencyBuckets[idx]
percentage := float64(count) / float64(m.totalRequests) * 100
fmt.Printf("[%v - %v): %d 请求 (%.1f%%)\n",
lowerBound, upperBound, count, percentage)
}
}
// 负载测试函数
func RunLoadTest(ctx context.Context, c pb.YourServiceClient, numRequests int, concurrency int) {
metrics := NewMetrics(1 * time.Millisecond) // 1毫秒桶大小
var wg sync.WaitGroup
requestsPerWorker := numRequests / concurrency
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for j := 0; j < requestsPerWorker; j++ {
start := time.Now()
r, err := c.GetItem(ctx, &pb.GetItemRequest{Id: "NAME"})
elapsed := time.Since(start)
if err != nil {
log.Printf("Worker %d 请求失败: %v", workerID, err)
} else {
log.Printf("Worker %d 响应: %s, 延迟: %v", workerID, r, elapsed)
}
metrics.RecordLatency(elapsed)
}
}(i)
}
wg.Wait()
metrics.PrintMetrics()
}
// 使用示例
func main() {
// 初始化gRPC客户端
// conn, err := grpc.Dial(...)
// c := pb.NewYourServiceClient(conn)
ctx := context.Background()
// 运行负载测试:1000个请求,10个并发
// RunLoadTest(ctx, c, 1000, 10)
}
这个实现提供了:
- 吞吐量测量:通过
Throughput()方法计算请求/秒 - 延迟分布:使用可配置的桶大小(示例中为1毫秒)记录延迟分布
- 百分位数计算:
Percentile():基于排序的精确计算PercentileFromBuckets():基于桶分布的保守估计(向上取整)
- 并发安全:使用互斥锁保护共享数据
- 详细统计输出:包括直方图分布和关键指标
使用方法:
// 创建指标收集器,桶大小为1毫秒
metrics := NewMetrics(1 * time.Millisecond)
// 记录每次请求的延迟
metrics.RecordLatency(elapsed)
// 获取吞吐量
throughput := metrics.Throughput()
// 获取百分位数
p90 := metrics.PercentileFromBuckets(90) // 基于桶的保守估计
p99 := metrics.PercentileFromBuckets(99)

