Golang中如何控制纯计算任务的大小或执行时间
Golang中如何控制纯计算任务的大小或执行时间
Go 垃圾回收器因抢占式调度失败而挂起
使用命令 GODEBUG=gctrace=2 GOGC=5 ./main 编译并运行以下代码。
package main
import (
"fmt"
"time"
)
const (
Num = 1 << 20
)
func main() {
go func() {
for {
select {
case <-time.After(time.Second):
fmt.Printf("hello world\n")
}
}
}()
vs := make([]int64, Num)
for i := 0; i < 100; i++ {
for i := 0; i < len(vs); i++ {
vs = append(vs[:i], vs[i+1:]...)
i--
}
vs = vs[:Num]
}
}
对于一个纯粹的计算循环,在程序设计时如何设置一个更合理的值?8192 是一个合理的值吗?
更多关于Golang中如何控制纯计算任务的大小或执行时间的实战教程也可以访问 https://www.itying.com/category-94-b0.html
谢谢
我这边也遇到了同样的问题。
我不太明白你在问什么。你是在问如何设置执行超时吗?你如何定义“更合理”?
我的主要问题在于,我不知道如何更好地编写此类代码,以便垃圾回收器能够良好工作。我不希望在循环中添加任何内容来让垃圾回收器终止这个循环,因为那样会导致性能下降。我想要的是让这个循环尽可能小(但过小又会导致性能下降),比如8192,但我不确定这个值多少才算合适。
我认为,与其在代码中引入各种变通方法来规避垃圾回收机制,不如尝试去控制它。SetGCPercent 和 SetMaxHeap 这两个函数可能会有所帮助。如果这些方法不奏效,可以尝试在 golang nuts 邮件列表上提问?Ian Lance Taylor 和其他熟悉垃圾回收实现的人会在那里回答问题。
我认为确实如此,对于一个纯粹的计算任务,需要确定一个更合理的执行周期,否则如果任务阻塞了垃圾回收(GC),将导致内存溢出(OOM)。我目前遇到的问题实际上就是类似这样的任务导致了OOM。因为会发生以下情况:
- 垃圾回收启动
- 垃圾回收等待循环结束
- 服务接收到新请求并处理该请求(此过程会分配内存)
- 内存溢出
我不确定除了在其中调用 runtime.Gosched() 之外,你还有其他选择。你的示例实际上并没有产生任何垃圾,因为它预先分配了切片,并且只将其重新切片为更小的尺寸。我添加了另一个 goroutine,它不断向切片追加内容*,并因内存不足错误而崩溃。如果我添加一个对 runtime.Gosched() 的调用,那么其他 goroutine,包括垃圾收集器,就能够运行。
据我理解,产生大量垃圾的 goroutine 应该停止并花时间清理其中的一些垃圾。我不确定为什么这没有发生,但我还没有机会深入研究。
skillian:
我不确定除了在其中调用
runtime.Gosched()之外,你还有什么其他选择。你的示例实际上并没有产生任何垃圾,因为它预先分配了切片,并且只将其重新切片为更小的尺寸。我添加了另一个 goroutine,它不断向切片追加内容*,然后因内存不足错误而崩溃。如果我添加一个对runtime.Gosched()的调用,那么包括垃圾收集器在内的其他 goroutine 就能够运行了。据我理解,产生大量垃圾的 goroutine 应该停下来,专门花时间清理其中的一些垃圾。我不确定为什么这没有发生,但我还没有机会去深入研究它。
发生这种情况的原因是这样的,在我的项目(一个数据库项目)中,一个事务执行了非常大量的插入操作(然后这些插入数据中的一部分会被刷到 s3,接着这些插入中的一部分会变成 nil),然后在提交时写了一个循环来收缩事务的大小(移除这些 nil),然后这个循环导致了一些 gc 问题。我目前的修复方法是直接移除这个循环。
好的,有了这些信息,我建议你修改移除 nil 值的算法。像这样使用 append 非常适合从切片中移除单个连续范围的元素,但如果你需要扫描切片并移除多个不连续范围的元素,那么反复移动切片元素所花费的时间会随着元素数量渐近增长。如果你改为采用更像下面这样的方法:
// removeZeros 从切片中移除等于其零值的元素(例如,对于 int 移除 0,对于 interface{} 移除 nil 等)。
func removeZeros[T comparable](vs []T) []T {
var zero T
delta := 0
for i := 0; i < len(vs); i++ {
if vs[i] == zero {
delta--
continue
}
vs[i+delta] = vs[i]
}
return vs[:len(vs)+delta]
}
我推测你实际代码中的循环看起来更像这样:
for i := 0; i < len(inserts); i++ {
if inserts[i] == nil {
inserts = append(inserts[:i], inserts[i+1:]...)
i--
}
}
你可以将整个循环改为:
inserts = removeZeros(inserts)
在Go语言中,控制纯计算任务的大小和执行时间是一个重要问题,特别是在避免垃圾回收器因抢占式调度失败而挂起的情况下。以下是几种解决方案:
1. 使用Gosched主动让出CPU
package main
import (
"runtime"
"time"
)
func computeTask() {
const batchSize = 1000 // 每批处理1000个元素
data := make([]int, 1000000)
for i := 0; i < len(data); i++ {
// 执行计算
data[i] = i * i
// 每处理一批数据让出CPU
if i%batchSize == 0 {
runtime.Gosched()
}
}
}
2. 基于时间片的执行控制
package main
import (
"time"
)
func computeWithTimeLimit() {
data := make([]int, 1000000)
start := time.Now()
const timeSlice = 10 * time.Millisecond // 10ms时间片
for i := 0; i < len(data); i++ {
data[i] = complexCalculation(i)
// 检查是否超过时间片
if time.Since(start) > timeSlice {
time.Sleep(1 * time.Millisecond) // 短暂休眠
start = time.Now()
}
}
}
func complexCalculation(n int) int {
// 模拟复杂计算
return n * n
}
3. 使用context控制执行时间
package main
import (
"context"
"time"
)
func computeWithContext(ctx context.Context) error {
data := make([]int, 1000000)
for i := 0; i < len(data); i++ {
select {
case <-ctx.Done():
return ctx.Err() // 上下文被取消
default:
data[i] = i * i
// 每8192次检查一次(8192是一个合理的批次大小)
if i%8192 == 0 {
runtime.Gosched()
}
}
}
return nil
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
go computeWithContext(ctx)
}
4. 工作窃取模式的分批处理
package main
import (
"runtime"
"sync/atomic"
)
func batchCompute() {
const batchSize = 8192 // 8192是一个合理的批次大小
data := make([]int, 1<<20) // 1M元素
var processed int64
// 使用多个goroutine并行处理
for g := 0; g < runtime.GOMAXPROCS(0); g++ {
go func(start int) {
for i := start; i < len(data); i += batchSize {
end := i + batchSize
if end > len(data) {
end = len(data)
}
// 处理一个批次
for j := i; j < end; j++ {
data[j] = j * j
}
atomic.AddInt64(&processed, int64(end-i))
runtime.Gosched() // 处理完一批后让出CPU
}
}(g * batchSize)
}
}
5. 针对原始问题的修复方案
package main
import (
"fmt"
"runtime"
"time"
)
const (
Num = 1 << 20
BatchSize = 8192 // 使用8192作为批次大小
)
func main() {
go func() {
for {
select {
case <-time.After(time.Second):
fmt.Printf("hello world\n")
}
}
}()
vs := make([]int64, Num)
for i := 0; i < 100; i++ {
// 分批处理,避免长时间占用CPU
for start := 0; start < len(vs); start += BatchSize {
end := start + BatchSize
if end > len(vs) {
end = len(vs)
}
// 处理当前批次
for j := start; j < end; j++ {
vs = append(vs[:j], vs[j+1:]...)
j--
}
// 每处理完一批让出CPU
if start%(BatchSize*4) == 0 {
runtime.Gosched()
}
}
vs = vs[:Num]
}
}
关于8192作为批次大小的合理性:
8192(8K)是一个合理的批次大小,原因如下:
- 缓存友好:现代CPU的L1缓存通常为32-64KB,8K的批次可以很好地利用缓存
- 调度平衡:足够大以保持计算效率,足够小以避免长时间阻塞调度器
- 经验值:在实践中,1024-16384的范围通常表现良好
关键原则是:在长时间运行的纯计算循环中,定期插入调度点(runtime.Gosched())或使用分批处理,避免单次计算占用CPU时间过长。这样可以确保垃圾回收器和其他goroutine能够及时获得执行机会。



