golang线程安全通用队列实现插件库queue的使用

Golang线程安全通用队列实现插件库queue的使用

queue包提供了Go中线程安全的通用实现,包括以下几种数据结构:BlockingQueuePriorityQueueCircularQueueLinked Queue

安装

要添加此包作为项目依赖项,请运行:

go get -u github.com/adrianbrad/queue

导入

在项目中导入:

import "github.com/adrianbrad/queue"

使用

Queue接口

// Queue是通用队列接口,定义了所有队列必须实现的方法
type Queue[T comparable] interface {
	// Get 检索并移除队列头部元素
	Get() (T, error)

	// Offer 将元素插入队列尾部
	Offer(T) error

	// Reset 将队列重置为初始状态
	Reset()

	// Contains 如果队列包含该元素则返回true
	Contains(T) bool

	// Peek 检索但不移除队列头部元素
	Peek() (T, error)

	// Size 返回队列中元素数量
	Size() int

	// IsEmpty 如果队列为空则返回true
	IsEmpty() bool

	// Iterator 返回一个将被元素填充的通道
	Iterator() <-chan T

	// Clear 移除队列中所有元素
	Clear() []T
}

阻塞队列(Blocking Queue)

阻塞队列是FIFO有序数据结构。实现了阻塞和非阻塞方法。

package main

import (
	"fmt"
	"github.com/adrianbrad/queue"
)

func main() {
	elems := []int{2, 3}

	blockingQueue := queue.NewBlocking(elems, queue.WithCapacity(3))

	containsTwo := blockingQueue.Contains(2)
	fmt.Println(containsTwo) // true

	size := blockingQueue.Size()
	fmt.Println(size) // 2

	empty := blockingQueue.IsEmpty()
	fmt.Println(empty) // false

	if err := blockingQueue.Offer(1); err != nil {
		// 处理错误
	}

	elem, err := blockingQueue.Get()
	if err != nil {
		// 处理错误
	}

	fmt.Println("elem: ", elem) // elem: 2
}

优先级队列(Priority Queue)

优先级队列中元素的顺序由构造时提供的比较函数决定。

package main

import (
	"fmt"
	"github.com/adrianbrad/queue"
)

func main() {
	elems := []int{2, 3, 4}

	priorityQueue := queue.NewPriority(
		elems, 
		func(elem, otherElem int) bool { return elem < otherElem },
        )

	containsTwo := priorityQueue.Contains(2)
	fmt.Println(containsTwo) // true

	size := priorityQueue.Size()
	fmt.Println(size) // 3

	empty := priorityQueue.IsEmpty()
	fmt.Println(empty) // false

	if err := priorityQueue.Offer(1); err != nil {
		// 处理错误
	}

	elem, err := priorityQueue.Get()
	if err != nil {
		// 处理错误
	}

	fmt.Printf("elem: %d\n", elem) // elem: 1
}

循环队列(Circular Queue)

循环队列是固定大小的FIFO有序数据结构。当队列满时,添加新元素会覆盖最旧的元素。

package main

import (
  "fmt"
  "github.com/adrianbrad/queue"
)

func main() {
  elems := []int{2, 3, 4}

  circularQueue := queue.NewCircular(elems, 3)

  containsTwo := circularQueue.Contains(2)
  fmt.Println(containsTwo) // true

  size := circularQueue.Size()
  fmt.Println(size) // 3

  empty := circularQueue.IsEmpty()
  fmt.Println(empty) // false

  if err := circularQueue.Offer(1); err != nil {
    // 处理错误
  }

  elem, err := circularQueue.Get()
  if err != nil {
    // 处理错误
  }

  fmt.Printf("elem: %d\n", elem) // elem: 1
}

链式队列(Linked Queue)

链式队列实现为单链表,提供O(1)时间复杂度的入队和出队操作。

package main

import (
  "fmt"
  "github.com/adrianbrad/queue"
)

