diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index bdc42480e4..1753ed2330 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -769,10 +769,10 @@ PATH = ;; Global limit of repositories per user, applied at creation time. -1 means no limit ;MAX_CREATION_LIMIT = -1 ;; -;; Mirror sync queue length, increase if mirror syncing starts hanging +;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead) ;MIRROR_QUEUE_LENGTH = 1000 ;; -;; Patch test queue length, increase if pull request patch testing starts hanging +;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead) ;PULL_REQUEST_QUEUE_LENGTH = 1000 ;; ;; Preferred Licenses to place at the top of the List diff --git a/docs/content/doc/advanced/config-cheat-sheet.en-us.md b/docs/content/doc/advanced/config-cheat-sheet.en-us.md index 251f6bd51a..91c62dbec3 100644 --- a/docs/content/doc/advanced/config-cheat-sheet.en-us.md +++ b/docs/content/doc/advanced/config-cheat-sheet.en-us.md @@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`. - `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create. - `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user, `-1` means no limit. -- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it +- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`. as large as possible. Use caution when editing this value. - `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch - testing starts hanging. + testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`. - `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at the top of the list. Name must match file name in options/license or custom/options/license. - `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the @@ -382,6 +382,8 @@ relation to port exhaustion. ## Queue (`queue` and `queue.*`) +Configuration at `[queue]` will set defaults for queues with overrides for individual queues at `[queue.*]`. (However see below.) + - `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy` - `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.) - `LENGTH`: **20**: Maximal queue size before channel queues block @@ -400,6 +402,37 @@ relation to port exhaustion. - `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long. - `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost. +Gitea creates the following non-unique queues: + +- `code_indexer` +- `issue_indexer` +- `notification-service` +- `task` +- `mail` +- `push_update` + +And the following unique queues: + +- `repo_stats_update` +- `repo-archive` +- `mirror` +- `pr_patch_checker` + +Certain queues have defaults that override the defaults set in `[queue]` (this occurs mostly to support older configuration): + +- `[queue.issue_indexer]` + - `TYPE` this will default to `[queue]` `TYPE` if it is set but if not it will appropriately convert `[indexer]` `ISSUE_INDEXER_QUEUE_TYPE` if that is set. + - `LENGTH` will default to `[indexer]` `UPDATE_BUFFER_LEN` if that is set. + - `BATCH_LENGTH` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_BATCH_NUMBER` if that is set. + - `DATADIR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_DIR` if that is set. + - `CONN_STR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_CONN_STR` if that is set. +- `[queue.mailer]` + - `LENGTH` will default to **100** or whatever `[mailer]` `SEND_BUFFER_LEN` is. +- `[queue.pr_patch_checker]` + - `LENGTH` will default to **1000** or whatever `[repository]` `PULL_REQUEST_QUEUE_LENGTH` is. +- `[queue.mirror]` + - `LENGTH` will default to **1000** or whatever `[repository]` `MIRROR_QUEUE_LENGTH` is. + ## Admin (`admin`) - `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled @@ -588,7 +621,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type command or full path). - `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments. - `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail -- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. +- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]` ## Cache (`cache`) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 5bec67c4d3..f617595c04 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -9,6 +9,7 @@ import ( "fmt" "sync" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" ) @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration type ChannelUniqueQueue struct { *WorkerPool lock sync.Mutex - table map[Data]bool + table map[string]bool shutdownCtx context.Context shutdownCtxCancel context.CancelFunc terminateCtx context.Context @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, + table: map[string]bool{}, shutdownCtx: shutdownCtx, shutdownCtxCancel: shutdownCtxCancel, terminateCtx: terminateCtx, @@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { + // No error is possible here because PushFunc ensures that this can be marshalled + bs, _ := json.Marshal(datum) + queue.lock.Lock() - delete(queue.table, datum) + delete(queue.table, string(bs)) queue.lock.Unlock() + handle(datum) } }, config.WorkerPoolConfiguration) @@ -94,6 +99,11 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { if !assignableTo(data, q.exemplar) { return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name) } + + bs, err := json.Marshal(data) + if err != nil { + return err + } q.lock.Lock() locked := true defer func() { @@ -101,16 +111,16 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { q.lock.Unlock() } }() - if _, ok := q.table[data]; ok { + if _, ok := q.table[string(bs)]; ok { return ErrAlreadyInQueue } // FIXME: We probably need to implement some sort of limit here // If the downstream queue blocks this table will grow without limit - q.table[data] = true + q.table[string(bs)] = true if fn != nil { err := fn() if err != nil { - delete(q.table, data) + delete(q.table, string(bs)) return err } } @@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error { // Has checks if the data is in the queue func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { + bs, err := json.Marshal(data) + if err != nil { + return false, err + } + q.lock.Lock() defer q.lock.Unlock() - _, has := q.table[data] + _, has := q.table[string(bs)] return has, nil } diff --git a/modules/setting/mailer.go b/modules/setting/mailer.go index a2228e938b..d2fac440ac 100644 --- a/modules/setting/mailer.go +++ b/modules/setting/mailer.go @@ -16,7 +16,6 @@ import ( // Mailer represents mail service. type Mailer struct { // Mailer - QueueLength int Name string From string FromName string @@ -54,7 +53,6 @@ func newMailService() { } MailService = &Mailer{ - QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100), Name: sec.Key("NAME").MustString(AppName), SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false), MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}), diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 76b7dc1faf..1668cc63a3 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -5,11 +5,12 @@ package setting import ( - "fmt" "path/filepath" + "strconv" "time" "code.gitea.io/gitea/modules/log" + ini "gopkg.in/ini.v1" ) // QueueSettings represent the settings for a queue from the ini @@ -106,11 +107,8 @@ func NewQueueService() { // Now handle the old issue_indexer configuration section := Cfg.Section("queue.issue_indexer") - sectionMap := map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true - } - if _, ok := sectionMap["TYPE"]; !ok && defaultType == "" { + directlySet := toDirectlySetKeysMap(section) + if !directlySet["TYPE"] && defaultType == "" { switch Indexer.IssueQueueType { case LevelQueueType: _, _ = section.NewKey("TYPE", "level") @@ -125,37 +123,53 @@ func NewQueueService() { Indexer.IssueQueueType) } } - if _, ok := sectionMap["LENGTH"]; !ok && Indexer.UpdateQueueLength != 0 { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength)) + if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 { + _, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength)) } - if _, ok := sectionMap["BATCH_LENGTH"]; !ok && Indexer.IssueQueueBatchNumber != 0 { - _, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) + if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 { + _, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber)) } - if _, ok := sectionMap["DATADIR"]; !ok && Indexer.IssueQueueDir != "" { + if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" { _, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir) } - if _, ok := sectionMap["CONN_STR"]; !ok && Indexer.IssueQueueConnStr != "" { + if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" { _, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr) } // Handle the old mailer configuration - section = Cfg.Section("queue.mailer") - sectionMap = map[string]bool{} - for _, key := range section.Keys() { - sectionMap[key.Name()] = true - } - if _, ok := sectionMap["LENGTH"]; !ok { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))) - } + handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)) // Handle the old test pull requests configuration // Please note this will be a unique queue - section = Cfg.Section("queue.pr_patch_checker") - sectionMap = map[string]bool{} + handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000)) + + // Handle the old mirror queue configuration + // Please note this will be a unique queue + handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000)) +} + +// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but +// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0) +func handleOldLengthConfiguration(queueName string, value int) { + // Don't override with 0 + if value <= 0 { + return + } + + section := Cfg.Section("queue." + queueName) + directlySet := toDirectlySetKeysMap(section) + if !directlySet["LENGTH"] { + _, _ = section.NewKey("LENGTH", strconv.Itoa(value)) + } +} + +// toDirectlySetKeysMap returns a bool map of keys directly set by this section +// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key +// but this section does not. +func toDirectlySetKeysMap(section *ini.Section) map[string]bool { + sectionMap := map[string]bool{} for _, key := range section.Keys() { sectionMap[key.Name()] = true } - if _, ok := sectionMap["LENGTH"]; !ok { - _, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength)) - } + return sectionMap } diff --git a/modules/setting/repository.go b/modules/setting/repository.go index de57eb9140..0791602efb 100644 --- a/modules/setting/repository.go +++ b/modules/setting/repository.go @@ -29,8 +29,6 @@ var ( DefaultPrivate string DefaultPushCreatePrivate bool MaxCreationLimit int - MirrorQueueLength int - PullRequestQueueLength int PreferredLicenses []string DisableHTTPGit bool AccessControlAllowOrigin string @@ -142,8 +140,6 @@ var ( DefaultPrivate: RepoCreatingLastUserVisibility, DefaultPushCreatePrivate: true, MaxCreationLimit: -1, - MirrorQueueLength: 1000, - PullRequestQueueLength: 1000, PreferredLicenses: []string{"Apache License 2.0", "MIT License"}, DisableHTTPGit: false, AccessControlAllowOrigin: "", diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 7a3e37d993..eb37639bef 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -7,18 +7,43 @@ package mirror import ( "context" "fmt" - "strconv" - "strings" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/sync" ) -// mirrorQueue holds an UniqueQueue object of the mirror -var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength) +var mirrorQueue queue.UniqueQueue + +// SyncType type of sync request +type SyncType int + +const ( + // PullMirrorType for pull mirrors + PullMirrorType SyncType = iota + // PushMirrorType for push mirrors + PushMirrorType +) + +// SyncRequest for the mirror queue +type SyncRequest struct { + Type SyncType + RepoID int64 +} + +// doMirrorSync causes this request to mirror itself +func doMirrorSync(ctx context.Context, req *SyncRequest) { + switch req.Type { + case PushMirrorType: + _ = SyncPushMirror(ctx, req.RepoID) + case PullMirrorType: + _ = SyncPullMirror(ctx, req.RepoID) + default: + log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID) + } +} // Update checks and updates mirror repositories. func Update(ctx context.Context) error { @@ -29,19 +54,25 @@ func Update(ctx context.Context) error { log.Trace("Doing: Update") handler := func(idx int, bean interface{}) error { - var item string + var item SyncRequest if m, ok := bean.(*models.Mirror); ok { if m.Repo == nil { log.Error("Disconnected mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("pull %d", m.RepoID) + item = SyncRequest{ + Type: PullMirrorType, + RepoID: m.RepoID, + } } else if m, ok := bean.(*models.PushMirror); ok { if m.Repo == nil { log.Error("Disconnected push-mirror found: %d", m.ID) return nil } - item = fmt.Sprintf("push %d", m.ID) + item = SyncRequest{ + Type: PushMirrorType, + RepoID: m.RepoID, + } } else { log.Error("Unknown bean: %v", bean) return nil @@ -51,8 +82,7 @@ func Update(ctx context.Context) error { case <-ctx.Done(): return fmt.Errorf("Aborted") default: - mirrorQueue.Add(item) - return nil + return mirrorQueue.Push(&item) } } @@ -68,26 +98,10 @@ func Update(ctx context.Context) error { return nil } -// syncMirrors checks and syncs mirrors. -// FIXME: graceful: this should be a persistable queue -func syncMirrors(ctx context.Context) { - // Start listening on new sync requests. - for { - select { - case <-ctx.Done(): - mirrorQueue.Close() - return - case item := <-mirrorQueue.Queue(): - id, _ := strconv.ParseInt(item[5:], 10, 64) - if strings.HasPrefix(item, "pull") { - _ = SyncPullMirror(ctx, id) - } else if strings.HasPrefix(item, "push") { - _ = SyncPushMirror(ctx, id) - } else { - log.Error("Unknown item in queue: %v", item) - } - mirrorQueue.Remove(item) - } +func queueHandle(data ...queue.Data) { + for _, datum := range data { + req := datum.(*SyncRequest) + doMirrorSync(graceful.GetManager().ShutdownContext(), req) } } @@ -96,7 +110,9 @@ func InitSyncMirrors() { if !setting.Mirror.Enabled { return } - go graceful.GetManager().RunWithShutdownContext(syncMirrors) + mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest)) + + go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run) } // StartToMirror adds repoID to mirror queue @@ -104,7 +120,15 @@ func StartToMirror(repoID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID)) + go func() { + err := mirrorQueue.Push(&SyncRequest{ + Type: PullMirrorType, + RepoID: repoID, + }) + if err != nil { + log.Error("Unable to push sync request for to the queue for push mirror repo[%d]: Error: %v", repoID, err) + } + }() } // AddPushMirrorToQueue adds the push mirror to the queue @@ -112,5 +136,13 @@ func AddPushMirrorToQueue(mirrorID int64) { if !setting.Mirror.Enabled { return } - go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID)) + go func() { + err := mirrorQueue.Push(&SyncRequest{ + Type: PushMirrorType, + RepoID: mirrorID, + }) + if err != nil { + log.Error("Unable to push sync request to the queue for pull mirror repo[%d]: Error: %v", mirrorID, err) + } + }() }