-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
105 lines (77 loc) · 3.87 KB
/
models.py
File metadata and controls
105 lines (77 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from datetime import datetime, timedelta, timezone
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Scan(db.Model):
"""Represents a single nmap scan execution.
Baseline scans serve as the reference point for change detection.
Only one scan per target should be marked as the baseline at a time;
enforcing that uniqueness is left to the application layer.
"""
__tablename__ = "scans"
id = db.Column(db.Integer, primary_key=True)
target = db.Column(db.String(255), nullable=False)
flags = db.Column(db.String(500), nullable=False, default="")
started_at = db.Column(
db.DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)
)
completed_at = db.Column(db.DateTime, nullable=True)
xml_file_path = db.Column(db.String(500), nullable=True)
status = db.Column(
db.String(20), nullable=False, default="pending"
) # pending, running, completed, failed
# Change-detection fields
is_baseline = db.Column(db.Boolean, nullable=False, default=False)
label = db.Column(db.String(100), nullable=True) # e.g. "post-patch baseline"
hosts = db.relationship("Host", backref="scan", cascade="all, delete-orphan")
def __repr__(self) -> str:
baseline_flag = " [baseline]" if self.is_baseline else ""
return f"<Scan {self.id} target={self.target} status={self.status}{baseline_flag}>"
class Host(db.Model):
"""A host discovered during a scan."""
__tablename__ = "hosts"
id = db.Column(db.Integer, primary_key=True)
scan_id = db.Column(db.Integer, db.ForeignKey("scans.id"), nullable=False)
address = db.Column(db.String(45), nullable=False) # IPv4 or IPv6
hostname = db.Column(db.String(255), nullable=True)
status = db.Column(db.String(10), nullable=False, default="up")
ports = db.relationship("Port", backref="host", cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"<Host {self.address} status={self.status}>"
class Port(db.Model):
"""A port found on a host during a scan."""
__tablename__ = "ports"
id = db.Column(db.Integer, primary_key=True)
host_id = db.Column(db.Integer, db.ForeignKey("hosts.id"), nullable=False)
port_number = db.Column(db.Integer, nullable=False)
protocol = db.Column(db.String(10), nullable=False, default="tcp")
state = db.Column(db.String(20), nullable=False, default="open")
service_name = db.Column(db.String(100), nullable=True)
service_version = db.Column(db.String(200), nullable=True)
def __repr__(self) -> str:
return f"<Port {self.port_number}/{self.protocol} state={self.state}>"
class Schedule(db.Model):
"""A recurring scan schedule.
When enabled, the scheduler will run a scan against the configured target
every interval_minutes minutes. next_run_at is updated after each execution.
"""
__tablename__ = "schedules"
id = db.Column(db.Integer, primary_key=True)
target = db.Column(db.String(255), nullable=False)
flags = db.Column(db.String(500), nullable=False, default="")
interval_minutes = db.Column(db.Integer, nullable=False, default=60)
enabled = db.Column(db.Boolean, nullable=False, default=True)
created_at = db.Column(
db.DateTime, nullable=False, default=lambda: datetime.now(timezone.utc)
)
last_run_at = db.Column(db.DateTime, nullable=True)
next_run_at = db.Column(
db.DateTime,
nullable=False,
default=lambda: datetime.now(timezone.utc), # run at next scheduler tick
)
# Optional URL to POST a JSON change summary to after each scan.
# Leave blank to disable notifications for this schedule.
webhook_url = db.Column(db.String(500), nullable=True)
def __repr__(self) -> str:
status = "enabled" if self.enabled else "disabled"
return f"<Schedule {self.id} target={self.target} every={self.interval_minutes}m {status}>"