Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
646 changes: 646 additions & 0 deletions spp_change_request_v2/README.rst

Large diffs are not rendered by default.

164 changes: 159 additions & 5 deletions spp_change_request_v2/models/change_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,27 @@ def _compute_is_cr_manager(self):
applied_by_id = fields.Many2one("res.users", readonly=True)
apply_error = fields.Text(readonly=True)

# ══════════════════════════════════════════════════════════════════════════
# DYNAMIC APPROVAL
# ══════════════════════════════════════════════════════════════════════════

selected_field_name = fields.Char(
string="Field Being Modified",
readonly=True,
help="The detail field selected for modification (set when detail is saved). "
"Used by CEL conditions to determine the approval workflow.",
)
selected_field_old_value = fields.Char(
string="Old Value",
readonly=True,
help="Human-readable old value of the selected field (from registrant). Stored for audit trail.",
)
selected_field_new_value = fields.Char(
string="New Value",
readonly=True,
help="Human-readable new value of the selected field (from detail). Stored for audit trail.",
)

# ══════════════════════════════════════════════════════════════════════════
# LOG
# ══════════════════════════════════════════════════════════════════════════
Expand Down Expand Up @@ -596,7 +617,11 @@ def create(self, vals_list):
record._create_audit_event("created", None, "draft")
record._create_log("created")
# Run conflict detection after creation
if hasattr(record, "_run_conflict_checks"):
# Skip for dynamic approval — field_to_modify isn't set yet at create time.
# Checks run when selected_field_name is written (see conflict model's write()).
if hasattr(record, "_run_conflict_checks") and not (
record.request_type_id and record.request_type_id.use_dynamic_approval
):
record._run_conflict_checks()
return records

Expand Down Expand Up @@ -813,17 +838,137 @@ def action_submit_for_approval(self):

def _get_approval_definition(self):
self.ensure_one()
definition = self.request_type_id.approval_definition_id
cr_type = self.request_type_id

# Dynamic approval: evaluate candidates using selected field + values
if cr_type.use_dynamic_approval and cr_type.candidate_definition_ids:
definition = self._resolve_dynamic_approval()
if definition:
return definition

# Fallback to static definition (existing behavior)
definition = cr_type.approval_definition_id
if not definition:
raise UserError(
_(
"No approval workflow configured for request type '%s'. "
"Please configure an approval definition in Change Request > Configuration > CR Types."
"Please configure an approval definition in Change Request > "
"Configuration > CR Types."
)
% self.request_type_id.name
% cr_type.name
)
return definition

def _resolve_dynamic_approval(self):
"""Evaluate candidate definitions against selected field and values.

Iterates candidates in sequence order; first match wins.
Returns spp.approval.definition record, or None if no candidate matches.
"""
self.ensure_one()

if not self.selected_field_name:
return None

extra_context = self._compute_field_values_for_cel()
evaluator = self.env["spp.cel.evaluator"]

for candidate in self.request_type_id.candidate_definition_ids.sorted("sequence"):
if not candidate.use_cel_condition or not candidate.cel_condition:
# No condition = catch-all (matches everything)
return candidate
try:
result = evaluator.evaluate(candidate.cel_condition, self, extra_context)
if result:
return candidate
except Exception:
_logger.warning(
"CEL evaluation failed for candidate definition '%s' on CR %s, skipping",
candidate.name,
self.name,
exc_info=True,
)
continue

return None

def _compute_field_values_for_cel(self):
"""Compute typed old and new values for CEL evaluation.

Returns a dict with old_value and new_value typed according to field type.
"""
self.ensure_one()
field_name = self.selected_field_name
cr_type = self.request_type_id

if not field_name:
return {"old_value": None, "new_value": None}

mapping = cr_type.apply_mapping_ids.filtered(lambda m: m.source_field == field_name)[:1]

detail = self.get_detail()
registrant = self.registrant_id

old_raw = None
new_raw = None

if mapping and registrant:
old_raw = getattr(registrant, mapping.target_field, None)
if detail:
new_raw = getattr(detail, field_name, None)

return {
"old_value": self._normalize_value_for_cel(old_raw, registrant, mapping.target_field if mapping else None),
"new_value": self._normalize_value_for_cel(new_raw, detail, field_name),
}

