golang高性能并发FIFO队列插件库goconcurrentqueue的使用

golang高性能并发FIFO队列插件库goconcurrentqueue的使用

goconcurrentqueue是一个提供并发安全队列的Go语言库,它实现了多种并发安全的队列结构。

安装

执行以下命令安装:

go get github.com/enriquebris/goconcurrentqueue

队列类型

FIFO队列

FIFO是一个并发安全、自动扩容的队列。

优点:

  • 可以无限添加元素
  • 提供额外方法获取和移除元素

缺点:

  • 性能略低于FixedFIFO

FixedFIFO队列

FixedFIFO是固定容量的并发安全队列。

优点:

  • 在并发场景下性能至少是FIFO的2倍

缺点:

  • 有固定容量限制

使用示例

FIFO队列基本使用

package main

import (
	"fmt"

	"github.com/enriquebris/goconcurrentqueue"
)

type AnyStruct struct {
	Field1 string
	Field2 int
}

func main() {
	queue := goconcurrentqueue.NewFIFO()

	// 入队三种不同类型元素
	queue.Enqueue("any string value")
	queue.Enqueue(5)
	queue.Enqueue(AnyStruct{Field1: "hello world", Field2: 15})

	// 输出队列长度: 3
	fmt.Printf("queue's length: %v\n", queue.GetLen())

	// 出队
	item, err := queue.Dequeue()
	if err != nil {
		fmt.Println(err)
		return
	}

	// 输出出队元素: "any string value"
	fmt.Printf("dequeued item: %v\n", item)

	// 输出队列长度: 2
	fmt.Printf("queue's length: %v\n", queue.GetLen())
}

等待元素入队

package main

import (
	"fmt"
	"time"

	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	var (
		fifo = goconcurrentqueue.NewFIFO()
		done = make(chan struct{})
	)

	go func() {
		fmt.Println("1 - Waiting for next enqueued element")
		value, _ := fifo.DequeueOrWaitForNextElement()
		fmt.Printf("2 - Dequeued element: %v\n", value)

		done <- struct{}{}
	}()

	fmt.Println("3 - Go to sleep for 3 seconds")
	time.Sleep(3 * time.Second)

	fmt.Println("4 - Enqueue element")
	fifo.Enqueue(100)

	<-done
}

带超时的等待元素入队

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	var (
		fifo          = goconcurrentqueue.NewFIFO()
		ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
	)
	defer cancel()

	fmt.Println("1 - Waiting for next enqueued element")
	_, err := fifo.DequeueOrWaitForNextElementContext(ctx)
    
	if err != nil {
		fmt.Printf("2 - Failed waiting for new element: %v\n", err)
		return
	}
}

依赖倒置原则使用队列

package main

import (
	"fmt"

	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	var (
		queue          goconcurrentqueue.Queue
		dummyCondition = true
	)

	// 根据条件选择队列实现
	if dummyCondition {
		queue = goconcurrentqueue.NewFIFO()
	} else {
		queue = goconcurrentqueue.NewFixedFIFO(10)
	}

	fmt.Printf("queue's length: %v\n", queue.GetLen())
	workWithQueue(queue)
	fmt.Printf("queue's length: %v\n", queue.GetLen())
}

// 使用Queue接口进行工作
func workWithQueue(queue goconcurrentqueue.Queue) error {
	// 入队一个元素
	if err := queue.Enqueue("test value"); err != nil {
		return err
	}

	return nil
}

性能对比

在2012款MacBook Pro(i7 2.3GHz, 16GB内存)上使用Go 1.12测试:

入队操作性能

入队性能对比

出队操作性能

出队性能对比

从图中可以看出FixedFIFO在并发场景下性能优势明显。


更多关于golang高性能并发FIFO队列插件库goconcurrentqueue的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能并发FIFO队列插件库goconcurrentqueue的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


GoConcurrentQueue - 高性能并发FIFO队列插件库使用指南

GoConcurrentQueue 是一个专门为 Go 语言设计的高性能并发安全 FIFO 队列库,它提供了简单易用的 API 和出色的并发性能。

