From df98452c0de9d01338f00aa5d85757623523b1fc Mon Sep 17 00:00:00 2001 From: Zettat123 Date: Fri, 27 Dec 2024 06:10:30 +0800 Subject: [PATCH] Improve Actions test (#32883) This PR adds a mock runner to test more actions features. --- tests/integration/actions_job_test.go | 410 +++++++++++++++++++++++ tests/integration/actions_log_test.go | 159 +++++++++ tests/integration/actions_runner_test.go | 157 +++++++++ 3 files changed, 726 insertions(+) create mode 100644 tests/integration/actions_job_test.go create mode 100644 tests/integration/actions_log_test.go create mode 100644 tests/integration/actions_runner_test.go diff --git a/tests/integration/actions_job_test.go b/tests/integration/actions_job_test.go new file mode 100644 index 0000000000..e13277678d --- /dev/null +++ b/tests/integration/actions_job_test.go @@ -0,0 +1,410 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package integration + +import ( + "encoding/base64" + "fmt" + "net/http" + "net/url" + "testing" + "time" + + actions_model "code.gitea.io/gitea/models/actions" + auth_model "code.gitea.io/gitea/models/auth" + "code.gitea.io/gitea/models/unittest" + user_model "code.gitea.io/gitea/models/user" + api "code.gitea.io/gitea/modules/structs" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/stretchr/testify/assert" +) + +func TestJobWithNeeds(t *testing.T) { + testCases := []struct { + treePath string + fileContent string + outcomes map[string]*mockTaskOutcome + expectedStatuses map[string]string + }{ + { + treePath: ".gitea/workflows/job-with-needs.yml", + fileContent: `name: job-with-needs +on: + push: + paths: + - '.gitea/workflows/job-with-needs.yml' +jobs: + job1: + runs-on: ubuntu-latest + steps: + - run: echo job1 + job2: + runs-on: ubuntu-latest + needs: [job1] + steps: + - run: echo job2 +`, + outcomes: map[string]*mockTaskOutcome{ + "job1": { + result: runnerv1.Result_RESULT_SUCCESS, + }, + "job2": { + result: runnerv1.Result_RESULT_SUCCESS, + }, + }, + expectedStatuses: map[string]string{ + "job1": actions_model.StatusSuccess.String(), + "job2": actions_model.StatusSuccess.String(), + }, + }, + { + treePath: ".gitea/workflows/job-with-needs-fail.yml", + fileContent: `name: job-with-needs-fail +on: + push: + paths: + - '.gitea/workflows/job-with-needs-fail.yml' +jobs: + job1: + runs-on: ubuntu-latest + steps: + - run: echo job1 + job2: + runs-on: ubuntu-latest + needs: [job1] + steps: + - run: echo job2 +`, + outcomes: map[string]*mockTaskOutcome{ + "job1": { + result: runnerv1.Result_RESULT_FAILURE, + }, + }, + expectedStatuses: map[string]string{ + "job1": actions_model.StatusFailure.String(), + "job2": actions_model.StatusSkipped.String(), + }, + }, + { + treePath: ".gitea/workflows/job-with-needs-fail-if.yml", + fileContent: `name: job-with-needs-fail-if +on: + push: + paths: + - '.gitea/workflows/job-with-needs-fail-if.yml' +jobs: + job1: + runs-on: ubuntu-latest + steps: + - run: echo job1 + job2: + runs-on: ubuntu-latest + if: ${{ always() }} + needs: [job1] + steps: + - run: echo job2 +`, + outcomes: map[string]*mockTaskOutcome{ + "job1": { + result: runnerv1.Result_RESULT_FAILURE, + }, + "job2": { + result: runnerv1.Result_RESULT_SUCCESS, + }, + }, + expectedStatuses: map[string]string{ + "job1": actions_model.StatusFailure.String(), + "job2": actions_model.StatusSuccess.String(), + }, + }, + } + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-jobs-with-needs", false) + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}) + + for _, tc := range testCases { + t.Run(fmt.Sprintf("test %s", tc.treePath), func(t *testing.T) { + // create the workflow file + opts := getWorkflowCreateFileOptions(user2, apiRepo.DefaultBranch, fmt.Sprintf("create %s", tc.treePath), tc.fileContent) + fileResp := createWorkflowFile(t, token, user2.Name, apiRepo.Name, tc.treePath, opts) + + // fetch and execute task + for i := 0; i < len(tc.outcomes); i++ { + task := runner.fetchTask(t) + jobName := getTaskJobNameByTaskID(t, token, user2.Name, apiRepo.Name, task.Id) + outcome := tc.outcomes[jobName] + assert.NotNil(t, outcome) + runner.execTask(t, task, outcome) + } + + // check result + req := NewRequest(t, "GET", fmt.Sprintf("/api/v1/repos/%s/%s/actions/tasks", user2.Name, apiRepo.Name)). + AddTokenAuth(token) + resp := MakeRequest(t, req, http.StatusOK) + var actionTaskRespAfter api.ActionTaskResponse + DecodeJSON(t, resp, &actionTaskRespAfter) + for _, apiTask := range actionTaskRespAfter.Entries { + if apiTask.HeadSHA != fileResp.Commit.SHA { + continue + } + status := apiTask.Status + assert.Equal(t, status, tc.expectedStatuses[apiTask.Name]) + } + }) + } + + httpContext := NewAPITestContext(t, user2.Name, apiRepo.Name, auth_model.AccessTokenScopeWriteRepository) + doAPIDeleteRepository(httpContext)(t) + }) +} + +func TestJobNeedsMatrix(t *testing.T) { + testCases := []struct { + treePath string + fileContent string + outcomes map[string]*mockTaskOutcome + expectedTaskNeeds map[string]*runnerv1.TaskNeed // jobID => TaskNeed + }{ + { + treePath: ".gitea/workflows/jobs-outputs-with-matrix.yml", + fileContent: `name: jobs-outputs-with-matrix +on: + push: + paths: + - '.gitea/workflows/jobs-outputs-with-matrix.yml' +jobs: + job1: + runs-on: ubuntu-latest + outputs: + output_1: ${{ steps.gen_output.outputs.output_1 }} + output_2: ${{ steps.gen_output.outputs.output_2 }} + output_3: ${{ steps.gen_output.outputs.output_3 }} + strategy: + matrix: + version: [1, 2, 3] + steps: + - name: Generate output + id: gen_output + run: | + version="${{ matrix.version }}" + echo "output_${version}=${version}" >> "$GITHUB_OUTPUT" + job2: + runs-on: ubuntu-latest + needs: [job1] + steps: + - run: echo '${{ toJSON(needs.job1.outputs) }}' +`, + outcomes: map[string]*mockTaskOutcome{ + "job1 (1)": { + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "output_1": "1", + "output_2": "", + "output_3": "", + }, + }, + "job1 (2)": { + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "output_1": "", + "output_2": "2", + "output_3": "", + }, + }, + "job1 (3)": { + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "output_1": "", + "output_2": "", + "output_3": "3", + }, + }, + }, + expectedTaskNeeds: map[string]*runnerv1.TaskNeed{ + "job1": { + Result: runnerv1.Result_RESULT_SUCCESS, + Outputs: map[string]string{ + "output_1": "1", + "output_2": "2", + "output_3": "3", + }, + }, + }, + }, + { + treePath: ".gitea/workflows/jobs-outputs-with-matrix-failure.yml", + fileContent: `name: jobs-outputs-with-matrix-failure +on: + push: + paths: + - '.gitea/workflows/jobs-outputs-with-matrix-failure.yml' +jobs: + job1: + runs-on: ubuntu-latest + outputs: + output_1: ${{ steps.gen_output.outputs.output_1 }} + output_2: ${{ steps.gen_output.outputs.output_2 }} + output_3: ${{ steps.gen_output.outputs.output_3 }} + strategy: + matrix: + version: [1, 2, 3] + steps: + - name: Generate output + id: gen_output + run: | + version="${{ matrix.version }}" + echo "output_${version}=${version}" >> "$GITHUB_OUTPUT" + job2: + runs-on: ubuntu-latest + if: ${{ always() }} + needs: [job1] + steps: + - run: echo '${{ toJSON(needs.job1.outputs) }}' +`, + outcomes: map[string]*mockTaskOutcome{ + "job1 (1)": { + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "output_1": "1", + "output_2": "", + "output_3": "", + }, + }, + "job1 (2)": { + result: runnerv1.Result_RESULT_FAILURE, + outputs: map[string]string{ + "output_1": "", + "output_2": "", + "output_3": "", + }, + }, + "job1 (3)": { + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "output_1": "", + "output_2": "", + "output_3": "3", + }, + }, + }, + expectedTaskNeeds: map[string]*runnerv1.TaskNeed{ + "job1": { + Result: runnerv1.Result_RESULT_FAILURE, + Outputs: map[string]string{ + "output_1": "1", + "output_2": "", + "output_3": "3", + }, + }, + }, + }, + } + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-jobs-outputs-with-matrix", false) + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}) + + for _, tc := range testCases { + t.Run(fmt.Sprintf("test %s", tc.treePath), func(t *testing.T) { + opts := getWorkflowCreateFileOptions(user2, apiRepo.DefaultBranch, fmt.Sprintf("create %s", tc.treePath), tc.fileContent) + createWorkflowFile(t, token, user2.Name, apiRepo.Name, tc.treePath, opts) + + for i := 0; i < len(tc.outcomes); i++ { + task := runner.fetchTask(t) + jobName := getTaskJobNameByTaskID(t, token, user2.Name, apiRepo.Name, task.Id) + outcome := tc.outcomes[jobName] + assert.NotNil(t, outcome) + runner.execTask(t, task, outcome) + } + + task := runner.fetchTask(t) + actualTaskNeeds := task.Needs + assert.Len(t, actualTaskNeeds, len(tc.expectedTaskNeeds)) + for jobID, tn := range tc.expectedTaskNeeds { + actualNeed := actualTaskNeeds[jobID] + assert.Equal(t, tn.Result, actualNeed.Result) + assert.Len(t, actualNeed.Outputs, len(tn.Outputs)) + for outputKey, outputValue := range tn.Outputs { + assert.Equal(t, outputValue, actualNeed.Outputs[outputKey]) + } + } + }) + } + + httpContext := NewAPITestContext(t, user2.Name, apiRepo.Name, auth_model.AccessTokenScopeWriteRepository) + doAPIDeleteRepository(httpContext)(t) + }) +} + +func createActionsTestRepo(t *testing.T, authToken, repoName string, isPrivate bool) *api.Repository { + req := NewRequestWithJSON(t, "POST", "/api/v1/user/repos", &api.CreateRepoOption{ + Name: repoName, + Private: isPrivate, + Readme: "Default", + AutoInit: true, + DefaultBranch: "main", + }).AddTokenAuth(authToken) + resp := MakeRequest(t, req, http.StatusCreated) + var apiRepo api.Repository + DecodeJSON(t, resp, &apiRepo) + return &apiRepo +} + +func getWorkflowCreateFileOptions(u *user_model.User, branch, msg, content string) *api.CreateFileOptions { + return &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + BranchName: branch, + Message: msg, + Author: api.Identity{ + Name: u.Name, + Email: u.Email, + }, + Committer: api.Identity{ + Name: u.Name, + Email: u.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte(content)), + } +} + +func createWorkflowFile(t *testing.T, authToken, ownerName, repoName, treePath string, opts *api.CreateFileOptions) *api.FileResponse { + req := NewRequestWithJSON(t, "POST", fmt.Sprintf("/api/v1/repos/%s/%s/contents/%s", ownerName, repoName, treePath), opts). + AddTokenAuth(authToken) + resp := MakeRequest(t, req, http.StatusCreated) + var fileResponse api.FileResponse + DecodeJSON(t, resp, &fileResponse) + return &fileResponse +} + +// getTaskJobNameByTaskID get the job name of the task by task ID +// there is currently not an API for querying a task by ID so we have to list all the tasks +func getTaskJobNameByTaskID(t *testing.T, authToken, ownerName, repoName string, taskID int64) string { + // FIXME: we may need to query several pages + req := NewRequest(t, "GET", fmt.Sprintf("/api/v1/repos/%s/%s/actions/tasks", ownerName, repoName)). + AddTokenAuth(authToken) + resp := MakeRequest(t, req, http.StatusOK) + var taskRespBefore api.ActionTaskResponse + DecodeJSON(t, resp, &taskRespBefore) + for _, apiTask := range taskRespBefore.Entries { + if apiTask.ID == taskID { + return apiTask.Name + } + } + return "" +} diff --git a/tests/integration/actions_log_test.go b/tests/integration/actions_log_test.go new file mode 100644 index 0000000000..fd055fc4c4 --- /dev/null +++ b/tests/integration/actions_log_test.go @@ -0,0 +1,159 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package integration + +import ( + "fmt" + "net/http" + "net/url" + "strings" + "testing" + "time" + + auth_model "code.gitea.io/gitea/models/auth" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/unittest" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/storage" + "code.gitea.io/gitea/modules/test" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestDownloadTaskLogs(t *testing.T) { + now := time.Now() + testCases := []struct { + treePath string + fileContent string + outcome *mockTaskOutcome + zstdEnabled bool + }{ + { + treePath: ".gitea/workflows/download-task-logs-zstd.yml", + fileContent: `name: download-task-logs-zstd +on: + push: + paths: + - '.gitea/workflows/download-task-logs-zstd.yml' +jobs: + job1: + runs-on: ubuntu-latest + steps: + - run: echo job1 with zstd enabled +`, + outcome: &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + logRows: []*runnerv1.LogRow{ + { + Time: timestamppb.New(now.Add(1 * time.Second)), + Content: " \U0001F433 docker create image", + }, + { + Time: timestamppb.New(now.Add(2 * time.Second)), + Content: "job1 zstd enabled", + }, + { + Time: timestamppb.New(now.Add(3 * time.Second)), + Content: "\U0001F3C1 Job succeeded", + }, + }, + }, + zstdEnabled: true, + }, + { + treePath: ".gitea/workflows/download-task-logs-no-zstd.yml", + fileContent: `name: download-task-logs-no-zstd +on: + push: + paths: + - '.gitea/workflows/download-task-logs-no-zstd.yml' +jobs: + job1: + runs-on: ubuntu-latest + steps: + - run: echo job1 with zstd disabled +`, + outcome: &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + logRows: []*runnerv1.LogRow{ + { + Time: timestamppb.New(now.Add(4 * time.Second)), + Content: " \U0001F433 docker create image", + }, + { + Time: timestamppb.New(now.Add(5 * time.Second)), + Content: "job1 zstd disabled", + }, + { + Time: timestamppb.New(now.Add(6 * time.Second)), + Content: "\U0001F3C1 Job succeeded", + }, + }, + }, + zstdEnabled: false, + }, + } + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-download-task-logs", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}) + + for _, tc := range testCases { + t.Run(fmt.Sprintf("test %s", tc.treePath), func(t *testing.T) { + var resetFunc func() + if tc.zstdEnabled { + resetFunc = test.MockVariableValue(&setting.Actions.LogCompression, "zstd") + assert.True(t, setting.Actions.LogCompression.IsZstd()) + } else { + resetFunc = test.MockVariableValue(&setting.Actions.LogCompression, "none") + assert.False(t, setting.Actions.LogCompression.IsZstd()) + } + + // create the workflow file + opts := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", tc.treePath), tc.fileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, tc.treePath, opts) + + // fetch and execute task + task := runner.fetchTask(t) + runner.execTask(t, task, tc.outcome) + + // check whether the log file exists + logFileName := fmt.Sprintf("%s/%02x/%d.log", repo.FullName(), task.Id%256, task.Id) + if setting.Actions.LogCompression.IsZstd() { + logFileName += ".zst" + } + _, err := storage.Actions.Stat(logFileName) + assert.NoError(t, err) + + // download task logs and check content + runIndex := task.Context.GetFields()["run_number"].GetStringValue() + req := NewRequest(t, "GET", fmt.Sprintf("/%s/%s/actions/runs/%s/jobs/0/logs", user2.Name, repo.Name, runIndex)). + AddTokenAuth(token) + resp := MakeRequest(t, req, http.StatusOK) + logTextLines := strings.Split(strings.TrimSpace(resp.Body.String()), "\n") + assert.Len(t, logTextLines, len(tc.outcome.logRows)) + for idx, lr := range tc.outcome.logRows { + assert.Equal( + t, + fmt.Sprintf("%s %s", lr.Time.AsTime().Format("2006-01-02T15:04:05.0000000Z07:00"), lr.Content), + logTextLines[idx], + ) + } + + resetFunc() + }) + } + + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + doAPIDeleteRepository(httpContext)(t) + }) +} diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go new file mode 100644 index 0000000000..355ea1705e --- /dev/null +++ b/tests/integration/actions_runner_test.go @@ -0,0 +1,157 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package integration + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + auth_model "code.gitea.io/gitea/models/auth" + "code.gitea.io/gitea/modules/setting" + + pingv1 "code.gitea.io/actions-proto-go/ping/v1" + "code.gitea.io/actions-proto-go/ping/v1/pingv1connect" + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect" + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type mockRunner struct { + client *mockRunnerClient +} + +type mockRunnerClient struct { + pingServiceClient pingv1connect.PingServiceClient + runnerServiceClient runnerv1connect.RunnerServiceClient +} + +func newMockRunner() *mockRunner { + client := newMockRunnerClient("", "") + return &mockRunner{client: client} +} + +func newMockRunnerClient(uuid, token string) *mockRunnerClient { + baseURL := fmt.Sprintf("%sapi/actions", setting.AppURL) + + opt := connect.WithInterceptors(connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + if uuid != "" { + req.Header().Set("x-runner-uuid", uuid) + } + if token != "" { + req.Header().Set("x-runner-token", token) + } + return next(ctx, req) + } + })) + + client := &mockRunnerClient{ + pingServiceClient: pingv1connect.NewPingServiceClient(http.DefaultClient, baseURL, opt), + runnerServiceClient: runnerv1connect.NewRunnerServiceClient(http.DefaultClient, baseURL, opt), + } + + return client +} + +func (r *mockRunner) doPing(t *testing.T) { + resp, err := r.client.pingServiceClient.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{ + Data: "mock-runner", + })) + assert.NoError(t, err) + assert.Equal(t, "Hello, mock-runner!", resp.Msg.Data) +} + +func (r *mockRunner) doRegister(t *testing.T, name, token string, labels []string) { + r.doPing(t) + resp, err := r.client.runnerServiceClient.Register(context.Background(), connect.NewRequest(&runnerv1.RegisterRequest{ + Name: name, + Token: token, + Version: "mock-runner-version", + Labels: labels, + })) + assert.NoError(t, err) + r.client = newMockRunnerClient(resp.Msg.Runner.Uuid, resp.Msg.Runner.Token) +} + +func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string) { + session := loginUser(t, ownerName) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository) + req := NewRequest(t, "GET", fmt.Sprintf("/api/v1/repos/%s/%s/actions/runners/registration-token", ownerName, repoName)).AddTokenAuth(token) + resp := MakeRequest(t, req, http.StatusOK) + var registrationToken struct { + Token string `json:"token"` + } + DecodeJSON(t, resp, ®istrationToken) + r.doRegister(t, runnerName, registrationToken.Token, labels) +} + +func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task { + fetchTimeout := 10 * time.Second + if len(timeout) > 0 { + fetchTimeout = timeout[0] + } + ddl := time.Now().Add(fetchTimeout) + var task *runnerv1.Task + for time.Now().Before(ddl) { + resp, err := r.client.runnerServiceClient.FetchTask(context.Background(), connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: 0, + })) + assert.NoError(t, err) + if resp.Msg.Task != nil { + task = resp.Msg.Task + break + } + time.Sleep(time.Second) + } + assert.NotNil(t, task, "failed to fetch a task") + return task +} + +type mockTaskOutcome struct { + result runnerv1.Result + outputs map[string]string + logRows []*runnerv1.LogRow + execTime time.Duration +} + +func (r *mockRunner) execTask(t *testing.T, task *runnerv1.Task, outcome *mockTaskOutcome) { + for idx, lr := range outcome.logRows { + resp, err := r.client.runnerServiceClient.UpdateLog(context.Background(), connect.NewRequest(&runnerv1.UpdateLogRequest{ + TaskId: task.Id, + Index: int64(idx), + Rows: []*runnerv1.LogRow{lr}, + NoMore: idx == len(outcome.logRows)-1, + })) + assert.NoError(t, err) + assert.EqualValues(t, idx+1, resp.Msg.AckIndex) + } + sentOutputKeys := make([]string, 0, len(outcome.outputs)) + for outputKey, outputValue := range outcome.outputs { + resp, err := r.client.runnerServiceClient.UpdateTask(context.Background(), connect.NewRequest(&runnerv1.UpdateTaskRequest{ + State: &runnerv1.TaskState{ + Id: task.Id, + Result: runnerv1.Result_RESULT_UNSPECIFIED, + }, + Outputs: map[string]string{outputKey: outputValue}, + })) + assert.NoError(t, err) + sentOutputKeys = append(sentOutputKeys, outputKey) + assert.ElementsMatch(t, sentOutputKeys, resp.Msg.SentOutputs) + } + time.Sleep(outcome.execTime) + resp, err := r.client.runnerServiceClient.UpdateTask(context.Background(), connect.NewRequest(&runnerv1.UpdateTaskRequest{ + State: &runnerv1.TaskState{ + Id: task.Id, + Result: outcome.result, + StoppedAt: timestamppb.Now(), + }, + })) + assert.NoError(t, err) + assert.Equal(t, outcome.result, resp.Msg.State.Result) +}