def _normalize_value_for_cel(self, value, record, field_name):
"""Normalize an Odoo field value for use in CEL expressions."""
if value is None or value is False:
if record and field_name and field_name in record._fields:
field = record._fields[field_name]
if field.type == "boolean":
return False
if field.type in ("integer", "float", "monetary"):
return 0
return None

if record and field_name and field_name in record._fields:
field = record._fields[field_name]
if field.type in ("char", "text", "selection", "html"):
return value or ""
if field.type in ("integer", "float", "monetary"):
return value or 0
if field.type == "boolean":
return bool(value)
if field.type in ("date", "datetime"):
return value
if field.type == "many2one":
# IDs are exposed for internal CEL evaluation only, not for external APIs.
result = {
"id": value.id if value else 0,
"name": value.display_name if value else "",
}
# Vocabulary models: expose machine-readable code for stable CEL matching
if value and "code" in value._fields:
result["code"] = value.code or ""
# Hierarchical vocabularies: expose parent category
if value and "parent_id" in value._fields and value.parent_id:
parent = value.parent_id
result["parent"] = {
"id": parent.id,
"name": parent.display_name,
"code": parent.code if "code" in parent._fields else "",
}
return result
if field.type in ("one2many", "many2many"):
return {
"ids": value.ids if value else [],
"count": len(value) if value else 0,
}

return value

def _on_approve(self):
super()._on_approve()
# Signal ORM that approval_state changed (set via raw SQL in _do_approve)
Expand All @@ -840,10 +985,19 @@ def _on_reject(self, reason):
self._create_log("rejected", notes=reason)

def _check_can_submit(self):
"""Override to allow resubmission from revision state."""
"""Override to allow resubmission and validate dynamic approval field selection."""
self.ensure_one()
if self.approval_state not in ("draft", "revision"):
raise UserError(_("Only draft or revision-requested records can be submitted for approval."))
cr_type = self.request_type_id
if cr_type.use_dynamic_approval and not self.selected_field_name:
raise ValidationError(
_(
"Please select a field to modify on the detail form before "
"submitting for approval. This CR type requires a single "
"field selection for dynamic approval routing."
)
)

def _on_submit(self):
# Run conflict checks before submission
Expand Down
11 changes: 8 additions & 3 deletions spp_change_request_v2/models/change_request_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ def create(self, vals_list):
records = super().create(vals_list)

for record in records:
# Run conflict checks if enabled for this CR type
# Run conflict checks if enabled for this CR type.
# Skip for dynamic approval — field_to_modify isn't set yet.
# Checks run when selected_field_name is set (see write() trigger).
if record.request_type_id and (
record.request_type_id.enable_conflict_detection or record.request_type_id.enable_duplicate_detection
):
if record.request_type_id.use_dynamic_approval:
continue
try:
record._run_conflict_checks()
except Exception as e:
Expand All @@ -47,8 +51,9 @@ def write(self, vals):
"""Re-run conflict detection if relevant fields change."""
result = super().write(vals)

