-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathactions_executor.py
More file actions
106 lines (85 loc) · 3.78 KB
/
actions_executor.py
File metadata and controls
106 lines (85 loc) · 3.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""Executes triage actions on GitHub notifications."""
from __future__ import annotations
import logging
from typing import Any, Dict, List
from github_api_client import GitHubAPIClient
logger = logging.getLogger(__name__)
# Map priority+action combinations to concrete GitHub operations
_ACTION_HANDLERS = {
"mute": "_mute",
"archive": "_archive",
"review_now": "_flag_review_now",
"review_later": "_noop",
}
class ActionsExecutor:
"""Executes triage actions returned by the LLM classifier."""
def __init__(self, client: GitHubAPIClient, dry_run: bool = False):
self._client = client
self._dry_run = dry_run
self._executed: List[Dict] = []
def execute(self, notification: Dict[str, Any], classification: Dict[str, Any]) -> Dict:
"""Execute the appropriate action for a classified notification.
Args:
notification: Raw GitHub notification dict
classification: Result from LLMClassifier.classify()
Returns:
Dict with keys: thread_id, action, priority, executed, dry_run
"""
thread_id = notification["id"]
action = classification.get("action", "review_later")
priority = classification.get("priority", "P2")
result = {
"thread_id": thread_id,
"action": action,
"priority": priority,
"executed": False,
"dry_run": self._dry_run,
}
handler_name = _ACTION_HANDLERS.get(action, "_noop")
handler = getattr(self, handler_name, self._noop)
if self._dry_run:
logger.info("[DRY RUN] Would execute %s on thread %s", action, thread_id)
result["executed"] = True
else:
try:
handler(notification, thread_id)
result["executed"] = True
logger.info("Executed %s on thread %s (priority=%s)", action, thread_id, priority)
except Exception as exc:
logger.error("Action %s failed on thread %s: %s", action, thread_id, exc)
result["error"] = str(exc)
self._executed.append(result)
return result
def get_execution_summary(self) -> Dict:
"""Return summary stats for this session."""
total = len(self._executed)
by_action = {}
for r in self._executed:
by_action[r["action"]] = by_action.get(r["action"], 0) + 1
return {"total": total, "by_action": by_action}
# -----------------------------------------------------------------------
# Private action handlers
# -----------------------------------------------------------------------
def _mute(self, notification: Dict, thread_id: str) -> None:
self._client.mute_thread(thread_id)
self._client.mark_thread_read(thread_id)
def _archive(self, notification: Dict, thread_id: str) -> None:
self._client.mark_thread_read(thread_id)
def _flag_review_now(self, notification: Dict, thread_id: str) -> None:
# Mark unread so it stays prominent; optionally add P1 label
repo = notification.get("repository", {})
owner_repo = repo.get("full_name", "")
subject = notification.get("subject", {})
ntype = subject.get("type", "")
if owner_repo and ntype in ("PullRequest", "Issue"):
try:
parts = owner_repo.split("/")
# Extract issue/PR number from URL
url = subject.get("url", "")
if url:
number = int(url.rstrip("/").split("/")[-1])
self._client.add_label(parts[0], parts[1], number, "P1-review-now")
except Exception as exc:
logger.debug("Labelling skipped: %s", exc)
def _noop(self, notification: Dict, thread_id: str) -> None:
pass