在Go语言中处理流式数据与错误传递是一个常见的API设计挑战。基于你的需求,我建议使用包装结构体的方案,这是最符合Go语言习惯且易于维护的方式。
以下是具体的实现示例:
type StreamMsg[T any] struct {
Val T
Err error
}
type StreamAPI interface {
// Results 返回一个只读通道,用于接收流式结果
Results() <-chan StreamMsg[string]
}
type myAPI struct {
results chan StreamMsg[string]
}
func NewMyAPI() *myAPI {
return &myAPI{
results: make(chan StreamMsg[string], 10), // 带缓冲的通道
}
}
func (api *myAPI) Results() <-chan StreamMsg[string] {
return api.results
}
func (api *myAPI) StartProcessing() {
go func() {
defer close(api.results)
// 模拟数据处理,可能在任何点失败
data := []string{"result1", "result2", "result3", "result4"}
for i, item := range data {
// 模拟在第二个项目后出现错误
if i == 2 {
api.results <- StreamMsg[string]{
Err: fmt.Errorf("processing failed at item %d", i),
}
// 可以选择继续发送剩余数据或终止
continue
}
api.results <- StreamMsg[string]{
Val: item,
}
}
}()
}
// 消费者使用示例
func main() {
api := NewMyAPI()
api.StartProcessing()
for msg := range api.Results() {
if msg.Err != nil {
fmt.Printf("Error encountered: %v\n", msg.Err)
// 消费者可以在这里决定是否重试或继续
continue
}
fmt.Printf("Received value: %s\n", msg.Val)
}
}
为了处理重试场景,可以扩展这个模式:
type RetryableStreamMsg[T any] struct {
Val T
Err error
Retry func() error // 可选的重试函数
}
func (api *myAPI) ResultsWithRetry() <-chan RetryableStreamMsg[string] {
retryCh := make(chan RetryableStreamMsg[string], 10)
go func() {
defer close(retryCh)
for msg := range api.Results() {
retryMsg := RetryableStreamMsg[string]{
Val: msg.Val,
Err: msg.Err,
}
if msg.Err != nil {
retryMsg.Retry = func() error {
// 实现具体的重试逻辑
fmt.Println("Retrying failed operation...")
return nil
}
}
retryCh <- retryMsg
}
}()
return retryCh
}
这种包装结构体的方法提供了以下优势:
- 类型安全:通过泛型保持类型安全
- 错误处理明确:强制消费者处理错误
- 通道语义完整:保持简单的range循环
- 扩展性强:易于添加额外字段(如重试函数)
- 内存效率:通过适当的通道缓冲减少分配
相比之下,双通道方案会增加复杂性,而前置错误检查则破坏了流式处理的连续性。包装结构体在Go生态系统中是被广泛接受的模式,在标准库和流行框架中都有类似实现。