func main() {
  elems := []int{2, 3, 4}

  circularQueue := queue.NewLinked(elems)

  containsTwo := circularQueue.Contains(2)
  fmt.Println(containsTwo) // true

  size := circularQueue.Size()
  fmt.Println(size) // 3

  empty := circularQueue.IsEmpty()
  fmt.Println(empty) // false

  if err := circularQueue.Offer(1); err != nil {
    // 处理错误
  }

  elem, err := circularQueue.Get()
  if err != nil {
    // 处理错误
  }

  fmt.Printf("elem: %d\n", elem) // elem: 2
}

基准测试

2023年10月测试结果:

BenchmarkBlockingQueue/Peek-8           84873882                13.98 ns/op            0 B/op          0 allocs/op
BenchmarkBlockingQueue/Get_Offer-8      27135865                47.00 ns/op           44 B/op          0 allocs/op
BenchmarkBlockingQueue/Offer-8          53750395                25.40 ns/op           43 B/op          0 allocs/op
BenchmarkCircularQueue/Peek-8           86001980                13.76 ns/op            0 B/op          0 allocs/op
BenchmarkCircularQueue/Get_Offer-8      32379159                36.83 ns/op            0 B/op          0 allocs/op
BenchmarkCircularQueue/Offer-8          63956366                18.77 ns/op            0 B/op          0 allocs/op
BenchmarkLinkedQueue/Peek-8             1000000000              0.4179 ns/op           0 B/op          0 allocs/op
BenchmarkLinkedQueue/Get_Offer-8        61257436                18.48 ns/op           16 B/op          1 allocs/op
BenchmarkLinkedQueue/Offer-8            38975062                30.74 ns/op           16 B/op          1 allocs/op
BenchmarkPriorityQueue/Peek-8           86633734                14.02 ns/op            0 B/op          0 allocs/op
BenchmarkPriorityQueue/Get_Offer-8      29347177                39.88 ns/op            0 B/op          0 allocs/op
BenchmarkPriorityQueue/Offer-8          40117958                31.37 ns/op           54 B/op          0 allocs/op

更多关于golang线程安全通用队列实现插件库queue的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang线程安全通用队列实现插件库queue的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang线程安全通用队列实现

在Go语言中,实现线程安全的队列有多种方式。下面我将介绍几种常见的线程安全队列实现方法,并提供相应的示例代码。

1. 使用标准库的sync包实现基础队列

package main

import (
	"sync"
)

// SafeQueue 线程安全队列
type SafeQueue[T any] struct {
	items []T
	lock  sync.Mutex
}

// NewSafeQueue 创建新的安全队列
func NewSafeQueue[T any]() *SafeQueue[T] {
	return &SafeQueue[T]{
		items: make([]T, 0),
	}
}

// Enqueue 入队
func (q *SafeQueue[T]) Enqueue(item T) {
	q.lock.Lock()
	defer q.lock.Unlock()
	q.items = append(q.items, item)
}

// Dequeue 出队
func (q *SafeQueue[T]) Dequeue() (T, bool) {
	q.lock.Lock()
	defer q.lock.Unlock()
	
	if len(q.items) == 0 {
		var zero T
		return zero, false
	}
	
	item := q.items[0]
	q.items = q.items[1:]
	return item, true
}

// Size 获取队列大小
func (q *SafeQueue[T]) Size() int {
	q.lock.Lock()
	defer q.lock.Unlock()
	return len(q.items)
}

// IsEmpty 检查队列是否为空
func (q *SafeQueue[T]) IsEmpty() bool {
	q.lock.Lock()
	defer q.lock.Unlock()
	return len(q.items) == 0
}

2. 使用channel实现队列

Go的channel本身就是线程安全的,可以用来实现队列:

package main

// ChannelQueue 基于channel的队列
type ChannelQueue[T any] struct {
	items chan T
}