安装

go get github.com/enriquebris/goconcurrentqueue

基本使用

创建队列

package main

import (
	"fmt"
	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	// 创建一个新的FIFO队列
	queue := goconcurrentqueue.NewFIFO()
	
	// 添加元素到队列
	queue.Enqueue("item1")
	queue.Enqueue(2)
	queue.Enqueue(3.14)
	
	// 获取队列长度
	fmt.Println("Queue length:", queue.GetLen()) // 输出: Queue length: 3
	
	// 从队列中取出元素
	item, err := queue.Dequeue()
	if err != nil {
		fmt.Println("Dequeue error:", err)
		return
	}
	fmt.Println("Dequeued item:", item) // 输出: Dequeued item: item1
}

高级特性

并发安全操作

package main

import (
	"fmt"
	"sync"
	"time"
	"github.com/enriquebris/goconcurrentqueue"
)

func producer(queue *goconcurrentqueue.FIFO, id int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < 5; i++ {
		item := fmt.Sprintf("producer-%d-item-%d", id, i)
		queue.Enqueue(item)
		fmt.Printf("Producer %d enqueued: %s\n", id, item)
		time.Sleep(time.Millisecond * 100)
	}
}

func consumer(queue *goconcurrentqueue.FIFO, id int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < 5; i++ {
		item, err := queue.Dequeue()
		if err != nil {
			fmt.Printf("Consumer %d error: %v\n", id, err)
			continue
		}
		fmt.Printf("Consumer %d dequeued: %v\n", id, item)
		time.Sleep(time.Millisecond * 150)
	}
}

func main() {
	queue := goconcurrentqueue.NewFIFO()
	var wg sync.WaitGroup

	// 启动3个生产者协程
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go producer(queue, i, &wg)
	}

	// 启动2个消费者协程
	for i := 0; i < 2; i++ {
		wg.Add(1)
		go consumer(queue, i, &wg)
	}

	wg.Wait()
	fmt.Println("Final queue length:", queue.GetLen())
}

阻塞式获取元素

package main

import (
	"fmt"
	"time"
	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	queue := goconcurrentqueue.NewFIFO()

	// 启动一个消费者协程
	go func() {
		// 阻塞等待直到有元素可用
		item, err := queue.DequeueOrWaitForNextElement()
		if err != nil {
			fmt.Println("Error waiting for element:", err)
			return
		}
		fmt.Println("Received item after waiting:", item)
	}()

	// 主协程稍后添加元素
	time.Sleep(2 * time.Second)
	queue.Enqueue("delayed item")

	// 给消费者时间处理
	time.Sleep(time.Second)
}

带超时的阻塞获取

package main

import (
	"fmt"
	"time"
	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	queue := goconcurrentqueue.NewFIFO()

	// 尝试获取元素,最多等待1秒
	item, err := queue.DequeueOrWaitForNextElementWithTimeout(time.Second)
	if err != nil {
		if err == goconcurrentqueue.ErrEmptyQueue {
			fmt.Println("Queue is empty after waiting")
		} else {
			fmt.Println("Error:", err)
		}
		return
	}
	fmt.Println("Received item:", item)
}

性能优化建议

  1. 批量操作:对于大量小项,考虑批量入队/出队以减少锁竞争
  2. 适当队列大小:根据场景选择合适队列容量
  3. 避免长时间持有锁:在队列操作中尽量减少耗时操作

与其他队列实现的比较

GoConcurrentQueue 相比标准库的 channel 和 sync.Mutex 保护的 slice 有以下优势:

  • 更丰富的 API(阻塞获取、超时等)
  • 动态容量(不像 channel 需要预先指定大小)
  • 专门优化的并发性能

总结

GoConcurrentQueue 是一个功能强大且易于使用的并发安全队列实现,特别适合需要高性能 FIFO 队列的 Go 并发程序。它提供了丰富的特性集,包括阻塞获取、超时控制等,同时保持了优异的性能表现。

回到顶部