mirror of
				https://github.com/go-gitea/gitea
				synced 2025-09-28 03:28:13 +00:00 
			
		
		
		
	chore: move job emitter to service
This commit is contained in:
		| @@ -11,18 +11,11 @@ import ( | ||||
|  | ||||
| 	"code.gitea.io/gitea/models/webhook" | ||||
| 	"code.gitea.io/gitea/modules/git" | ||||
| 	"code.gitea.io/gitea/modules/graceful" | ||||
| 	"code.gitea.io/gitea/modules/log" | ||||
| 	"code.gitea.io/gitea/modules/queue" | ||||
|  | ||||
| 	"github.com/nektos/act/pkg/model" | ||||
| ) | ||||
|  | ||||
| func Init() { | ||||
| 	jobEmitterQueue = queue.CreateUniqueQueue("bots_ready_job", jobEmitterQueueHandle, new(jobUpdate)) | ||||
| 	go graceful.GetManager().RunWithShutdownFns(jobEmitterQueue.Run) | ||||
| } | ||||
|  | ||||
| func ListWorkflows(commit *git.Commit) (git.Entries, error) { | ||||
| 	tree, err := commit.SubTree(".gitea/workflows") | ||||
| 	if _, ok := err.(git.ErrNotExist); ok { | ||||
|   | ||||
| @@ -1,141 +0,0 @@ | ||||
| // Copyright 2022 The Gitea Authors. All rights reserved. | ||||
| // Use of this source code is governed by a MIT-style | ||||
| // license that can be found in the LICENSE file. | ||||
|  | ||||
| package bots | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
|  | ||||
| 	bots_model "code.gitea.io/gitea/models/bots" | ||||
| 	"code.gitea.io/gitea/models/db" | ||||
| 	"code.gitea.io/gitea/modules/graceful" | ||||
| 	"code.gitea.io/gitea/modules/queue" | ||||
|  | ||||
| 	"xorm.io/builder" | ||||
| ) | ||||
|  | ||||
| var jobEmitterQueue queue.UniqueQueue | ||||
|  | ||||
| type jobUpdate struct { | ||||
| 	RunID int64 | ||||
| } | ||||
|  | ||||
| func EmitJobsIfReady(runID int64) error { | ||||
| 	err := jobEmitterQueue.Push(&jobUpdate{ | ||||
| 		RunID: runID, | ||||
| 	}) | ||||
| 	if errors.Is(err, queue.ErrAlreadyInQueue) { | ||||
| 		return nil | ||||
| 	} | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func jobEmitterQueueHandle(data ...queue.Data) []queue.Data { | ||||
| 	ctx := graceful.GetManager().ShutdownContext() | ||||
| 	var ret []queue.Data | ||||
| 	for _, d := range data { | ||||
| 		update := d.(*jobUpdate) | ||||
| 		if err := checkJobsOfRun(ctx, update.RunID); err != nil { | ||||
| 			ret = append(ret, d) | ||||
| 		} | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
|  | ||||
| func checkJobsOfRun(ctx context.Context, runID int64) error { | ||||
| 	return db.WithTx(ctx, func(ctx context.Context) error { | ||||
| 		jobs, _, err := bots_model.FindRunJobs(ctx, bots_model.FindRunJobOptions{RunID: runID}) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		idToJobs := make(map[string][]*bots_model.RunJob, len(jobs)) | ||||
| 		for _, job := range jobs { | ||||
| 			idToJobs[job.JobID] = append(idToJobs[job.JobID], job) | ||||
| 		} | ||||
|  | ||||
| 		updates := newJobStatusResolver(jobs).Resolve() | ||||
| 		for _, job := range jobs { | ||||
| 			if status, ok := updates[job.ID]; ok { | ||||
| 				job.Status = status | ||||
| 				if n, err := bots_model.UpdateRunJob(ctx, job, builder.Eq{"status": bots_model.StatusBlocked}, "status"); err != nil { | ||||
| 					return err | ||||
| 				} else if n != 1 { | ||||
| 					return fmt.Errorf("no affected for updating blocked job %v", job.ID) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		return nil | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| type jobStatusResolver struct { | ||||
| 	statuses map[int64]bots_model.Status | ||||
| 	needs    map[int64][]int64 | ||||
| } | ||||
|  | ||||
| func newJobStatusResolver(jobs bots_model.RunJobList) *jobStatusResolver { | ||||
| 	idToJobs := make(map[string][]*bots_model.RunJob, len(jobs)) | ||||
| 	for _, job := range jobs { | ||||
| 		idToJobs[job.JobID] = append(idToJobs[job.JobID], job) | ||||
| 	} | ||||
|  | ||||
| 	statuses := make(map[int64]bots_model.Status, len(jobs)) | ||||
| 	needs := make(map[int64][]int64, len(jobs)) | ||||
| 	for _, job := range jobs { | ||||
| 		statuses[job.ID] = job.Status | ||||
| 		for _, need := range job.Needs { | ||||
| 			for _, v := range idToJobs[need] { | ||||
| 				needs[job.ID] = append(needs[job.ID], v.ID) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return &jobStatusResolver{ | ||||
| 		statuses: statuses, | ||||
| 		needs:    needs, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (r *jobStatusResolver) Resolve() map[int64]bots_model.Status { | ||||
| 	ret := map[int64]bots_model.Status{} | ||||
| 	for i := 0; i < len(r.statuses); i++ { | ||||
| 		updated := r.resolve() | ||||
| 		if len(updated) == 0 { | ||||
| 			return ret | ||||
| 		} | ||||
| 		for k, v := range updated { | ||||
| 			ret[k] = v | ||||
| 			r.statuses[k] = v | ||||
| 		} | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
|  | ||||
| func (r *jobStatusResolver) resolve() map[int64]bots_model.Status { | ||||
| 	ret := map[int64]bots_model.Status{} | ||||
| 	for id, status := range r.statuses { | ||||
| 		if status != bots_model.StatusBlocked { | ||||
| 			continue | ||||
| 		} | ||||
| 		allDone, allSucceed := true, true | ||||
| 		for _, need := range r.needs[id] { | ||||
| 			needStatus := r.statuses[need] | ||||
| 			if !needStatus.IsDone() { | ||||
| 				allDone = false | ||||
| 			} | ||||
| 			if needStatus.In(bots_model.StatusFailure, bots_model.StatusCancelled, bots_model.StatusSkipped) { | ||||
| 				allSucceed = false | ||||
| 			} | ||||
| 		} | ||||
| 		if allDone { | ||||
| 			if allSucceed { | ||||
| 				ret[id] = bots_model.StatusWaiting | ||||
| 			} else { | ||||
| 				ret[id] = bots_model.StatusSkipped | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| @@ -1,81 +0,0 @@ | ||||
| // Copyright 2022 The Gitea Authors. All rights reserved. | ||||
| // Use of this source code is governed by a MIT-style | ||||
| // license that can be found in the LICENSE file. | ||||
|  | ||||
| package bots | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
|  | ||||
| 	bots_model "code.gitea.io/gitea/models/bots" | ||||
|  | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
|  | ||||
| func Test_jobStatusResolver_Resolve(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name string | ||||
| 		jobs bots_model.RunJobList | ||||
| 		want map[int64]bots_model.Status | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name: "no blocked", | ||||
| 			jobs: bots_model.RunJobList{ | ||||
| 				{ID: 1, JobID: "1", Status: bots_model.StatusWaiting, Needs: []string{}}, | ||||
| 				{ID: 2, JobID: "2", Status: bots_model.StatusWaiting, Needs: []string{}}, | ||||
| 				{ID: 3, JobID: "3", Status: bots_model.StatusWaiting, Needs: []string{}}, | ||||
| 			}, | ||||
| 			want: map[int64]bots_model.Status{}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "single blocked", | ||||
| 			jobs: bots_model.RunJobList{ | ||||
| 				{ID: 1, JobID: "1", Status: bots_model.StatusSuccess, Needs: []string{}}, | ||||
| 				{ID: 2, JobID: "2", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, | ||||
| 				{ID: 3, JobID: "3", Status: bots_model.StatusWaiting, Needs: []string{}}, | ||||
| 			}, | ||||
| 			want: map[int64]bots_model.Status{ | ||||
| 				2: bots_model.StatusWaiting, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "multiple blocked", | ||||
| 			jobs: bots_model.RunJobList{ | ||||
| 				{ID: 1, JobID: "1", Status: bots_model.StatusSuccess, Needs: []string{}}, | ||||
| 				{ID: 2, JobID: "2", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, | ||||
| 				{ID: 3, JobID: "3", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, | ||||
| 			}, | ||||
| 			want: map[int64]bots_model.Status{ | ||||
| 				2: bots_model.StatusWaiting, | ||||
| 				3: bots_model.StatusWaiting, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "chain blocked", | ||||
| 			jobs: bots_model.RunJobList{ | ||||
| 				{ID: 1, JobID: "1", Status: bots_model.StatusFailure, Needs: []string{}}, | ||||
| 				{ID: 2, JobID: "2", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, | ||||
| 				{ID: 3, JobID: "3", Status: bots_model.StatusBlocked, Needs: []string{"2"}}, | ||||
| 			}, | ||||
| 			want: map[int64]bots_model.Status{ | ||||
| 				2: bots_model.StatusSkipped, | ||||
| 				3: bots_model.StatusSkipped, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "loop need", | ||||
| 			jobs: bots_model.RunJobList{ | ||||
| 				{ID: 1, JobID: "1", Status: bots_model.StatusBlocked, Needs: []string{"3"}}, | ||||
| 				{ID: 2, JobID: "2", Status: bots_model.StatusBlocked, Needs: []string{"1"}}, | ||||
| 				{ID: 3, JobID: "3", Status: bots_model.StatusBlocked, Needs: []string{"2"}}, | ||||
| 			}, | ||||
| 			want: map[int64]bots_model.Status{}, | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			r := newJobStatusResolver(tt.jobs) | ||||
| 			assert.Equal(t, tt.want, r.Resolve()) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
		Reference in New Issue
	
	Block a user