diff --git a/go.mod b/go.mod index 968c0663e0..1996d29a84 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/prometheus/client_golang v1.16.0 github.com/quasoft/websspi v1.1.2 github.com/redis/go-redis/v9 v9.0.5 + github.com/robfig/cron/v3 v3.0.1 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/sassoftware/go-rpmutils v0.2.0 github.com/sergi/go-diff v1.3.1 @@ -254,7 +255,6 @@ require ( github.com/rhysd/actionlint v1.6.25 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/robfig/cron v1.2.0 // indirect - github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/rs/xid v1.5.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect diff --git a/models/actions/schedule.go b/models/actions/schedule.go new file mode 100644 index 0000000000..b0bc40dadc --- /dev/null +++ b/models/actions/schedule.go @@ -0,0 +1,120 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "time" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/timeutil" + webhook_module "code.gitea.io/gitea/modules/webhook" + + "github.com/robfig/cron/v3" +) + +// ActionSchedule represents a schedule of a workflow file +type ActionSchedule struct { + ID int64 + Title string + Specs []string + RepoID int64 `xorm:"index"` + Repo *repo_model.Repository `xorm:"-"` + OwnerID int64 `xorm:"index"` + WorkflowID string + TriggerUserID int64 + TriggerUser *user_model.User `xorm:"-"` + Ref string + CommitSHA string + Event webhook_module.HookEventType + EventPayload string `xorm:"LONGTEXT"` + Content []byte + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` +} + +func init() { + db.RegisterModel(new(ActionSchedule)) +} + +// GetSchedulesMapByIDs returns the schedules by given id slice. +func GetSchedulesMapByIDs(ids []int64) (map[int64]*ActionSchedule, error) { + schedules := make(map[int64]*ActionSchedule, len(ids)) + return schedules, db.GetEngine(db.DefaultContext).In("id", ids).Find(&schedules) +} + +// GetReposMapByIDs returns the repos by given id slice. +func GetReposMapByIDs(ids []int64) (map[int64]*repo_model.Repository, error) { + repos := make(map[int64]*repo_model.Repository, len(ids)) + return repos, db.GetEngine(db.DefaultContext).In("id", ids).Find(&repos) +} + +var cronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + +// CreateScheduleTask creates new schedule task. +func CreateScheduleTask(ctx context.Context, rows []*ActionSchedule) error { + // Return early if there are no rows to insert + if len(rows) == 0 { + return nil + } + + // Begin transaction + ctx, committer, err := db.TxContext(ctx) + if err != nil { + return err + } + defer committer.Close() + + // Loop through each schedule row + for _, row := range rows { + // Create new schedule row + if err = db.Insert(ctx, row); err != nil { + return err + } + + // Loop through each schedule spec and create a new spec row + now := time.Now() + + for _, spec := range row.Specs { + // Parse the spec and check for errors + schedule, err := cronParser.Parse(spec) + if err != nil { + continue // skip to the next spec if there's an error + } + + // Insert the new schedule spec row + if err = db.Insert(ctx, &ActionScheduleSpec{ + RepoID: row.RepoID, + ScheduleID: row.ID, + Spec: spec, + Next: timeutil.TimeStamp(schedule.Next(now).Unix()), + }); err != nil { + return err + } + } + } + + // Commit transaction + return committer.Commit() +} + +func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error { + ctx, committer, err := db.TxContext(ctx) + if err != nil { + return err + } + defer committer.Close() + + if _, err := db.GetEngine(ctx).Delete(&ActionSchedule{RepoID: id}); err != nil { + return err + } + + if _, err := db.GetEngine(ctx).Delete(&ActionScheduleSpec{RepoID: id}); err != nil { + return err + } + + return committer.Commit() +} diff --git a/models/actions/schedule_list.go b/models/actions/schedule_list.go new file mode 100644 index 0000000000..e873c05ec3 --- /dev/null +++ b/models/actions/schedule_list.go @@ -0,0 +1,94 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/container" + + "xorm.io/builder" +) + +type ScheduleList []*ActionSchedule + +// GetUserIDs returns a slice of user's id +func (schedules ScheduleList) GetUserIDs() []int64 { + ids := make(container.Set[int64], len(schedules)) + for _, schedule := range schedules { + ids.Add(schedule.TriggerUserID) + } + return ids.Values() +} + +func (schedules ScheduleList) GetRepoIDs() []int64 { + ids := make(container.Set[int64], len(schedules)) + for _, schedule := range schedules { + ids.Add(schedule.RepoID) + } + return ids.Values() +} + +func (schedules ScheduleList) LoadTriggerUser(ctx context.Context) error { + userIDs := schedules.GetUserIDs() + users := make(map[int64]*user_model.User, len(userIDs)) + if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil { + return err + } + for _, schedule := range schedules { + if schedule.TriggerUserID == user_model.ActionsUserID { + schedule.TriggerUser = user_model.NewActionsUser() + } else { + schedule.TriggerUser = users[schedule.TriggerUserID] + } + } + return nil +} + +func (schedules ScheduleList) LoadRepos() error { + repoIDs := schedules.GetRepoIDs() + repos, err := repo_model.GetRepositoriesMapByIDs(repoIDs) + if err != nil { + return err + } + for _, schedule := range schedules { + schedule.Repo = repos[schedule.RepoID] + } + return nil +} + +type FindScheduleOptions struct { + db.ListOptions + RepoID int64 + OwnerID int64 +} + +func (opts FindScheduleOptions) toConds() builder.Cond { + cond := builder.NewCond() + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + } + if opts.OwnerID > 0 { + cond = cond.And(builder.Eq{"owner_id": opts.OwnerID}) + } + + return cond +} + +func FindSchedules(ctx context.Context, opts FindScheduleOptions) (ScheduleList, int64, error) { + e := db.GetEngine(ctx).Where(opts.toConds()) + if !opts.ListAll && opts.PageSize > 0 && opts.Page >= 1 { + e.Limit(opts.PageSize, (opts.Page-1)*opts.PageSize) + } + var schedules ScheduleList + total, err := e.Desc("id").FindAndCount(&schedules) + return schedules, total, err +} + +func CountSchedules(ctx context.Context, opts FindScheduleOptions) (int64, error) { + return db.GetEngine(ctx).Where(opts.toConds()).Count(new(ActionSchedule)) +} diff --git a/models/actions/schedule_spec.go b/models/actions/schedule_spec.go new file mode 100644 index 0000000000..91240459a0 --- /dev/null +++ b/models/actions/schedule_spec.go @@ -0,0 +1,50 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/timeutil" + + "github.com/robfig/cron/v3" +) + +// ActionScheduleSpec represents a schedule spec of a workflow file +type ActionScheduleSpec struct { + ID int64 + RepoID int64 `xorm:"index"` + Repo *repo_model.Repository `xorm:"-"` + ScheduleID int64 `xorm:"index"` + Schedule *ActionSchedule `xorm:"-"` + + // Next time the job will run, or the zero time if Cron has not been + // started or this entry's schedule is unsatisfiable + Next timeutil.TimeStamp `xorm:"index"` + // Prev is the last time this job was run, or the zero time if never. + Prev timeutil.TimeStamp + Spec string + + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` +} + +func (s *ActionScheduleSpec) Parse() (cron.Schedule, error) { + return cronParser.Parse(s.Spec) +} + +func init() { + db.RegisterModel(new(ActionScheduleSpec)) +} + +func UpdateScheduleSpec(ctx context.Context, spec *ActionScheduleSpec, cols ...string) error { + sess := db.GetEngine(ctx).ID(spec.ID) + if len(cols) > 0 { + sess.Cols(cols...) + } + _, err := sess.Update(spec) + return err +} diff --git a/models/actions/schedule_spec_list.go b/models/actions/schedule_spec_list.go new file mode 100644 index 0000000000..d379490b4e --- /dev/null +++ b/models/actions/schedule_spec_list.go @@ -0,0 +1,106 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/container" + + "xorm.io/builder" +) + +type SpecList []*ActionScheduleSpec + +func (specs SpecList) GetScheduleIDs() []int64 { + ids := make(container.Set[int64], len(specs)) + for _, spec := range specs { + ids.Add(spec.ScheduleID) + } + return ids.Values() +} + +func (specs SpecList) LoadSchedules() error { + scheduleIDs := specs.GetScheduleIDs() + schedules, err := GetSchedulesMapByIDs(scheduleIDs) + if err != nil { + return err + } + for _, spec := range specs { + spec.Schedule = schedules[spec.ScheduleID] + } + + repoIDs := specs.GetRepoIDs() + repos, err := GetReposMapByIDs(repoIDs) + if err != nil { + return err + } + for _, spec := range specs { + spec.Repo = repos[spec.RepoID] + } + + return nil +} + +func (specs SpecList) GetRepoIDs() []int64 { + ids := make(container.Set[int64], len(specs)) + for _, spec := range specs { + ids.Add(spec.RepoID) + } + return ids.Values() +} + +func (specs SpecList) LoadRepos() error { + repoIDs := specs.GetRepoIDs() + repos, err := repo_model.GetRepositoriesMapByIDs(repoIDs) + if err != nil { + return err + } + for _, spec := range specs { + spec.Repo = repos[spec.RepoID] + } + return nil +} + +type FindSpecOptions struct { + db.ListOptions + RepoID int64 + Next int64 +} + +func (opts FindSpecOptions) toConds() builder.Cond { + cond := builder.NewCond() + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + } + + if opts.Next > 0 { + cond = cond.And(builder.Lte{"next": opts.Next}) + } + + return cond +} + +func FindSpecs(ctx context.Context, opts FindSpecOptions) (SpecList, int64, error) { + e := db.GetEngine(ctx).Where(opts.toConds()) + if opts.PageSize > 0 && opts.Page >= 1 { + e.Limit(opts.PageSize, (opts.Page-1)*opts.PageSize) + } + var specs SpecList + total, err := e.Desc("id").FindAndCount(&specs) + if err != nil { + return nil, 0, err + } + + if err := specs.LoadSchedules(); err != nil { + return nil, 0, err + } + return specs, total, nil +} + +func CountSpecs(ctx context.Context, opts FindSpecOptions) (int64, error) { + return db.GetEngine(ctx).Where(opts.toConds()).Count(new(ActionScheduleSpec)) +} diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 87c597b573..9f4acda236 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -526,6 +526,8 @@ var migrations = []Migration{ NewMigration("Allow archiving labels", v1_21.AddArchivedUnixColumInLabelTable), // v272 -> v273 NewMigration("Add Version to ActionRun table", v1_21.AddVersionToActionRunTable), + // v273 -> v274 + NewMigration("Add Action Schedule Table", v1_21.AddActionScheduleTable), } // GetCurrentDBVersion returns the current db version diff --git a/models/migrations/v1_21/v273.go b/models/migrations/v1_21/v273.go new file mode 100644 index 0000000000..61c79f4a76 --- /dev/null +++ b/models/migrations/v1_21/v273.go @@ -0,0 +1,45 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_21 //nolint +import ( + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/xorm" +) + +func AddActionScheduleTable(x *xorm.Engine) error { + type ActionSchedule struct { + ID int64 + Title string + Specs []string + RepoID int64 `xorm:"index"` + OwnerID int64 `xorm:"index"` + WorkflowID string + TriggerUserID int64 + Ref string + CommitSHA string + Event string + EventPayload string `xorm:"LONGTEXT"` + Content []byte + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` + } + + type ActionScheduleSpec struct { + ID int64 + RepoID int64 `xorm:"index"` + ScheduleID int64 `xorm:"index"` + Spec string + Next timeutil.TimeStamp `xorm:"index"` + Prev timeutil.TimeStamp + + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated"` + } + + return x.Sync( + new(ActionSchedule), + new(ActionScheduleSpec), + ) +} diff --git a/models/repo.go b/models/repo.go index 7579d2ad73..74a88d4c48 100644 --- a/models/repo.go +++ b/models/repo.go @@ -170,6 +170,8 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error { &actions_model.ActionRunJob{RepoID: repoID}, &actions_model.ActionRun{RepoID: repoID}, &actions_model.ActionRunner{RepoID: repoID}, + &actions_model.ActionScheduleSpec{RepoID: repoID}, + &actions_model.ActionSchedule{RepoID: repoID}, &actions_model.ActionArtifact{RepoID: repoID}, ); err != nil { return fmt.Errorf("deleteBeans: %w", err) diff --git a/modules/actions/workflows.go b/modules/actions/workflows.go index de340a74ec..408fdb8f8e 100644 --- a/modules/actions/workflows.go +++ b/modules/actions/workflows.go @@ -95,18 +95,25 @@ func GetEventsFromContent(content []byte) ([]*jobparser.Event, error) { return events, nil } -func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent webhook_module.HookEventType, payload api.Payloader) ([]*DetectedWorkflow, error) { +func DetectWorkflows( + gitRepo *git.Repository, + commit *git.Commit, + triggedEvent webhook_module.HookEventType, + payload api.Payloader, +) ([]*DetectedWorkflow, []*DetectedWorkflow, error) { entries, err := ListWorkflows(commit) if err != nil { - return nil, err + return nil, nil, err } workflows := make([]*DetectedWorkflow, 0, len(entries)) + schedules := make([]*DetectedWorkflow, 0, len(entries)) for _, entry := range entries { content, err := GetContentFromEntry(entry) if err != nil { - return nil, err + return nil, nil, err } + events, err := GetEventsFromContent(content) if err != nil { log.Warn("ignore invalid workflow %q: %v", entry.Name(), err) @@ -114,6 +121,14 @@ func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent w } for _, evt := range events { log.Trace("detect workflow %q for event %#v matching %q", entry.Name(), evt, triggedEvent) + if evt.IsSchedule() { + dwf := &DetectedWorkflow{ + EntryName: entry.Name(), + TriggerEvent: evt.Name, + Content: content, + } + schedules = append(schedules, dwf) + } if detectMatched(gitRepo, commit, triggedEvent, payload, evt) { dwf := &DetectedWorkflow{ EntryName: entry.Name(), @@ -125,7 +140,7 @@ func DetectWorkflows(gitRepo *git.Repository, commit *git.Commit, triggedEvent w } } - return workflows, nil + return workflows, schedules, nil } func detectMatched(gitRepo *git.Repository, commit *git.Commit, triggedEvent webhook_module.HookEventType, payload api.Payloader, evt *jobparser.Event) bool { diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index 587a2b14bc..f08d2b7eae 100644 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -2756,6 +2756,7 @@ dashboard.gc_lfs = Garbage collect LFS meta objects dashboard.stop_zombie_tasks = Stop zombie tasks dashboard.stop_endless_tasks = Stop endless tasks dashboard.cancel_abandoned_jobs = Cancel abandoned jobs +dashboard.start_schedule_tasks = Start schedule tasks dashboard.sync_branch.started = Branches Sync started dashboard.rebuild_issue_indexer = Rebuild issue indexer diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 75c99ff19c..ff00e48c64 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -4,6 +4,7 @@ package actions import ( + "bytes" "context" "fmt" "strings" @@ -24,6 +25,7 @@ import ( "code.gitea.io/gitea/services/convert" "github.com/nektos/act/pkg/jobparser" + "github.com/nektos/act/pkg/model" ) var methodCtxKey struct{} @@ -143,15 +145,15 @@ func notify(ctx context.Context, input *notifyInput) error { } var detectedWorkflows []*actions_module.DetectedWorkflow - workflows, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload) + actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig() + workflows, schedules, err := actions_module.DetectWorkflows(gitRepo, commit, input.Event, input.Payload) if err != nil { return fmt.Errorf("DetectWorkflows: %w", err) } + if len(workflows) == 0 { log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID) } else { - actionsConfig := input.Repo.MustGetUnit(ctx, unit_model.TypeActions).ActionsConfig() - for _, wf := range workflows { if actionsConfig.IsWorkflowDisabled(wf.EntryName) { log.Trace("repo %s has disable workflows %s", input.Repo.RepoPath(), wf.EntryName) @@ -171,7 +173,7 @@ func notify(ctx context.Context, input *notifyInput) error { if err != nil { return fmt.Errorf("gitRepo.GetCommit: %w", err) } - baseWorkflows, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload) + baseWorkflows, _, err := actions_module.DetectWorkflows(gitRepo, baseCommit, input.Event, input.Payload) if err != nil { return fmt.Errorf("DetectWorkflows: %w", err) } @@ -186,7 +188,22 @@ func notify(ctx context.Context, input *notifyInput) error { } } + if err := handleSchedules(ctx, schedules, commit, input); err != nil { + return err + } + + return handleWorkflows(ctx, detectedWorkflows, commit, input, ref) +} + +func handleWorkflows( + ctx context.Context, + detectedWorkflows []*actions_module.DetectedWorkflow, + commit *git.Commit, + input *notifyInput, + ref string, +) error { if len(detectedWorkflows) == 0 { + log.Trace("repo %s with commit %s couldn't find workflows", input.Repo.RepoPath(), commit.ID) return nil } @@ -350,3 +367,86 @@ func ifNeedApproval(ctx context.Context, run *actions_model.ActionRun, repo *rep log.Trace("need approval because it's the first time user %d triggered actions", user.ID) return true, nil } + +func handleSchedules( + ctx context.Context, + detectedWorkflows []*actions_module.DetectedWorkflow, + commit *git.Commit, + input *notifyInput, +) error { + if len(detectedWorkflows) == 0 { + log.Trace("repo %s with commit %s couldn't find schedules", input.Repo.RepoPath(), commit.ID) + return nil + } + + branch, err := commit.GetBranchName() + if err != nil { + return err + } + if branch != input.Repo.DefaultBranch { + log.Trace("commit branch is not default branch in repo") + return nil + } + + rows, _, err := actions_model.FindSchedules(ctx, actions_model.FindScheduleOptions{RepoID: input.Repo.ID}) + if err != nil { + log.Error("FindCrons: %v", err) + return err + } + + if len(rows) > 0 { + if err := actions_model.DeleteScheduleTaskByRepo(ctx, input.Repo.ID); err != nil { + log.Error("DeleteCronTaskByRepo: %v", err) + } + } + + p, err := json.Marshal(input.Payload) + if err != nil { + return fmt.Errorf("json.Marshal: %w", err) + } + + crons := make([]*actions_model.ActionSchedule, 0, len(detectedWorkflows)) + for _, dwf := range detectedWorkflows { + // Check cron job condition. Only working in default branch + workflow, err := model.ReadWorkflow(bytes.NewReader(dwf.Content)) + if err != nil { + log.Error("ReadWorkflow: %v", err) + continue + } + schedules := workflow.OnSchedule() + if len(schedules) == 0 { + log.Warn("no schedule event") + continue + } + + run := &actions_model.ActionSchedule{ + Title: strings.SplitN(commit.CommitMessage, "\n", 2)[0], + RepoID: input.Repo.ID, + OwnerID: input.Repo.OwnerID, + WorkflowID: dwf.EntryName, + TriggerUserID: input.Doer.ID, + Ref: input.Ref, + CommitSHA: commit.ID.String(), + Event: input.Event, + EventPayload: string(p), + Specs: schedules, + Content: dwf.Content, + } + + // cancel running jobs if the event is push + if run.Event == webhook_module.HookEventPush { + // cancel running jobs of the same workflow + if err := actions_model.CancelRunningJobs( + ctx, + run.RepoID, + run.Ref, + run.WorkflowID, + ); err != nil { + log.Error("CancelRunningJobs: %v", err) + } + } + crons = append(crons, run) + } + + return actions_model.CreateScheduleTask(ctx, crons) +} diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go new file mode 100644 index 0000000000..87131e0aab --- /dev/null +++ b/services/actions/schedule_tasks.go @@ -0,0 +1,135 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + "time" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/timeutil" + webhook_module "code.gitea.io/gitea/modules/webhook" + + "github.com/nektos/act/pkg/jobparser" +) + +// StartScheduleTasks start the task +func StartScheduleTasks(ctx context.Context) error { + return startTasks(ctx) +} + +// startTasks retrieves specifications in pages, creates a schedule task for each specification, +// and updates the specification's next run time and previous run time. +// The function returns an error if there's an issue with finding or updating the specifications. +func startTasks(ctx context.Context) error { + // Set the page size + pageSize := 50 + + // Retrieve specs in pages until all specs have been retrieved + now := time.Now() + for page := 1; ; page++ { + // Retrieve the specs for the current page + specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{ + ListOptions: db.ListOptions{ + Page: page, + PageSize: pageSize, + }, + Next: now.Unix(), + }) + if err != nil { + return fmt.Errorf("find specs: %w", err) + } + + // Loop through each spec and create a schedule task for it + for _, row := range specs { + // cancel running jobs if the event is push + if row.Schedule.Event == webhook_module.HookEventPush { + // cancel running jobs of the same workflow + if err := actions_model.CancelRunningJobs( + ctx, + row.RepoID, + row.Schedule.Ref, + row.Schedule.WorkflowID, + ); err != nil { + log.Error("CancelRunningJobs: %v", err) + } + } + + if err := CreateScheduleTask(ctx, row.Schedule); err != nil { + log.Error("CreateScheduleTask: %v", err) + return err + } + + // Parse the spec + schedule, err := row.Parse() + if err != nil { + log.Error("Parse: %v", err) + return err + } + + // Update the spec's next run time and previous run time + row.Prev = row.Next + row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix()) + if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil { + log.Error("UpdateScheduleSpec: %v", err) + return err + } + } + + // Stop if all specs have been retrieved + if len(specs) < pageSize { + break + } + } + + return nil +} + +// CreateScheduleTask creates a scheduled task from a cron action schedule. +// It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job. +func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error { + // Create a new action run based on the schedule + run := &actions_model.ActionRun{ + Title: cron.Title, + RepoID: cron.RepoID, + OwnerID: cron.OwnerID, + WorkflowID: cron.WorkflowID, + TriggerUserID: cron.TriggerUserID, + Ref: cron.Ref, + CommitSHA: cron.CommitSHA, + Event: cron.Event, + EventPayload: cron.EventPayload, + Status: actions_model.StatusWaiting, + } + + // Parse the workflow specification from the cron schedule + workflows, err := jobparser.Parse(cron.Content) + if err != nil { + return err + } + + // Insert the action run and its associated jobs into the database + if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + return err + } + + // Retrieve the jobs for the newly created action run + jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + return err + } + + // Create commit statuses for each job + for _, job := range jobs { + if err := createCommitStatus(ctx, job); err != nil { + return err + } + } + + // Return nil if no errors occurred + return nil +} diff --git a/services/cron/tasks_actions.go b/services/cron/tasks_actions.go index 30e8749a5e..0875792503 100644 --- a/services/cron/tasks_actions.go +++ b/services/cron/tasks_actions.go @@ -18,6 +18,7 @@ func initActionsTasks() { registerStopZombieTasks() registerStopEndlessTasks() registerCancelAbandonedJobs() + registerScheduleTasks() } func registerStopZombieTasks() { @@ -49,3 +50,16 @@ func registerCancelAbandonedJobs() { return actions_service.CancelAbandonedJobs(ctx) }) } + +// registerScheduleTasks registers a scheduled task that runs every minute to start any due schedule tasks. +func registerScheduleTasks() { + // Register the task with a unique name, enabled status, and schedule for every minute. + RegisterTaskFatal("start_schedule_tasks", &BaseConfig{ + Enabled: true, + RunAtStart: false, + Schedule: "@every 1m", + }, func(ctx context.Context, _ *user_model.User, cfg Config) error { + // Call the function to start schedule tasks and pass the context. + return actions_service.StartScheduleTasks(ctx) + }) +}