diff --git a/pkg/restic/backup.go b/pkg/restic/backup.go index ae467551e..4b9561486 100644 --- a/pkg/restic/backup.go +++ b/pkg/restic/backup.go @@ -17,6 +17,7 @@ limitations under the License. package restic import ( + "context" "sync" "time" @@ -197,7 +198,9 @@ func (w *ResticWrapper) InitializeRepository() error { func (w *ResticWrapper) ApplyRetentionPolicies(retentionPolicy api_v1alpha1.RetentionPolicy) (*RepositoryStats, error) { // Cleanup old snapshots according to retention policy - out, err := w.cleanup(retentionPolicy, "") + out, err := w.RunWithRetry(context.Background(), func() ([]byte, error) { + return w.cleanup(retentionPolicy, "") + }) if err != nil { return nil, err } @@ -211,14 +214,18 @@ func (w *ResticWrapper) ApplyRetentionPolicies(retentionPolicy api_v1alpha1.Rete func (w *ResticWrapper) VerifyRepositoryIntegrity() (*RepositoryStats, error) { // Check repository integrity - out, err := w.check() + out, err := w.RunWithRetry(context.Background(), func() ([]byte, error) { + return w.check() + }) if err != nil { return nil, err } // Extract information from output of "check" command integrity := extractCheckInfo(out) // Read repository statics after cleanup - out, err = w.stats("") + out, err = w.RunWithRetry(context.Background(), func() ([]byte, error) { + return w.stats("") + }) if err != nil { return nil, err } diff --git a/pkg/restic/commands.go b/pkg/restic/commands.go index cb317a489..0d54e5fa5 100644 --- a/pkg/restic/commands.go +++ b/pkg/restic/commands.go @@ -491,10 +491,10 @@ func (w *ResticWrapper) run(commands ...Command) ([]byte, error) { } } out, err := w.sh.Output() + klog.Infoln("sh-output:", string(out)) if err != nil { - return nil, formatError(err, errBuff.String()) + return out, formatError(err, errBuff.String()) } - klog.Infoln("sh-output:", string(out)) return out, nil } diff --git a/pkg/restic/config.go b/pkg/restic/config.go index 78fc59aae..037b29e79 100644 --- a/pkg/restic/config.go +++ b/pkg/restic/config.go @@ -38,6 +38,7 @@ const ( type ResticWrapper struct { sh *shell.Session config SetupOptions + *RetryConfig } type Command struct { @@ -112,8 +113,9 @@ type KeyOptions struct { func NewResticWrapper(options SetupOptions) (*ResticWrapper, error) { wrapper := &ResticWrapper{ - sh: shell.NewSession(), - config: options, + sh: shell.NewSession(), + config: options, + RetryConfig: NewRetryConfig(), } err := wrapper.configure() diff --git a/pkg/restic/retry.go b/pkg/restic/retry.go new file mode 100644 index 000000000..9b9e40f85 --- /dev/null +++ b/pkg/restic/retry.go @@ -0,0 +1,95 @@ +/* +Copyright AppsCode Inc. and Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restic + +import ( + "context" + "fmt" + "strings" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" +) + +const ( + maxRetries = 5 + delay = 10 * time.Second +) + +var retryablePatterns = []string{ + "Connection closed by foreign host", +} + +type RetryConfig struct { + MaxRetries int + Delay time.Duration + ShouldRetry func(error, string) bool +} + +func NewRetryConfig() *RetryConfig { + return &RetryConfig{ + MaxRetries: maxRetries, + Delay: delay, + ShouldRetry: func(err error, output string) bool { + if err == nil { + return false + } + combined := strings.ToLower(err.Error() + " " + output) + klog.Infoln("Combined output: " + combined) + for _, pattern := range retryablePatterns { + if strings.Contains(combined, strings.ToLower(pattern)) { + return true + } + } + return false + }, + } +} + +func (rc *RetryConfig) RunWithRetry(ctx context.Context, execFunc func() ([]byte, error)) ([]byte, error) { + var output []byte + var lastErr error + attempts := 0 + + err := wait.PollUntilContextCancel( + ctx, + rc.Delay, + true, // Run immediately on first call + func(ctx context.Context) (bool, error) { + // Stop if max retries reached + if attempts >= rc.MaxRetries { + return false, fmt.Errorf("max retries reached") + } + output, lastErr = execFunc() + if !rc.ShouldRetry(lastErr, string(output)) { + return true, nil + } + klog.Infoln("Retrying command after error", + "attempt", attempts, + "maxRetries", rc.MaxRetries, + "error", fmt.Sprintf("%s %s", lastErr, string(output))) + attempts++ + return false, nil + }, + ) + if err != nil { + return nil, fmt.Errorf("failed after %d attempts: %w", attempts, lastErr) + } + + return output, lastErr +}