Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/vorta/borg/borg_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from vorta.i18n import trans_late, translate
from vorta.keyring.abc import VortaKeyring
from vorta.keyring.db import VortaDBKeyring
from vorta.store.models import EventLogModel
from vorta.store.models import EventLogModel, JobModel
from vorta.utils import borg_compat, pretty_bytes

keyring_lock = Lock()
Expand Down Expand Up @@ -234,6 +234,7 @@ def run(self):
profile=self.params.get('profile_id', None),
)
log_entry.save()
self._mark_job_running(log_entry)

# logs: put cmd arguments with special strings in quotation marks
quote_strings = [' ', '*', '?', 're:']
Expand Down Expand Up @@ -341,6 +342,7 @@ def read_async(fd):
log_entry.end_time = dt.now()
with db_lock:
log_entry.save()
self._mark_job_finished(log_entry, result)
self.process_result(result)

self.finished_event(result)
Expand All @@ -350,6 +352,48 @@ def read_async(fd):
def process_result(self, result):
pass

def _get_job_record(self):
job_model_id = self.params.get('job_model_id')
if job_model_id is None:
return None
return JobModel.get_or_none(id=job_model_id)

def _mark_job_running(self, log_entry):
job_record = self._get_job_record()
if job_record is None:
return

now = dt.now()
job_record.event_log = log_entry
job_record.status = JobModel.StatusFieldOptions.RUNNING.value
if job_record.queued_at is None:
job_record.queued_at = now
if job_record.started_at is None:
job_record.started_at = now
job_record.finished_at = None
job_record.skip_reason_code = None
job_record.skip_reason_text = None
job_record.save()

def _mark_job_finished(self, log_entry, result):
job_record = self._get_job_record()
if job_record is None:
return

if result['returncode'] == 0:
status = JobModel.StatusFieldOptions.SUCCESS.value
elif result['returncode'] == 1:
status = JobModel.StatusFieldOptions.WARNING.value
else:
status = JobModel.StatusFieldOptions.FAILED.value

job_record.event_log = log_entry
job_record.status = status
if job_record.started_at is None:
job_record.started_at = log_entry.start_time
job_record.finished_at = log_entry.end_time
job_record.save()

def started_event(self):
self.updated.emit(self.tr('Task started'))

Expand Down
135 changes: 130 additions & 5 deletions src/vorta/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from vorta.borg.prune import BorgPruneJob
from vorta.i18n import translate
from vorta.notifications import VortaNotifications
from vorta.store.models import BackupProfileModel, EventLogModel
from vorta.store.models import BackupProfileModel, EventLogModel, JobModel
from vorta.utils import borg_compat, get_network_status_monitor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,6 +103,99 @@ def tr(self, *args: Any, **kwargs: Any) -> str:
scope = self.__class__.__name__
return translate(scope, *args, **kwargs)

def _pending_scheduled_job_query(self, profile_id: int):
return (
JobModel.select()
.where(
JobModel.profile == profile_id,
JobModel.job_type == 'create',
JobModel.source == JobModel.SourceFieldOptions.SCHEDULED.value,
JobModel.status == JobModel.StatusFieldOptions.PENDING.value,
)
.order_by(JobModel.created_at.desc())
)

def _cancel_pending_jobs(self, profile_id: int, reason_code: str, reason_text: str):
now = dt.now()
(
JobModel.update(
status=JobModel.StatusFieldOptions.CANCELLED.value,
finished_at=now,
skip_reason_code=reason_code,
skip_reason_text=reason_text,
)
.where(
JobModel.profile == profile_id,
JobModel.job_type == 'create',
JobModel.source == JobModel.SourceFieldOptions.SCHEDULED.value,
JobModel.status == JobModel.StatusFieldOptions.PENDING.value,
)
.execute()
)

def _ensure_scheduled_job(self, profile: BackupProfileModel, scheduled_for: dt) -> JobModel:
pending_jobs = list(self._pending_scheduled_job_query(profile.id))
now = dt.now()

if pending_jobs:
job = pending_jobs[0]
job.repo = profile.repo
job.scheduled_for = scheduled_for
job.finished_at = None
job.skip_reason_code = None
job.skip_reason_text = None
job.metadata = {'schedule_mode': profile.schedule_mode}
job.save()
else:
job = JobModel.create(
profile=profile,
repo=profile.repo,
job_type='create',
source=JobModel.SourceFieldOptions.SCHEDULED.value,
status=JobModel.StatusFieldOptions.PENDING.value,
scheduled_for=scheduled_for,
metadata={'schedule_mode': profile.schedule_mode},
)

