Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions mirror/lfssyncer/lfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,33 +149,40 @@ func (w *LfsSyncWorker) ID() int {
return w.id
}

func (w *LfsSyncWorker) updateMirrorTaskFailed(mt *database.MirrorTask, mtFSM database.MirrorTaskWithFSM, msg string, err error) {
slog.Error(msg,
slog.Int("workerID", w.id),
slog.Any("error", err),
)
mt.ErrorMessage = fmt.Sprintf("%s: %v", msg, err)
mt.Status = types.MirrorTaskStatus(mtFSM.Current())
_, updateErr := w.mirrorTaskStore.Update(w.ctx, *mt)
if updateErr != nil {
slog.Error("fail to update mirror task",
slog.Int("workerID", w.id),
slog.Any("error", updateErr),
)
}
}

func (w *LfsSyncWorker) Run(mt *database.MirrorTask) {
var action string
mtFSM := database.NewMirrorTaskWithFSM(mt)
mirror, err := w.mirrorStore.FindByID(w.ctx, mt.MirrorID)
if err != nil {
slog.Error(
"fail to get mirror",
slog.Int("workerID", w.id),
slog.Any("error", err),
)
mt.ErrorMessage = "mirror not found"
mt.Status = types.MirrorLfsSyncFailed
_, updateErr := w.mirrorTaskStore.Update(w.ctx, *mt)
if updateErr != nil {
slog.Error("fail to update mirror task",
slog.Int("workerID", w.id),
slog.Any("error", updateErr),
)
// Update task status when mirror query fails
if mtFSM.SubmitEvent(context.Background(), database.MirrorFail) {
w.updateMirrorTaskFailed(mt, mtFSM, "fail to get mirror", err)
}
return
}

_, err = w.repoStore.FindById(w.ctx, mirror.RepositoryID)
if err != nil {
slog.Error("fail to get repo",
slog.Int("workerID", w.id),
slog.Any("error", err),
)
// Update task status even when repo query fails
if mtFSM.SubmitEvent(context.Background(), database.MirrorFail) {
w.updateMirrorTaskFailed(mt, mtFSM, "fail to get repo", err)
}
return
}

Expand All @@ -185,6 +192,10 @@ func (w *LfsSyncWorker) Run(mt *database.MirrorTask) {
slog.Int("workerId", w.id),
slog.Any("error", err),
)
// update task status even when repoFilter check fails
if mtFSM.SubmitEvent(context.Background(), database.MirrorFail) {
w.updateMirrorTaskFailed(mt, mtFSM, "fail to check if repo should sync", err)
}
return
}

Expand All @@ -204,6 +215,11 @@ func (w *LfsSyncWorker) Run(mt *database.MirrorTask) {

mt.Priority = types.LowMirrorPriority
mt.Mirror.Priority = types.LowMirrorPriority
// Even when the repo does not need to be synchronized,
Comment thread
shaoyou520 marked this conversation as resolved.
//the task status should be updated correctly to 'completed'
if mtFSM.SubmitEvent(context.Background(), database.MirrorSuccess) {
mt.Status = types.MirrorTaskStatus(mtFSM.Current())
}
_, err = w.mirrorTaskStore.Update(w.ctx, *mt)
if err != nil {
slog.Error("fail to update mirror task",
Expand Down Expand Up @@ -243,8 +259,6 @@ func (w *LfsSyncWorker) Run(mt *database.MirrorTask) {
action = database.MirrorSuccess
mt.Progress = 100
}

mtFSM := database.NewMirrorTaskWithFSM(mt)
// Can not use w.ctx cause it could be canceled
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand Down
16 changes: 16 additions & 0 deletions mirror/manager/lfs_worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ func (m *Manager) Start() {
}

func (m *Manager) startWorker(id int, mt *database.MirrorTask) {
defer func() {
r := recover()
if r != nil {
slog.Error("start worker recovered from panic", slog.Any("panic", r))
m.conChan <- id
}
}()
lfsSyncWorker, err := mirror.NewLFSSyncWorker(m.config, id)
if err != nil {
slog.Error("failed to create lfs sync worker", slog.Any("error", err))
Expand All @@ -156,6 +163,7 @@ func (m *Manager) startWorker(id int, mt *database.MirrorTask) {

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
lfsSyncWorker.SetContext(ctx)

m.mu.Lock()
Expand All @@ -175,7 +183,15 @@ func (m *Manager) startWorker(id int, mt *database.MirrorTask) {
}
m.mu.Unlock()

// Ensure to clean up from the workers map before the function returns
defer func() {
Comment thread
shaoyou520 marked this conversation as resolved.
m.mu.Lock()
delete(m.workers, id)
m.mu.Unlock()
}()

lfsSyncWorker.Run(mt)

m.conChan <- id
}

Expand Down
Loading