// Copyright 2019 Gitea. All rights reserved. // SPDX-License-Identifier: MIT package task import ( "fmt" admin_model "code.gitea.io/gitea/models/admin" repo_model "code.gitea.io/gitea/models/repo" user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" base "code.gitea.io/gitea/modules/migration" "code.gitea.io/gitea/modules/queue" repo_module "code.gitea.io/gitea/modules/repository" "code.gitea.io/gitea/modules/secret" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" ) // taskQueue is a global queue of tasks var taskQueue *queue.WorkerPoolQueue[*admin_model.Task] // Run a task func Run(t *admin_model.Task) error { switch t.Type { case structs.TaskTypeMigrateRepo: return runMigrateTask(t) default: return fmt.Errorf("Unknown task type: %d", t.Type) } } // Init will start the service to get all unfinished tasks and run them func Init() error { taskQueue = queue.CreateSimpleQueue("task", handler) if taskQueue == nil { return fmt.Errorf("Unable to create Task Queue") } go graceful.GetManager().RunWithShutdownFns(taskQueue.Run) return nil } func handler(items ...*admin_model.Task) []*admin_model.Task { for _, task := range items { if err := Run(task); err != nil { log.Error("Run task failed: %v", err) } } return nil } // MigrateRepository add migration repository to task func MigrateRepository(doer, u *user_model.User, opts base.MigrateOptions) error { task, err := CreateMigrateTask(doer, u, opts) if err != nil { return err } return taskQueue.Push(task) } // CreateMigrateTask creates a migrate task func CreateMigrateTask(doer, u *user_model.User, opts base.MigrateOptions) (*admin_model.Task, error) { // encrypt credentials for persistence var err error opts.CloneAddrEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.CloneAddr) if err != nil { return nil, err } opts.CloneAddr = util.SanitizeCredentialURLs(opts.CloneAddr) opts.AuthPasswordEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthPassword) if err != nil { return nil, err } opts.AuthPassword = "" opts.AuthTokenEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthToken) if err != nil { return nil, err } opts.AuthToken = "" bs, err := json.Marshal(&opts) if err != nil { return nil, err } task := &admin_model.Task{ DoerID: doer.ID, OwnerID: u.ID, Type: structs.TaskTypeMigrateRepo, Status: structs.TaskStatusQueued, PayloadContent: string(bs), } if err := admin_model.CreateTask(task); err != nil { return nil, err } repo, err := repo_module.CreateRepository(doer, u, repo_module.CreateRepoOptions{ Name: opts.RepoName, Description: opts.Description, OriginalURL: opts.OriginalURL, GitServiceType: opts.GitServiceType, IsPrivate: opts.Private, IsMirror: opts.Mirror, Status: repo_model.RepositoryBeingMigrated, }) if err != nil { task.EndTime = timeutil.TimeStampNow() task.Status = structs.TaskStatusFailed err2 := task.UpdateCols("end_time", "status") if err2 != nil { log.Error("UpdateCols Failed: %v", err2.Error()) } return nil, err } task.RepoID = repo.ID if err = task.UpdateCols("repo_id"); err != nil { return nil, err } return task, nil }