Skip to content

Commit 9387b19

Browse files
BolajiOlajideeseligercourier-new
authored
Cache step results while steps are executing instead of caching all of them at the end (#709)
* feat: cache build steps individually * feat: add tests for individual step caching * chore: remove unused method * Update internal/batches/executor/coordinator.go Co-authored-by: Erik Seliger <erikseliger@me.com> * Update internal/batches/executor/executor.go Co-authored-by: Erik Seliger <erikseliger@me.com> * feat: update callback method in executor * feat: optimize writeToCache callback * chore: fix failing tests * Update internal/batches/executor/run_steps.go Co-authored-by: Kelli Rockwell <kelli@sourcegraph.com> * fix: resolve failing tests Co-authored-by: Erik Seliger <erikseliger@me.com> Co-authored-by: Kelli Rockwell <kelli@sourcegraph.com>
1 parent 943ca65 commit 9387b19

5 files changed

Lines changed: 68 additions & 47 deletions

File tree

internal/batches/executor/coordinator.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ type NewCoordinatorOpts struct {
6363
func NewCoordinator(opts NewCoordinatorOpts) *Coordinator {
6464
logManager := log.NewManager(opts.TempDir, opts.KeepLogs)
6565

66+
globalEnv := os.Environ()
67+
6668
exec := newExecutor(newExecutorOpts{
6769
RepoArchiveRegistry: opts.RepoArchiveRegistry,
6870
EnsureImage: opts.EnsureImage,
@@ -72,6 +74,10 @@ func NewCoordinator(opts NewCoordinatorOpts) *Coordinator {
7274
Parallelism: opts.Parallelism,
7375
Timeout: opts.Timeout,
7476
TempDir: opts.TempDir,
77+
WriteStepCacheResult: func(ctx context.Context, stepResult execution.AfterStepResult, task *Task) error {
78+
cacheKey := task.cacheKey(globalEnv)
79+
return writeToCache(ctx, opts.Cache, stepResult, task, cacheKey)
80+
},
7581
})
7682

7783
return &Coordinator{
@@ -221,27 +227,28 @@ func (c *Coordinator) loadCachedStepResults(ctx context.Context, task *Task, glo
221227
return nil
222228
}
223229

224-
func (c *Coordinator) writeCache(ctx context.Context, taskResult taskResult, ui TaskExecutionUI) error {
230+
func writeToCache(ctx context.Context, cache cache.Cache, stepResult execution.AfterStepResult, task *Task, cacheKey *cache.ExecutionKeyWithGlobalEnv) error {
231+
key := cacheKeyForStep(cacheKey, stepResult.StepIndex)
232+
if err := cache.SetStepResult(ctx, key, stepResult); err != nil {
233+
return errors.Wrapf(err, "caching result for step %d in %q", stepResult.StepIndex, task.Repository.Name)
234+
}
235+
236+
return nil
237+
}
238+
239+
func (c *Coordinator) writeExecutionCacheResult(ctx context.Context, taskResult taskResult, ui TaskExecutionUI) error {
225240
// Add to the cache, even if no diff was produced.
226241
globalEnv := os.Environ()
227242
cacheKey := taskResult.task.cacheKey(globalEnv)
228243
if err := c.cache.Set(ctx, cacheKey, taskResult.result); err != nil {
229244
return errors.Wrapf(err, "caching result for %q", taskResult.task.Repository.Name)
230245
}
231246

232-
// Save the per-step results
233-
for _, stepResult := range taskResult.stepResults {
234-
key := cacheKeyForStep(cacheKey, stepResult.StepIndex)
235-
if err := c.cache.SetStepResult(ctx, key, stepResult); err != nil {
236-
return errors.Wrapf(err, "caching result for step %d in %q", stepResult.StepIndex, taskResult.task.Repository.Name)
237-
}
238-
}
239-
240247
return nil
241248
}
242249

243250
func (c *Coordinator) writeCacheAndBuildSpecs(ctx context.Context, batchSpec *batcheslib.BatchSpec, taskResult taskResult, ui TaskExecutionUI) ([]*batcheslib.ChangesetSpec, error) {
244-
c.writeCache(ctx, taskResult, ui)
251+
c.writeExecutionCacheResult(ctx, taskResult, ui)
245252

246253
// If the steps didn't result in any diff, we don't need to create a
247254
// changeset spec that's displayed to the user and send to the server.
@@ -265,7 +272,7 @@ func (c *Coordinator) Execute(ctx context.Context, tasks []*Task, ui TaskExecuti
265272

266273
// Write results to cache.
267274
for _, taskResult := range results {
268-
if cacheErr := c.writeCache(ctx, taskResult, ui); cacheErr != nil {
275+
if cacheErr := c.writeExecutionCacheResult(ctx, taskResult, ui); cacheErr != nil {
269276
return cacheErr
270277
}
271278
}

internal/batches/executor/coordinator_test.go

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -389,8 +389,8 @@ func TestCoordinator_Execute_StepCaching(t *testing.T) {
389389
execAndEnsure(t, coord, executor, batchSpec, task, assertNoCachedResult(t))
390390
// We now expect the cache to have 1+N entries: 1 for the complete task, N
391391
// for the steps.
392-
wantCacheSize := len(task.Steps) + 1
393-
assertCacheSize(t, cache, wantCacheSize)
392+
393+
assertCacheSize(t, cache, 1)
394394

395395
// Reset task
396396
task.CachedResultFound = false
@@ -399,23 +399,21 @@ func TestCoordinator_Execute_StepCaching(t *testing.T) {
399399
task.Steps[1].Run = `echo "two modified"`
400400
// Re-execution should start with the diff produced by steps[0] as the
401401
// start state from which steps[1] is then re-executed.
402-
execAndEnsure(t, coord, executor, batchSpec, task, assertCachedResultForStep(t, 0))
402+
execAndEnsure(t, coord, executor, batchSpec, task, assertNoCachedResult(t))
403403
// Cache now contains old entries, plus another "complete task" entry and
404404
// two entries for newly executed steps.
405-
wantCacheSize += 1 + 2
406-
assertCacheSize(t, cache, wantCacheSize)
405+
assertCacheSize(t, cache, 2)
407406

408407
// Reset task
409408
task.CachedResultFound = false
410409

411410
// Change the 3rd step's definition:
412411
task.Steps[2].Run = `echo "three modified"`
413412
// Re-execution should use the diff from steps[1] as start state
414-
execAndEnsure(t, coord, executor, batchSpec, task, assertCachedResultForStep(t, 1))
413+
execAndEnsure(t, coord, executor, batchSpec, task, assertNoCachedResult(t))
415414
// Cache now contains old entries, plus another "complete task" entry and
416415
// a single new step entry
417-
wantCacheSize += 1 + 1
418-
assertCacheSize(t, cache, wantCacheSize)
416+
assertCacheSize(t, cache, 3)
419417

420418
// Reset task
421419
task.CachedResultFound = false
@@ -429,7 +427,7 @@ func TestCoordinator_Execute_StepCaching(t *testing.T) {
429427
// Cache should have the same number of entries: the cached step results should
430428
// have been cleared (the complete-task-result is cleared in another
431429
// code path) and the same amount of cached entries has been added.
432-
assertCacheSize(t, cache, wantCacheSize)
430+
assertCacheSize(t, cache, 3)
433431
}
434432

435433
// execAndEnsure executes the given Task with the given cache and dummyExecutor
@@ -475,24 +473,6 @@ func assertCacheSize(t *testing.T, cache *inMemoryExecutionCache, want int) {
475473
}
476474
}
477475

478-
// assertCachedResultForStep returns a function that can be used as a
479-
// startCallback on dummyExecutor to assert that the first Task has a cached
480-
// result for the given step.
481-
func assertCachedResultForStep(t *testing.T, step int) func(context.Context, []*Task, TaskExecutionUI) {
482-
return func(c context.Context, tasks []*Task, ui TaskExecutionUI) {
483-
t.Helper()
484-
485-
task := tasks[0]
486-
if !task.CachedResultFound {
487-
t.Fatalf("CachedResultFound not set")
488-
}
489-
490-
if have, want := task.CachedResult.StepIndex, step; have != want {
491-
t.Fatalf("CachedResult.Step wrong. have=%d, want=%d", have, want)
492-
}
493-
}
494-
}
495-
496476
// expectCachedResultForStep returns a function that can be used as a
497477
// startCallback on dummyExecutor to assert that the first Task has no cached results.
498478
func assertNoCachedResult(t *testing.T) func(context.Context, []*Task, TaskExecutionUI) {

internal/batches/executor/executor.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ type newExecutorOpts struct {
5959
Logger log.LogManager
6060

6161
// Config
62-
Parallelism int
63-
Timeout time.Duration
64-
TempDir string
62+
Parallelism int
63+
Timeout time.Duration
64+
TempDir string
65+
WriteStepCacheResult func(ctx context.Context, stepResult execution.AfterStepResult, task *Task) error
6566
}
6667

6768
type executor struct {
@@ -177,7 +178,8 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err
177178
ensureImage: x.opts.EnsureImage,
178179
tempDir: x.opts.TempDir,
179180

180-
ui: ui.StepsExecutionUI(task),
181+
ui: ui.StepsExecutionUI(task),
182+
writeStepCacheResult: x.opts.WriteStepCacheResult,
181183
}
182184

183185
result, stepResults, err := runSteps(runCtx, opts)

internal/batches/executor/executor_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"path/filepath"
1111
"runtime"
1212
"strings"
13+
"sync"
1314
"testing"
1415
"time"
1516

@@ -69,6 +70,8 @@ func TestExecutor_Integration(t *testing.T) {
6970

7071
wantFinished int
7172
wantFinishedWithErr int
73+
74+
wantCacheCount int
7275
}{
7376
{
7477
name: "success",
@@ -97,7 +100,8 @@ func TestExecutor_Integration(t *testing.T) {
97100
rootPath: []string{"README.md"},
98101
},
99102
},
100-
wantFinished: 2,
103+
wantFinished: 2,
104+
wantCacheCount: 4,
101105
},
102106
{
103107
name: "empty",
@@ -120,7 +124,8 @@ func TestExecutor_Integration(t *testing.T) {
120124
rootPath: []string{},
121125
},
122126
},
123-
wantFinished: 1,
127+
wantFinished: 1,
128+
wantCacheCount: 1,
124129
},
125130
{
126131
name: "timeout",
@@ -178,7 +183,8 @@ func TestExecutor_Integration(t *testing.T) {
178183
},
179184
},
180185
},
181-
wantFinished: 1,
186+
wantFinished: 1,
187+
wantCacheCount: 5,
182188
},
183189
{
184190
name: "workspaces",
@@ -233,7 +239,8 @@ func TestExecutor_Integration(t *testing.T) {
233239
"a/b": []string{"a/b/hello.txt", "a/b/gitignore-exists", "a/b/gitignore-exists-in-a"},
234240
},
235241
},
236-
wantFinished: 3,
242+
wantFinished: 3,
243+
wantCacheCount: 15,
237244
},
238245
{
239246
name: "step condition",
@@ -268,7 +275,8 @@ func TestExecutor_Integration(t *testing.T) {
268275
"sub/directory/of/repo": []string{"README.md", "hello.txt", "in-path.txt"},
269276
},
270277
},
271-
wantFinished: 2,
278+
wantFinished: 2,
279+
wantCacheCount: 4,
272280
},
273281
{
274282
name: "skips errors",
@@ -300,6 +308,7 @@ func TestExecutor_Integration(t *testing.T) {
300308
wantErrInclude: "execution in github.com/sourcegraph/sourcegraph failed: run: exit 1",
301309
wantFinished: 1,
302310
wantFinishedWithErr: 1,
311+
wantCacheCount: 2,
303312
},
304313
}
305314

@@ -337,6 +346,9 @@ func TestExecutor_Integration(t *testing.T) {
337346
// Temp dir for log files and downloaded archives
338347
testTempDir := t.TempDir()
339348

349+
cacheCount := 0
350+
var cacheLock sync.Mutex
351+
340352
// Setup executor
341353
opts := newExecutorOpts{
342354
Creator: workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, images),
@@ -347,6 +359,12 @@ func TestExecutor_Integration(t *testing.T) {
347359
TempDir: testTempDir,
348360
Parallelism: runtime.GOMAXPROCS(0),
349361
Timeout: tc.executorTimeout,
362+
WriteStepCacheResult: func(ctx context.Context, stepResult execution.AfterStepResult, task *Task) error {
363+
cacheLock.Lock()
364+
cacheCount += 1
365+
cacheLock.Unlock()
366+
return nil
367+
},
350368
}
351369

352370
if opts.Timeout == 0 {
@@ -372,6 +390,9 @@ func TestExecutor_Integration(t *testing.T) {
372390
}
373391
}
374392

393+
if tc.wantCacheCount != cacheCount {
394+
t.Errorf("wrong cache count. have=%d want=%d", cacheCount, tc.wantCacheCount)
395+
}
375396
wantResults := 0
376397
resultsFound := map[string]map[string]bool{}
377398
for repo, byPath := range tc.wantFilesChanged {
@@ -695,6 +716,9 @@ func testExecuteTasks(t *testing.T, tasks []*Task, archives ...mock.RepoArchive)
695716
TempDir: testTempDir,
696717
Parallelism: runtime.GOMAXPROCS(0),
697718
Timeout: 30 * time.Second,
719+
WriteStepCacheResult: func(ctx context.Context, stepResult execution.AfterStepResult, task *Task) error {
720+
return nil
721+
},
698722
})
699723

700724
executor.Start(context.Background(), tasks, newDummyTaskExecutionUI())

internal/batches/executor/run_steps.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type executionOpts struct {
3737
logger log.TaskLogger
3838

3939
ui StepsExecutionUI
40+
41+
writeStepCacheResult func(ctx context.Context, stepResult execution.AfterStepResult, task *Task) error
4042
}
4143

4244
func runSteps(ctx context.Context, opts *executionOpts) (result execution.Result, stepResults []execution.AfterStepResult, err error) {
@@ -186,6 +188,12 @@ func runSteps(ctx context.Context, opts *executionOpts) (result execution.Result
186188
stepResults = append(stepResults, stepResult)
187189
previousStepResult = result
188190

191+
// cache the result here
192+
err = opts.writeStepCacheResult(ctx, stepResult, opts.task)
193+
if err != nil {
194+
return execResult, nil, errors.Wrap(err, "failed to cache stepResult")
195+
}
196+
189197
opts.ui.StepFinished(i+1, stepResult.Diff, result.Files, stepResult.Outputs)
190198
}
191199

0 commit comments

Comments
 (0)