Golang新手求代码评审

Golang新手求代码评审 我编写了以下代码,旨在减少围绕 Go 协程/通道的一些样板代码,并使其在某种程度上更类似于其他语言使用 async/await 的方式。

有几个问题:

  1. 这在 Go 中是一种不好的做法吗?为什么?
  2. 如果可行,我是否应该做些不同的处理?
  3. 是否有更完整的解决方案可用(我检查过一些包,但不确定我的包搜索能力是否足够找到)
package task

import (
	"context"
	"fmt"
)

type TaskStatus int

const (
	New TaskStatus = iota
	Running
	Completed
)

var taskStatus = [3]string{"New", "Running", "Completed"}

type taskFunc[T interface{}] func(ctx context.Context) (T, error)
type voidTaskFunc func(ctx context.Context) error

type Task[T interface{}] interface {
	Status() TaskStatus
	Result() (T, error)
	Await(ctx ...context.Context) (T, error)
}

type task[T interface{}] struct {
	result T
	err    error
	c      chan struct{}
	status TaskStatus
	fn     taskFunc[T]
}

type TaskPanicError struct {
	reason interface{}
}

func (e TaskPanicError) Error() string {
	return fmt.Sprintf("Task panicked: %v", e.reason)
}

func NewTask[T interface{}](fn taskFunc[T]) *task[T] {
	t := new(task[T])
	t.c = make(chan struct{})
	t.fn = fn

	return t
}

func (t *task[T]) Status() TaskStatus {
	return t.status
}

func (t *task[T]) Result() (T, error) {
	switch t.status {
	case Completed:
		return t.result, t.err
	default:
		return t.result, fmt.Errorf("Task status is %s", taskStatus[t.status])
	}
}

func (t *task[T]) Await(ctx ...context.Context) (T, error) {
	switch t.status {
	case Completed:
		return t.result, t.err
	case Running:
		return t.result, fmt.Errorf("Task already running, can only be started once")
	}

	defer func() {
		t.status = Completed
	}()

	var _ctx context.Context
	if len(ctx) == 0 {
		_ctx = context.Background()
	} else {
		_ctx = ctx[0]
	}

	go t.start(_ctx)

	select {
	case <-_ctx.Done():
		return t.result, _ctx.Err()
	case <-t.c:
		return t.result, t.err
	}
}

func (t *task[T]) start(ctx context.Context) {
	defer func() {
		if r := recover(); r != nil {
			t.err = TaskPanicError{reason: r}
		}

		t.status = Completed
		t.c <- struct{}{}
		close(t.c)
	}()

	if ctx.Err() != nil {
		t.err = ctx.Err()
		t.c <- struct{}{}
		return
	}

	t.status = Running
	t.result, t.err = t.fn(ctx)
}

更多关于Golang新手求代码评审的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

最大的问题是对变量的并发访问。 如果多个 goroutine 访问同一个变量,并且其中至少有一个在写入,那么你需要同步访问。可以通过使用原子访问器、互斥锁或通道来实现。 我认为最符合 Go 语言习惯的做法是使用通道。带有超时的 Context 实现就是一个很好的例子。

就我个人而言,我可能会大幅简化接口。创建一个任务但不启动它似乎有些多余,它完全可以保持为一个普通的函数。因此,可以将 API 更改为只有一个方法:StartNewTask(context, func),并且一个任务只有两种状态:运行中和已完成(附带一个结果,该结果可能是一个错误)。

我通常首先关注实用性,然后从那里倒推。人们需要任务/等待方法做什么?通常是为了实现类似“运行一组任务,每次最多并行执行 4 个,并在所有任务完成后将所有结果作为数组返回”这样的功能。 你可以尝试找出这些用例,然后从那里倒推,以找到满足这些需求的正确结构,而不是从一个没有明确目的的结构开始。

更多关于Golang新手求代码评审的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


就我所见,你试图创建一个通用接口来运行简单的函数。这是一个很好的实践,但在我看来,主要是为了学习算法,因为这是一个非常理想的场景,当你需要运行类似 func(ctx context.Context) (any, error) 这样的函数时。你的例子非常好。尽管如此,我想提一些建议。

  1. Await 函数中,只使用简单的 context.Context 作为参数,让用户自己传递 background 或 TODO,减少假设。

  2. 为你的自定义状态添加 stringer。这是一个非常简单且有用的工具,它可以让你减少因使用额外的字符串变量切片而可能产生的错误。

type TaskStatus int

//go:generate stringer -type=TaskStatus
const (
	New TaskStatus = iota
	Running
	Completed
	Error
)

每次你为状态添加、移除或交换 iota 的位置时,简单的生成命令将为你节省大量时间。这将允许你在 fmt 中简单地使用 t.status 和 %s。

  1. 我认为让 ResultAwait 返回相同的结果和错误有点多余。我会根据它们的目的进行拆分。让 Await 只返回与任务相关的问题。

  2. 我会在你的结构体的读写数据上添加一些锁,因为可能存在对正在更改中的不同数据的并发访问。

下面的代码也不理想。这只是我的个人看法,说明我会如何在这个特定示例中进行调整。

package task

import (
	"context"
	"fmt"
	"sync"
)

type TaskStatus int

//go:generate stringer -type=TaskStatus
const (
	New TaskStatus = iota
	Running
	Completed
	Error
)

type taskFunc[T any] func(ctx context.Context) (T, error)
type voidTaskFunc func(ctx context.Context) error

