From 7117c7774ae1c1003c95d3df8dfaa5bd7270165d Mon Sep 17 00:00:00 2001 From: zeripath Date: Sun, 17 Oct 2021 12:43:25 +0100 Subject: [PATCH] Make the Mirror Queue a queue (#17326) Convert the old mirror syncing queue to the more modern queue format. Fix a bug in the from the repo-archive queue PR - the assumption was made that uniqueness could be enforced with by checking equality in a map in channel unique queues - however this only works for primitive types - which was the initial intention but is an imperfect. This is fixed by marshalling the data and placing the martialled data in the unique map instead. The documentation is also updated to add information about the deprecated configuration values. Signed-off-by: Andrew Thornton --- custom/conf/app.example.ini | 4 +- .../doc/advanced/config-cheat-sheet.en-us.md | 39 +++++++- modules/queue/unique_queue_channel.go | 29 ++++-- modules/setting/mailer.go | 2 - modules/setting/queue.go | 64 +++++++----- modules/setting/repository.go | 4 - services/mirror/mirror.go | 98 ++++++++++++------- 7 files changed, 164 insertions(+), 76 deletions(-) diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index bdc42480e4..1753ed2330 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -769,10 +769,10 @@ PATH = ;; Global limit of repositories per user, applied at creation time. -1 means no limit ;MAX_CREATION_LIMIT = -1 ;; -;; Mirror sync queue length, increase if mirror syncing starts hanging +;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead) ;MIRROR_QUEUE_LENGTH = 1000 ;; -;; Patch test queue length, increase if pull request patch testing starts hanging +;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead) ;PULL_REQUEST_QUEUE_LENGTH = 1000 ;; ;; Preferred Licenses to place at the top of the List diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index 251f6bd51a..91c62dbec3 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`. - `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create. - `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user, `-1` means no limit. -- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it +- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`. as large as possible. Use caution when editing this value. - `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch - testing starts hanging. + testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`. - `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at the top of the list. Name must match file name in options/license or custom/options/license. - `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the @@ -382,6 +382,8 @@ relation to port exhaustion. ## Queue (`queue` and `queue.*`) +Configuration at `[queue]` will set defaults for queues with overrides for individual queues at `[queue.*]`. (However see below.) + - `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy` - `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.) - `LENGTH`: **20**: Maximal queue size before channel queues block @@ -400,6 +402,37 @@ relation to port exhaustion. - `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long. - `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost. +Gitea creates the following non-unique queues: + +- `code_indexer` +- `issue_indexer` +- `notification-service` +- `task` +- `mail` +- `push_update` + +And the following unique queues: + +- `repo_stats_update` +- `repo-archive` +- `mirror` +- `pr_patch_checker` + +Certain queues have defaults that override the defaults set in `[queue]` (this occurs mostly to support older configuration): + +- `[queue.issue_indexer]` + - `TYPE` this will default to `[queue]` `TYPE` if it is set but if not it will appropriately convert `[indexer]` `ISSUE_INDEXER_QUEUE_TYPE` if that is set. + - `LENGTH` will default to `[indexer]` `UPDATE_BUFFER_LEN` if that is set. + - `BATCH_LENGTH` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_BATCH_NUMBER` if that is set. + - `DATADIR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_DIR` if that is set. + - `CONN_STR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_CONN_STR` if that is set. +- `[queue.mailer]` + - `LENGTH` will default to **100** or whatever `[mailer]` `SEND_BUFFER_LEN` is. +- `[queue.pr_patch_checker]` + - `LENGTH` will default to **1000** or whatever `[repository]` `PULL_REQUEST_QUEUE_LENGTH` is. +- `[queue.mirror]` + - `LENGTH` will default to **1000** or whatever `[repository]` `MIRROR_QUEUE_LENGTH` is. + ## Admin (`admin`) - `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled @@ -588,7 +621,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type command or full path). - `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments. - `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail -- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. +- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]` ## Cache (`cache`) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 5bec67c4d3..f617595c04 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -9,6 +9,7 @@ import ( "fmt" "sync" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" ) @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration type ChannelUniqueQueue struct { *WorkerPool lock sync.Mutex - table map[Data]bool + table map[string]bool shutdownCtx context.Context shutdownCtxCancel context.CancelFunc terminateCtx context.Context @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, + table: map[string]bool{}, shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { + // No error is possible here because PushFunc ensures that this can be marshalled + bs, _ := json.Marshal(datum) + queue.lock.Lock() - delete(queue.table, datum) + delete(queue.table, string(bs)) queue.lock.Unlock() + handle(datum) } }, config.WorkerPoolConfiguration) @@ -94,6 +99,11 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } + + bs, err := json.Marshal(data) + if err != nil { + return err + } q.lock.Lock() locked := true defer func() { @@ -101,16 +111,16 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { q.lock.Unlock() } }() - if _, ok := q.table[data]; ok { + if _, ok := q.table[string(bs)]; ok { return ErrAlreadyInQueue } // FIXME: We probably need to implement some sort of limit here // If the downstream queue blocks this table will grow without limit - q.table[data] = true + q.table[string(bs)] = true if fn != nil { err := fn() if err != nil { - delete(q.table, data) + delete(q.table, string(bs)) return err } } @@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { // Has checks if the data is in the queue func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { + bs, err := json.Marshal(data) + if err != nil { + return false, err + } + q.lock.Lock() defer q.lock.Unlock() - _, has := q.table[data] + _, has := q.table[string(bs)] return has, nil } diff --git a/modules/setting/mailer.go b/modules/setting/mailer.go index a2228e938b..d2fac440ac 100644 --- a/modules/setting/mailer.go +++ b/modules/setting/mailer.go @@ -16,7 +16,6 @@ import ( // Mailer represents mail service. type Mailer struct { // Mailer - QueueLength int Name string From string FromName string @@ -54,7 +53,6 @@ func newMailService() { } MailService = &Mailer{ - QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100), Name: sec.Key("NAME").MustString(AppName), SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false), MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}), diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 76b7dc1faf..1668cc63a3 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -5,11 +5,12 @@ package setting import ( - "fmt" "path/filepath" + "strconv" "time" "code.gitea.io/gitea/modules/log" + ini "gopkg.in/ini.v1" ) // QueueSettings represent the settings for a queue from the ini @@ -106,11 +107,8 @@ func NewQueueService() { // Now handle the old issue_indexer configuration section := Cfg.Section("queue.issue_indexer") - sectionMap := map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true - } - if _, ok := sectionMap["TYPE"]; !ok && defaultType == "" { + directlySet := toDirectlySetKeysMap(section) + if !directlySet["TYPE"] && defaultType == "" { switch Indexer.IssueQueueType { case LevelQueueType: _, _ = section.NewKey("TYPE", "level") @@ -125,37 +123,53 @@ func NewQueueService() { Indexer.IssueQueueType) } } - if _, ok := sectionMap["LENGTH"]; !ok && Indexer.UpdateQueueLength != 0 { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength)) + if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 { + _, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength)) } - if _, ok := sectionMap["BATCH_LENGTH"]; !ok && Indexer.IssueQueueBatchNumber != 0 { - _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) + if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 { + _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber)) } - if _, ok := sectionMap["DATADIR"]; !ok && Indexer.IssueQueueDir != "" { + if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" { _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir) } - if _, ok := sectionMap["CONN_STR"]; !ok && Indexer.IssueQueueConnStr != "" { + if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" { _, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr) } // Handle the old mailer configuration - section = Cfg.Section("queue.mailer") - sectionMap = map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true - } - if _, ok := sectionMap["LENGTH"]; !ok { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))) - } + handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)) // Handle the old test pull requests configuration // Please note this will be a unique queue - section = Cfg.Section("queue.pr_patch_checker") - sectionMap = map[string]bool{} + handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000)) + + // Handle the old mirror queue configuration + // Please note this will be a unique queue + handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000)) +} + +// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but +// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0) +func handleOldLengthConfiguration(queueName string, value int) { + // Don't override with 0 + if value <= 0 { + return + } + + section := Cfg.Section("queue." + queueName) + directlySet := toDirectlySetKeysMap(section) + if !directlySet["LENGTH"] { + _, _ = section.NewKey("LENGTH", strconv.Itoa(value)) + } +} + +// toDirectlySetKeysMap returns a bool map of keys directly set by this section +// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key +// but this section does not. +func toDirectlySetKeysMap(section *ini.Section) map[string]bool { + sectionMap := map[string]bool{} for _, key := range section.Keys() { sectionMap[key.Name()] = true } - if _, ok := sectionMap["LENGTH"]; !ok { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength)) - } + return sectionMap } diff --git a/modules/setting/repository.go b/modules/setting/repository.go index de57eb9140..0791602efb 100644 --- a/modules/setting/repository.go +++ b/modules/setting/repository.go @@ -29,8 +29,6 @@ var ( DefaultPrivate string DefaultPushCreatePrivate bool MaxCreationLimit int - MirrorQueueLength int - PullRequestQueueLength int PreferredLicenses []string DisableHTTPGit bool AccessControlAllowOrigin string @@ -142,8 +140,6 @@ var ( DefaultPrivate: RepoCreatingLastUserVisibility, DefaultPushCreatePrivate: true, MaxCreationLimit: -1, - MirrorQueueLength: 1000, - PullRequestQueueLength: 1000, PreferredLicenses: []string{"Apache License 2.0", "MIT License"}, DisableHTTPGit: false, AccessControlAllowOrigin: "", diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 7a3e37d993..eb37639bef 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -7,18 +7,43 @@ package mirror import ( "context" "fmt" - "strconv" - "strings" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/sync" ) -// mirrorQueue holds an UniqueQueue object of the mirror -var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength) +var mirrorQueue queue.UniqueQueue + +// SyncType type of sync request +type SyncType int + +const ( + // PullMirrorType for pull mirrors + PullMirrorType SyncType = iota + // PushMirrorType for push mirrors + PushMirrorType +) + +// SyncRequest for the mirror queue +type SyncRequest struct { + Type SyncType + RepoID int64 +} + +// doMirrorSync causes this request to mirror itself +func doMirrorSync(ctx context.Context, req *SyncRequest) { + switch req.Type { + case PushMirrorType: + _ = SyncPushMirror(ctx, req.RepoID) + case PullMirrorType: + _ = SyncPullMirror(ctx, req.RepoID) + default: + log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID) + } +} // Update checks and updates mirror repositories. func Update(ctx context.Context) error { @@ -29,19 +54,25 @@ func Update(ctx context.Context) error { log.Trace("Doing: Update") handler := func(idx int, bean interface{}) error { - var item string + var item SyncRequest if m, ok := bean.(*models.Mirror); ok { if m.Repo == nil { log.Error("Disconnected mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("pull %d", m.RepoID) + item = SyncRequest{ + Type: PullMirrorType, + RepoID: m.RepoID, + } } else if m, ok := bean.(*models.PushMirror); ok { if m.Repo == nil { log.Error("Disconnected push-mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("push %d", m.ID) + item = SyncRequest{ + Type: PushMirrorType, + RepoID: m.RepoID, + } } else { log.Error("Unknown bean: %v", bean) return nil @@ -51,8 +82,7 @@ func Update(ctx context.Context) error { case <-ctx.Done(): return fmt.Errorf("Aborted") default: - mirrorQueue.Add(item) - return nil + return mirrorQueue.Push(&item) } } @@ -68,26 +98,10 @@ func Update(ctx context.Context) error { return nil } -// syncMirrors checks and syncs mirrors. -// FIXME: graceful: this should be a persistable queue -func syncMirrors(ctx context.Context) { - // Start listening on new sync requests. - for { - select { - case <-ctx.Done(): - mirrorQueue.Close() - return - case item := <-mirrorQueue.Queue(): - id, _ := strconv.ParseInt(item[5:], 10, 64) - if strings.HasPrefix(item, "pull") { - _ = SyncPullMirror(ctx, id) - } else if strings.HasPrefix(item, "push") { - _ = SyncPushMirror(ctx, id) - } else { - log.Error("Unknown item in queue: %v", item) - } - mirrorQueue.Remove(item) - } +func queueHandle(data ...queue.Data) { + for _, datum := range data { + req := datum.(*SyncRequest) + doMirrorSync(graceful.GetManager().ShutdownContext(), req) } } @@ -96,7 +110,9 @@ func InitSyncMirrors() { if !setting.Mirror.Enabled { return } - go graceful.GetManager().RunWithShutdownContext(syncMirrors) + mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest)) + + go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run) } // StartToMirror adds repoID to mirror queue @@ -104,7 +120,15 @@ func StartToMirror(repoID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID)) + go func() { + err := mirrorQueue.Push(&SyncRequest{ + Type: PullMirrorType, + RepoID: repoID, + }) + if err != nil { + log.Error("Unable to push sync request for to the queue for push mirror repo[%d]: Error: %v", repoID, err) + } + }() } // AddPushMirrorToQueue adds the push mirror to the queue @@ -112,5 +136,13 @@ func AddPushMirrorToQueue(mirrorID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID)) + go func() { + err := mirrorQueue.Push(&SyncRequest{ + Type: PushMirrorType, + RepoID: mirrorID, + }) + if err != nil { + log.Error("Unable to push sync request to the queue for pull mirror repo[%d]: Error: %v", mirrorID, err) + } + }() }