// NewChannelQueue 创建新的channel队列
func NewChannelQueue[T any](capacity int) *ChannelQueue[T] {
	return &ChannelQueue[T]{
		items: make(chan T, capacity),
	}
}

// Enqueue 入队
func (q *ChannelQueue[T]) Enqueue(item T) bool {
	select {
	case q.items <- item:
		return true
	default:
		return false // 队列已满
	}
}

// Dequeue 出队
func (q *ChannelQueue[T]) Dequeue() (T, bool) {
	select {
	case item := <-q.items:
		return item, true
	default:
		var zero T
		return zero, false // 队列为空
	}
}

3. 使用第三方库

有几个优秀的第三方队列实现库:

3.1 使用github.com/eapache/queue

package main

import (
	"github.com/eapache/queue"
	"sync"
)

// ConcurrentQueue 并发队列
type ConcurrentQueue[T any] struct {
	q   *queue.Queue
	mu  sync.Mutex
}

// NewConcurrentQueue 创建新的并发队列
func NewConcurrentQueue[T any]() *ConcurrentQueue[T] {
	return &ConcurrentQueue[T]{
		q: queue.New(),
	}
}

// Enqueue 入队
func (cq *ConcurrentQueue[T]) Enqueue(item T) {
	cq.mu.Lock()
	defer cq.mu.Unlock()
	cq.q.Add(item)
}

// Dequeue 出队
func (cq *ConcurrentQueue[T]) Dequeue() (T, bool) {
	cq.mu.Lock()
	defer cq.mu.Unlock()
	
	if cq.q.Length() == 0 {
		var zero T
		return zero, false
	}
	
	return cq.q.Remove().(T), true
}

3.2 使用github.com/gammazero/deque

package main

import (
	"github.com/gammazero/deque"
	"sync"
)

// DequeWrapper 双端队列包装器
type DequeWrapper[T any] struct {
	dq deque.Deque[T]
	mu sync.Mutex
}

// NewDequeWrapper 创建新的双端队列
func NewDequeWrapper[T any]() *DequeWrapper[T] {
	return &DequeWrapper[T]{}
}

// PushBack 从尾部插入
func (d *DequeWrapper[T]) PushBack(item T) {
	d.mu.Lock()
	defer d.mu.Unlock()
	d.dq.PushBack(item)
}

// PopFront 从头部取出
func (d *DequeWrapper[T]) PopFront() (T, bool) {
	d.mu.Lock()
	defer d.mu.Unlock()
	
	if d.dq.Len() == 0 {
		var zero T
		return zero, false
	}
	return d.dq.PopFront(), true
}

4. 性能比较与选择建议

  1. 标准库实现:简单直接,适合大多数场景,但频繁的锁操作可能影响性能
  2. Channel实现:天然线程安全,适合生产者-消费者模式,但有容量限制
  3. 第三方库
    • eapache/queue:高效环形缓冲区实现
    • gammazero/deque:高性能双端队列

选择建议:

  • 简单场景:使用标准库实现
  • 生产者-消费者模式:使用channel
  • 高性能需求:选择第三方库

5. 使用示例

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	// 使用标准库实现的队列
	q := NewSafeQueue[int]()
	
	var wg sync.WaitGroup
	wg.Add(2)
	
	// 生产者
	go func() {
		defer wg.Done()
		for i := 0; i < 10; i++ {
			q.Enqueue(i)
			time.Sleep(100 * time.Millisecond)
		}
	}()
	
	// 消费者
	go func() {
		defer wg.Done()
		for i := 0; i < 10; i++ {
			if item, ok := q.Dequeue(); ok {
				fmt.Println("Dequeued:", item)
			}
			time.Sleep(150 * time.Millisecond)
		}
	}()
	
	wg.Wait()
	fmt.Println("Queue size:", q.Size())
}

以上代码提供了几种不同的线程安全队列实现方式,您可以根据具体需求选择合适的实现。

回到顶部