# Re-check conflicts if registrant or type changed
if "registrant_id" in vals or "request_type_id" in vals:
# Re-check conflicts if registrant, type, or selected field changed
trigger_fields = {"registrant_id", "request_type_id", "selected_field_name"}
if trigger_fields & set(vals):
for record in self:
if (
record.request_type_id
Expand Down
84 changes: 84 additions & 0 deletions spp_change_request_v2/models/change_request_detail_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,44 @@ def _compute_is_cr_manager(self):
stage = fields.Selection(
related="change_request_id.stage",
)
use_dynamic_approval = fields.Boolean(
related="change_request_id.request_type_id.use_dynamic_approval",
)
field_to_modify = fields.Selection(
selection="_get_field_to_modify_selection",
string="Field to Modify",
help="Select which field to update in this change request",
)

@api.model
def _get_field_to_modify_selection(self):
"""Return available field options for field-level change requests.

Override in concrete detail models to provide the list of modifiable fields.
Returns a list of (value, label) tuples, e.g.:
[("poverty_status_id", "Poverty Status"), ("set_group_id", "Set Group")]
"""
return []

def write(self, vals):
result = super().write(vals)
if "field_to_modify" in vals:
for rec in self:
if rec.change_request_id:
rec._sync_field_to_modify()
else:
for rec in self:
if rec.field_to_modify and rec.field_to_modify in vals and rec.change_request_id:
rec._sync_field_to_modify()
return result
Comment on lines +75 to +85

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The write method can be simplified to improve readability and avoid iterating over self in two separate blocks. The current implementation correctly handles syncing when field_to_modify or its target field's value changes, but the logic can be combined into a single loop for better maintainability.

Suggested change
def write(self, vals):
result = super().write(vals)
if "field_to_modify" in vals:
for rec in self:
if rec.change_request_id:
rec._sync_field_to_modify()
else:
for rec in self:
if rec.field_to_modify and rec.field_to_modify in vals and rec.change_request_id:
rec._sync_field_to_modify()
return result
def write(self, vals):
result = super().write(vals)
for rec in self:
if not rec.change_request_id:
continue
# Sync if the field selector is changed, or if the value of the selected field is changed.
if "field_to_modify" in vals or (rec.field_to_modify and rec.field_to_modify in vals):
rec._sync_field_to_modify()
return result


@api.model_create_multi
def create(self, vals_list):
records = super().create(vals_list)
for rec in records:
if rec.field_to_modify and rec.change_request_id:
rec._sync_field_to_modify()
return records

def action_proceed_to_cr(self):
"""Navigate to the parent Change Request form if there are proposed changes."""
Expand Down Expand Up @@ -120,6 +158,52 @@ def action_request_revision(self):
self.ensure_one()
return self.change_request_id.action_request_revision()

# ══════════════════════════════════════════════════════════════════════════
# DYNAMIC APPROVAL SYNC
# ══════════════════════════════════════════════════════════════════════════

def _sync_field_to_modify(self):
"""Sync field_to_modify and its old/new values to the parent CR."""
self.ensure_one()
cr = self.change_request_id
if not cr:
return
# Only sync for dynamic-approval CR types
if not cr.request_type_id.use_dynamic_approval:
return

field_name = self.field_to_modify
cr_vals = {
"selected_field_name": field_name,
"selected_field_old_value": False,
"selected_field_new_value": False,
}

if field_name:
mapping = cr.request_type_id.apply_mapping_ids.filtered(lambda m: m.source_field == field_name)[:1]

if mapping:
registrant = cr.registrant_id
old_raw = getattr(registrant, mapping.target_field, None)
cr_vals["selected_field_old_value"] = self._format_value_for_display(old_raw)

new_raw = getattr(self, field_name, None)
cr_vals["selected_field_new_value"] = self._format_value_for_display(new_raw)

cr.write(cr_vals)

def _format_value_for_display(self, value):
"""Format a field value as a human-readable string for audit display."""
# Boolean check MUST come before the falsy check,
# otherwise False displays as "" instead of "No"
if isinstance(value, bool):
return _("Yes") if value else _("No")
if value is None or value is False:
return ""
if hasattr(value, "display_name"):
return value.display_name or ""
return str(value)

# ══════════════════════════════════════════════════════════════════════════
# PREFILL FROM REGISTRANT
# ══════════════════════════════════════════════════════════════════════════
Expand Down
15 changes: 15 additions & 0 deletions spp_change_request_v2/models/change_request_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,21 @@ def _onchange_available_document_ids(self):
string="Approval Workflow",
)
auto_approve_from_event = fields.Boolean(default=False)
use_dynamic_approval = fields.Boolean(
string="Dynamic Approval",
default=False,
help="When enabled, user selects a single field to modify per CR. "
"The selected field determines which approval workflow applies.",
)
candidate_definition_ids = fields.Many2many(
"spp.approval.definition",
"cr_type_candidate_definition_rel",
"type_id",
"definition_id",
string="Candidate Approval Definitions",
help="Evaluated in sequence order; first matching CEL condition wins. "
"If none match, the default Approval Workflow is used.",
)

# ══════════════════════════════════════════════════════════════════════════
# CONFLICT DETECTION
Expand Down
Loading
Loading