From c1ecb6c60a1acacf530e226b8043ca93d2fe4a07 Mon Sep 17 00:00:00 2001 From: Unknwon Date: Tue, 30 Aug 2016 15:50:30 -0700 Subject: [PATCH] modules/sync: add UniqueQueue --- models/pull.go | 5 +-- models/webhook.go | 63 ++------------------------------ modules/sync/unique_queue.go | 70 ++++++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 62 deletions(-) create mode 100644 modules/sync/unique_queue.go diff --git a/models/pull.go b/models/pull.go index 64b347554c..100d4db435 100644 --- a/models/pull.go +++ b/models/pull.go @@ -20,8 +20,11 @@ import ( "github.com/gogits/gogs/modules/log" "github.com/gogits/gogs/modules/process" "github.com/gogits/gogs/modules/setting" + "github.com/gogits/gogs/modules/sync" ) +var PullRequestQueue = sync.NewUniqueQueue(setting.Repository.PullRequestQueueLength) + type PullRequestType int const ( @@ -537,8 +540,6 @@ func (pr *PullRequest) UpdateCols(cols ...string) error { return err } -var PullRequestQueue = NewUniqueQueue(setting.Repository.PullRequestQueueLength) - // UpdatePatch generates and saves a new patch. func (pr *PullRequest) UpdatePatch() (err error) { if err = pr.GetHeadRepo(); err != nil { diff --git a/models/webhook.go b/models/webhook.go index 2db0274115..528dd5e474 100644 --- a/models/webhook.go +++ b/models/webhook.go @@ -10,10 +10,8 @@ import ( "fmt" "io/ioutil" "strings" - "sync" "time" - "github.com/Unknwon/com" "github.com/go-xorm/xorm" gouuid "github.com/satori/go.uuid" @@ -22,8 +20,11 @@ import ( "github.com/gogits/gogs/modules/httplib" "github.com/gogits/gogs/modules/log" "github.com/gogits/gogs/modules/setting" + "github.com/gogits/gogs/modules/sync" ) +var HookQueue = sync.NewUniqueQueue(setting.Webhook.QueueLength) + type HookContentType int const ( @@ -500,64 +501,6 @@ func PrepareWebhooks(repo *Repository, event HookEventType, p api.Payloader) err return nil } -// UniqueQueue represents a queue that guarantees only one instance of same ID is in the line. -type UniqueQueue struct { - lock sync.Mutex - ids map[string]bool - - queue chan string -} - -func (q *UniqueQueue) Queue() <-chan string { - return q.queue -} - -func NewUniqueQueue(queueLength int) *UniqueQueue { - if queueLength <= 0 { - queueLength = 100 - } - - return &UniqueQueue{ - ids: make(map[string]bool), - queue: make(chan string, queueLength), - } -} - -func (q *UniqueQueue) Remove(id interface{}) { - q.lock.Lock() - defer q.lock.Unlock() - delete(q.ids, com.ToStr(id)) -} - -func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { - newid := com.ToStr(id) - - if q.Exist(id) { - return - } - - q.lock.Lock() - q.ids[newid] = true - if fn != nil { - fn() - } - q.lock.Unlock() - q.queue <- newid -} - -func (q *UniqueQueue) Add(id interface{}) { - q.AddFunc(id, nil) -} - -func (q *UniqueQueue) Exist(id interface{}) bool { - q.lock.Lock() - defer q.lock.Unlock() - - return q.ids[com.ToStr(id)] -} - -var HookQueue = NewUniqueQueue(setting.Webhook.QueueLength) - func (t *HookTask) deliver() { t.IsDelivered = true diff --git a/modules/sync/unique_queue.go b/modules/sync/unique_queue.go new file mode 100644 index 0000000000..3f3c1c8661 --- /dev/null +++ b/modules/sync/unique_queue.go @@ -0,0 +1,70 @@ +// Copyright 2016 The Gogs Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "github.com/Unknwon/com" +) + +// UniqueQueue is a queue which guarantees only one instance of same +// identity is in the line. Instances with same identity will be +// discarded if there is already one in the line. +// +// This queue is particularly useful for preventing duplicated task +// of same purpose. +type UniqueQueue struct { + table *StatusTable + queue chan string +} + +// NewUniqueQueue initializes and returns a new UniqueQueue object. +func NewUniqueQueue(queueLength int) *UniqueQueue { + if queueLength <= 0 { + queueLength = 100 + } + + return &UniqueQueue{ + table: NewStatusTable(), + queue: make(chan string, queueLength), + } +} + +// Queue returns channel of queue for retrieving instances. +func (q *UniqueQueue) Queue() <-chan string { + return q.queue +} + +// Exist returns true if there is an instance with given indentity +// exists in the queue. +func (q *UniqueQueue) Exist(id interface{}) bool { + return q.table.IsRunning(com.ToStr(id)) +} + +// AddFunc adds new instance to the queue with a custom runnable function, +// the queue is blocked until the function exits. +func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { + if q.Exist(id) { + return + } + + idStr := com.ToStr(id) + q.table.lock.Lock() + q.table.pool[idStr] = true + if fn != nil { + fn() + } + q.table.lock.Unlock() + q.queue <- idStr +} + +// Add adds new instance to the queue. +func (q *UniqueQueue) Add(id interface{}) { + q.AddFunc(id, nil) +} + +// Remove removes instance from the queue. +func (q *UniqueQueue) Remove(id interface{}) { + q.table.Stop(com.ToStr(id)) +}