Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
{
"label": "pytest integration",
"type": "shell",
"command": "cd tests/integration; PYTHONPATH=../..: pytest -m sqlite3 -vv",
"command": "cd tests/integration; PYTHONPATH=../..: pytest -m 'not postgres' -vv",
"problemMatcher": []
},
{
"label": "pytest integration last failed",
"type": "shell",
"command": "cd tests/integration; PYTHONPATH=../..: pytest -m sqlite3 -vv --last-failed",
"command": "cd tests/integration; PYTHONPATH=../..: pytest -m 'not postgres' -vv --last-failed",
"problemMatcher": []
},
{
Expand Down
4 changes: 2 additions & 2 deletions lufa/api_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from lufa.auth import ro_token_required, sanitize, token_required, with_json_data
from lufa.decorators import debug_only
from lufa.provider import get_api_repository, get_awx_client, get_database_manager
from lufa.repository.api_repository import LufaKeyError
from lufa.repository.api_repository import JobExport, LufaKeyError

MALFORMED_JSON = {"error": "Malformed json"}

Expand Down Expand Up @@ -243,7 +243,7 @@ def jobs_export(tower_job_id: int):
"tasks": list,
},
)
def import_job(data: dict):
def import_job(data: JobExport):
"""
Imports a job from export data.

Expand Down
2 changes: 1 addition & 1 deletion lufa/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def init_db(self) -> None:

def get_db_now(self) -> str:
cur = self.get_db_connection().cursor()
cur.execute("select datetime('now') as now;")
cur.execute("select strftime('%Y-%m-%d %H:%M', datetime('now')) as now;")
return cur.fetchone()["now"]


Expand Down
96 changes: 70 additions & 26 deletions lufa/repository/api_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import sqlite3
from abc import ABC, abstractmethod
from typing import Optional, TypeAlias, TypedDict
from typing import Optional, TypeAlias, TypedDict, cast

from psycopg2.errors import ForeignKeyViolation, InvalidDatetimeFormat, InvalidTextRepresentation

Expand All @@ -29,7 +29,11 @@ class Callback(TypedDict):
ansible_host: str
state: JobState
module: str
result_dump: str
result_dump: JSon


class CallbackExport(Callback):
timestamp: TimeStamp


class Task(TypedDict):
Expand All @@ -38,6 +42,12 @@ class Task(TypedDict):
task_name: str


class TaskExport(TypedDict):
ansible_uuid: str
task_name: str
callbacks: list[CallbackExport]


class TowerJobStats(TypedDict):
ansible_host: str
ok: int
Expand All @@ -59,6 +69,41 @@ class JobTemplateComplianceStates(TypedDict):
organisation: str


class FullJob(TypedDict):
tower_job_id: int
tower_job_template_id: int
tower_job_template_name: str
ansible_limit: str
tower_user_name: str
awx_tags: list[str]
extra_vars: JSon
artifacts: JSon
tower_schedule_id: int
tower_schedule_name: str
tower_workflow_job_id: int
tower_workflow_job_name: str
start_time: TimeStamp
end_time: TimeStamp | None
state: JobState


class TowerJobTemplate(TypedDict):
tower_job_template_id: int
tower_job_template_name: str
playbook_path: str
compliance_interval: int
awx_organisation: str
template_infos: str | None


class JobExport(TypedDict):
exported_at: TimeStamp
job: FullJob
job_template: TowerJobTemplate
stats: list[TowerJobStats]
tasks: list[TaskExport]


class ApiRepository(ABC):
@abstractmethod
def get_all_noncompliant_hosts(self) -> dict[str, list[JobTemplateComplianceStates]]:
Expand Down Expand Up @@ -136,12 +181,12 @@ def add_stats(self, tower_job_id: int, stats: list[TowerJobStats]) -> None:
pass

@abstractmethod
def export_job(self, tower_job_id: int) -> dict:
def export_job(self, tower_job_id: int) -> JobExport:
"""Exports complete job data with tasks and callbacks"""
pass

@abstractmethod
def import_job(self, job_data: dict) -> int:
def import_job(self, export_data: JobExport) -> int:
"""Imports a job from a dict
Returns the tower_job_id of the imported job.
"""
Expand Down Expand Up @@ -392,7 +437,7 @@ def update_job(

conn.commit()

def export_job(self, tower_job_id: int) -> dict:
def export_job(self, tower_job_id: int) -> JobExport:
"""Exports complete job data with tasks and callbacks"""
conn = self.db_manager.get_db_connection()
cursor = conn.cursor()
Expand Down Expand Up @@ -486,10 +531,10 @@ def export_job(self, tower_job_id: int) -> dict:
}
)

tasks_with_callbacks = list(tasks_dict.values())
tasks_with_callbacks = cast(list[TaskExport], list(tasks_dict.values()))

# build export structure
export_data = {
export_data: JobExport = {
"exported_at": self.db_manager.get_db_now(),
"job": {
"tower_job_id": job["tower_job_id"],
Expand Down Expand Up @@ -522,7 +567,7 @@ def export_job(self, tower_job_id: int) -> dict:

return export_data

def import_job(self, export_data: dict) -> int:
def import_job(self, export_data: JobExport) -> int:
"""
Imports a job from export data.