for duplicate in pending_jobs[1:]:
duplicate.status = JobModel.StatusFieldOptions.CANCELLED.value
duplicate.finished_at = now
duplicate.skip_reason_code = 'superseded'
duplicate.skip_reason_text = self.tr('Superseded by a newer scheduled backup.')
duplicate.save()

return job

def _mark_job_queued(self, job_id: int):
now = dt.now()
(
JobModel.update(
status=JobModel.StatusFieldOptions.QUEUED.value,
queued_at=now,
finished_at=None,
skip_reason_code=None,
skip_reason_text=None,
)
.where(JobModel.id == job_id)
.execute()
)

def _mark_job_skipped(self, job_id: int | None, reason_code: str, reason_text: str):
if job_id is None:
return

now = dt.now()
(
JobModel.update(
status=JobModel.StatusFieldOptions.SKIPPED.value,
finished_at=now,
skip_reason_code=reason_code,
skip_reason_text=reason_text,
)
.where(JobModel.id == job_id)
.execute()
)

def pause(self, profile_id: int, until: dt | None = None) -> None:
"""
Call a timeout for scheduling of a given profile.
Expand Down Expand Up @@ -145,6 +238,7 @@ def pause(self, profile_id: int, until: dt | None = None) -> None:

# remove existing schedule
self.remove_job(profile_id)
self._cancel_pending_jobs(profile_id, 'paused', self.tr('Scheduling paused.'))

# setting timer for reschedule is not possible if called
# from a non-QThread - it won't fail but won't work
Expand Down Expand Up @@ -214,9 +308,9 @@ def set_timer_for_profile(self, profile_id: int) -> None:
next suitable backup time.
"""
profile = BackupProfileModel.get_or_none(id=profile_id)
logger.debug('Profile: %s, %d %d', str(profile), profile.schedule_fixed_hour, profile.schedule_fixed_minute)
if profile is None: # profile doesn't exist any more.
return
logger.debug('Profile: %s, %d %d', str(profile), profile.schedule_fixed_hour, profile.schedule_fixed_minute)

with self.lock: # Acquire lock
self.remove_job(profile_id) # reset schedule
Expand All @@ -240,12 +334,14 @@ def set_timer_for_profile(self, profile_id: int) -> None:
'Nothing scheduled for profile %s because of unset repo.',
profile_id,
)
self._cancel_pending_jobs(profile_id, 'missing_repo', self.tr('No repository configured.'))
# Emit signal so that e.g. the GUI can react to the new schedule
self.schedule_changed.emit()
return

if profile.schedule_mode == 'off':
logger.debug('Scheduler for profile %s is disabled.', profile_id)
self._cancel_pending_jobs(profile_id, 'schedule_disabled', self.tr('Schedule disabled.'))
# Emit signal so that e.g. the GUI can react to the new schedule
self.schedule_changed.emit()
return
Expand Down Expand Up @@ -284,6 +380,11 @@ def set_timer_for_profile(self, profile_id: int) -> None:
+ "because it would be the first backup "
+ "for this profile."
)
self._cancel_pending_jobs(
profile_id,
'awaiting_first_backup',
self.tr('Run a manual backup before scheduling starts.'),
)
self.timers[profile_id] = {'type': ScheduleStatusType.NO_PREVIOUS_BACKUP}
# Emit signal so that e.g. the GUI can react to the new schedule
self.schedule_changed.emit()
Expand Down Expand Up @@ -316,20 +417,27 @@ def set_timer_for_profile(self, profile_id: int) -> None:
# handle missing of a scheduled time
if next_time <= dt.now():
if profile.schedule_make_up_missed and (self._net_up or not needs_network):
scheduled_job = self._ensure_scheduled_job(profile, next_time)
self.lock.release()
try:
logger.debug(
'Catching up by running job for %s (%s)',
profile.name,
profile_id,
)
self.create_backup(profile_id)
self.create_backup(profile_id, scheduled_job.id)
finally:
self.lock.acquire() # with-statement will try to release