type Task[T any] interface {
	Status() TaskStatus
	Result() (T, error)
	Await(ctx ...context.Context) (T, error)
}

type task[T any] struct {
	sync.RWMutex

	result T
	err    error
	c      chan struct{}
	status TaskStatus
	fn     taskFunc[T]
}

type TaskPanicError struct {
	reason any
}

func (e TaskPanicError) Error() string {
	return fmt.Sprintf("Task panicked: %v", e.reason)
}

func NewTask[T any](fn taskFunc[T]) *task[T] {
	t := new(task[T])
	t.c = make(chan struct{})
	t.fn = fn

	return t
}

func (t *task[T]) Status() TaskStatus {
	t.RLock()
	defer t.RUnlock()

	return t.status
}

func (t *task[T]) Result() (T, error) {
	t.RLock()
	defer t.RUnlock()

	switch t.status {
	case Completed:
		return t.result, t.err
	default:
		return t.result, fmt.Errorf("Task status is %s", t.status)
	}
}

func (t *task[T]) Await(ctx context.Context) error {
	switch t.Status() {
	case Completed:
		return t.err
	case Running:
		return fmt.Errorf("Task already running, can only be started once")
	}

	t.Lock()
	defer t.Unlock()

	t.status = Running
	go t.start(ctx)

	select {
	case <-ctx.Done():
		t.status = Error
		return ctx.Err()
	case <-t.c:
		t.status = Completed
		return t.err
	}
}

func (t *task[T]) start(ctx context.Context) {
	defer func() {
		if r := recover(); r != nil {
			t.err = TaskPanicError{reason: r}
		}

		close(t.c)
	}()

	t.result, t.err = t.fn(ctx)
}

这是一个很好的尝试,展示了Go语言中异步编程的思考。以下是对你代码的专业评审:

代码分析

优点:

  1. 类型安全:使用了泛型,可以处理任意类型的返回值
  2. 上下文支持:正确集成了context,支持取消操作
  3. 错误处理:考虑了panic恢复和错误传播

主要问题:

  1. 竞态条件status字段存在数据竞争
  2. 多次调用Await:如果多次调用Await,会启动多个goroutine
  3. 内存泄漏风险:通道可能永远不会被消费

改进版本

package task

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"
)

type TaskStatus int32

const (
	New TaskStatus = iota
	Running
	Completed
)

type Task[T any] interface {
	Status() TaskStatus
	Result() (T, error)
	Await(ctx context.Context) (T, error)
}

type task[T any] struct {
	result T
	err    error
	done   chan struct{}
	status atomic.Int32
	once   sync.Once
	fn     func(ctx context.Context) (T, error)
}

func NewTask[T any](fn func(ctx context.Context) (T, error)) *task[T] {
	return &task[T]{
		done: make(chan struct{}),
		fn:   fn,
	}
}

func (t *task[T]) Status() TaskStatus {
	return TaskStatus(t.status.Load())
}

func (t *task[T]) Result() (T, error) {
	if t.Status() != Completed {
		var zero T
		return zero, fmt.Errorf("task not completed")
	}
	return t.result, t.err
}

func (t *task[T]) Await(ctx context.Context) (T, error) {
	// 确保只执行一次
	t.once.Do(func() {
		go t.execute(ctx)
	})

	select {
	case <-ctx.Done():
		var zero T
		return zero, ctx.Err()
	case <-t.done:
		return t.result, t.err
	}
}

func (t *task[T]) execute(ctx context.Context) {
	defer func() {
		if r := recover(); r != nil {
			t.err = fmt.Errorf("task panicked: %v", r)
		}
		t.status.Store(int32(Completed))
		close(t.done)
	}()

	t.status.Store(int32(Running))
	t.result, t.err = t.fn(ctx)
}

使用示例

func main() {
	// 创建任务
	task1 := NewTask(func(ctx context.Context) (int, error) {
		time.Sleep(1 * time.Second)
		return 42, nil
	})

	task2 := NewTask(func(ctx context.Context) (string, error) {
		time.Sleep(500 * time.Millisecond)
		return "hello", nil
	})

	// 并发执行
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		defer wg.Done()
		result, err := task1.Await(ctx)
		if err != nil {
			fmt.Printf("Task1 error: %v\n", err)
		} else {
			fmt.Printf("Task1 result: %d\n", result)
		}
	}()

	go func() {
		defer wg.Done()
		result, err := task2.Await(ctx)
		if err != nil {
			fmt.Printf("Task2 error: %v\n", err)
		} else {
			fmt.Printf("Task2 result: %s\n", result)
		}
	}()

	wg.Wait()
}

现有解决方案

确实有更成熟的解决方案:

  1. golang.org/x/sync/errgroup - 官方并发原语
var g errgroup.Group
g.Go(func() error { return doWork1() })
g.Go(func() error { return doWork2() })
if err := g.Wait(); err != nil {
    // 处理错误
}
  1. github.com/sourcegraph/conc - 更高级的并发工具
pool := conc.NewWaitGroup()
pool.Go(func() { doWork1() })
pool.Go(func() { doWork2() })
pool.Wait()
  1. Future/Promise模式实现
// 例如:github.com/chebyrash/promise
p := promise.New(func(resolve func(int), reject func(error)) {
    // 异步操作
    resolve(42)
})

result, err := p.Await()

你的实现思路是正确的,但Go社区更倾向于使用更简单、更明确的并发模式。对于生产环境,建议使用经过充分测试的现有库。

回到顶部