From e765b01b715f3f6dcd0d1deab2bc930195410891 Mon Sep 17 00:00:00 2001 From: qintao <670026955@qq.com> Date: Thu, 30 Apr 2026 11:24:40 +0800 Subject: [PATCH] [issues/766] fix: model synchronization status staying at lfs_start all the time --- mirror/lfssyncer/lfs.go | 52 ++++++++++++++++++---------- mirror/manager/lfs_worker_manager.go | 16 +++++++++ 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/mirror/lfssyncer/lfs.go b/mirror/lfssyncer/lfs.go index 321defe5a..7f190c0fb 100644 --- a/mirror/lfssyncer/lfs.go +++ b/mirror/lfssyncer/lfs.go @@ -149,33 +149,40 @@ func (w *LfsSyncWorker) ID() int { return w.id } +func (w *LfsSyncWorker) updateMirrorTaskFailed(mt *database.MirrorTask, mtFSM database.MirrorTaskWithFSM, msg string, err error) { + slog.Error(msg, + slog.Int("workerID", w.id), + slog.Any("error", err), + ) + mt.ErrorMessage = fmt.Sprintf("%s: %v", msg, err) + mt.Status = types.MirrorTaskStatus(mtFSM.Current()) + _, updateErr := w.mirrorTaskStore.Update(w.ctx, *mt) + if updateErr != nil { + slog.Error("fail to update mirror task", + slog.Int("workerID", w.id), + slog.Any("error", updateErr), + ) + } +} + func (w *LfsSyncWorker) Run(mt *database.MirrorTask) { var action string + mtFSM := database.NewMirrorTaskWithFSM(mt) mirror, err := w.mirrorStore.FindByID(w.ctx, mt.MirrorID) if err != nil { - slog.Error( - "fail to get mirror", - slog.Int("workerID", w.id), - slog.Any("error", err), - ) - mt.ErrorMessage = "mirror not found" - mt.Status = types.MirrorLfsSyncFailed - _, updateErr := w.mirrorTaskStore.Update(w.ctx, *mt) - if updateErr != nil { - slog.Error("fail to update mirror task", - slog.Int("workerID", w.id), - slog.Any("error", updateErr), - ) + // Update task status when mirror query fails + if mtFSM.SubmitEvent(context.Background(), database.MirrorFail) { + w.updateMirrorTaskFailed(mt, mtFSM, "fail to get mirror", err) } return } _, err = w.repoStore.FindById(w.ctx, mirror.RepositoryID) if err != nil { - slog.Error("fail to get repo", - slog.Int("workerID", w.id), - slog.Any("error", err), - ) + // Update task status even when repo query fails + if mtFSM.SubmitEvent(context.Background(), database.MirrorFail) { + w.updateMirrorTaskFailed(mt, mtFSM, "fail to get repo", err) + } return } @@ -185,6 +192,10 @@ func (w *LfsSyncWorker) Run(mt *database.MirrorTask) { slog.Int("workerId", w.id), slog.Any("error", err), ) + // update task status even when repoFilter check fails + if mtFSM.SubmitEvent(context.Background(), database.MirrorFail) { + w.updateMirrorTaskFailed(mt, mtFSM, "fail to check if repo should sync", err) + } return } @@ -204,6 +215,11 @@ func (w *LfsSyncWorker) Run(mt *database.MirrorTask) { mt.Priority = types.LowMirrorPriority mt.Mirror.Priority = types.LowMirrorPriority + // Even when the repo does not need to be synchronized, + //the task status should be updated correctly to 'completed' + if mtFSM.SubmitEvent(context.Background(), database.MirrorSuccess) { + mt.Status = types.MirrorTaskStatus(mtFSM.Current()) + } _, err = w.mirrorTaskStore.Update(w.ctx, *mt) if err != nil { slog.Error("fail to update mirror task", @@ -243,8 +259,6 @@ func (w *LfsSyncWorker) Run(mt *database.MirrorTask) { action = database.MirrorSuccess mt.Progress = 100 } - - mtFSM := database.NewMirrorTaskWithFSM(mt) // Can not use w.ctx cause it could be canceled ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) diff --git a/mirror/manager/lfs_worker_manager.go b/mirror/manager/lfs_worker_manager.go index 3b5240738..8b1b5ea6b 100644 --- a/mirror/manager/lfs_worker_manager.go +++ b/mirror/manager/lfs_worker_manager.go @@ -148,6 +148,13 @@ func (m *Manager) Start() { } func (m *Manager) startWorker(id int, mt *database.MirrorTask) { + defer func() { + r := recover() + if r != nil { + slog.Error("start worker recovered from panic", slog.Any("panic", r)) + m.conChan <- id + } + }() lfsSyncWorker, err := mirror.NewLFSSyncWorker(m.config, id) if err != nil { slog.Error("failed to create lfs sync worker", slog.Any("error", err)) @@ -156,6 +163,7 @@ func (m *Manager) startWorker(id int, mt *database.MirrorTask) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) + defer cancel() lfsSyncWorker.SetContext(ctx) m.mu.Lock() @@ -175,7 +183,15 @@ func (m *Manager) startWorker(id int, mt *database.MirrorTask) { } m.mu.Unlock() + // Ensure to clean up from the workers map before the function returns + defer func() { + m.mu.Lock() + delete(m.workers, id) + m.mu.Unlock() + }() + lfsSyncWorker.Run(mt) + m.conChan <- id }