return # create_backup will lead to a call to this method
elif profile.schedule_make_up_missed and not self._net_up and needs_network:
logger.debug('Skipping catchup %s (%s), the network is not available', profile.name, profile.id)
skipped_job = self._ensure_scheduled_job(profile, next_time)
self._mark_job_skipped(
skipped_job.id,
'network_unavailable',
self.tr('Skipped scheduled backup because the network is unavailable.'),
)

# calculate next time from now
if profile.schedule_mode == 'interval':
Expand All @@ -354,14 +462,15 @@ def set_timer_for_profile(self, profile_id: int) -> None:

# start QTimer
timer_ms = (next_time - dt.now()).total_seconds() * 1000
scheduled_job = self._ensure_scheduled_job(profile, next_time)

if timer_ms < 2**31 - 1:
logger.debug('Scheduling next run for %s', next_time)

timer = QTimer()
timer.setSingleShot(True)
timer.setInterval(int(timer_ms))
timer.timeout.connect(lambda: self.create_backup(profile_id))
timer.timeout.connect(lambda: self.create_backup(profile_id, scheduled_job.id))
timer.start()

self.timers[profile_id] = {
Expand Down Expand Up @@ -421,17 +530,30 @@ def next_job_for_profile(self, profile_id: int) -> ScheduleStatus:
return ScheduleStatus(ScheduleStatusType.UNSCHEDULED)
return ScheduleStatus(job['type'], time=job.get('dt')) # type: ignore[arg-type]

def create_backup(self, profile_id: int) -> None:
def create_backup(self, profile_id: int, job_id: int | None = None) -> None:
notifier = VortaNotifications.pick()
profile = BackupProfileModel.get_or_none(id=profile_id)

if profile is None:
logger.info('Profile not found. Maybe deleted?')
return

if profile.repo is None:
logger.info('Profile %s has no repository configured.', profile_id)
self._mark_job_skipped(job_id, 'missing_repo', self.tr('No repository configured.'))
return

if job_id is None:
job_id = self._ensure_scheduled_job(profile, dt.now()).id

# Skip if a job for this profile (repo) is already in progress
if self.app.jobs_manager.is_worker_running(site=profile.repo.id):
logger.debug('A job for repo %s is already active.', profile.repo.id)
self._mark_job_skipped(
job_id,
'repo_busy',
self.tr('Skipped scheduled backup because another repository job is already running.'),
)
self.pause(profile_id)
return

Expand All @@ -445,13 +567,16 @@ def create_backup(self, profile_id: int) -> None:
msg = BorgCreateJob.prepare(profile)
if msg['ok']:
logger.info('Preparation for backup successful.')
self._mark_job_queued(job_id)
msg['category'] = 'scheduled'
msg['job_model_id'] = job_id
job = BorgCreateJob(msg['cmd'], msg, profile.repo.id)
job.result.connect(self.notify)
self.app.jobs_manager.add_job(job)
else:
logger.error('Conditions for backup not met. Aborting.')
logger.error(msg['message'])
self._mark_job_skipped(job_id, 'preparation_failed', translate('messages', msg['message']))
notifier.deliver(
self.tr('Vorta Backup'),
translate('messages', msg['message']),
Expand Down
4 changes: 3 additions & 1 deletion src/vorta/store/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
BackupProfileModel,
EventLogModel,
ExclusionModel,
JobModel,
RepoModel,
RepoPassword,
SchemaVersion,
Expand All @@ -27,7 +28,7 @@
)
from .settings import get_misc_settings

SCHEMA_VERSION = 23
SCHEMA_VERSION = 24


@signals.post_save(sender=SettingsModel)
Expand Down Expand Up @@ -57,6 +58,7 @@ def init_db(con: pw.SqliteDatabase | None = None) -> None:
ArchiveModel,
WifiSettingModel,
EventLogModel,
JobModel,
SchemaVersion,
ExclusionModel,
]
Expand Down
3 changes: 3 additions & 0 deletions src/vorta/store/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ def run_migrations(current_schema: SchemaVersion, db_connection: pw.SqliteDataba
),
)

if current_schema.version < 24:
_apply_schema_update(current_schema, 24)


def _apply_schema_update(current_schema: SchemaVersion, version_after: int, *operations: Operation) -> None:
with DB.atomic():
Expand Down
Loading
Loading