Expand All @@ -548,7 +593,7 @@ def import_job(self, export_data: dict) -> int:

try:
# insert/update job_template
template_infos_value = template_data.get("template_infos", {})
template_infos_value = cast(str, template_data.get("template_infos", {}))
if template_infos_value is not None:
template_infos_json = json.dumps(template_infos_value)
else:
Expand Down Expand Up @@ -675,7 +720,7 @@ def import_job(self, export_data: dict) -> int:

# insert callbacks for this task
for callback in task.get("callbacks", []):
result_dump = json.dumps(callback["result_dump"])
result_dump = callback["result_dump"]

cursor.execute(
"""
Expand Down Expand Up @@ -959,7 +1004,7 @@ def update_job(
raise LufaKeyError("tower_job_id", tower_job_id) from ex
db_conn.commit()

def export_job(self, tower_job_id: int) -> dict:
def export_job(self, tower_job_id: int) -> JobExport:
conn = self.db_manager.get_db_connection()
cursor = conn.cursor()

Expand Down Expand Up @@ -1029,10 +1074,10 @@ def export_job(self, tower_job_id: int) -> dict:
(tower_job_id,),
)

tasks_with_callbacks = cursor.fetchall()
tasks_with_callbacks: list[TaskExport] = cursor.fetchall()

# build export structure
export_data = {
export_data: JobExport = {
"exported_at": self.db_manager.get_db_now(),
"job": {
"tower_job_id": job["tower_job_id"],
Expand All @@ -1041,8 +1086,8 @@ def export_job(self, tower_job_id: int) -> dict:
"ansible_limit": job["ansible_limit"],
"tower_user_name": job["tower_user_name"],
"awx_tags": job["awx_tags"],
"extra_vars": job["extra_vars"],
"artifacts": job["artifacts"],
"extra_vars": json.dumps(job["extra_vars"]),
"artifacts": json.dumps(job["artifacts"]),
"tower_schedule_id": job["tower_schedule_id"],
"tower_schedule_name": job["tower_schedule_name"],
"tower_workflow_job_id": job["tower_workflow_job_id"],
Expand All @@ -1064,14 +1109,18 @@ def export_job(self, tower_job_id: int) -> dict:
{
"ansible_uuid": task["ansible_uuid"],
"task_name": task["task_name"],
"callbacks": task["callbacks"] if task["callbacks"] else [],
"callbacks": (
[{**cb, "result_dump": json.dumps(cb["result_dump"])} for cb in task["callbacks"]]
if task["callbacks"]
else []
),
}
for task in tasks_with_callbacks
],
}
return export_data

def import_job(self, export_data: dict) -> int:
def import_job(self, export_data: JobExport) -> int:
"""Imports a job from a dict
Returns the tower_job_id of the imported job.
"""
Expand Down Expand Up @@ -1121,14 +1170,9 @@ def import_job(self, export_data: dict) -> int:
)

# insert job
# ensure extra_vars and artifacts are strings
extra_vars = job_data.get("extra_vars", "{}")
if isinstance(extra_vars, dict):
extra_vars = json.dumps(extra_vars)

artifacts = job_data.get("artifacts", "{}")
if isinstance(artifacts, dict):
artifacts = json.dumps(artifacts)
# ensure extra_vars and artifacts are dicts
extra_vars = cast(dict, job_data.get("extra_vars", {}))
artifacts = cast(dict, job_data.get("artifacts", {}))

cursor.execute(
"""
Expand Down Expand Up @@ -1212,7 +1256,7 @@ def import_job(self, export_data: dict) -> int:

# insert callbacks for this task
for callback in task.get("callbacks", []):
result_dump = json.dumps(callback["result_dump"])
result_dump = callback["result_dump"]
cursor.execute(
"""
INSERT INTO task_callbacks (task_ansible_uuid,
Expand Down
Loading
Loading