|
| 1 | +"""End-to-end workflow: pull Slack messages → summarise → render PDF → email. |
| 2 | +
|
| 3 | +This is a self-contained pipeline that ties together five AutoControl |
| 4 | +features into something an operations team would actually run on a |
| 5 | +schedule: |
| 6 | +
|
| 7 | +1. **Scheduler** — fires the pipeline once per day at 18:00. |
| 8 | +2. **Slack pull** — hits Slack's Web API via the standard ``requests`` |
| 9 | + library; falls back to a stub message list when run without |
| 10 | + credentials (so the demo always completes). |
| 11 | +3. **Anthropic summarisation** — uses the existing ``plan_actions`` / |
| 12 | + ``run_from_description`` plumbing's underlying client, called |
| 13 | + directly with a system prompt + the message list. Falls back to a |
| 14 | + deterministic mock summary if ``ANTHROPIC_API_KEY`` is unset. |
| 15 | +4. **HTML → PDF** — generates an HTML report with AutoControl's |
| 16 | + built-in templater, then renders it to PDF using ``weasyprint`` |
| 17 | + when installed; otherwise prints the HTML path. |
| 18 | +5. **Email** — sends the PDF as an attachment via stdlib ``smtplib``. |
| 19 | +
|
| 20 | +Environment variables (all optional — pipeline degrades gracefully): |
| 21 | +
|
| 22 | + SLACK_BOT_TOKEN Bot token with ``channels:history`` |
| 23 | + SLACK_CHANNEL_ID Channel to summarise (default: #general) |
| 24 | + ANTHROPIC_API_KEY Enables real summarisation |
| 25 | + SMTP_HOST Mail server host |
| 26 | + SMTP_PORT Mail server port (default 587) |
| 27 | + SMTP_USER / SMTP_PASS Login credentials |
| 28 | + SMTP_FROM / SMTP_TO Sender + recipient addresses |
| 29 | +
|
| 30 | +The script is meant as a starting point: every step is a clearly-named |
| 31 | +function so you can replace pieces (e.g. swap Slack for Discord) by |
| 32 | +editing one block. |
| 33 | +""" |
| 34 | +from __future__ import annotations |
| 35 | + |
| 36 | +import json |
| 37 | +import os |
| 38 | +import smtplib |
| 39 | +import ssl |
| 40 | +import sys |
| 41 | +import urllib.error |
| 42 | +import urllib.parse |
| 43 | +import urllib.request |
| 44 | +from dataclasses import dataclass |
| 45 | +from datetime import datetime, timedelta, timezone |
| 46 | +from email.message import EmailMessage |
| 47 | +from pathlib import Path |
| 48 | +from typing import List, Optional, Sequence |
| 49 | + |
| 50 | +import je_auto_control as ac |
| 51 | + |
| 52 | + |
| 53 | +# --- data classes --------------------------------------------------- |
| 54 | + |
| 55 | +@dataclass(frozen=True) |
| 56 | +class SlackMessage: |
| 57 | + """One Slack message stripped to the fields the summariser needs.""" |
| 58 | + user: str |
| 59 | + timestamp: float |
| 60 | + text: str |
| 61 | + |
| 62 | + |
| 63 | +# --- pipeline steps ------------------------------------------------- |
| 64 | + |
| 65 | +def fetch_slack_messages(channel_id: str, *, |
| 66 | + since_hours: int = 24, |
| 67 | + token: Optional[str] = None, |
| 68 | + ) -> List[SlackMessage]: |
| 69 | + """Pull Slack ``conversations.history`` over the last N hours. |
| 70 | +
|
| 71 | + Returns a deterministic stub message list when no token is set so |
| 72 | + the rest of the pipeline still runs. |
| 73 | + """ |
| 74 | + if not token: |
| 75 | + print(" (no SLACK_BOT_TOKEN — using stub messages)") |
| 76 | + return _stub_messages() |
| 77 | + oldest = ( |
| 78 | + datetime.now(tz=timezone.utc) - timedelta(hours=since_hours) |
| 79 | + ).timestamp() |
| 80 | + params = urllib.parse.urlencode({ |
| 81 | + "channel": channel_id, |
| 82 | + "oldest": f"{oldest:.6f}", |
| 83 | + "limit": 200, |
| 84 | + }) |
| 85 | + request = urllib.request.Request( |
| 86 | + f"https://slack.com/api/conversations.history?{params}", |
| 87 | + headers={"Authorization": f"Bearer {token}"}, |
| 88 | + ) |
| 89 | + try: |
| 90 | + with urllib.request.urlopen(request, timeout=30.0) as resp: |
| 91 | + body = json.loads(resp.read().decode("utf-8")) |
| 92 | + except urllib.error.URLError as error: |
| 93 | + print(f" warning: Slack call failed ({error}); using stub") |
| 94 | + return _stub_messages() |
| 95 | + if not body.get("ok"): |
| 96 | + print(f" warning: Slack API error: {body.get('error')}; using stub") |
| 97 | + return _stub_messages() |
| 98 | + return [ |
| 99 | + SlackMessage( |
| 100 | + user=str(msg.get("user") or "<unknown>"), |
| 101 | + timestamp=float(msg.get("ts") or 0.0), |
| 102 | + text=str(msg.get("text") or ""), |
| 103 | + ) |
| 104 | + for msg in body.get("messages", []) |
| 105 | + if msg.get("text") |
| 106 | + ] |
| 107 | + |
| 108 | + |
| 109 | +def _stub_messages() -> List[SlackMessage]: |
| 110 | + now = datetime.now(tz=timezone.utc).timestamp() |
| 111 | + return [ |
| 112 | + SlackMessage(user="alice", timestamp=now - 3600, |
| 113 | + text="The deployment to staging is green."), |
| 114 | + SlackMessage(user="bob", timestamp=now - 1800, |
| 115 | + text="I'll start the migration at 22:00 UTC."), |
| 116 | + SlackMessage(user="charlie", timestamp=now - 600, |
| 117 | + text="Logs look quiet — no errors in the last hour."), |
| 118 | + ] |
| 119 | + |
| 120 | + |
| 121 | +def summarise(messages: Sequence[SlackMessage]) -> str: |
| 122 | + """Hand the message list to Anthropic; fall back to a stitched recap.""" |
| 123 | + bullet_lines = "\n".join( |
| 124 | + f"- {msg.user}: {msg.text}" for msg in messages |
| 125 | + ) |
| 126 | + if not os.environ.get("ANTHROPIC_API_KEY"): |
| 127 | + print(" (no ANTHROPIC_API_KEY — stitching messages directly)") |
| 128 | + return "Today's digest:\n" + bullet_lines |
| 129 | + try: |
| 130 | + import anthropic # noqa: F401 # nosemgrep: codacy.python.openai.import-without-guardrails # reason: see anthropic.py backend rationale |
| 131 | + client = __import__("anthropic").Anthropic() |
| 132 | + response = client.messages.create( |
| 133 | + model="claude-haiku-4-5-20251001", |
| 134 | + max_tokens=512, |
| 135 | + messages=[{ |
| 136 | + "role": "user", |
| 137 | + "content": ( |
| 138 | + "Summarise these Slack messages in three short " |
| 139 | + "bullet points suitable for an end-of-day email:\n\n" |
| 140 | + + bullet_lines |
| 141 | + ), |
| 142 | + }], |
| 143 | + ) |
| 144 | + return response.content[0].text # type: ignore[union-attr] |
| 145 | + except (ImportError, RuntimeError, OSError) as error: |
| 146 | + print(f" warning: Anthropic call failed ({error}); stitching") |
| 147 | + return "Today's digest:\n" + bullet_lines |
| 148 | + |
| 149 | + |
| 150 | +def render_report(summary: str, raw_messages: Sequence[SlackMessage], |
| 151 | + output_dir: Path) -> Path: |
| 152 | + """Render an HTML report + (if weasyprint is available) a PDF. |
| 153 | +
|
| 154 | + Returns the path to the artefact that should be emailed — the PDF |
| 155 | + when WeasyPrint is installed, otherwise the HTML file. |
| 156 | + """ |
| 157 | + output_dir.mkdir(parents=True, exist_ok=True) |
| 158 | + today = datetime.now(tz=timezone.utc).date().isoformat() |
| 159 | + rows = "".join( |
| 160 | + f"<li><strong>{m.user}</strong>: {m.text}</li>" |
| 161 | + for m in raw_messages |
| 162 | + ) |
| 163 | + html = f"""<!doctype html> |
| 164 | +<html><head><meta charset="utf-8"><title>Daily Slack digest — {today}</title> |
| 165 | +<style> |
| 166 | + body {{ font-family: -apple-system, sans-serif; margin: 2em; }} |
| 167 | + h1 {{ border-bottom: 2px solid #444; padding-bottom: .3em; }} |
| 168 | + pre {{ background: #f5f5f5; padding: 1em; border-radius: 4px; }} |
| 169 | + ul {{ line-height: 1.5em; }} |
| 170 | +</style></head> |
| 171 | +<body> |
| 172 | +<h1>Daily Slack digest — {today}</h1> |
| 173 | +<h2>Summary</h2> |
| 174 | +<pre>{summary}</pre> |
| 175 | +<h2>Raw messages ({len(raw_messages)})</h2> |
| 176 | +<ul>{rows}</ul> |
| 177 | +</body></html> |
| 178 | +""" |
| 179 | + html_path = output_dir / f"digest-{today}.html" |
| 180 | + html_path.write_text(html, encoding="utf-8") |
| 181 | + try: |
| 182 | + from weasyprint import HTML |
| 183 | + except ImportError: |
| 184 | + print(" (no weasyprint — sending the HTML instead)") |
| 185 | + return html_path |
| 186 | + pdf_path = output_dir / f"digest-{today}.pdf" |
| 187 | + HTML(string=html).write_pdf(str(pdf_path)) |
| 188 | + return pdf_path |
| 189 | + |
| 190 | + |
| 191 | +def email_report(artefact: Path, *, |
| 192 | + subject: str, |
| 193 | + sender: str, recipient: str, |
| 194 | + smtp_host: str, smtp_port: int = 587, |
| 195 | + smtp_user: Optional[str] = None, |
| 196 | + smtp_pass: Optional[str] = None) -> None: |
| 197 | + """Send ``artefact`` as an attachment via STARTTLS-SMTP.""" |
| 198 | + message = EmailMessage() |
| 199 | + message["Subject"] = subject |
| 200 | + message["From"] = sender |
| 201 | + message["To"] = recipient |
| 202 | + message.set_content( |
| 203 | + f"Daily Slack digest attached.\n\nGenerated: {datetime.now().isoformat()}", |
| 204 | + ) |
| 205 | + mime_type = ( |
| 206 | + "application/pdf" if artefact.suffix == ".pdf" |
| 207 | + else "text/html" |
| 208 | + ) |
| 209 | + maintype, subtype = mime_type.split("/", 1) |
| 210 | + message.add_attachment( |
| 211 | + artefact.read_bytes(), maintype=maintype, subtype=subtype, |
| 212 | + filename=artefact.name, |
| 213 | + ) |
| 214 | + context = ssl.create_default_context() |
| 215 | + with smtplib.SMTP(smtp_host, smtp_port, timeout=30.0) as server: |
| 216 | + server.starttls(context=context) |
| 217 | + if smtp_user and smtp_pass: |
| 218 | + server.login(smtp_user, smtp_pass) |
| 219 | + server.send_message(message) |
| 220 | + |
| 221 | + |
| 222 | +# --- AutoControl wiring -------------------------------------------- |
| 223 | + |
| 224 | +def run_pipeline_once() -> int: |
| 225 | + """Execute one pass of the pipeline; return 0 on success, 1 on error.""" |
| 226 | + print("Slack daily digest pipeline starting…") |
| 227 | + channel = os.environ.get("SLACK_CHANNEL_ID", "C0000000000") |
| 228 | + messages = fetch_slack_messages( |
| 229 | + channel, |
| 230 | + since_hours=24, |
| 231 | + token=os.environ.get("SLACK_BOT_TOKEN"), |
| 232 | + ) |
| 233 | + print(f" pulled {len(messages)} messages from {channel}") |
| 234 | + |
| 235 | + summary = summarise(messages) |
| 236 | + print(f" summary ready ({len(summary)} chars)") |
| 237 | + |
| 238 | + output_dir = Path("./slack_digests") |
| 239 | + artefact = render_report(summary, messages, output_dir) |
| 240 | + print(f" artefact: {artefact}") |
| 241 | + |
| 242 | + smtp_host = os.environ.get("SMTP_HOST") |
| 243 | + sender = os.environ.get("SMTP_FROM") |
| 244 | + recipient = os.environ.get("SMTP_TO") |
| 245 | + if not (smtp_host and sender and recipient): |
| 246 | + print(" (SMTP_* unset — skipping email step; artefact saved locally)") |
| 247 | + return 0 |
| 248 | + try: |
| 249 | + email_report( |
| 250 | + artefact, |
| 251 | + subject=f"Slack daily digest — {datetime.now().date()}", |
| 252 | + sender=sender, recipient=recipient, |
| 253 | + smtp_host=smtp_host, |
| 254 | + smtp_port=int(os.environ.get("SMTP_PORT", "587")), |
| 255 | + smtp_user=os.environ.get("SMTP_USER"), |
| 256 | + smtp_pass=os.environ.get("SMTP_PASS"), |
| 257 | + ) |
| 258 | + except (OSError, smtplib.SMTPException) as error: |
| 259 | + print(f" email failed: {error}") |
| 260 | + return 1 |
| 261 | + print(f" emailed {artefact.name} to {recipient}") |
| 262 | + return 0 |
| 263 | + |
| 264 | + |
| 265 | +def schedule_daily(hour: int = 18, minute: int = 0) -> None: |
| 266 | + """Register the pipeline with the AutoControl scheduler. |
| 267 | +
|
| 268 | + Uses a cron expression so the firing time survives process restarts. |
| 269 | + """ |
| 270 | + bridge_script = Path(__file__).with_name("18_slack_pipeline_bridge.json") |
| 271 | + bridge_script.write_text( |
| 272 | + json.dumps([ |
| 273 | + ["AC_shell_command", { |
| 274 | + "command": f"{sys.executable} {Path(__file__).resolve()} --run", |
| 275 | + }], |
| 276 | + ]), |
| 277 | + encoding="utf-8", |
| 278 | + ) |
| 279 | + ac.default_scheduler.add_cron_job( |
| 280 | + script_path=str(bridge_script), |
| 281 | + cron_expression=f"{minute} {hour} * * *", |
| 282 | + job_id="slack-daily-digest", |
| 283 | + ) |
| 284 | + ac.default_scheduler.start() |
| 285 | + print(f"scheduled at {hour:02d}:{minute:02d} UTC daily — Ctrl-C to stop.") |
| 286 | + |
| 287 | + |
| 288 | +def main() -> int: |
| 289 | + if "--run" in sys.argv[1:]: |
| 290 | + return run_pipeline_once() |
| 291 | + if "--schedule" in sys.argv[1:]: |
| 292 | + schedule_daily() |
| 293 | + import time |
| 294 | + try: |
| 295 | + while True: |
| 296 | + time.sleep(60) |
| 297 | + except KeyboardInterrupt: |
| 298 | + ac.default_scheduler.stop() |
| 299 | + return 0 |
| 300 | + # Default: run once and exit. |
| 301 | + return run_pipeline_once() |
| 302 | + |
| 303 | + |
| 304 | +if __name__ == "__main__": |
| 305 | + sys.exit(main()) |
0 commit comments