mirror of
https://github.com/go-gitea/gitea
synced 2025-07-22 18:28:37 +00:00
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0 This adds functionality for Unique Queues * Add UniqueQueue interface and functions to create them * Add UniqueQueue implementations * Move TestPullRequests over to use UniqueQueue * Reduce code duplication * Add bytefifos * Ensure invalid types are logged * Fix close race in PersistableChannelQueue Shutdown
This commit is contained in:
61
modules/queue/bytefifo.go
Normal file
61
modules/queue/bytefifo.go
Normal file
@@ -0,0 +1,61 @@
|
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package queue
|
||||
|
||||
// ByteFIFO defines a FIFO that takes a byte array
|
||||
type ByteFIFO interface {
|
||||
// Len returns the length of the fifo
|
||||
Len() int64
|
||||
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
|
||||
PushFunc(data []byte, fn func() error) error
|
||||
// Pop pops data from the start of the fifo
|
||||
Pop() ([]byte, error)
|
||||
// Close this fifo
|
||||
Close() error
|
||||
}
|
||||
|
||||
// UniqueByteFIFO defines a FIFO that Uniques its contents
|
||||
type UniqueByteFIFO interface {
|
||||
ByteFIFO
|
||||
// Has returns whether the fifo contains this data
|
||||
Has(data []byte) (bool, error)
|
||||
}
|
||||
|
||||
var _ (ByteFIFO) = &DummyByteFIFO{}
|
||||
|
||||
// DummyByteFIFO represents a dummy fifo
|
||||
type DummyByteFIFO struct{}
|
||||
|
||||
// PushFunc returns nil
|
||||
func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pop returns nil
|
||||
func (*DummyByteFIFO) Pop() ([]byte, error) {
|
||||
return []byte{}, nil
|
||||
}
|
||||
|
||||
// Close returns nil
|
||||
func (*DummyByteFIFO) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Len is always 0
|
||||
func (*DummyByteFIFO) Len() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
var _ (UniqueByteFIFO) = &DummyUniqueByteFIFO{}
|
||||
|
||||
// DummyUniqueByteFIFO represents a dummy unique fifo
|
||||
type DummyUniqueByteFIFO struct {
|
||||
DummyByteFIFO
|
||||
}
|
||||
|
||||
// Has always returns false
|
||||
func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) {
|
||||
return false, nil
|
||||
}
|
@@ -74,25 +74,35 @@ type DummyQueue struct {
|
||||
}
|
||||
|
||||
// Run does nothing
|
||||
func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
|
||||
func (*DummyQueue) Run(_, _ func(context.Context, func())) {}
|
||||
|
||||
// Push fakes a push of data to the queue
|
||||
func (b *DummyQueue) Push(Data) error {
|
||||
func (*DummyQueue) Push(Data) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PushFunc fakes a push of data to the queue with a function. The function is never run.
|
||||
func (*DummyQueue) PushFunc(Data, func() error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Has always returns false as this queue never does anything
|
||||
func (*DummyQueue) Has(Data) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Flush always returns nil
|
||||
func (b *DummyQueue) Flush(time.Duration) error {
|
||||
func (*DummyQueue) Flush(time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// FlushWithContext always returns nil
|
||||
func (b *DummyQueue) FlushWithContext(context.Context) error {
|
||||
func (*DummyQueue) FlushWithContext(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsEmpty asserts that the queue is empty
|
||||
func (b *DummyQueue) IsEmpty() bool {
|
||||
func (*DummyQueue) IsEmpty() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
|
227
modules/queue/queue_bytefifo.go
Normal file
227
modules/queue/queue_bytefifo.go
Normal file
@@ -0,0 +1,227 @@
|
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
)
|
||||
|
||||
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
|
||||
type ByteFIFOQueueConfiguration struct {
|
||||
WorkerPoolConfiguration
|
||||
Workers int
|
||||
Name string
|
||||
}
|
||||
|
||||
var _ (Queue) = &ByteFIFOQueue{}
|
||||
|
||||
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
|
||||
type ByteFIFOQueue struct {
|
||||
*WorkerPool
|
||||
byteFIFO ByteFIFO
|
||||
typ Type
|
||||
closed chan struct{}
|
||||
terminated chan struct{}
|
||||
exemplar interface{}
|
||||
workers int
|
||||
name string
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// NewByteFIFOQueue creates a new ByteFIFOQueue
|
||||
func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) {
|
||||
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := configInterface.(ByteFIFOQueueConfiguration)
|
||||
|
||||
return &ByteFIFOQueue{
|
||||
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
|
||||
byteFIFO: byteFIFO,
|
||||
typ: typ,
|
||||
closed: make(chan struct{}),
|
||||
terminated: make(chan struct{}),
|
||||
exemplar: exemplar,
|
||||
workers: config.Workers,
|
||||
name: config.Name,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Name returns the name of this queue
|
||||
func (q *ByteFIFOQueue) Name() string {
|
||||
return q.name
|
||||
}
|
||||
|
||||
// Push pushes data to the fifo
|
||||
func (q *ByteFIFOQueue) Push(data Data) error {
|
||||
return q.PushFunc(data, nil)
|
||||
}
|
||||
|
||||
// PushFunc pushes data to the fifo
|
||||
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
|
||||
if !assignableTo(data, q.exemplar) {
|
||||
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
|
||||
}
|
||||
bs, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return q.byteFIFO.PushFunc(bs, fn)
|
||||
}
|
||||
|
||||
// IsEmpty checks if the queue is empty
|
||||
func (q *ByteFIFOQueue) IsEmpty() bool {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if !q.WorkerPool.IsEmpty() {
|
||||
return false
|
||||
}
|
||||
return q.byteFIFO.Len() == 0
|
||||
}
|
||||
|
||||
// Run runs the bytefifo queue
|
||||
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||
atShutdown(context.Background(), q.Shutdown)
|
||||
atTerminate(context.Background(), q.Terminate)
|
||||
log.Debug("%s: %s Starting", q.typ, q.name)
|
||||
|
||||
go func() {
|
||||
_ = q.AddWorkers(q.workers, 0)
|
||||
}()
|
||||
|
||||
go q.readToChan()
|
||||
|
||||
log.Trace("%s: %s Waiting til closed", q.typ, q.name)
|
||||
<-q.closed
|
||||
log.Trace("%s: %s Waiting til done", q.typ, q.name)
|
||||
q.Wait()
|
||||
|
||||
log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
atTerminate(ctx, cancel)
|
||||
q.CleanUp(ctx)
|
||||
cancel()
|
||||
}
|
||||
|
||||
func (q *ByteFIFOQueue) readToChan() {
|
||||
for {
|
||||
select {
|
||||
case <-q.closed:
|
||||
// tell the pool to shutdown.
|
||||
q.cancel()
|
||||
return
|
||||
default:
|
||||
q.lock.Lock()
|
||||
bs, err := q.byteFIFO.Pop()
|
||||
if err != nil {
|
||||
q.lock.Unlock()
|
||||
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(bs) == 0 {
|
||||
q.lock.Unlock()
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := unmarshalAs(bs, q.exemplar)
|
||||
if err != nil {
|
||||
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
|
||||
q.lock.Unlock()
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
|
||||
q.WorkerPool.Push(data)
|
||||
q.lock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown processing from this queue
|
||||
func (q *ByteFIFOQueue) Shutdown() {
|
||||
log.Trace("%s: %s Shutting down", q.typ, q.name)
|
||||
q.lock.Lock()
|
||||
select {
|
||||
case <-q.closed:
|
||||
default:
|
||||
close(q.closed)
|
||||
}
|
||||
q.lock.Unlock()
|
||||
log.Debug("%s: %s Shutdown", q.typ, q.name)
|
||||
}
|
||||
|
||||
// Terminate this queue and close the queue
|
||||
func (q *ByteFIFOQueue) Terminate() {
|
||||
log.Trace("%s: %s Terminating", q.typ, q.name)
|
||||
q.Shutdown()
|
||||
q.lock.Lock()
|
||||
select {
|
||||
case <-q.terminated:
|
||||
q.lock.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
close(q.terminated)
|
||||
q.lock.Unlock()
|
||||
if log.IsDebug() {
|
||||
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
|
||||
}
|
||||
if err := q.byteFIFO.Close(); err != nil {
|
||||
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
|
||||
}
|
||||
log.Debug("%s: %s Terminated", q.typ, q.name)
|
||||
}
|
||||
|
||||
var _ (UniqueQueue) = &ByteFIFOUniqueQueue{}
|
||||
|
||||
// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
|
||||
type ByteFIFOUniqueQueue struct {
|
||||
ByteFIFOQueue
|
||||
}
|
||||
|
||||
// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
|
||||
func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) {
|
||||
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := configInterface.(ByteFIFOQueueConfiguration)
|
||||
|
||||
return &ByteFIFOUniqueQueue{
|
||||
ByteFIFOQueue: ByteFIFOQueue{
|
||||
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
|
||||
byteFIFO: byteFIFO,
|
||||
typ: typ,
|
||||
closed: make(chan struct{}),
|
||||
terminated: make(chan struct{}),
|
||||
exemplar: exemplar,
|
||||
workers: config.Workers,
|
||||
name: config.Name,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Has checks if the provided data is in the queue
|
||||
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
|
||||
if !assignableTo(data, q.exemplar) {
|
||||
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
|
||||
}
|
||||
bs, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return q.byteFIFO.(UniqueByteFIFO).Has(bs)
|
||||
}
|
@@ -53,31 +53,31 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
|
||||
}
|
||||
|
||||
// Run starts to run the queue
|
||||
func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||
func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||
atShutdown(context.Background(), func() {
|
||||
log.Warn("ChannelQueue: %s is not shutdownable!", c.name)
|
||||
log.Warn("ChannelQueue: %s is not shutdownable!", q.name)
|
||||
})
|
||||
atTerminate(context.Background(), func() {
|
||||
log.Warn("ChannelQueue: %s is not terminatable!", c.name)
|
||||
log.Warn("ChannelQueue: %s is not terminatable!", q.name)
|
||||
})
|
||||
log.Debug("ChannelQueue: %s Starting", c.name)
|
||||
log.Debug("ChannelQueue: %s Starting", q.name)
|
||||
go func() {
|
||||
_ = c.AddWorkers(c.workers, 0)
|
||||
_ = q.AddWorkers(q.workers, 0)
|
||||
}()
|
||||
}
|
||||
|
||||
// Push will push data into the queue
|
||||
func (c *ChannelQueue) Push(data Data) error {
|
||||
if !assignableTo(data, c.exemplar) {
|
||||
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
|
||||
func (q *ChannelQueue) Push(data Data) error {
|
||||
if !assignableTo(data, q.exemplar) {
|
||||
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
|
||||
}
|
||||
c.WorkerPool.Push(data)
|
||||
q.WorkerPool.Push(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Name returns the name of this queue
|
||||
func (c *ChannelQueue) Name() string {
|
||||
return c.name
|
||||
func (q *ChannelQueue) Name() string {
|
||||
return q.name
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@@ -5,15 +5,6 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
|
||||
"gitea.com/lunny/levelqueue"
|
||||
)
|
||||
|
||||
@@ -22,22 +13,13 @@ const LevelQueueType Type = "level"
|
||||
|
||||
// LevelQueueConfiguration is the configuration for a LevelQueue
|
||||
type LevelQueueConfiguration struct {
|
||||
WorkerPoolConfiguration
|
||||
ByteFIFOQueueConfiguration
|
||||
DataDir string
|
||||
Workers int
|
||||
Name string
|
||||
}
|
||||
|
||||
// LevelQueue implements a disk library queue
|
||||
type LevelQueue struct {
|
||||
*WorkerPool
|
||||
queue *levelqueue.Queue
|
||||
closed chan struct{}
|
||||
terminated chan struct{}
|
||||
lock sync.Mutex
|
||||
exemplar interface{}
|
||||
workers int
|
||||
name string
|
||||
*ByteFIFOQueue
|
||||
}
|
||||
|
||||
// NewLevelQueue creates a ledis local queue
|
||||
@@ -48,149 +30,69 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
|
||||
}
|
||||
config := configInterface.(LevelQueueConfiguration)
|
||||
|
||||
internal, err := levelqueue.Open(config.DataDir)
|
||||
byteFIFO, err := NewLevelQueueByteFIFO(config.DataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queue := &LevelQueue{
|
||||
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
|
||||
queue: internal,
|
||||
exemplar: exemplar,
|
||||
closed: make(chan struct{}),
|
||||
terminated: make(chan struct{}),
|
||||
workers: config.Workers,
|
||||
name: config.Name,
|
||||
ByteFIFOQueue: byteFIFOQueue,
|
||||
}
|
||||
queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
|
||||
return queue, nil
|
||||
}
|
||||
|
||||
// Run starts to run the queue
|
||||
func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||
atShutdown(context.Background(), l.Shutdown)
|
||||
atTerminate(context.Background(), l.Terminate)
|
||||
log.Debug("LevelQueue: %s Starting", l.name)
|
||||
|
||||
go func() {
|
||||
_ = l.AddWorkers(l.workers, 0)
|
||||
}()
|
||||
|
||||
go l.readToChan()
|
||||
|
||||
log.Trace("LevelQueue: %s Waiting til closed", l.name)
|
||||
<-l.closed
|
||||
|
||||
log.Trace("LevelQueue: %s Waiting til done", l.name)
|
||||
l.Wait()
|
||||
|
||||
log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
atTerminate(ctx, cancel)
|
||||
l.CleanUp(ctx)
|
||||
cancel()
|
||||
log.Trace("LevelQueue: %s Cleaned", l.name)
|
||||
var _ (ByteFIFO) = &LevelQueueByteFIFO{}
|
||||
|
||||
// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
|
||||
type LevelQueueByteFIFO struct {
|
||||
internal *levelqueue.Queue
|
||||
}
|
||||
|
||||
func (l *LevelQueue) readToChan() {
|
||||
for {
|
||||
select {
|
||||
case <-l.closed:
|
||||
// tell the pool to shutdown.
|
||||
l.cancel()
|
||||
return
|
||||
default:
|
||||
atomic.AddInt64(&l.numInQueue, 1)
|
||||
bs, err := l.queue.RPop()
|
||||
if err != nil {
|
||||
if err != levelqueue.ErrNotFound {
|
||||
log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
|
||||
}
|
||||
atomic.AddInt64(&l.numInQueue, -1)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(bs) == 0 {
|
||||
atomic.AddInt64(&l.numInQueue, -1)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := unmarshalAs(bs, l.exemplar)
|
||||
if err != nil {
|
||||
log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
|
||||
atomic.AddInt64(&l.numInQueue, -1)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
|
||||
l.WorkerPool.Push(data)
|
||||
atomic.AddInt64(&l.numInQueue, -1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Push will push the indexer data to queue
|
||||
func (l *LevelQueue) Push(data Data) error {
|
||||
if !assignableTo(data, l.exemplar) {
|
||||
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
|
||||
}
|
||||
bs, err := json.Marshal(data)
|
||||
// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
|
||||
func NewLevelQueueByteFIFO(dataDir string) (*LevelQueueByteFIFO, error) {
|
||||
internal, err := levelqueue.Open(dataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
return l.queue.LPush(bs)
|
||||
|
||||
return &LevelQueueByteFIFO{
|
||||
internal: internal,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IsEmpty checks whether the queue is empty
|
||||
func (l *LevelQueue) IsEmpty() bool {
|
||||
if !l.WorkerPool.IsEmpty() {
|
||||
return false
|
||||
}
|
||||
return l.queue.Len() == 0
|
||||
}
|
||||
|
||||
// Shutdown this queue and stop processing
|
||||
func (l *LevelQueue) Shutdown() {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
log.Trace("LevelQueue: %s Shutting down", l.name)
|
||||
select {
|
||||
case <-l.closed:
|
||||
default:
|
||||
close(l.closed)
|
||||
}
|
||||
log.Debug("LevelQueue: %s Shutdown", l.name)
|
||||
}
|
||||
|
||||
// Terminate this queue and close the queue
|
||||
func (l *LevelQueue) Terminate() {
|
||||
log.Trace("LevelQueue: %s Terminating", l.name)
|
||||
l.Shutdown()
|
||||
l.lock.Lock()
|
||||
select {
|
||||
case <-l.terminated:
|
||||
l.lock.Unlock()
|
||||
default:
|
||||
close(l.terminated)
|
||||
l.lock.Unlock()
|
||||
if log.IsDebug() {
|
||||
log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len())
|
||||
// PushFunc will push data into the fifo
|
||||
func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
|
||||
if fn != nil {
|
||||
if err := fn(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
|
||||
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
|
||||
}
|
||||
|
||||
}
|
||||
log.Debug("LevelQueue: %s Terminated", l.name)
|
||||
return fifo.internal.LPush(data)
|
||||
}
|
||||
|
||||
// Name returns the name of this queue
|
||||
func (l *LevelQueue) Name() string {
|
||||
return l.name
|
||||
// Pop pops data from the start of the fifo
|
||||
func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
|
||||
data, err := fifo.internal.RPop()
|
||||
if err != nil && err != levelqueue.ErrNotFound {
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// Close this fifo
|
||||
func (fifo *LevelQueueByteFIFO) Close() error {
|
||||
return fifo.internal.Close()
|
||||
}
|
||||
|
||||
// Len returns the length of the fifo
|
||||
func (fifo *LevelQueueByteFIFO) Len() int64 {
|
||||
return fifo.internal.Len()
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@@ -69,17 +69,19 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
|
||||
|
||||
// the level backend only needs temporary workers to catch up with the previously dropped work
|
||||
levelCfg := LevelQueueConfiguration{
|
||||
WorkerPoolConfiguration: WorkerPoolConfiguration{
|
||||
QueueLength: config.QueueLength,
|
||||
BatchLength: config.BatchLength,
|
||||
BlockTimeout: 1 * time.Second,
|
||||
BoostTimeout: 5 * time.Minute,
|
||||
BoostWorkers: 5,
|
||||
MaxWorkers: 6,
|
||||
ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
|
||||
WorkerPoolConfiguration: WorkerPoolConfiguration{
|
||||
QueueLength: config.QueueLength,
|
||||
BatchLength: config.BatchLength,
|
||||
BlockTimeout: 1 * time.Second,
|
||||
BoostTimeout: 5 * time.Minute,
|
||||
BoostWorkers: 5,
|
||||
MaxWorkers: 6,
|
||||
},
|
||||
Workers: 1,
|
||||
Name: config.Name + "-level",
|
||||
},
|
||||
DataDir: config.DataDir,
|
||||
Workers: 1,
|
||||
Name: config.Name + "-level",
|
||||
}
|
||||
|
||||
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
|
||||
@@ -116,67 +118,67 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
|
||||
}
|
||||
|
||||
// Name returns the name of this queue
|
||||
func (p *PersistableChannelQueue) Name() string {
|
||||
return p.delayedStarter.name
|
||||
func (q *PersistableChannelQueue) Name() string {
|
||||
return q.delayedStarter.name
|
||||
}
|
||||
|
||||
// Push will push the indexer data to queue
|
||||
func (p *PersistableChannelQueue) Push(data Data) error {
|
||||
func (q *PersistableChannelQueue) Push(data Data) error {
|
||||
select {
|
||||
case <-p.closed:
|
||||
return p.internal.Push(data)
|
||||
case <-q.closed:
|
||||
return q.internal.Push(data)
|
||||
default:
|
||||
return p.channelQueue.Push(data)
|
||||
return q.channelQueue.Push(data)
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts to run the queue
|
||||
func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||
log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name)
|
||||
func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||
log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
|
||||
|
||||
p.lock.Lock()
|
||||
if p.internal == nil {
|
||||
err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar)
|
||||
p.lock.Unlock()
|
||||
q.lock.Lock()
|
||||
if q.internal == nil {
|
||||
err := q.setInternal(atShutdown, q.channelQueue.handle, q.channelQueue.exemplar)
|
||||
q.lock.Unlock()
|
||||
if err != nil {
|
||||
log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
|
||||
log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
p.lock.Unlock()
|
||||
q.lock.Unlock()
|
||||
}
|
||||
atShutdown(context.Background(), p.Shutdown)
|
||||
atTerminate(context.Background(), p.Terminate)
|
||||
atShutdown(context.Background(), q.Shutdown)
|
||||
atTerminate(context.Background(), q.Terminate)
|
||||
|
||||
// Just run the level queue - we shut it down later
|
||||
go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
|
||||
go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
|
||||
|
||||
go func() {
|
||||
_ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0)
|
||||
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
|
||||
}()
|
||||
|
||||
log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
|
||||
<-p.closed
|
||||
log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
|
||||
p.channelQueue.cancel()
|
||||
p.internal.(*LevelQueue).cancel()
|
||||
log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
|
||||
p.channelQueue.Wait()
|
||||
p.internal.(*LevelQueue).Wait()
|
||||
log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name)
|
||||
<-q.closed
|
||||
log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name)
|
||||
q.channelQueue.cancel()
|
||||
q.internal.(*LevelQueue).cancel()
|
||||
log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name)
|
||||
q.channelQueue.Wait()
|
||||
q.internal.(*LevelQueue).Wait()
|
||||
// Redirect all remaining data in the chan to the internal channel
|
||||
go func() {
|
||||
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
|
||||
for data := range p.channelQueue.dataChan {
|
||||
_ = p.internal.Push(data)
|
||||
atomic.AddInt64(&p.channelQueue.numInQueue, -1)
|
||||
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
|
||||
for data := range q.channelQueue.dataChan {
|
||||
_ = q.internal.Push(data)
|
||||
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
|
||||
}
|
||||
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
|
||||
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
|
||||
}()
|
||||
log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
|
||||
log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name)
|
||||
}
|
||||
|
||||
// Flush flushes the queue and blocks till the queue is empty
|
||||
func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
|
||||
func (q *PersistableChannelQueue) Flush(timeout time.Duration) error {
|
||||
var ctx context.Context
|
||||
var cancel context.CancelFunc
|
||||
if timeout > 0 {
|
||||
@@ -185,24 +187,24 @@ func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
}
|
||||
defer cancel()
|
||||
return p.FlushWithContext(ctx)
|
||||
return q.FlushWithContext(ctx)
|
||||
}
|
||||
|
||||
// FlushWithContext flushes the queue and blocks till the queue is empty
|
||||
func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
|
||||
func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
errChan <- p.channelQueue.FlushWithContext(ctx)
|
||||
errChan <- q.channelQueue.FlushWithContext(ctx)
|
||||
}()
|
||||
go func() {
|
||||
p.lock.Lock()
|
||||
if p.internal == nil {
|
||||
p.lock.Unlock()
|
||||
errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name())
|
||||
q.lock.Lock()
|
||||
if q.internal == nil {
|
||||
q.lock.Unlock()
|
||||
errChan <- fmt.Errorf("not ready to flush internal queue %s yet", q.Name())
|
||||
return
|
||||
}
|
||||
p.lock.Unlock()
|
||||
errChan <- p.internal.FlushWithContext(ctx)
|
||||
q.lock.Unlock()
|
||||
errChan <- q.internal.FlushWithContext(ctx)
|
||||
}()
|
||||
err1 := <-errChan
|
||||
err2 := <-errChan
|
||||
@@ -214,44 +216,44 @@ func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// IsEmpty checks if a queue is empty
|
||||
func (p *PersistableChannelQueue) IsEmpty() bool {
|
||||
if !p.channelQueue.IsEmpty() {
|
||||
func (q *PersistableChannelQueue) IsEmpty() bool {
|
||||
if !q.channelQueue.IsEmpty() {
|
||||
return false
|
||||
}
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
if p.internal == nil {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if q.internal == nil {
|
||||
return false
|
||||
}
|
||||
return p.internal.IsEmpty()
|
||||
return q.internal.IsEmpty()
|
||||
}
|
||||
|
||||
// Shutdown processing this queue
|
||||
func (p *PersistableChannelQueue) Shutdown() {
|
||||
log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name)
|
||||
func (q *PersistableChannelQueue) Shutdown() {
|
||||
log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
select {
|
||||
case <-p.closed:
|
||||
case <-q.closed:
|
||||
default:
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
if p.internal != nil {
|
||||
p.internal.(*LevelQueue).Shutdown()
|
||||
if q.internal != nil {
|
||||
q.internal.(*LevelQueue).Shutdown()
|
||||
}
|
||||
close(p.closed)
|
||||
close(q.closed)
|
||||
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
|
||||
}
|
||||
log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
|
||||
}
|
||||
|
||||
// Terminate this queue and close the queue
|
||||
func (p *PersistableChannelQueue) Terminate() {
|
||||
log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name)
|
||||
p.Shutdown()
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
if p.internal != nil {
|
||||
p.internal.(*LevelQueue).Terminate()
|
||||
func (q *PersistableChannelQueue) Terminate() {
|
||||
log.Trace("PersistableChannelQueue: %s Terminating", q.delayedStarter.name)
|
||||
q.Shutdown()
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if q.internal != nil {
|
||||
q.internal.(*LevelQueue).Terminate()
|
||||
}
|
||||
log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name)
|
||||
log.Debug("PersistableChannelQueue: %s Terminated", q.delayedStarter.name)
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@@ -34,16 +34,18 @@ func TestLevelQueue(t *testing.T) {
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
|
||||
WorkerPoolConfiguration: WorkerPoolConfiguration{
|
||||
QueueLength: 20,
|
||||
BatchLength: 2,
|
||||
BlockTimeout: 1 * time.Second,
|
||||
BoostTimeout: 5 * time.Minute,
|
||||
BoostWorkers: 5,
|
||||
MaxWorkers: 10,
|
||||
ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
|
||||
WorkerPoolConfiguration: WorkerPoolConfiguration{
|
||||
QueueLength: 20,
|
||||
BatchLength: 2,
|
||||
BlockTimeout: 1 * time.Second,
|
||||
BoostTimeout: 5 * time.Minute,
|
||||
BoostWorkers: 5,
|
||||
MaxWorkers: 10,
|
||||
},
|
||||
Workers: 1,
|
||||
},
|
||||
DataDir: tmpDir,
|
||||
Workers: 1,
|
||||
}, &testData{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -105,16 +107,18 @@ func TestLevelQueue(t *testing.T) {
|
||||
WrappedQueueConfiguration{
|
||||
Underlying: LevelQueueType,
|
||||
Config: LevelQueueConfiguration{
|
||||
WorkerPoolConfiguration: WorkerPoolConfiguration{
|
||||
QueueLength: 20,
|
||||
BatchLength: 2,
|
||||
BlockTimeout: 1 * time.Second,
|
||||
BoostTimeout: 5 * time.Minute,
|
||||
BoostWorkers: 5,
|
||||
MaxWorkers: 10,
|
||||
ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
|
||||
WorkerPoolConfiguration: WorkerPoolConfiguration{
|
||||
QueueLength: 20,
|
||||
BatchLength: 2,
|
||||
BlockTimeout: 1 * time.Second,
|
||||
BoostTimeout: 5 * time.Minute,
|
||||
BoostWorkers: 5,
|
||||
MaxWorkers: 10,
|
||||
},
|
||||
Workers: 1,
|
||||
},
|
||||
DataDir: tmpDir,
|
||||
Workers: 1,
|
||||
},
|
||||
}, &testData{})
|
||||
assert.NoError(t, err)
|
||||
|
@@ -5,14 +5,8 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
|
||||
@@ -22,37 +16,15 @@ import (
|
||||
// RedisQueueType is the type for redis queue
|
||||
const RedisQueueType Type = "redis"
|
||||
|
||||
type redisClient interface {
|
||||
RPush(key string, args ...interface{}) *redis.IntCmd
|
||||
LPop(key string) *redis.StringCmd
|
||||
LLen(key string) *redis.IntCmd
|
||||
Ping() *redis.StatusCmd
|
||||
Close() error
|
||||
// RedisQueueConfiguration is the configuration for the redis queue
|
||||
type RedisQueueConfiguration struct {
|
||||
ByteFIFOQueueConfiguration
|
||||
RedisByteFIFOConfiguration
|
||||
}
|
||||
|
||||
// RedisQueue redis queue
|
||||
type RedisQueue struct {
|
||||
*WorkerPool
|
||||
client redisClient
|
||||
queueName string
|
||||
closed chan struct{}
|
||||
terminated chan struct{}
|
||||
exemplar interface{}
|
||||
workers int
|
||||
name string
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// RedisQueueConfiguration is the configuration for the redis queue
|
||||
type RedisQueueConfiguration struct {
|
||||
WorkerPoolConfiguration
|
||||
Network string
|
||||
Addresses string
|
||||
Password string
|
||||
DBIndex int
|
||||
QueueName string
|
||||
Workers int
|
||||
Name string
|
||||
*ByteFIFOQueue
|
||||
}
|
||||
|
||||
// NewRedisQueue creates single redis or cluster redis queue
|
||||
@@ -63,163 +35,111 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
|
||||
}
|
||||
config := configInterface.(RedisQueueConfiguration)
|
||||
|
||||
dbs := strings.Split(config.Addresses, ",")
|
||||
|
||||
var queue = &RedisQueue{
|
||||
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
|
||||
queueName: config.QueueName,
|
||||
exemplar: exemplar,
|
||||
closed: make(chan struct{}),
|
||||
terminated: make(chan struct{}),
|
||||
workers: config.Workers,
|
||||
name: config.Name,
|
||||
byteFIFO, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
byteFIFOQueue, err := NewByteFIFOQueue(RedisQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queue := &RedisQueue{
|
||||
ByteFIFOQueue: byteFIFOQueue,
|
||||
}
|
||||
|
||||
queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
|
||||
|
||||
return queue, nil
|
||||
}
|
||||
|
||||
type redisClient interface {
|
||||
RPush(key string, args ...interface{}) *redis.IntCmd
|
||||
LPop(key string) *redis.StringCmd
|
||||
LLen(key string) *redis.IntCmd
|
||||
SAdd(key string, members ...interface{}) *redis.IntCmd
|
||||
SRem(key string, members ...interface{}) *redis.IntCmd
|
||||
SIsMember(key string, member interface{}) *redis.BoolCmd
|
||||
Ping() *redis.StatusCmd
|
||||
Close() error
|
||||
}
|
||||
|
||||
var _ (ByteFIFO) = &RedisByteFIFO{}
|
||||
|
||||
// RedisByteFIFO represents a ByteFIFO formed from a redisClient
|
||||
type RedisByteFIFO struct {
|
||||
client redisClient
|
||||
queueName string
|
||||
}
|
||||
|
||||
// RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO
|
||||
type RedisByteFIFOConfiguration struct {
|
||||
Network string
|
||||
Addresses string
|
||||
Password string
|
||||
DBIndex int
|
||||
QueueName string
|
||||
}
|
||||
|
||||
// NewRedisByteFIFO creates a ByteFIFO formed from a redisClient
|
||||
func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) {
|
||||
fifo := &RedisByteFIFO{
|
||||
queueName: config.QueueName,
|
||||
}
|
||||
dbs := strings.Split(config.Addresses, ",")
|
||||
if len(dbs) == 0 {
|
||||
return nil, errors.New("no redis host specified")
|
||||
} else if len(dbs) == 1 {
|
||||
queue.client = redis.NewClient(&redis.Options{
|
||||
fifo.client = redis.NewClient(&redis.Options{
|
||||
Network: config.Network,
|
||||
Addr: strings.TrimSpace(dbs[0]), // use default Addr
|
||||
Password: config.Password, // no password set
|
||||
DB: config.DBIndex, // use default DB
|
||||
})
|
||||
} else {
|
||||
queue.client = redis.NewClusterClient(&redis.ClusterOptions{
|
||||
fifo.client = redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: dbs,
|
||||
})
|
||||
}
|
||||
if err := queue.client.Ping().Err(); err != nil {
|
||||
if err := fifo.client.Ping().Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
queue.qid = GetManager().Add(queue, RedisQueueType, config, exemplar)
|
||||
|
||||
return queue, nil
|
||||
return fifo, nil
|
||||
}
|
||||
|
||||
// Run runs the redis queue
|
||||
func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||
atShutdown(context.Background(), r.Shutdown)
|
||||
atTerminate(context.Background(), r.Terminate)
|
||||
log.Debug("RedisQueue: %s Starting", r.name)
|
||||
|
||||
go func() {
|
||||
_ = r.AddWorkers(r.workers, 0)
|
||||
}()
|
||||
|
||||
go r.readToChan()
|
||||
|
||||
log.Trace("RedisQueue: %s Waiting til closed", r.name)
|
||||
<-r.closed
|
||||
log.Trace("RedisQueue: %s Waiting til done", r.name)
|
||||
r.Wait()
|
||||
|
||||
log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
atTerminate(ctx, cancel)
|
||||
r.CleanUp(ctx)
|
||||
cancel()
|
||||
}
|
||||
|
||||
func (r *RedisQueue) readToChan() {
|
||||
for {
|
||||
select {
|
||||
case <-r.closed:
|
||||
// tell the pool to shutdown
|
||||
r.cancel()
|
||||
return
|
||||
default:
|
||||
atomic.AddInt64(&r.numInQueue, 1)
|
||||
bs, err := r.client.LPop(r.queueName).Bytes()
|
||||
if err != nil && err != redis.Nil {
|
||||
log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
|
||||
atomic.AddInt64(&r.numInQueue, -1)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(bs) == 0 {
|
||||
atomic.AddInt64(&r.numInQueue, -1)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := unmarshalAs(bs, r.exemplar)
|
||||
if err != nil {
|
||||
log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
|
||||
atomic.AddInt64(&r.numInQueue, -1)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
|
||||
r.WorkerPool.Push(data)
|
||||
atomic.AddInt64(&r.numInQueue, -1)
|
||||
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
|
||||
func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error {
|
||||
if fn != nil {
|
||||
if err := fn(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return fifo.client.RPush(fifo.queueName, data).Err()
|
||||
}
|
||||
|
||||
// Push implements Queue
|
||||
func (r *RedisQueue) Push(data Data) error {
|
||||
if !assignableTo(data, r.exemplar) {
|
||||
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
|
||||
// Pop pops data from the start of the fifo
|
||||
func (fifo *RedisByteFIFO) Pop() ([]byte, error) {
|
||||
data, err := fifo.client.LPop(fifo.queueName).Bytes()
|
||||
if err != nil && err == redis.Nil {
|
||||
return data, nil
|
||||
}
|
||||
bs, err := json.Marshal(data)
|
||||
return data, err
|
||||
}
|
||||
|
||||
// Close this fifo
|
||||
func (fifo *RedisByteFIFO) Close() error {
|
||||
return fifo.client.Close()
|
||||
}
|
||||
|
||||
// Len returns the length of the fifo
|
||||
func (fifo *RedisByteFIFO) Len() int64 {
|
||||
val, err := fifo.client.LLen(fifo.queueName).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err)
|
||||
return -1
|
||||
}
|
||||
return r.client.RPush(r.queueName, bs).Err()
|
||||
}
|
||||
|
||||
// IsEmpty checks if the queue is empty
|
||||
func (r *RedisQueue) IsEmpty() bool {
|
||||
if !r.WorkerPool.IsEmpty() {
|
||||
return false
|
||||
}
|
||||
length, err := r.client.LLen(r.queueName).Result()
|
||||
if err != nil {
|
||||
log.Error("Error whilst getting queue length for %s: Error: %v", r.name, err)
|
||||
return false
|
||||
}
|
||||
return length == 0
|
||||
}
|
||||
|
||||
// Shutdown processing from this queue
|
||||
func (r *RedisQueue) Shutdown() {
|
||||
log.Trace("RedisQueue: %s Shutting down", r.name)
|
||||
r.lock.Lock()
|
||||
select {
|
||||
case <-r.closed:
|
||||
default:
|
||||
close(r.closed)
|
||||
}
|
||||
r.lock.Unlock()
|
||||
log.Debug("RedisQueue: %s Shutdown", r.name)
|
||||
}
|
||||
|
||||
// Terminate this queue and close the queue
|
||||
func (r *RedisQueue) Terminate() {
|
||||
log.Trace("RedisQueue: %s Terminating", r.name)
|
||||
r.Shutdown()
|
||||
r.lock.Lock()
|
||||
select {
|
||||
case <-r.terminated:
|
||||
r.lock.Unlock()
|
||||
default:
|
||||
close(r.terminated)
|
||||
r.lock.Unlock()
|
||||
if log.IsDebug() {
|
||||
log.Debug("RedisQueue: %s Closing with %d tasks left in queue", r.name, r.client.LLen(r.queueName))
|
||||
}
|
||||
if err := r.client.Close(); err != nil {
|
||||
log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
|
||||
}
|
||||
}
|
||||
log.Debug("RedisQueue: %s Terminated", r.name)
|
||||
}
|
||||
|
||||
// Name returns the name of this queue
|
||||
func (r *RedisQueue) Name() string {
|
||||
return r.name
|
||||
return val
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@@ -7,6 +7,7 @@ package queue
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
@@ -36,6 +37,7 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) {
|
||||
opts["Password"] = q.Password
|
||||
opts["DBIndex"] = q.DBIndex
|
||||
opts["QueueName"] = q.QueueName
|
||||
opts["SetName"] = q.SetName
|
||||
opts["Workers"] = q.Workers
|
||||
opts["MaxWorkers"] = q.MaxWorkers
|
||||
opts["BlockTimeout"] = q.BlockTimeout
|
||||
@@ -81,3 +83,41 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
|
||||
}
|
||||
return returnable
|
||||
}
|
||||
|
||||
// CreateUniqueQueue for name with provided handler and exemplar
|
||||
func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue {
|
||||
q, cfg := getQueueSettings(name)
|
||||
if len(cfg) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(q.Type) > 0 && q.Type != "dummy" && !strings.HasPrefix(q.Type, "unique-") {
|
||||
q.Type = "unique-" + q.Type
|
||||
}
|
||||
|
||||
typ, err := validType(q.Type)
|
||||
if err != nil || typ == PersistableChannelQueueType {
|
||||
typ = PersistableChannelUniqueQueueType
|
||||
if err != nil {
|
||||
log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ))
|
||||
}
|
||||
}
|
||||
|
||||
returnable, err := NewQueue(typ, handle, cfg, exemplar)
|
||||
if q.WrapIfNecessary && err != nil {
|
||||
log.Warn("Unable to create unique queue for %s: %v", name, err)
|
||||
log.Warn("Attempting to create wrapped queue")
|
||||
returnable, err = NewQueue(WrappedUniqueQueueType, handle, WrappedUniqueQueueConfiguration{
|
||||
Underlying: typ,
|
||||
Timeout: q.Timeout,
|
||||
MaxAttempts: q.MaxAttempts,
|
||||
Config: cfg,
|
||||
QueueLength: q.Length,
|
||||
}, exemplar)
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("Unable to create unique queue for %s: %v", name, err)
|
||||
return nil
|
||||
}
|
||||
return returnable.(UniqueQueue)
|
||||
}
|
||||
|
29
modules/queue/unique_queue.go
Normal file
29
modules/queue/unique_queue.go
Normal file
@@ -0,0 +1,29 @@
|
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// UniqueQueue defines a queue which guarantees only one instance of same
|
||||
// data is in the queue. Instances with same identity will be
|
||||
// discarded if there is already one in the line.
|
||||
//
|
||||
// This queue is particularly useful for preventing duplicated task
|
||||
// of same purpose - please note that this does not guarantee that a particular
|
||||
// task cannot be processed twice or more at the same time. Uniqueness is
|
||||
// only guaranteed whilst the task is waiting in the queue.
|
||||
//
|
||||
// Users of this queue should be careful to push only the identifier of the
|
||||
// data
|
||||
type UniqueQueue interface {
|
||||
Queue
|
||||
PushFunc(Data, func() error) error
|
||||
Has(Data) (bool, error)
|
||||
}
|
||||
|
||||
// ErrAlreadyInQueue is returned when trying to push data to the queue that is already in the queue
|
||||
var ErrAlreadyInQueue = fmt.Errorf("already in queue")
|
132
modules/queue/unique_queue_channel.go
Normal file
132
modules/queue/unique_queue_channel.go
Normal file
@@ -0,0 +1,132 @@
|
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
)
|
||||
|
||||
// ChannelUniqueQueueType is the type for channel queue
|
||||
const ChannelUniqueQueueType Type = "unique-channel"
|
||||
|
||||
// ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue
|
||||
type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
|
||||
|
||||
// ChannelUniqueQueue implements UniqueQueue
|
||||
//
|
||||
// It is basically a thin wrapper around a WorkerPool but keeps a store of
|
||||
// what has been pushed within a table.
|
||||
//
|
||||
// Please note that this Queue does not guarantee that a particular
|
||||
// task cannot be processed twice or more at the same time. Uniqueness is
|
||||
// only guaranteed whilst the task is waiting in the queue.
|
||||
type ChannelUniqueQueue struct {
|
||||
*WorkerPool
|
||||
lock sync.Mutex
|
||||
table map[Data]bool
|
||||
exemplar interface{}
|
||||
workers int
|
||||
name string
|
||||
}
|
||||
|
||||
// NewChannelUniqueQueue create a memory channel queue
|
||||
func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||
configInterface, err := toConfig(ChannelUniqueQueueConfiguration{}, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := configInterface.(ChannelUniqueQueueConfiguration)
|
||||
if config.BatchLength == 0 {
|
||||
config.BatchLength = 1
|
||||
}
|
||||
queue := &ChannelUniqueQueue{
|
||||
table: map[Data]bool{},
|
||||
exemplar: exemplar,
|
||||
workers: config.Workers,
|
||||
name: config.Name,
|
||||
}
|
||||
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
|
||||
for _, datum := range data {
|
||||
queue.lock.Lock()
|
||||
delete(queue.table, datum)
|
||||
queue.lock.Unlock()
|
||||
handle(datum)
|
||||
}
|
||||
}, config.WorkerPoolConfiguration)
|
||||
|
||||
queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar)
|
||||
return queue, nil
|
||||
}
|
||||
|
||||
// Run starts to run the queue
|
||||
func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||
atShutdown(context.Background(), func() {
|
||||
log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name)
|
||||
})
|
||||
atTerminate(context.Background(), func() {
|
||||
log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name)
|
||||
})
|
||||
log.Debug("ChannelUniqueQueue: %s Starting", q.name)
|
||||
go func() {
|
||||
_ = q.AddWorkers(q.workers, 0)
|
||||
}()
|
||||
}
|
||||
|
||||
// Push will push data into the queue if the data is not already in the queue
|
||||
func (q *ChannelUniqueQueue) Push(data Data) error {
|
||||
return q.PushFunc(data, nil)
|
||||
}
|
||||
|
||||
// PushFunc will push data into the queue
|
||||
func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
|
||||
if !assignableTo(data, q.exemplar) {
|
||||
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
|
||||
}
|
||||
q.lock.Lock()
|
||||
locked := true
|
||||
defer func() {
|
||||
if locked {
|
||||
q.lock.Unlock()
|
||||
}
|
||||
}()
|
||||
if _, ok := q.table[data]; ok {
|
||||
return ErrAlreadyInQueue
|
||||
}
|
||||
// FIXME: We probably need to implement some sort of limit here
|
||||
// If the downstream queue blocks this table will grow without limit
|
||||
q.table[data] = true
|
||||
if fn != nil {
|
||||
err := fn()
|
||||
if err != nil {
|
||||
delete(q.table, data)
|
||||
return err
|
||||
}
|
||||
}
|
||||
locked = false
|
||||
q.lock.Unlock()
|
||||
q.WorkerPool.Push(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Has checks if the data is in the queue
|
||||
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
_, has := q.table[data]
|
||||
return has, nil
|
||||
}
|
||||
|
||||
// Name returns the name of this queue
|
||||
func (q *ChannelUniqueQueue) Name() string {
|
||||
return q.name
|
||||
}
|
||||
|
||||
func init() {
|
||||
queuesMap[ChannelUniqueQueueType] = NewChannelUniqueQueue
|
||||
}
|
104
modules/queue/unique_queue_disk.go
Normal file
104
modules/queue/unique_queue_disk.go
Normal file
@@ -0,0 +1,104 @@
|
||||
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"gitea.com/lunny/levelqueue"
|
||||
)
|
||||
|
||||
// LevelUniqueQueueType is the type for level queue
|
||||
const LevelUniqueQueueType Type = "unique-level"
|
||||
|
||||
// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
|
||||
type LevelUniqueQueueConfiguration struct {
|
||||
ByteFIFOQueueConfiguration
|
||||
DataDir string
|
||||
}
|
||||
|
||||
// LevelUniqueQueue implements a disk library queue
|
||||
type LevelUniqueQueue struct {
|
||||
*ByteFIFOUniqueQueue
|
||||
}
|
||||
|
||||
// NewLevelUniqueQueue creates a ledis local queue
|
||||
//
|
||||
// Please note that this Queue does not guarantee that a particular
|
||||
// task cannot be processed twice or more at the same time. Uniqueness is
|
||||
// only guaranteed whilst the task is waiting in the queue.
|
||||
func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||
configInterface, err := toConfig(LevelUniqueQueueConfiguration{}, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := configInterface.(LevelUniqueQueueConfiguration)
|
||||
|
||||
byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.DataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
byteFIFOQueue, err := NewByteFIFOUniqueQueue(LevelUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queue := &LevelUniqueQueue{
|
||||
ByteFIFOUniqueQueue: byteFIFOQueue,
|
||||
}
|
||||
queue.qid = GetManager().Add(queue, LevelUniqueQueueType, config, exemplar)
|
||||
return queue, nil
|
||||
}
|
||||
|
||||
var _ (UniqueByteFIFO) = &LevelUniqueQueueByteFIFO{}
|
||||
|
||||
// LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
|
||||
type LevelUniqueQueueByteFIFO struct {
|
||||
internal *levelqueue.UniqueQueue
|
||||
}
|
||||
|
||||
// NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
|
||||
func NewLevelUniqueQueueByteFIFO(dataDir string) (*LevelUniqueQueueByteFIFO, error) {
|
||||
internal, err := levelqueue.OpenUnique(dataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &LevelUniqueQueueByteFIFO{
|
||||
internal: internal,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
|
||||
func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
|
||||
return fifo.internal.LPushFunc(data, fn)
|
||||
}
|
||||
|
||||
// Pop pops data from the start of the fifo
|
||||
func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
|
||||
data, err := fifo.internal.RPop()
|
||||
if err != nil && err != levelqueue.ErrNotFound {
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// Len returns the length of the fifo
|
||||
func (fifo *LevelUniqueQueueByteFIFO) Len() int64 {
|
||||
return fifo.internal.Len()
|
||||
}
|
||||
|
||||
// Has returns whether the fifo contains this data
|
||||
func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) {
|
||||
return fifo.internal.Has(data)
|
||||
}
|
||||
|
||||
// Close this fifo
|
||||
func (fifo *LevelUniqueQueueByteFIFO) Close() error {
|
||||
return fifo.internal.Close()
|
||||
}
|
||||
|
||||
func init() {
|
||||
queuesMap[LevelUniqueQueueType] = NewLevelUniqueQueue
|
||||
}
|
241
modules/queue/unique_queue_disk_channel.go
Normal file
241
modules/queue/unique_queue_disk_channel.go
Normal file
@@ -0,0 +1,241 @@
|
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
)
|
||||
|
||||
// PersistableChannelUniqueQueueType is the type for persistable queue
|
||||
const PersistableChannelUniqueQueueType Type = "unique-persistable-channel"
|
||||
|
||||
// PersistableChannelUniqueQueueConfiguration is the configuration for a PersistableChannelUniqueQueue
|
||||
type PersistableChannelUniqueQueueConfiguration struct {
|
||||
Name string
|
||||
DataDir string
|
||||
BatchLength int
|
||||
QueueLength int
|
||||
Timeout time.Duration
|
||||
MaxAttempts int
|
||||
Workers int
|
||||
MaxWorkers int
|
||||
BlockTimeout time.Duration
|
||||
BoostTimeout time.Duration
|
||||
BoostWorkers int
|
||||
}
|
||||
|
||||
// PersistableChannelUniqueQueue wraps a channel queue and level queue together
|
||||
//
|
||||
// Please note that this Queue does not guarantee that a particular
|
||||
// task cannot be processed twice or more at the same time. Uniqueness is
|
||||
// only guaranteed whilst the task is waiting in the queue.
|
||||
type PersistableChannelUniqueQueue struct {
|
||||
*ChannelUniqueQueue
|
||||
delayedStarter
|
||||
lock sync.Mutex
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
// NewPersistableChannelUniqueQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
|
||||
// This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
|
||||
func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||
configInterface, err := toConfig(PersistableChannelUniqueQueueConfiguration{}, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := configInterface.(PersistableChannelUniqueQueueConfiguration)
|
||||
|
||||
channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{
|
||||
WorkerPoolConfiguration: WorkerPoolConfiguration{
|
||||
QueueLength: config.QueueLength,
|
||||
BatchLength: config.BatchLength,
|
||||
BlockTimeout: config.BlockTimeout,
|
||||
BoostTimeout: config.BoostTimeout,
|
||||
BoostWorkers: config.BoostWorkers,
|
||||
MaxWorkers: config.MaxWorkers,
|
||||
},
|
||||
Workers: config.Workers,
|
||||
Name: config.Name + "-channel",
|
||||
}, exemplar)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// the level backend only needs temporary workers to catch up with the previously dropped work
|
||||
levelCfg := LevelUniqueQueueConfiguration{
|
||||
ByteFIFOQueueConfiguration: ByteFIFOQueueConfiguration{
|
||||
WorkerPoolConfiguration: WorkerPoolConfiguration{
|
||||
QueueLength: config.QueueLength,
|
||||
BatchLength: config.BatchLength,
|
||||
BlockTimeout: 0,
|
||||
BoostTimeout: 0,
|
||||
BoostWorkers: 0,
|
||||
MaxWorkers: 1,
|
||||
},
|
||||
Workers: 1,
|
||||
Name: config.Name + "-level",
|
||||
},
|
||||
DataDir: config.DataDir,
|
||||
}
|
||||
|
||||
queue := &PersistableChannelUniqueQueue{
|
||||
ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
levelQueue, err := NewLevelUniqueQueue(func(data ...Data) {
|
||||
for _, datum := range data {
|
||||
err := queue.Push(datum)
|
||||
if err != nil && err != ErrAlreadyInQueue {
|
||||
log.Error("Unable push to channelled queue: %v", err)
|
||||
}
|
||||
}
|
||||
}, levelCfg, exemplar)
|
||||
if err == nil {
|
||||
queue.delayedStarter = delayedStarter{
|
||||
internal: levelQueue.(*LevelUniqueQueue),
|
||||
name: config.Name,
|
||||
}
|
||||
|
||||
_ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
|
||||
return queue, nil
|
||||
}
|
||||
if IsErrInvalidConfiguration(err) {
|
||||
// Retrying ain't gonna make this any better...
|
||||
return nil, ErrInvalidConfiguration{cfg: cfg}
|
||||
}
|
||||
|
||||
queue.delayedStarter = delayedStarter{
|
||||
cfg: levelCfg,
|
||||
underlying: LevelUniqueQueueType,
|
||||
timeout: config.Timeout,
|
||||
maxAttempts: config.MaxAttempts,
|
||||
name: config.Name,
|
||||
}
|
||||
_ = GetManager().Add(queue, PersistableChannelUniqueQueueType, config, exemplar)
|
||||
return queue, nil
|
||||
}
|
||||
|
||||
// Name returns the name of this queue
|
||||
func (q *PersistableChannelUniqueQueue) Name() string {
|
||||
return q.delayedStarter.name
|
||||
}
|
||||
|
||||
// Push will push the indexer data to queue
|
||||
func (q *PersistableChannelUniqueQueue) Push(data Data) error {
|
||||
return q.PushFunc(data, nil)
|
||||
}
|
||||
|
||||
// PushFunc will push the indexer data to queue
|
||||
func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
|
||||
select {
|
||||
case <-q.closed:
|
||||
return q.internal.(UniqueQueue).PushFunc(data, fn)
|
||||
default:
|
||||
return q.ChannelUniqueQueue.PushFunc(data, fn)
|
||||
}
|
||||
}
|
||||
|
||||
// Has will test if the queue has the data
|
||||
func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
|
||||
// This is more difficult...
|
||||
has, err := q.ChannelUniqueQueue.Has(data)
|
||||
if err != nil || has {
|
||||
return has, err
|
||||
}
|
||||
return q.internal.(UniqueQueue).Has(data)
|
||||
}
|
||||
|
||||
// Run starts to run the queue
|
||||
func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
|
||||
log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name)
|
||||
|
||||
q.lock.Lock()
|
||||
if q.internal == nil {
|
||||
err := q.setInternal(atShutdown, func(data ...Data) {
|
||||
for _, datum := range data {
|
||||
err := q.Push(datum)
|
||||
if err != nil && err != ErrAlreadyInQueue {
|
||||
log.Error("Unable push to channelled queue: %v", err)
|
||||
}
|
||||
}
|
||||
}, q.exemplar)
|
||||
q.lock.Unlock()
|
||||
if err != nil {
|
||||
log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
q.lock.Unlock()
|
||||
}
|
||||
atShutdown(context.Background(), q.Shutdown)
|
||||
atTerminate(context.Background(), q.Terminate)
|
||||
|
||||
// Just run the level queue - we shut it down later
|
||||
go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
|
||||
|
||||
go func() {
|
||||
_ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0)
|
||||
}()
|
||||
|
||||
log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name)
|
||||
<-q.closed
|
||||
log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name)
|
||||
q.internal.(*LevelUniqueQueue).cancel()
|
||||
q.ChannelUniqueQueue.cancel()
|
||||
log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name)
|
||||
q.ChannelUniqueQueue.Wait()
|
||||
q.internal.(*LevelUniqueQueue).Wait()
|
||||
// Redirect all remaining data in the chan to the internal channel
|
||||
go func() {
|
||||
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
|
||||
for data := range q.ChannelUniqueQueue.dataChan {
|
||||
_ = q.internal.Push(data)
|
||||
}
|
||||
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
|
||||
}()
|
||||
log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name)
|
||||
}
|
||||
|
||||
// Flush flushes the queue
|
||||
func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error {
|
||||
return q.ChannelUniqueQueue.Flush(timeout)
|
||||
}
|
||||
|
||||
// Shutdown processing this queue
|
||||
func (q *PersistableChannelUniqueQueue) Shutdown() {
|
||||
log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
select {
|
||||
case <-q.closed:
|
||||
default:
|
||||
if q.internal != nil {
|
||||
q.internal.(*LevelUniqueQueue).Shutdown()
|
||||
}
|
||||
close(q.closed)
|
||||
}
|
||||
log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
|
||||
}
|
||||
|
||||
// Terminate this queue and close the queue
|
||||
func (q *PersistableChannelUniqueQueue) Terminate() {
|
||||
log.Trace("PersistableChannelUniqueQueue: %s Terminating", q.delayedStarter.name)
|
||||
q.Shutdown()
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if q.internal != nil {
|
||||
q.internal.(*LevelUniqueQueue).Terminate()
|
||||
}
|
||||
log.Debug("PersistableChannelUniqueQueue: %s Terminated", q.delayedStarter.name)
|
||||
}
|
||||
|
||||
func init() {
|
||||
queuesMap[PersistableChannelUniqueQueueType] = NewPersistableChannelUniqueQueue
|
||||
}
|
124
modules/queue/unique_queue_redis.go
Normal file
124
modules/queue/unique_queue_redis.go
Normal file
@@ -0,0 +1,124 @@
|
||||
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package queue
|
||||
|
||||
// RedisUniqueQueueType is the type for redis queue
|
||||
const RedisUniqueQueueType Type = "unique-redis"
|
||||
|
||||
// RedisUniqueQueue redis queue
|
||||
type RedisUniqueQueue struct {
|
||||
*ByteFIFOUniqueQueue
|
||||
}
|
||||
|
||||
// RedisUniqueQueueConfiguration is the configuration for the redis queue
|
||||
type RedisUniqueQueueConfiguration struct {
|
||||
ByteFIFOQueueConfiguration
|
||||
RedisUniqueByteFIFOConfiguration
|
||||
}
|
||||
|
||||
// NewRedisUniqueQueue creates single redis or cluster redis queue.
|
||||
//
|
||||
// Please note that this Queue does not guarantee that a particular
|
||||
// task cannot be processed twice or more at the same time. Uniqueness is
|
||||
// only guaranteed whilst the task is waiting in the queue.
|
||||
func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||
configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := configInterface.(RedisUniqueQueueConfiguration)
|
||||
|
||||
byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(byteFIFO.setName) == 0 {
|
||||
byteFIFO.setName = byteFIFO.queueName + "_unique"
|
||||
}
|
||||
|
||||
byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queue := &RedisUniqueQueue{
|
||||
ByteFIFOUniqueQueue: byteFIFOQueue,
|
||||
}
|
||||
|
||||
queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar)
|
||||
|
||||
return queue, nil
|
||||
}
|
||||
|
||||
var _ (UniqueByteFIFO) = &RedisUniqueByteFIFO{}
|
||||
|
||||
// RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
|
||||
type RedisUniqueByteFIFO struct {
|
||||
RedisByteFIFO
|
||||
setName string
|
||||
}
|
||||
|
||||
// RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
|
||||
type RedisUniqueByteFIFOConfiguration struct {
|
||||
RedisByteFIFOConfiguration
|
||||
SetName string
|
||||
}
|
||||
|
||||
// NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
|
||||
func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) {
|
||||
internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fifo := &RedisUniqueByteFIFO{
|
||||
RedisByteFIFO: *internal,
|
||||
setName: config.SetName,
|
||||
}
|
||||
|
||||
return fifo, nil
|
||||
}
|
||||
|
||||
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
|
||||
func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
|
||||
added, err := fifo.client.SAdd(fifo.setName, data).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if added == 0 {
|
||||
return ErrAlreadyInQueue
|
||||
}
|
||||
if fn != nil {
|
||||
if err := fn(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return fifo.client.RPush(fifo.queueName, data).Err()
|
||||
}
|
||||
|
||||
// Pop pops data from the start of the fifo
|
||||
func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
|
||||
data, err := fifo.client.LPop(fifo.queueName).Bytes()
|
||||
if err != nil {
|
||||
return data, err
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
err = fifo.client.SRem(fifo.setName, data).Err()
|
||||
return data, err
|
||||
}
|
||||
|
||||
// Has returns whether the fifo contains this data
|
||||
func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
|
||||
return fifo.client.SIsMember(fifo.setName, data).Result()
|
||||
}
|
||||
|
||||
func init() {
|
||||
queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue
|
||||
}
|
172
modules/queue/unique_queue_wrapped.go
Normal file
172
modules/queue/unique_queue_wrapped.go
Normal file
@@ -0,0 +1,172 @@
|
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WrappedUniqueQueueType is the type for a wrapped delayed starting queue
|
||||
const WrappedUniqueQueueType Type = "unique-wrapped"
|
||||
|
||||
// WrappedUniqueQueueConfiguration is the configuration for a WrappedUniqueQueue
|
||||
type WrappedUniqueQueueConfiguration struct {
|
||||
Underlying Type
|
||||
Timeout time.Duration
|
||||
MaxAttempts int
|
||||
Config interface{}
|
||||
QueueLength int
|
||||
Name string
|
||||
}
|
||||
|
||||
// WrappedUniqueQueue wraps a delayed starting unique queue
|
||||
type WrappedUniqueQueue struct {
|
||||
*WrappedQueue
|
||||
table map[Data]bool
|
||||
tlock sync.Mutex
|
||||
ready bool
|
||||
}
|
||||
|
||||
// NewWrappedUniqueQueue will attempt to create a unique queue of the provided type,
|
||||
// but if there is a problem creating this queue it will instead create
|
||||
// a WrappedUniqueQueue with delayed startup of the queue instead and a
|
||||
// channel which will be redirected to the queue
|
||||
//
|
||||
// Please note that this Queue does not guarantee that a particular
|
||||
// task cannot be processed twice or more at the same time. Uniqueness is
|
||||
// only guaranteed whilst the task is waiting in the queue.
|
||||
func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
||||
configInterface, err := toConfig(WrappedUniqueQueueConfiguration{}, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := configInterface.(WrappedUniqueQueueConfiguration)
|
||||
|
||||
queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar)
|
||||
if err == nil {
|
||||
// Just return the queue there is no need to wrap
|
||||
return queue, nil
|
||||
}
|
||||
if IsErrInvalidConfiguration(err) {
|
||||
// Retrying ain't gonna make this any better...
|
||||
return nil, ErrInvalidConfiguration{cfg: cfg}
|
||||
}
|
||||
|
||||
wrapped := &WrappedUniqueQueue{
|
||||
WrappedQueue: &WrappedQueue{
|
||||
channel: make(chan Data, config.QueueLength),
|
||||
exemplar: exemplar,
|
||||
delayedStarter: delayedStarter{
|
||||
cfg: config.Config,
|
||||
underlying: config.Underlying,
|
||||
timeout: config.Timeout,
|
||||
maxAttempts: config.MaxAttempts,
|
||||
name: config.Name,
|
||||
},
|
||||
},
|
||||
table: map[Data]bool{},
|
||||
}
|
||||
|
||||
// wrapped.handle is passed to the delayedStarting internal queue and is run to handle
|
||||
// data passed to
|
||||
wrapped.handle = func(data ...Data) {
|
||||
for _, datum := range data {
|
||||
wrapped.tlock.Lock()
|
||||
if !wrapped.ready {
|
||||
delete(wrapped.table, data)
|
||||
// If our table is empty all of the requests we have buffered between the
|
||||
// wrapper queue starting and the internal queue starting have been handled.
|
||||
// We can stop buffering requests in our local table and just pass Push
|
||||
// direct to the internal queue
|
||||
if len(wrapped.table) == 0 {
|
||||
wrapped.ready = true
|
||||
}
|
||||
}
|
||||
wrapped.tlock.Unlock()
|
||||
handle(datum)
|
||||
}
|
||||
}
|
||||
_ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar)
|
||||
return wrapped, nil
|
||||
}
|
||||
|
||||
// Push will push the data to the internal channel checking it against the exemplar
|
||||
func (q *WrappedUniqueQueue) Push(data Data) error {
|
||||
return q.PushFunc(data, nil)
|
||||
}
|
||||
|
||||
// PushFunc will push the data to the internal channel checking it against the exemplar
|
||||
func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
|
||||
if !assignableTo(data, q.exemplar) {
|
||||
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
|
||||
}
|
||||
|
||||
q.tlock.Lock()
|
||||
if q.ready {
|
||||
// ready means our table is empty and all of the requests we have buffered between the
|
||||
// wrapper queue starting and the internal queue starting have been handled.
|
||||
// We can stop buffering requests in our local table and just pass Push
|
||||
// direct to the internal queue
|
||||
q.tlock.Unlock()
|
||||
return q.internal.(UniqueQueue).PushFunc(data, fn)
|
||||
}
|
||||
|
||||
locked := true
|
||||
defer func() {
|
||||
if locked {
|
||||
q.tlock.Unlock()
|
||||
}
|
||||
}()
|
||||
if _, ok := q.table[data]; ok {
|
||||
return ErrAlreadyInQueue
|
||||
}
|
||||
// FIXME: We probably need to implement some sort of limit here
|
||||
// If the downstream queue blocks this table will grow without limit
|
||||
q.table[data] = true
|
||||
if fn != nil {
|
||||
err := fn()
|
||||
if err != nil {
|
||||
delete(q.table, data)
|
||||
return err
|
||||
}
|
||||
}
|
||||
locked = false
|
||||
q.tlock.Unlock()
|
||||
|
||||
q.channel <- data
|
||||
return nil
|
||||
}
|
||||
|
||||
// Has checks if the data is in the queue
|
||||
func (q *WrappedUniqueQueue) Has(data Data) (bool, error) {
|
||||
q.tlock.Lock()
|
||||
defer q.tlock.Unlock()
|
||||
if q.ready {
|
||||
return q.internal.(UniqueQueue).Has(data)
|
||||
}
|
||||
_, has := q.table[data]
|
||||
return has, nil
|
||||
}
|
||||
|
||||
// IsEmpty checks whether the queue is empty
|
||||
func (q *WrappedUniqueQueue) IsEmpty() bool {
|
||||
q.tlock.Lock()
|
||||
if len(q.table) > 0 {
|
||||
q.tlock.Unlock()
|
||||
return false
|
||||
}
|
||||
if q.ready {
|
||||
q.tlock.Unlock()
|
||||
return q.internal.IsEmpty()
|
||||
}
|
||||
q.tlock.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
func init() {
|
||||
queuesMap[WrappedUniqueQueueType] = NewWrappedUniqueQueue
|
||||
}
|
Reference in New Issue
Block a user