diff --git a/README.md b/README.md index 93a27ce..ab52562 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ AdLoop exists because managing Google Ads alongside your code is a mess. These a - **"My landing page gets paid traffic but nobody converts."** AdLoop joins your ad final URLs with GA4 page-level data. See which pages get clicks but no conversions, which have high bounce rates, and which ones are orphaned from any ad campaign. +- **"Are conversions even being tagged on every page?"** AdLoop reads your live Google Tag Manager container, joins it against the events in your codebase and the events firing in GA4, and tells you exactly which conversions are being captured, which tags are paused, which page-scope filters are too narrow, and which codebase events have no tag at all — the kind of three-way audit GTM Preview can't give you in a single view. + - **"I don't know if my EU consent setup is causing data gaps."** In Europe, 30-70% of users reject analytics cookies. AdLoop accounts for this automatically — it won't diagnose a normal GDPR consent gap as broken tracking. ## Built From Real Usage @@ -42,7 +44,7 @@ Every tool exists because of an actual problem hit while running real Google Ads The best features come from real workflows. If you're using AdLoop and find yourself wishing it could do something it can't, **open an issue describing your situation** — not just "add feature X" but "I was trying to do Y and couldn't because Z." The context matters more than the request. -## All 43 Tools +## All 55 Tools > **Quick start:** `pip install adloop` or `git clone https://github.com/kLOsk/adloop.git && cd adloop && uv sync && uv run adloop init` @@ -98,6 +100,27 @@ These tools call both APIs internally and return unified results with auto-gener | `validate_tracking` | Compare event names found in your codebase against what GA4 actually records. Returns matched, missing, and unexpected events with diagnostics. | | `generate_tracking_code` | Generate ready-to-paste GA4 gtag JavaScript for any event, with recommended parameters for well-known events (sign_up, purchase, etc.) and optional trigger wrappers. | +### Google Tag Manager Tools + +These tools read the live GTM container and join it with the codebase + GA4 to find tracking gaps that pure GA4 inspection can't catch — page-scoped triggers, paused tags, dynamic event names, brittle CSS selectors, and codebase events with no tag wired up at all. + +| Tool | What It Does | +|------|-------------| +| `audit_event_coverage` | **The flagship.** Three-way join: codebase events ↔ GTM tags ↔ GA4 actual fires. For each event name in `expected_events`, returns one of 10 statuses (`ok`, `no_tag_no_fire`, `tag_paused`, `tag_active_but_not_firing`, `gtm_only_firing`, `ga4_only`, etc.) plus auto-generated insights for the gaps. | +| `list_gtm_accounts` | Discover accessible GTM accounts | +| `list_gtm_containers` | List containers under an account — returns numeric `container_id` (needed by other tools), public `GTM-XXXXXXX` ID, and usage context (web/iOS/Android/server) | +| `list_gtm_tags` | Every tag in the live container with parsed event names and resolved firing/blocking trigger names | +| `get_gtm_tag` | Full raw config for a single tag — every parameter, firing/blocking triggers with filter conditions, priority, pause status, sampling | +| `list_gtm_triggers` | Every trigger with filter conditions parsed to readable text (e.g. `{{Page Path}} contains service-promotions`, `{{Form ID}} NOT contains wf-form-...`). Renders the `negate` flag explicitly. | +| `get_gtm_trigger` | Full trigger config + reverse lookup of every tag that uses it. Includes parsed `element_visibility` block (selector, on-screen ratio, firing frequency) for elementVisibility triggers and `group_member_trigger_ids` for triggerGroup types | +| `list_gtm_variables` | Custom variables (data layer, constants, JS) plus enabled built-in variables | +| `list_gtm_workspaces` | List drafts (workspaces) under a container — workspace IDs are needed by `get_gtm_workspace_diff` | +| `get_gtm_workspace_diff` | Drafted-but-not-published changes — common cause of "I edited a tag but nothing happened". Returns `is_clean: true` when nothing is pending. | +| `list_gtm_versions` | Publish history with version IDs and entity counts. Use to correlate a metric drop with a recent publish. | +| `get_gtm_version` | Full metadata + tag/trigger names for a single historical container version | + +> **Setup for GTM tools** — Enable the **Tag Manager API v2** in your GCP project, then add your AdLoop credentials' email (the OAuth user, or the service account email if using a service account) as a **Read** user on the GTM container under Admin → User Management. AdLoop will pick up access on the next call — no token refresh needed for service accounts. + ### Planning Tools | Tool | What It Does | @@ -323,6 +346,7 @@ Ask your AI assistant things like: - *"Draft a new responsive search ad for my main campaign."* - *"Which landing pages get paid traffic but don't convert?"* - *"Is my tracking set up correctly? Compare my codebase events against GA4."* +- *"Audit my Google Tag Manager container — which conversions are being captured and where are the gaps?"* - *"What keywords should I target for [product]? Find ideas and estimate the budget."* - *"How much budget would I need for these keywords in Germany?"* - *"Create a new search campaign for [product feature] with a €20/day budget."* @@ -349,11 +373,11 @@ All configuration lives in `~/.adloop/config.yaml`. See [`config.yaml.example`]( ``` src/adloop/ ├── __init__.py # Entry point — routes 'adloop init' to wizard, otherwise starts MCP server -├── server.py # FastMCP server — 43 tool registrations with safety annotations +├── server.py # FastMCP server — 55 tool registrations with safety annotations ├── config.py # Config loader (~/.adloop/config.yaml) -├── auth.py # OAuth 2.0 flow (bundled + custom credentials, headless fallback) + service accounts +├── auth.py # OAuth 2.0 flow (bundled + custom credentials, headless fallback) + service accounts; GA4 / Ads / GTM scope handling ├── cli.py # Interactive 'adloop init' setup wizard -├── crossref.py # Cross-reference tools (GA4 + Ads combined analysis) +├── crossref.py # Cross-reference tools (GA4 + Ads + GTM combined analysis) ├── tracking.py # Tracking validation + code generation tools ├── ga4/ │ ├── client.py # GA4 Data + Admin API clients @@ -366,6 +390,9 @@ src/adloop/ │ ├── pmax.py # Performance Max tools — campaign/asset group performance, asset labels, top combinations │ ├── write.py # Draft campaign, RSA, keywords; pause, enable, remove, confirm │ └── forecast.py # Budget estimation + keyword discovery via Keyword Planner API +├── gtm/ +│ ├── client.py # Google Tag Manager API v2 client +│ └── read.py # Live container fetching, tag/trigger/variable parsing, workspace diff, version history └── safety/ ├── guards.py # Budget caps, bid limits, blocked operations, Broad Match safety ├── preview.py # Change plans and previews @@ -390,6 +417,7 @@ What's been shipped and what's next: - ~~Bundled OAuth credentials~~ ✓ — no Google Cloud project required, auto-discovery of GA4/Ads accounts (currently capped at 100 users pending Google verification — use [Advanced Setup](#advanced-setup-custom-google-cloud-project) in the meantime) - ~~Headless server support~~ ✓ — manual URL copy-paste flow for servers without a browser - ~~Behavioral eval suites~~ ✓ — 28 prompt-and-expectation tests covering read, write, tracking, and planning workflows +- ~~Google Tag Manager integration~~ ✓ — read tools for tags, triggers, variables, workspaces, and version history, plus the `audit_event_coverage` three-way join across codebase events, GTM tags, and GA4 actual fires - **Community launch** — HN, Indie Hackers, r/cursor, Twitter - **Video walkthrough** diff --git a/pyproject.toml b/pyproject.toml index f750049..83096cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "google-ads>=29.0.0", "google-analytics-data>=0.20.0", "google-analytics-admin>=0.27.0", + "google-api-python-client>=2.100.0", "google-auth-oauthlib>=1.0.0", "pyyaml>=6.0", ] diff --git a/src/adloop/ads/write.py b/src/adloop/ads/write.py index a1847a1..f5b0726 100644 --- a/src/adloop/ads/write.py +++ b/src/adloop/ads/write.py @@ -1909,6 +1909,33 @@ def _execute_plan(config: AdLoopConfig, plan: object) -> dict: """Dispatch to the right Google Ads mutate call based on plan.operation.""" from adloop.ads.client import get_ads_client, normalize_customer_id + # GTM operations don't need a Google Ads client and don't use customer_id. + # Route them to GTM apply handlers that fetch their own GTM client. + if plan.operation in ( + "create_gtm_tag", "create_gtm_trigger", "publish_gtm_workspace", + "update_gtm_tag", "update_gtm_trigger", + "delete_gtm_tag", "delete_gtm_trigger", + ): + from adloop.gtm.write import ( + _apply_create_gtm_tag, + _apply_create_gtm_trigger, + _apply_publish_gtm_workspace, + _apply_update_gtm_tag, + _apply_update_gtm_trigger, + _apply_delete_gtm_tag, + _apply_delete_gtm_trigger, + ) + gtm_dispatch = { + "create_gtm_tag": _apply_create_gtm_tag, + "create_gtm_trigger": _apply_create_gtm_trigger, + "publish_gtm_workspace": _apply_publish_gtm_workspace, + "update_gtm_tag": _apply_update_gtm_tag, + "update_gtm_trigger": _apply_update_gtm_trigger, + "delete_gtm_tag": _apply_delete_gtm_tag, + "delete_gtm_trigger": _apply_delete_gtm_trigger, + } + return gtm_dispatch[plan.operation](None, "", plan.changes) + client = get_ads_client(config) cid = normalize_customer_id(plan.customer_id) diff --git a/src/adloop/auth.py b/src/adloop/auth.py index 3eb8f9d..529bbfc 100644 --- a/src/adloop/auth.py +++ b/src/adloop/auth.py @@ -18,6 +18,10 @@ "https://www.googleapis.com/auth/analytics.readonly", "https://www.googleapis.com/auth/analytics.edit", "https://www.googleapis.com/auth/adwords", + "https://www.googleapis.com/auth/tagmanager.readonly", + "https://www.googleapis.com/auth/tagmanager.edit.containers", + "https://www.googleapis.com/auth/tagmanager.edit.containerversions", + "https://www.googleapis.com/auth/tagmanager.publish", ] _GA4_SCOPES = [ @@ -29,6 +33,13 @@ "https://www.googleapis.com/auth/adwords", ] +_GTM_SCOPES = [ + "https://www.googleapis.com/auth/tagmanager.readonly", + "https://www.googleapis.com/auth/tagmanager.edit.containers", + "https://www.googleapis.com/auth/tagmanager.edit.containerversions", + "https://www.googleapis.com/auth/tagmanager.publish", +] + def _get_credentials_path(config: AdLoopConfig) -> Path | None: """Resolve OAuth client credentials using a priority chain. @@ -110,6 +121,32 @@ def get_ads_credentials(config: AdLoopConfig) -> Credentials: return credentials +def get_gtm_credentials(config: AdLoopConfig) -> Credentials: + """Return authenticated credentials for Google Tag Manager API.""" + creds_path = _get_credentials_path(config) + + if creds_path is not None: + import json + + with open(creds_path) as f: + creds_info = json.load(f) + + if creds_info.get("type") == "service_account": + from google.oauth2 import service_account + + return service_account.Credentials.from_service_account_file( + str(creds_path), + scopes=_GTM_SCOPES, + ) + + return _oauth_flow(config, creds_path) + + import google.auth + + credentials, _ = google.auth.default(scopes=_GTM_SCOPES) + return credentials + + def _oauth_flow( config: AdLoopConfig, creds_path: Path | None = None ) -> Credentials: diff --git a/src/adloop/crossref.py b/src/adloop/crossref.py index 916edee..25effe0 100644 --- a/src/adloop/crossref.py +++ b/src/adloop/crossref.py @@ -511,3 +511,253 @@ def attribution_check( "insights": insights, "date_range": {"start": start, "end": end}, } + + +# --------------------------------------------------------------------------- +# Tool 4: audit_event_coverage — three-way join across codebase, GTM, and GA4 +# --------------------------------------------------------------------------- + +# GA4 events that fire automatically (Enhanced Measurement) and don't need +# either a GTM tag or a codebase gtag/dataLayer call to appear in GA4. +_GA4_AUTO_EVENTS = { + "page_view", + "session_start", + "first_visit", + "user_engagement", + "scroll", + "click", + "form_start", + "form_submit", + "video_start", + "video_progress", + "video_complete", + "file_download", + "view_search_results", +} + + +def audit_event_coverage( + config: AdLoopConfig, + *, + expected_events: list[str], + gtm_account_id: str, + gtm_container_id: str, + property_id: str = "", + date_range_start: str = "", + date_range_end: str = "", +) -> dict: + """Three-way audit: codebase events ↔ GTM tags ↔ GA4 actual fires. + + Joins (a) event names extracted from codebase gtag/dataLayer calls, + (b) GA4 event tags in the LIVE GTM container, and (c) actual GA4 event + counts for the date range. Surfaces every gap: codebase events with no + tag, tags that are paused, tags configured but never firing, GTM tags + firing events not in the codebase, and GA4 events with no matching tag. + """ + from adloop.ga4.tracking import get_tracking_events as _get_events + from adloop.gtm.read import GA4_EVENT_TAG, get_live_container + + start, end = _default_date_range(date_range_start, date_range_end) + + container = get_live_container( + config, account_id=gtm_account_id, container_id=gtm_container_id + ) + + ga4 = _get_events( + config, + property_id=property_id, + date_range_start=start, + date_range_end=end, + ) + if "error" in ga4: + return { + "error": f"GA4 fetch failed: {ga4['error']}", + "container": { + "account_id": container["account_id"], + "container_id": container["container_id"], + "tag_count": len(container["tags"]), + }, + } + + ga4_counts: dict[str, int] = {} + for row in ga4.get("rows", []): + try: + ga4_counts[row["eventName"]] = int(row.get("eventCount", 0)) + except (KeyError, ValueError, TypeError): + continue + + gtm_by_event: dict[str, list[dict]] = {} + dynamic_event_tags: list[dict] = [] + custom_html_tags: list[dict] = [] + other_tags_by_type: dict[str, int] = {} + + for tag in container["tags"]: + ttype = tag["type"] + if ttype == GA4_EVENT_TAG: + ev = tag["event_name"] + if ev is None: + continue + if ev.startswith("{{") and ev.endswith("}}"): + dynamic_event_tags.append({ + "name": tag["name"], + "tag_id": tag["tag_id"], + "event_variable": ev, + "paused": tag["paused"], + }) + continue + gtm_by_event.setdefault(ev, []).append({ + "name": tag["name"], + "tag_id": tag["tag_id"], + "paused": tag["paused"], + "firing_triggers": tag["firing_triggers"], + }) + elif ttype == "html": + custom_html_tags.append({ + "name": tag["name"], + "tag_id": tag["tag_id"], + "paused": tag["paused"], + "firing_triggers": tag["firing_triggers"], + }) + else: + other_tags_by_type[ttype] = other_tags_by_type.get(ttype, 0) + 1 + + expected_set = set(expected_events) + all_events = expected_set | set(gtm_by_event.keys()) | set(ga4_counts.keys()) + + matrix = [] + for event in sorted(all_events): + in_codebase = event in expected_set + gtm_tags = gtm_by_event.get(event, []) + in_gtm = bool(gtm_tags) + any_active_tag = any(not t["paused"] for t in gtm_tags) + ga4_count = ga4_counts.get(event, 0) + ga4_fires = ga4_count > 0 + is_auto = event in _GA4_AUTO_EVENTS + + if in_codebase and in_gtm and any_active_tag and ga4_fires: + status = "ok" + elif in_codebase and not in_gtm and ga4_fires and is_auto: + status = "ok_auto_collected" + elif in_codebase and not in_gtm and ga4_fires and not is_auto: + status = "ga4_fires_no_tag" + elif in_codebase and in_gtm and not any_active_tag: + status = "tag_paused" + elif in_codebase and in_gtm and any_active_tag and not ga4_fires: + status = "tag_active_but_not_firing" + elif in_codebase and not in_gtm and not ga4_fires: + status = "no_tag_no_fire" + elif not in_codebase and in_gtm and any_active_tag and ga4_fires: + status = "gtm_only_firing" + elif not in_codebase and in_gtm and not ga4_fires: + status = "gtm_only_not_firing" + elif not in_codebase and not in_gtm and ga4_fires and is_auto: + status = "auto_event_only" + elif not in_codebase and not in_gtm and ga4_fires: + status = "ga4_only" + else: + status = "unknown" + + matrix.append({ + "event_name": event, + "in_codebase": in_codebase, + "in_gtm": in_gtm, + "gtm_tag_count": len(gtm_tags), + "any_active_tag": any_active_tag, + "gtm_tag_names": [t["name"] for t in gtm_tags], + "ga4_count": ga4_count, + "is_auto_event": is_auto, + "status": status, + }) + + insights: list[str] = [] + + def _names(items, k=5): + return ", ".join(i["event_name"] for i in items[:k]) + + no_tag = [m for m in matrix if m["status"] == "no_tag_no_fire"] + if no_tag: + suffix = f" (showing first 5 of {len(no_tag)})" if len(no_tag) > 5 else "" + insights.append( + f"{len(no_tag)} codebase event(s) have NO GTM tag and NEVER fired in GA4: " + f"{_names(no_tag)}{suffix} — most likely real coverage gaps." + ) + + tag_paused = [m for m in matrix if m["status"] == "tag_paused"] + if tag_paused: + insights.append( + f"{len(tag_paused)} event(s) have a GTM tag but it is PAUSED: " + f"{_names(tag_paused, 10)} — un-pause or delete." + ) + + tag_no_fire = [m for m in matrix if m["status"] == "tag_active_but_not_firing"] + if tag_no_fire: + insights.append( + f"{len(tag_no_fire)} event(s) have an ACTIVE GTM tag but never fired in GA4: " + f"{_names(tag_no_fire, 10)} — check trigger conditions, page-load timing, " + f"or whether the underlying user action is happening at all." + ) + + gtm_only_firing = [m for m in matrix if m["status"] == "gtm_only_firing"] + if gtm_only_firing: + insights.append( + f"{len(gtm_only_firing)} GA4 event(s) fire from a GTM tag but are NOT in the " + f"codebase: {_names(gtm_only_firing, 5)} — likely auto-event listeners " + f"(GTM-managed); verify these aren't stale." + ) + + ga4_only = [m for m in matrix if m["status"] == "ga4_only"] + if ga4_only: + insights.append( + f"{len(ga4_only)} GA4 event(s) fire but have neither a GTM tag nor a codebase " + f"reference: {_names(ga4_only, 5)} — likely from another tag manager, a " + f"third-party SDK, or a gtag call grep missed." + ) + + ga4_fires_no_tag = [m for m in matrix if m["status"] == "ga4_fires_no_tag"] + if ga4_fires_no_tag: + insights.append( + f"{len(ga4_fires_no_tag)} codebase event(s) fire in GA4 but have no GTM tag: " + f"{_names(ga4_fires_no_tag, 5)} — may be reaching GA4 via gtag.js directly " + f"(no GTM in path) or via Custom HTML tag." + ) + + if dynamic_event_tags: + active = [t for t in dynamic_event_tags if not t["paused"]] + insights.append( + f"{len(dynamic_event_tags)} GTM tag(s) use a DYNAMIC event name " + f"(variable like {{{{Event}}}}): " + f"{', '.join(t['name'] for t in dynamic_event_tags[:5])}" + f"{' — none active' if not active else ''} — manual review required, " + f"the audit cannot resolve their event names." + ) + + if custom_html_tags: + active_html = [t for t in custom_html_tags if not t["paused"]] + if active_html: + insights.append( + f"{len(active_html)} active Custom HTML tag(s) in container — these may " + f"send events the audit cannot see: " + f"{', '.join(t['name'] for t in active_html[:5])}" + ) + + return { + "container": { + "account_id": container["account_id"], + "container_id": container["container_id"], + "container_version_id": container["container_version_id"], + "container_version_name": container["container_version_name"], + "tag_count": len(container["tags"]), + "trigger_count": container["trigger_count"], + "variable_count": container["variable_count"], + "ga4_event_tag_count": sum(len(v) for v in gtm_by_event.values()), + "dynamic_event_tag_count": len(dynamic_event_tags), + "custom_html_tag_count": len(custom_html_tags), + "other_tag_types": other_tags_by_type, + }, + "codebase_events": expected_events, + "matrix": matrix, + "dynamic_event_tags": dynamic_event_tags, + "custom_html_tags": custom_html_tags, + "insights": insights, + "date_range": {"start": start, "end": end}, + } diff --git a/src/adloop/gtm/__init__.py b/src/adloop/gtm/__init__.py new file mode 100644 index 0000000..84a9c54 --- /dev/null +++ b/src/adloop/gtm/__init__.py @@ -0,0 +1 @@ +"""Google Tag Manager integration.""" diff --git a/src/adloop/gtm/client.py b/src/adloop/gtm/client.py new file mode 100644 index 0000000..7b48553 --- /dev/null +++ b/src/adloop/gtm/client.py @@ -0,0 +1,20 @@ +"""GTM API client wrapper — Google Tag Manager API v2.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from adloop.config import AdLoopConfig + + +def get_gtm_client(config: AdLoopConfig): + """Return an authenticated Google Tag Manager API v2 client.""" + from googleapiclient.discovery import build + + from adloop.auth import get_gtm_credentials + + credentials = get_gtm_credentials(config) + return build( + "tagmanager", "v2", credentials=credentials, cache_discovery=False + ) diff --git a/src/adloop/gtm/read.py b/src/adloop/gtm/read.py new file mode 100644 index 0000000..a3b78c3 --- /dev/null +++ b/src/adloop/gtm/read.py @@ -0,0 +1,648 @@ +"""GTM read helpers — fetch the live (published) container version and parse tags.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from adloop.config import AdLoopConfig + + +GA4_EVENT_TAG = "gaawe" +GA4_CONFIG_TAG = "googtag" +ADS_CONVERSION_TAG = "awct" +ADS_CONVERSION_LINKER = "gclidw" +ADS_REMARKETING_TAG = "sp" +CUSTOM_HTML = "html" + + +# Built-in trigger IDs are >= 2147479553. They aren't returned in the +# container's trigger[] list, but tags reference them by ID. Names are +# stable per GTM docs. +_BUILT_IN_TRIGGERS = { + "2147479553": ("All Pages", "pageview"), + "2147479572": ("Consent Initialization - All Pages", "consentInit"), + "2147479573": ("Initialization - All Pages", "init"), +} + + +def _resolve_trigger(trigger_by_id: dict, tid: str) -> dict: + """Resolve a trigger ID to a {id, name, type} dict, handling built-ins.""" + if tid in trigger_by_id: + t = trigger_by_id[tid] + return {"id": tid, "name": t.get("name"), "type": t.get("type")} + if tid in _BUILT_IN_TRIGGERS: + name, ttype = _BUILT_IN_TRIGGERS[tid] + return {"id": tid, "name": f"(built-in) {name}", "type": ttype} + return {"id": tid, "name": "(unknown — possibly built-in)", "type": None} + + +def _params_dict(tag: dict) -> dict: + """Flatten a tag's parameter list to a {key: value} dict for simple lookups.""" + out = {} + for p in tag.get("parameter", []): + key = p.get("key") + if key is None: + continue + if "value" in p: + out[key] = p["value"] + elif "list" in p: + out[key] = p["list"] + elif "map" in p: + out[key] = p["map"] + return out + + +def _summarize_filter(filter_obj: dict) -> str: + """Render a single GTM trigger filter as 'variable [NOT] OP value'. + + GTM stores negation as a `negate: "true"` boolean parameter alongside + arg0/arg1, NOT as a separate operator. Surface it explicitly because + a missed negate flag inverts the meaning of the trigger. + """ + op = filter_obj.get("type", "?") + parameter_map = {p.get("key"): p for p in filter_obj.get("parameter", [])} + arg0 = parameter_map.get("arg0", {}).get("value", "?") + arg1 = parameter_map.get("arg1", {}).get("value", "?") + negate_param = parameter_map.get("negate", {}) + is_negated = str(negate_param.get("value", "")).lower() == "true" + prefix = "NOT " if is_negated else "" + return f"{arg0} {prefix}{op} {arg1}" + + +def _trigger_group_member_ids(trigger: dict) -> list[str]: + """Extract child trigger IDs from a triggerGroup's parameters. + + Stored as parameter `triggerIds` of type `list` containing items of type + `triggerReference` whose value is the child trigger_id. + """ + members: list[str] = [] + for p in trigger.get("parameter", []): + if p.get("key") != "triggerIds": + continue + for item in p.get("list", []): + v = item.get("value") + if v: + members.append(str(v)) + return members + + +def _element_visibility_summary(trigger: dict) -> dict: + """Extract selector + timing config from an elementVisibility trigger. + + Most actionable fields: selectorType (id vs cssSelector), the selector + itself, and firingFrequency (oncePerEvent/oncePerElement/many) — these + determine which DOM element the trigger watches. + """ + params = {} + for p in trigger.get("parameter", []): + key = p.get("key") + if key: + params[key] = p.get("value") + + selector_type = params.get("selectorType") + if str(selector_type).upper() == "ID": + selector = params.get("elementId") + else: + selector = params.get("elementSelector") + + return { + "selector_type": selector_type, + "selector": selector, + "firing_frequency": params.get("firingFrequency"), + "on_screen_ratio": params.get("onScreenRatio"), + "use_dom_change_listener": params.get("useDomChangeListener"), + "use_on_screen_duration": params.get("useOnScreenDuration"), + } + + +def _parse_trigger(trigger: dict) -> dict: + """Normalize a trigger to its key fields plus human-readable filter list. + + Adds type-specific fields when relevant: triggerGroup member IDs, + elementVisibility selector + timing. + """ + out = { + "trigger_id": trigger.get("triggerId"), + "name": trigger.get("name"), + "type": trigger.get("type"), + "filters": [_summarize_filter(f) for f in trigger.get("filter", [])], + "auto_event_filters": [ + _summarize_filter(f) + for group in trigger.get("autoEventFilter", []) + for f in group.get("filter", []) + ], + "custom_event_filters": [ + _summarize_filter(f) for f in trigger.get("customEventFilter", []) + ], + "wait_for_tags": trigger.get("waitForTags", {}).get("value") + if isinstance(trigger.get("waitForTags"), dict) + else None, + "check_validation": trigger.get("checkValidation", {}).get("value") + if isinstance(trigger.get("checkValidation"), dict) + else None, + } + + if trigger.get("type") == "triggerGroup": + out["group_member_trigger_ids"] = _trigger_group_member_ids(trigger) + + if trigger.get("type") == "elementVisibility": + out["element_visibility"] = _element_visibility_summary(trigger) + + return out + + +def _parse_variable(variable: dict) -> dict: + """Normalize a custom variable to its key fields.""" + params = _params_dict(variable) + return { + "variable_id": variable.get("variableId"), + "name": variable.get("name"), + "type": variable.get("type"), + "parameters": params, + "format_value": variable.get("formatValue"), + } + + +def list_accounts(config: AdLoopConfig) -> dict: + """List all GTM accounts the service account / OAuth user can read.""" + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + resp = client.accounts().list().execute() + accounts = [] + for acct in resp.get("account", []): + accounts.append({ + "account_id": acct.get("accountId"), + "name": acct.get("name"), + "path": acct.get("path"), + }) + return {"accounts": accounts, "count": len(accounts)} + + +def list_containers(config: AdLoopConfig, *, account_id: str) -> dict: + """List all containers under a GTM account.""" + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + parent = f"accounts/{account_id}" + resp = client.accounts().containers().list(parent=parent).execute() + containers = [] + for c in resp.get("container", []): + containers.append({ + "container_id": c.get("containerId"), + "public_id": c.get("publicId"), + "name": c.get("name"), + "usage_context": c.get("usageContext", []), + "path": c.get("path"), + }) + return {"account_id": account_id, "containers": containers, "count": len(containers)} + + +def get_live_container( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, +) -> dict: + """Fetch the LIVE (published) container version with parsed tags + triggers. + + Returns a normalized dict — each tag has its event name extracted (for GA4 + event tags), firing triggers resolved to names + types, and pause status + surfaced. Custom HTML tags are flagged separately because their event + semantics can't be inferred without parsing the JS body. + """ + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + parent = f"accounts/{account_id}/containers/{container_id}" + + live = ( + client.accounts() + .containers() + .versions() + .live(parent=parent) + .execute() + ) + + tags = live.get("tag", []) + triggers = live.get("trigger", []) + variables = live.get("variable", []) + + trigger_by_id = {t.get("triggerId"): t for t in triggers} + + parsed_tags = [] + for tag in tags: + params = _params_dict(tag) + firing_trigger_ids = tag.get("firingTriggerId", []) + firing_triggers = [_resolve_trigger(trigger_by_id, tid) for tid in firing_trigger_ids] + + event_name = None + if tag.get("type") == GA4_EVENT_TAG: + ev = params.get("eventName") + if isinstance(ev, str): + event_name = ev + + parsed_tags.append({ + "tag_id": tag.get("tagId"), + "name": tag.get("name"), + "type": tag.get("type"), + "event_name": event_name, + "paused": tag.get("paused", False), + "firing_triggers": firing_triggers, + "blocking_triggers": tag.get("blockingTriggerId", []), + "parameters": params, + }) + + return { + "account_id": account_id, + "container_id": container_id, + "container_version_id": live.get("containerVersionId"), + "container_version_name": live.get("name"), + "fingerprint": live.get("fingerprint"), + "tags": parsed_tags, + "trigger_count": len(triggers), + "variable_count": len(variables), + } + + +# --------------------------------------------------------------------------- +# Per-resource read helpers — operate on the LIVE container by default +# --------------------------------------------------------------------------- + + +def _fetch_live(client, account_id: str, container_id: str) -> dict: + parent = f"accounts/{account_id}/containers/{container_id}" + return ( + client.accounts() + .containers() + .versions() + .live(parent=parent) + .execute() + ) + + +def list_tags( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, +) -> dict: + """List every tag in the LIVE container with parsed event names + triggers.""" + container = get_live_container( + config, account_id=account_id, container_id=container_id + ) + return { + "account_id": account_id, + "container_id": container_id, + "container_version_id": container["container_version_id"], + "tags": container["tags"], + "count": len(container["tags"]), + } + + +def get_tag( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + tag_id: str, +) -> dict: + """Return the full RAW config for a single tag from the live container. + + Includes every parameter, firing/blocking trigger references, priority, + pause status, and tag-specific settings (sampling, monitoring, etc.). + Use after audit_event_coverage flags a tag for inspection. + """ + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + live = _fetch_live(client, account_id, container_id) + triggers_by_id = {t.get("triggerId"): t for t in live.get("trigger", [])} + + for tag in live.get("tag", []): + if str(tag.get("tagId")) == str(tag_id): + params = _params_dict(tag) + return { + "tag_id": tag.get("tagId"), + "name": tag.get("name"), + "type": tag.get("type"), + "paused": tag.get("paused", False), + "priority": tag.get("priority"), + "tag_firing_option": tag.get("tagFiringOption"), + "monitoring_metadata": tag.get("monitoringMetadata"), + "live_only": tag.get("liveOnly"), + "parameters": params, + "firing_triggers": [ + { + **_resolve_trigger(triggers_by_id, tid), + "filters": [ + _summarize_filter(f) + for f in triggers_by_id.get(tid, {}).get("filter", []) + ], + } + for tid in tag.get("firingTriggerId", []) + ], + "blocking_triggers": [ + _resolve_trigger(triggers_by_id, tid) + for tid in tag.get("blockingTriggerId", []) + ], + "raw": tag, + } + + return { + "error": f"Tag {tag_id} not found in live container {container_id}", + "available_tag_ids": [t.get("tagId") for t in live.get("tag", [])], + } + + +def list_triggers( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, +) -> dict: + """List every trigger in the LIVE container with filters parsed to text.""" + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + live = _fetch_live(client, account_id, container_id) + triggers = [_parse_trigger(t) for t in live.get("trigger", [])] + return { + "account_id": account_id, + "container_id": container_id, + "container_version_id": live.get("containerVersionId"), + "triggers": triggers, + "count": len(triggers), + } + + +def get_trigger( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + trigger_id: str, +) -> dict: + """Return the full RAW config for a single trigger from the live container.""" + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + live = _fetch_live(client, account_id, container_id) + + for trigger in live.get("trigger", []): + if str(trigger.get("triggerId")) == str(trigger_id): + parsed = _parse_trigger(trigger) + parsed["raw"] = trigger + tags_using = [ + {"tag_id": t.get("tagId"), "name": t.get("name")} + for t in live.get("tag", []) + if str(trigger_id) in [str(x) for x in t.get("firingTriggerId", [])] + ] + parsed["used_by_tags"] = tags_using + return parsed + + return { + "error": f"Trigger {trigger_id} not found in live container {container_id}", + "available_trigger_ids": [t.get("triggerId") for t in live.get("trigger", [])], + } + + +def list_variables( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, +) -> dict: + """List custom variables in the LIVE container plus enabled built-in variables. + + Custom variables come from the live container version. Built-in variables + (Page URL, Click Element, etc.) come from a separate API endpoint and are + listed under `built_in`. + """ + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + live = _fetch_live(client, account_id, container_id) + + custom = [_parse_variable(v) for v in live.get("variable", [])] + + built_in: list[dict] = [] + workspaces = ( + client.accounts() + .containers() + .workspaces() + .list(parent=f"accounts/{account_id}/containers/{container_id}") + .execute() + .get("workspace", []) + ) + if workspaces: + wid = workspaces[0].get("workspaceId") + try: + biv = ( + client.accounts() + .containers() + .workspaces() + .built_in_variables() + .list( + parent=f"accounts/{account_id}/containers/{container_id}/workspaces/{wid}" + ) + .execute() + .get("builtInVariable", []) + ) + built_in = [ + {"name": v.get("name"), "type": v.get("type")} for v in biv + ] + except Exception: + built_in = [] + + return { + "account_id": account_id, + "container_id": container_id, + "container_version_id": live.get("containerVersionId"), + "custom_variables": custom, + "custom_count": len(custom), + "built_in": built_in, + "built_in_count": len(built_in), + } + + +def list_workspaces( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, +) -> dict: + """List workspaces (drafts) under a container. + + Most containers have a single Default Workspace. Multiple workspaces appear + when the team uses parallel drafts. Workspace IDs are needed for diff + + future write operations. + """ + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + parent = f"accounts/{account_id}/containers/{container_id}" + resp = ( + client.accounts() + .containers() + .workspaces() + .list(parent=parent) + .execute() + ) + workspaces = [] + for w in resp.get("workspace", []): + workspaces.append({ + "workspace_id": w.get("workspaceId"), + "name": w.get("name"), + "description": w.get("description"), + "path": w.get("path"), + }) + return { + "account_id": account_id, + "container_id": container_id, + "workspaces": workspaces, + "count": len(workspaces), + } + + +def get_workspace_diff( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + workspace_id: str, +) -> dict: + """Show drafted-but-not-published changes in a workspace. + + Calls workspaces.getStatus, which returns the list of entities (tags, + triggers, variables) that have been added, modified, or deleted relative + to the live published version. Common cause of "I edited a tag in GTM + but nothing happened" — the workspace was never published. + """ + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + path = ( + f"accounts/{account_id}/containers/{container_id}/workspaces/{workspace_id}" + ) + status = ( + client.accounts() + .containers() + .workspaces() + .getStatus(path=path) + .execute() + ) + + changes = status.get("workspaceChange", []) + summary: dict[str, int] = {} + parsed_changes = [] + for change in changes: + change_status = change.get("changeStatus", "unknown") + summary[change_status] = summary.get(change_status, 0) + 1 + + for kind in ("tag", "trigger", "variable", "folder", "client", "transformation", "zone"): + if kind in change: + entity = change[kind] + parsed_changes.append({ + "change_status": change_status, + "entity_kind": kind, + "entity_id": entity.get(f"{kind}Id"), + "name": entity.get("name"), + "type": entity.get("type"), + }) + + return { + "account_id": account_id, + "container_id": container_id, + "workspace_id": workspace_id, + "merge_conflict": status.get("mergeConflict", []), + "change_count": len(changes), + "change_summary_by_status": summary, + "changes": parsed_changes, + "is_clean": len(changes) == 0 and not status.get("mergeConflict"), + } + + +def list_versions( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + page_size: int = 50, +) -> dict: + """List published version history (newest first) with author + notes. + + Use this to correlate a metric drop with a recent publish — fetch the + last few versions, look at created/updated timestamps and notes, and + cross-reference with the date the conversion / session drop began. + """ + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + parent = f"accounts/{account_id}/containers/{container_id}" + resp = ( + client.accounts() + .containers() + .version_headers() + .list(parent=parent) + .execute() + ) + headers = resp.get("containerVersionHeader", [])[:page_size] + versions = [] + for v in headers: + versions.append({ + "container_version_id": v.get("containerVersionId"), + "name": v.get("name"), + "deleted": v.get("deleted", False), + "num_tags": v.get("numTags"), + "num_triggers": v.get("numTriggers"), + "num_variables": v.get("numVariables"), + "num_macros": v.get("numMacros"), + "num_rules": v.get("numRules"), + }) + return { + "account_id": account_id, + "container_id": container_id, + "versions": versions, + "count": len(versions), + "note": ( + "Version headers do not include createdAt/author. Call " + "get_gtm_version on a specific version_id for full metadata." + ), + } + + +def get_version( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + container_version_id: str, +) -> dict: + """Get full metadata + content for a single container version. + + Includes created/updated timestamps, fingerprint, full tag/trigger/variable + lists at that point in time. Useful for correlating a metric drop with + what changed in a specific publish. + """ + from adloop.gtm.client import get_gtm_client + + client = get_gtm_client(config) + path = ( + f"accounts/{account_id}/containers/{container_id}/versions/" + f"{container_version_id}" + ) + v = client.accounts().containers().versions().get(path=path).execute() + return { + "container_version_id": v.get("containerVersionId"), + "name": v.get("name"), + "description": v.get("description"), + "fingerprint": v.get("fingerprint"), + "deleted": v.get("deleted", False), + "tag_count": len(v.get("tag", [])), + "trigger_count": len(v.get("trigger", [])), + "variable_count": len(v.get("variable", [])), + "tag_names": [t.get("name") for t in v.get("tag", [])], + "trigger_names": [t.get("name") for t in v.get("trigger", [])], + } diff --git a/src/adloop/gtm/write.py b/src/adloop/gtm/write.py new file mode 100644 index 0000000..6b94258 --- /dev/null +++ b/src/adloop/gtm/write.py @@ -0,0 +1,909 @@ +"""GTM API write tools — Google Tag Manager API v2. + +All write operations follow the AdLoop safety pattern: + 1. draft_gtm_* → creates a ChangePlan, stores it, returns plan_id + 2. confirm_and_apply(plan_id) → executes via the GTM API + +Supported tag types (tag_type): + googtag — Google Tag (gtag.js config) — use for AW-... and G-... + awct — Google Ads Conversion Tracking + gclidw — Google Ads Conversion Linker + html — Custom HTML + gaawe — GA4 Event tag + +Supported trigger types (trigger_type): + pageview, dom_ready, window_loaded + click — clicks on any element + linkClick — clicks on links only + formSubmission — form submits + customEvent — dataLayer.push({event: name}) + +Workspaces: leave workspace_id empty to auto-pick the Default Workspace. +""" +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from adloop.config import AdLoopConfig + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_VALID_TAG_TYPES = { + # Google-platform native templates + "googtag", "gclidw", + "awct", # Google Ads Conversion Tracking (page-load conversions) + "awcc", # Google Ads Calls from Website Conversion (GFN swap) + "awcr", # Google Ads Remarketing + "awud", # Google Ads User-Provided Data + "ua", # Universal Analytics (legacy) + "gaawe", # GA4 Event + "gaawc", # GA4 Configuration (legacy alias) + "fls", "flc", # Floodlight Sales / Counter + + # Verbose long-form aliases — same templates, different names + "google_analytics_4_event", + "google_ads_conversion_tracking", + "google_ads_calls_from_website", + "google_ads_remarketing", + "google_ads_user_provided_data", + + # Generic / open-ended + "html", # Custom HTML + "img", # Custom Image +} + +_VALID_TRIGGER_TYPES = { + "pageview", "dom_ready", "window_loaded", + "click", "linkClick", "formSubmission", + "customEvent", "elementVisibility", + "scroll_depth", "youtube_video", "history_change", + "timer", "javascript_error", +} + +# How GTM stores key/value parameter structures +def _param(key: str, value, type_: str = "TEMPLATE") -> dict: + """Build a GTM parameter dict. + + GTM parameter shape: + {"type": "TEMPLATE"|"BOOLEAN"|"INTEGER"|"LIST"|"MAP", "key": ..., "value": ...} + """ + out = {"type": type_, "key": key} + if type_ == "BOOLEAN": + out["value"] = "true" if bool(value) else "false" + elif type_ == "INTEGER": + out["value"] = str(int(value)) + elif type_ == "LIST": + out["list"] = value + elif type_ == "MAP": + out["map"] = value + else: + out["value"] = str(value) + return out + + +def _resolve_workspace(client, account_id: str, container_id: str, + workspace_id: str = "") -> str: + """Resolve workspace_id, defaulting to the Default Workspace.""" + if workspace_id: + return workspace_id + parent = f"accounts/{account_id}/containers/{container_id}" + resp = ( + client.accounts() + .containers() + .workspaces() + .list(parent=parent) + .execute() + ) + for w in resp.get("workspace", []): + if w.get("name") == "Default Workspace": + return w["workspaceId"] + # Fall back to first workspace + workspaces = resp.get("workspace", []) + if not workspaces: + raise ValueError( + f"No workspaces found under {parent}. Container may be misconfigured." + ) + return workspaces[0]["workspaceId"] + + +# --------------------------------------------------------------------------- +# draft_gtm_tag +# --------------------------------------------------------------------------- + +def draft_gtm_tag( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + workspace_id: str = "", + name: str, + tag_type: str, + parameters: list[dict] | None = None, + firing_trigger_ids: list[str] | None = None, + blocking_trigger_ids: list[str] | None = None, + paused: bool = False, + notes: str = "", +) -> dict: + """Draft a new GTM tag in a workspace — returns a PREVIEW. + + name: human-readable tag name shown in GTM UI + tag_type: one of: googtag, awct, gclidw, html, gaawe (see module docstring) + parameters: list of {type, key, value} dicts. Common patterns: + - Google Tag (googtag): + [{"type": "TEMPLATE", "key": "tagId", "value": "AW-11437481610"}] + - Google Ads Conversion (awct): + [{"type": "TEMPLATE", "key": "conversionId", "value": "11437481610"}, + {"type": "TEMPLATE", "key": "conversionLabel", "value": "_qxp..."}, + {"type": "TEMPLATE", "key": "conversionValue", "value": "250"}, + {"type": "TEMPLATE", "key": "conversionCurrency", "value": "USD"}] + - Custom HTML (html): + [{"type": "TEMPLATE", "key": "html", "value": ""}, + {"type": "BOOLEAN", "key": "supportDocumentWrite", "value": "false"}] + - Conversion Linker (gclidw): no parameters required + - GA4 Event (gaawe): + [{"type": "TEMPLATE", "key": "eventName", "value": "form_submit"}, + {"type": "TEMPLATE", "key": "measurementIdOverride", "value": "G-..."}] + firing_trigger_ids: list of trigger IDs (string) that fire this tag. + For init-on-all-pages, use ["2147479573"] (built-in). + blocking_trigger_ids: optional list of trigger IDs that block the tag. + paused: pause the tag on creation (default False). + notes: optional notes shown in GTM UI. + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.safety.guards import SafetyViolation, check_blocked_operation + from adloop.safety.preview import ChangePlan, store_plan + + try: + check_blocked_operation("create_gtm_tag", config.safety) + except SafetyViolation as e: + return {"error": str(e)} + + errors = [] + if not name or len(name) > 200: + errors.append("name is required (1-200 chars)") + if tag_type not in _VALID_TAG_TYPES: + errors.append( + f"tag_type '{tag_type}' invalid; valid: {sorted(_VALID_TAG_TYPES)}" + ) + if errors: + return {"error": "Validation failed", "details": errors} + + plan = ChangePlan( + operation="create_gtm_tag", + entity_type="gtm_tag", + entity_id=container_id, + customer_id="", # GTM doesn't use customer_id + changes={ + "account_id": account_id, + "container_id": container_id, + "workspace_id": workspace_id, + "name": name, + "type": tag_type, + "parameters": parameters or [], + "firing_trigger_ids": firing_trigger_ids or [], + "blocking_trigger_ids": blocking_trigger_ids or [], + "paused": bool(paused), + "notes": notes, + }, + ) + store_plan(plan) + return plan.to_preview() + + +def draft_gtm_trigger( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + workspace_id: str = "", + name: str, + trigger_type: str, + filters: list[dict] | None = None, + custom_event_filters: list[dict] | None = None, + auto_event_filters: list[dict] | None = None, + custom_event_name: str = "", + parameters: list[dict] | None = None, + notes: str = "", +) -> dict: + """Draft a new GTM trigger in a workspace — returns a PREVIEW. + + trigger_type: pageview | click | linkClick | formSubmission | customEvent | ... + filters: list of GTM filter dicts {parameter: [...], type: "EQUALS"|"CONTAINS"|...}. + Used for "fire on these conditions" — e.g. {{Click URL}} contains "tel:" + custom_event_filters: same shape, applied as additional conditions on + custom event triggers. + custom_event_name: required when trigger_type=customEvent (the dataLayer + event name to listen for). + + Common shapes: + Click on any tel: link: + trigger_type="click", filters=[{ + "type": "CONTAINS", + "parameter": [ + {"type": "TEMPLATE", "key": "arg0", "value": "{{Click URL}}"}, + {"type": "TEMPLATE", "key": "arg1", "value": "tel:"}, + ], + }] + + Form submit on contact page: + trigger_type="formSubmission", filters=[{ + "type": "CONTAINS", + "parameter": [ + {"type": "TEMPLATE", "key": "arg0", "value": "{{Page Path}}"}, + {"type": "TEMPLATE", "key": "arg1", "value": "/contacts"}, + ], + }] + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.safety.guards import SafetyViolation, check_blocked_operation + from adloop.safety.preview import ChangePlan, store_plan + + try: + check_blocked_operation("create_gtm_trigger", config.safety) + except SafetyViolation as e: + return {"error": str(e)} + + errors = [] + if not name or len(name) > 200: + errors.append("name is required (1-200 chars)") + if trigger_type not in _VALID_TRIGGER_TYPES: + errors.append( + f"trigger_type '{trigger_type}' invalid; valid: " + f"{sorted(_VALID_TRIGGER_TYPES)}" + ) + if trigger_type == "customEvent" and not custom_event_name: + errors.append("custom_event_name is required when trigger_type=customEvent") + if errors: + return {"error": "Validation failed", "details": errors} + + plan_changes = { + "account_id": account_id, + "container_id": container_id, + "workspace_id": workspace_id, + "name": name, + "type": trigger_type, + "filters": filters or [], + "custom_event_filters": custom_event_filters or [], + "auto_event_filters": auto_event_filters or [], + "parameters": parameters or [], + "notes": notes, + } + if custom_event_name: + plan_changes["custom_event_name"] = custom_event_name + + plan = ChangePlan( + operation="create_gtm_trigger", + entity_type="gtm_trigger", + entity_id=container_id, + customer_id="", + changes=plan_changes, + ) + store_plan(plan) + return plan.to_preview() + + +# --------------------------------------------------------------------------- +# update + delete for tags + triggers +# --------------------------------------------------------------------------- + + +def draft_update_gtm_tag( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + tag_id: str, + workspace_id: str = "", + name: str = "", + parameters: list[dict] | None = None, + firing_trigger_ids: list[str] | None = None, + blocking_trigger_ids: list[str] | None = None, + paused: bool | None = None, + notes: str = "", + tag_type: str = "", +) -> dict: + """Draft an UPDATE to an existing GTM tag — returns a PREVIEW. + + Only the fields you pass non-empty/non-None will be modified. Pass + ``parameters`` to fully replace the parameter list (GTM doesn't support + partial parameter updates — read the existing tag first if you need to + merge). + + tag_type: only set if you want to change the tag type, which is + unusual (most GTM tags can't change type — delete + create instead). + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.safety.guards import SafetyViolation, check_blocked_operation + from adloop.safety.preview import ChangePlan, store_plan + + try: + check_blocked_operation("update_gtm_tag", config.safety) + except SafetyViolation as e: + return {"error": str(e)} + + if not tag_id: + return {"error": "tag_id is required"} + + if tag_type and tag_type not in _VALID_TAG_TYPES: + return { + "error": "Validation failed", + "details": [ + f"tag_type '{tag_type}' invalid; valid: " + f"{sorted(_VALID_TAG_TYPES)}" + ], + } + + changes: dict = { + "account_id": account_id, + "container_id": container_id, + "workspace_id": workspace_id, + "tag_id": str(tag_id), + } + if name: + changes["name"] = name + if tag_type: + changes["type"] = tag_type + if parameters is not None: + changes["parameters"] = parameters + if firing_trigger_ids is not None: + changes["firing_trigger_ids"] = firing_trigger_ids + if blocking_trigger_ids is not None: + changes["blocking_trigger_ids"] = blocking_trigger_ids + if paused is not None: + changes["paused"] = bool(paused) + if notes: + changes["notes"] = notes + + if len(changes) <= 4: # only the routing fields + return {"error": "No fields to update"} + + plan = ChangePlan( + operation="update_gtm_tag", + entity_type="gtm_tag", + entity_id=str(tag_id), + customer_id="", + changes=changes, + ) + store_plan(plan) + return plan.to_preview() + + +def draft_update_gtm_trigger( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + trigger_id: str, + workspace_id: str = "", + name: str = "", + filters: list[dict] | None = None, + custom_event_filters: list[dict] | None = None, + auto_event_filters: list[dict] | None = None, + parameters: list[dict] | None = None, + notes: str = "", +) -> dict: + """Draft an UPDATE to an existing GTM trigger — returns a PREVIEW. + + Trigger type cannot be changed (delete + create instead). + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.safety.guards import SafetyViolation, check_blocked_operation + from adloop.safety.preview import ChangePlan, store_plan + + try: + check_blocked_operation("update_gtm_trigger", config.safety) + except SafetyViolation as e: + return {"error": str(e)} + + if not trigger_id: + return {"error": "trigger_id is required"} + + changes: dict = { + "account_id": account_id, + "container_id": container_id, + "workspace_id": workspace_id, + "trigger_id": str(trigger_id), + } + if name: + changes["name"] = name + if filters is not None: + changes["filters"] = filters + if custom_event_filters is not None: + changes["custom_event_filters"] = custom_event_filters + if auto_event_filters is not None: + changes["auto_event_filters"] = auto_event_filters + if parameters is not None: + changes["parameters"] = parameters + if notes: + changes["notes"] = notes + + if len(changes) <= 4: + return {"error": "No fields to update"} + + plan = ChangePlan( + operation="update_gtm_trigger", + entity_type="gtm_trigger", + entity_id=str(trigger_id), + customer_id="", + changes=changes, + ) + store_plan(plan) + return plan.to_preview() + + +def draft_delete_gtm_tag( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + tag_id: str, + workspace_id: str = "", +) -> dict: + """Draft a deletion of a GTM tag — returns a PREVIEW. + + Deleting a tag also deletes its firing-trigger references. Triggers + themselves are not deleted; use draft_delete_gtm_trigger for that. + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.safety.guards import SafetyViolation, check_blocked_operation + from adloop.safety.preview import ChangePlan, store_plan + + try: + check_blocked_operation("delete_gtm_tag", config.safety) + except SafetyViolation as e: + return {"error": str(e)} + + if not tag_id: + return {"error": "tag_id is required"} + + plan = ChangePlan( + operation="delete_gtm_tag", + entity_type="gtm_tag", + entity_id=str(tag_id), + customer_id="", + changes={ + "account_id": account_id, + "container_id": container_id, + "workspace_id": workspace_id, + "tag_id": str(tag_id), + }, + ) + store_plan(plan) + preview = plan.to_preview() + preview["warnings"] = [ + "Deleting a GTM tag is irreversible. Publish the workspace to make " + "the deletion live; until then it stays as a workspace draft." + ] + return preview + + +def draft_delete_gtm_trigger( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + trigger_id: str, + workspace_id: str = "", +) -> dict: + """Draft a deletion of a GTM trigger — returns a PREVIEW. + + GTM blocks the deletion if the trigger is referenced by any tag + (firing or blocking). Remove or update those tags first. + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.safety.guards import SafetyViolation, check_blocked_operation + from adloop.safety.preview import ChangePlan, store_plan + + try: + check_blocked_operation("delete_gtm_trigger", config.safety) + except SafetyViolation as e: + return {"error": str(e)} + + if not trigger_id: + return {"error": "trigger_id is required"} + + plan = ChangePlan( + operation="delete_gtm_trigger", + entity_type="gtm_trigger", + entity_id=str(trigger_id), + customer_id="", + changes={ + "account_id": account_id, + "container_id": container_id, + "workspace_id": workspace_id, + "trigger_id": str(trigger_id), + }, + ) + store_plan(plan) + preview = plan.to_preview() + preview["warnings"] = [ + "Deleting a GTM trigger is irreversible. GTM rejects the deletion " + "if any tag references this trigger — delete or unhook those tags " + "first." + ] + return preview + + +def publish_gtm_workspace( + config: AdLoopConfig, + *, + account_id: str, + container_id: str, + workspace_id: str = "", + version_name: str = "", + version_notes: str = "", +) -> dict: + """Draft a publish of the workspace — returns a PREVIEW. + + Publishing creates a new version from the workspace and sets it live. + Until you call confirm_and_apply, no live change happens — the tags and + triggers you've drafted into the workspace stay as drafts. + + version_name: optional friendly name for the version + version_notes: optional release notes + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.safety.guards import SafetyViolation, check_blocked_operation + from adloop.safety.preview import ChangePlan, store_plan + + try: + check_blocked_operation("publish_gtm_workspace", config.safety) + except SafetyViolation as e: + return {"error": str(e)} + + plan = ChangePlan( + operation="publish_gtm_workspace", + entity_type="gtm_container_version", + entity_id=container_id, + customer_id="", + changes={ + "account_id": account_id, + "container_id": container_id, + "workspace_id": workspace_id, + "version_name": version_name or "AdLoop publish", + "version_notes": version_notes, + }, + ) + store_plan(plan) + preview = plan.to_preview() + preview["warnings"] = [ + "Publishing this workspace will set its drafted changes LIVE on the " + "container. All visitors with the GTM snippet installed will start " + "receiving the new tags + triggers within minutes of publish." + ] + return preview + + +# --------------------------------------------------------------------------- +# Apply handlers — invoked via _execute_plan in ads/write.py +# --------------------------------------------------------------------------- + +def _apply_create_gtm_tag(_unused_client, _unused_cid, changes: dict) -> dict: + """POST a new tag to GTM workspace.""" + from adloop.config import load_config + from adloop.gtm.client import get_gtm_client + + cfg = load_config() + client = get_gtm_client(cfg) + ws_id = _resolve_workspace( + client, changes["account_id"], changes["container_id"], + changes.get("workspace_id", "") + ) + parent = ( + f"accounts/{changes['account_id']}" + f"/containers/{changes['container_id']}" + f"/workspaces/{ws_id}" + ) + body = { + "name": changes["name"], + "type": changes["type"], + "parameter": changes.get("parameters", []) or [], + "firingTriggerId": changes.get("firing_trigger_ids", []) or [], + "blockingTriggerId": changes.get("blocking_trigger_ids", []) or [], + "paused": changes.get("paused", False), + } + if changes.get("notes"): + body["notes"] = changes["notes"] + + resp = ( + client.accounts() + .containers() + .workspaces() + .tags() + .create(parent=parent, body=body) + .execute() + ) + return { + "tag_id": resp.get("tagId"), + "name": resp.get("name"), + "type": resp.get("type"), + "workspace_id": ws_id, + "path": resp.get("path"), + } + + +def _apply_create_gtm_trigger(_unused_client, _unused_cid, changes: dict) -> dict: + """POST a new trigger to GTM workspace.""" + from adloop.config import load_config + from adloop.gtm.client import get_gtm_client + + cfg = load_config() + client = get_gtm_client(cfg) + ws_id = _resolve_workspace( + client, changes["account_id"], changes["container_id"], + changes.get("workspace_id", "") + ) + parent = ( + f"accounts/{changes['account_id']}" + f"/containers/{changes['container_id']}" + f"/workspaces/{ws_id}" + ) + body = { + "name": changes["name"], + "type": changes["type"], + } + if changes.get("filters"): + body["filter"] = changes["filters"] + if changes.get("custom_event_filters"): + body["customEventFilter"] = changes["custom_event_filters"] + if changes.get("auto_event_filters"): + body["autoEventFilter"] = changes["auto_event_filters"] + if changes.get("parameters"): + body["parameter"] = changes["parameters"] + if changes.get("notes"): + body["notes"] = changes["notes"] + if changes.get("custom_event_name"): + # Custom event triggers use a customEventFilter on {{Event}} + body.setdefault("customEventFilter", []).append({ + "type": "EQUALS", + "parameter": [ + {"type": "TEMPLATE", "key": "arg0", "value": "{{_event}}"}, + {"type": "TEMPLATE", "key": "arg1", "value": changes["custom_event_name"]}, + ], + }) + + resp = ( + client.accounts() + .containers() + .workspaces() + .triggers() + .create(parent=parent, body=body) + .execute() + ) + return { + "trigger_id": resp.get("triggerId"), + "name": resp.get("name"), + "type": resp.get("type"), + "workspace_id": ws_id, + "path": resp.get("path"), + } + + +def _apply_publish_gtm_workspace(_unused_client, _unused_cid, changes: dict) -> dict: + """Create a version from the workspace and publish it.""" + from adloop.config import load_config + from adloop.gtm.client import get_gtm_client + + cfg = load_config() + client = get_gtm_client(cfg) + ws_id = _resolve_workspace( + client, changes["account_id"], changes["container_id"], + changes.get("workspace_id", "") + ) + ws_path = ( + f"accounts/{changes['account_id']}" + f"/containers/{changes['container_id']}" + f"/workspaces/{ws_id}" + ) + body = {} + if changes.get("version_name"): + body["name"] = changes["version_name"] + if changes.get("version_notes"): + body["notes"] = changes["version_notes"] + + # Step 1: create the version (compiles the workspace into a versioned snapshot) + create_resp = ( + client.accounts() + .containers() + .workspaces() + .create_version(path=ws_path, body=body) + .execute() + ) + if create_resp.get("compilerError"): + return { + "error": "GTM compiler errors", + "details": create_resp.get("compilerError"), + } + version = create_resp.get("containerVersion") or {} + version_path = version.get("path") + if not version_path: + return { + "error": "GTM did not return a version path", + "details": create_resp, + } + + # Step 2: publish the version + publish_resp = ( + client.accounts() + .containers() + .versions() + .publish(path=version_path) + .execute() + ) + return { + "version_id": version.get("containerVersionId"), + "version_path": version_path, + "compiler_warnings": create_resp.get("compilerError", []), + "published": True, + "publish_response": publish_resp, + } + + +def _apply_update_gtm_tag(_unused_client, _unused_cid, changes: dict) -> dict: + """Update an existing GTM tag in the workspace. + + Reads the existing tag first to preserve fingerprint + any unspecified + fields, then writes the merged version back. + """ + from adloop.config import load_config + from adloop.gtm.client import get_gtm_client + + cfg = load_config() + client = get_gtm_client(cfg) + ws_id = _resolve_workspace( + client, changes["account_id"], changes["container_id"], + changes.get("workspace_id", "") + ) + tag_path = ( + f"accounts/{changes['account_id']}" + f"/containers/{changes['container_id']}" + f"/workspaces/{ws_id}/tags/{changes['tag_id']}" + ) + tags_api = client.accounts().containers().workspaces().tags() + existing = tags_api.get(path=tag_path).execute() + + body = { + "name": changes.get("name", existing.get("name")), + "type": changes.get("type", existing.get("type")), + "parameter": ( + changes["parameters"] if "parameters" in changes + else existing.get("parameter", []) + ), + "firingTriggerId": ( + changes["firing_trigger_ids"] + if "firing_trigger_ids" in changes + else existing.get("firingTriggerId", []) + ), + "blockingTriggerId": ( + changes["blocking_trigger_ids"] + if "blocking_trigger_ids" in changes + else existing.get("blockingTriggerId", []) + ), + "paused": ( + changes["paused"] if "paused" in changes + else existing.get("paused", False) + ), + "fingerprint": existing.get("fingerprint"), + } + if "notes" in changes: + body["notes"] = changes["notes"] + elif existing.get("notes"): + body["notes"] = existing["notes"] + + resp = tags_api.update(path=tag_path, body=body).execute() + return { + "tag_id": resp.get("tagId"), + "name": resp.get("name"), + "type": resp.get("type"), + "workspace_id": ws_id, + "path": resp.get("path"), + } + + +def _apply_update_gtm_trigger(_unused_client, _unused_cid, changes: dict) -> dict: + """Update an existing GTM trigger in the workspace.""" + from adloop.config import load_config + from adloop.gtm.client import get_gtm_client + + cfg = load_config() + client = get_gtm_client(cfg) + ws_id = _resolve_workspace( + client, changes["account_id"], changes["container_id"], + changes.get("workspace_id", "") + ) + trig_path = ( + f"accounts/{changes['account_id']}" + f"/containers/{changes['container_id']}" + f"/workspaces/{ws_id}/triggers/{changes['trigger_id']}" + ) + triggers_api = client.accounts().containers().workspaces().triggers() + existing = triggers_api.get(path=trig_path).execute() + + body = { + "name": changes.get("name", existing.get("name")), + "type": existing.get("type"), # immutable + "fingerprint": existing.get("fingerprint"), + } + # Preserve fields the caller didn't ask to change. + for src_key, body_key in ( + ("filters", "filter"), + ("custom_event_filters", "customEventFilter"), + ("auto_event_filters", "autoEventFilter"), + ("parameters", "parameter"), + ): + if src_key in changes: + body[body_key] = changes[src_key] + elif body_key in existing: + body[body_key] = existing[body_key] + + for keep in ("waitForTags", "checkValidation", "waitForTagsTimeout", + "uniqueTriggerId", "parentFolderId"): + if keep in existing: + body[keep] = existing[keep] + + if "notes" in changes: + body["notes"] = changes["notes"] + elif existing.get("notes"): + body["notes"] = existing["notes"] + + resp = triggers_api.update(path=trig_path, body=body).execute() + return { + "trigger_id": resp.get("triggerId"), + "name": resp.get("name"), + "type": resp.get("type"), + "workspace_id": ws_id, + "path": resp.get("path"), + } + + +def _apply_delete_gtm_tag(_unused_client, _unused_cid, changes: dict) -> dict: + """Delete a GTM tag from the workspace.""" + from adloop.config import load_config + from adloop.gtm.client import get_gtm_client + + cfg = load_config() + client = get_gtm_client(cfg) + ws_id = _resolve_workspace( + client, changes["account_id"], changes["container_id"], + changes.get("workspace_id", "") + ) + tag_path = ( + f"accounts/{changes['account_id']}" + f"/containers/{changes['container_id']}" + f"/workspaces/{ws_id}/tags/{changes['tag_id']}" + ) + tags_api = client.accounts().containers().workspaces().tags() + tags_api.delete(path=tag_path).execute() + return {"deleted_tag_id": changes["tag_id"], "workspace_id": ws_id} + + +def _apply_delete_gtm_trigger(_unused_client, _unused_cid, changes: dict) -> dict: + """Delete a GTM trigger from the workspace.""" + from adloop.config import load_config + from adloop.gtm.client import get_gtm_client + + cfg = load_config() + client = get_gtm_client(cfg) + ws_id = _resolve_workspace( + client, changes["account_id"], changes["container_id"], + changes.get("workspace_id", "") + ) + trig_path = ( + f"accounts/{changes['account_id']}" + f"/containers/{changes['container_id']}" + f"/workspaces/{ws_id}/triggers/{changes['trigger_id']}" + ) + triggers_api = client.accounts().containers().workspaces().triggers() + triggers_api.delete(path=trig_path).execute() + return {"deleted_trigger_id": changes["trigger_id"], "workspace_id": ws_id} diff --git a/src/adloop/server.py b/src/adloop/server.py index fb9035a..09125b5 100644 --- a/src/adloop/server.py +++ b/src/adloop/server.py @@ -713,6 +713,256 @@ def attribution_check( ) +@mcp.tool(annotations=_READONLY) +@_safe +def audit_event_coverage( + expected_events: list[str], + gtm_account_id: str, + gtm_container_id: str, + property_id: str = "", + date_range_start: str = "", + date_range_end: str = "", +) -> dict: + """Three-way audit: codebase events ↔ GTM tags ↔ GA4 actual fires. + + First, search the user's codebase for gtag('event', ...) and + dataLayer.push({event: ...}) calls and extract every distinct event name. + Pass that list as `expected_events`. The tool fetches the LIVE GTM + container, joins it against GA4 event counts for the date range, and + returns a per-event matrix with one of these statuses: + ok — tag active and event firing + ok_auto_collected — GA4 Enhanced Measurement event, no tag needed + no_tag_no_fire — codebase event, no GTM tag, never fires + tag_paused — GTM tag exists but is paused + tag_active_but_not_firing — tag is active but no GA4 hits + gtm_only_firing — GA4 event from a tag, not in codebase + gtm_only_not_firing — tag exists, not in codebase, no fires + ga4_only — fires in GA4, no tag, no codebase ref + ga4_fires_no_tag — codebase event firing without a GTM tag + auto_event_only — Enhanced Measurement event with no codebase ref + + Also surfaces dynamic-event tags ({{Event}} variables) and Custom HTML + tags that the audit cannot interpret automatically. + + GTM IDs come from Tag Manager UI → Admin → Container Settings. + Date format: "YYYY-MM-DD". Empty = last 30 days. + """ + from adloop.crossref import audit_event_coverage as _impl + + return _impl( + _config, + expected_events=expected_events, + gtm_account_id=gtm_account_id, + gtm_container_id=gtm_container_id, + property_id=property_id or _config.ga4.property_id, + date_range_start=date_range_start, + date_range_end=date_range_end, + ) + + +@mcp.tool(annotations=_READONLY) +@_safe +def list_gtm_accounts() -> dict: + """List all GTM accounts the AdLoop service account / OAuth user can read. + + Use this for first-time discovery before calling audit_event_coverage — + you need the account_id from here. If this returns an empty list, the + service account hasn't been added to any GTM container with at least + Read permission. + """ + from adloop.gtm.read import list_accounts as _impl + + return _impl(_config) + + +@mcp.tool(annotations=_READONLY) +@_safe +def list_gtm_containers(gtm_account_id: str) -> dict: + """List all containers under a GTM account. + + Returns container_id (the numeric ID needed by audit_event_coverage), + public_id (the GTM-XXXXXXX string shown in the UI), name, and usage + context (web / iOS / Android / amp / server). + """ + from adloop.gtm.read import list_containers as _impl + + return _impl(_config, account_id=gtm_account_id) + + +@mcp.tool(annotations=_READONLY) +@_safe +def list_gtm_tags(gtm_account_id: str, gtm_container_id: str) -> dict: + """List every tag in the LIVE GTM container. + + Each tag includes type, status, parsed parameters, the GA4 event name + (for GA4 event tags), and resolved firing/blocking trigger names. + Use after audit_event_coverage to inspect specific tags. + """ + from adloop.gtm.read import list_tags as _impl + + return _impl( + _config, account_id=gtm_account_id, container_id=gtm_container_id + ) + + +@mcp.tool(annotations=_READONLY) +@_safe +def get_gtm_tag( + gtm_account_id: str, gtm_container_id: str, tag_id: str +) -> dict: + """Get the full RAW configuration for a single GTM tag. + + Includes every parameter, firing/blocking triggers (with their filter + conditions resolved to text), priority, pause status, sampling, and + monitoring metadata. Use to inspect a tag flagged by audit_event_coverage. + """ + from adloop.gtm.read import get_tag as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + tag_id=tag_id, + ) + + +@mcp.tool(annotations=_READONLY) +@_safe +def list_gtm_triggers(gtm_account_id: str, gtm_container_id: str) -> dict: + """List every trigger in the LIVE GTM container. + + Each trigger has its filter conditions parsed to readable text + (e.g. "{{Page Path}} matches RegExp ^/service-promotions/"). Use to + diagnose why a tag fires or doesn't fire on specific pages. + """ + from adloop.gtm.read import list_triggers as _impl + + return _impl( + _config, account_id=gtm_account_id, container_id=gtm_container_id + ) + + +@mcp.tool(annotations=_READONLY) +@_safe +def get_gtm_trigger( + gtm_account_id: str, gtm_container_id: str, trigger_id: str +) -> dict: + """Get the full RAW configuration for a single GTM trigger. + + Includes filters, auto-event filters, custom-event filters, validation + settings, and a list of every tag that uses this trigger. Use to + diagnose why a tag with a specific trigger ID does or doesn't fire. + """ + from adloop.gtm.read import get_trigger as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + trigger_id=trigger_id, + ) + + +@mcp.tool(annotations=_READONLY) +@_safe +def list_gtm_variables(gtm_account_id: str, gtm_container_id: str) -> dict: + """List GTM variables — both custom and enabled built-in. + + Custom variables come from the live container. Built-in variables + (Page URL, Click Element, Form ID, etc.) come from the workspace's + enabled-built-ins list. Variables matter because triggers reference + them — if a trigger uses {{Form ID}} but Form ID isn't enabled, the + trigger never matches. + """ + from adloop.gtm.read import list_variables as _impl + + return _impl( + _config, account_id=gtm_account_id, container_id=gtm_container_id + ) + + +@mcp.tool(annotations=_READONLY) +@_safe +def list_gtm_workspaces(gtm_account_id: str, gtm_container_id: str) -> dict: + """List workspaces (drafts) under a GTM container. + + Workspace IDs are needed for `get_gtm_workspace_diff`. Most containers + have a single Default Workspace; multiple workspaces appear when the + team uses parallel drafts. + """ + from adloop.gtm.read import list_workspaces as _impl + + return _impl( + _config, account_id=gtm_account_id, container_id=gtm_container_id + ) + + +@mcp.tool(annotations=_READONLY) +@_safe +def get_gtm_workspace_diff( + gtm_account_id: str, gtm_container_id: str, workspace_id: str +) -> dict: + """Show drafted-but-not-published changes in a GTM workspace. + + Returns the list of entities (tags, triggers, variables) added, + modified, or deleted relative to the live published version, plus + any merge conflicts. Common cause of "I edited a tag but nothing + happened" — the workspace was never published. is_clean=true means + no pending changes and no conflicts. + """ + from adloop.gtm.read import get_workspace_diff as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + workspace_id=workspace_id, + ) + + +@mcp.tool(annotations=_READONLY) +@_safe +def list_gtm_versions( + gtm_account_id: str, gtm_container_id: str, page_size: int = 50 +) -> dict: + """List published GTM version history (newest first). + + Version headers include version_id, name, and entity counts. Use to + correlate a metric drop with a recent publish: fetch versions, find + one with timestamps near the drop date, then call get_gtm_version + for full content + author info. + """ + from adloop.gtm.read import list_versions as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + page_size=page_size, + ) + + +@mcp.tool(annotations=_READONLY) +@_safe +def get_gtm_version( + gtm_account_id: str, gtm_container_id: str, container_version_id: str +) -> dict: + """Get full metadata + entity counts for a single GTM container version. + + Returns name, description, fingerprint, and lists of tag/trigger/ + variable names at that point in time. Use after list_gtm_versions + when correlating a metric drop with a specific publish. + """ + from adloop.gtm.read import get_version as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + container_version_id=container_version_id, + ) + + @mcp.tool(annotations=_READONLY) @_safe def run_gaql( @@ -1430,3 +1680,299 @@ def discover_keywords( except ImportError: # _debug_tools.py is intentionally absent in released builds. pass +# ---------------------------------------------------------------------------# GTM Write Tools# --------------------------------------------------------------------------- + +@mcp.tool(annotations=_WRITE) +@_safe +def draft_gtm_tag( + gtm_account_id: str, + gtm_container_id: str, + name: str, + tag_type: str, + parameters: list[dict] | None = None, + firing_trigger_ids: list[str] | None = None, + blocking_trigger_ids: list[str] | None = None, + paused: bool = False, + notes: str = "", + workspace_id: str = "", +) -> dict: + """Draft a new GTM tag in the Default Workspace — returns a PREVIEW. + + Common tag_type values: + googtag — Google Tag (gtag.js config). Required for both AW-... and G-... + awct — Google Ads Conversion Tracking + gclidw — Conversion Linker (no parameters required) + html — Custom HTML + gaawe — GA4 Event tag + + parameters: list of GTM parameter dicts. Common shapes: + + Google Tag config (Google Ads): + [{"type": "TEMPLATE", "key": "tagId", "value": "AW-11437481610"}] + + Google Ads Conversion (with phone_conversion_number for GFN): + [{"type": "TEMPLATE", "key": "conversionId", "value": "11437481610"}, + {"type": "TEMPLATE", "key": "conversionLabel", "value": "_qxp..."}, + {"type": "TEMPLATE", "key": "conversionValue", "value": "250"}, + {"type": "TEMPLATE", "key": "conversionCurrency", "value": "USD"}, + {"type": "TEMPLATE", "key": "phone_conversion_number", + "value": "(916) 460-9257"}] + + Custom HTML (e.g. GFN snippet): + [{"type": "TEMPLATE", "key": "html", + "value": ""}, + {"type": "BOOLEAN", "key": "supportDocumentWrite", "value": "false"}] + + firing_trigger_ids: list of trigger ID strings. Use ["2147479573"] for + the built-in "All Pages — Initialization" trigger. + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.gtm.write import draft_gtm_tag as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + workspace_id=workspace_id, + name=name, + tag_type=tag_type, + parameters=parameters, + firing_trigger_ids=firing_trigger_ids, + blocking_trigger_ids=blocking_trigger_ids, + paused=paused, + notes=notes, + ) + +@mcp.tool(annotations=_WRITE) +@_safe +def draft_gtm_trigger( + gtm_account_id: str, + gtm_container_id: str, + name: str, + trigger_type: str, + filters: list[dict] | None = None, + custom_event_filters: list[dict] | None = None, + auto_event_filters: list[dict] | None = None, + custom_event_name: str = "", + parameters: list[dict] | None = None, + notes: str = "", + workspace_id: str = "", +) -> dict: + """Draft a new GTM trigger in the Default Workspace — returns a PREVIEW. + + Common trigger_type values: + pageview, dom_ready, window_loaded + click, linkClick + formSubmission + customEvent (requires custom_event_name) + + filters: GTM filter shape — list of dicts: + [{"type": "EQUALS"|"CONTAINS"|"STARTS_WITH"|"REGEX"|"GREATER"|..., + "parameter": [ + {"type": "TEMPLATE", "key": "arg0", "value": "{{Variable}}"}, + {"type": "TEMPLATE", "key": "arg1", "value": "expected"} + ]}] + + Common patterns: + + Click on tel: links anywhere on site: + trigger_type="linkClick", filters=[{ + "type": "STARTS_WITH", + "parameter": [ + {"type": "TEMPLATE", "key": "arg0", "value": "{{Click URL}}"}, + {"type": "TEMPLATE", "key": "arg1", "value": "tel:"} + ] + }] + + Form submit on /contacts: + trigger_type="formSubmission", filters=[{ + "type": "CONTAINS", + "parameter": [ + {"type": "TEMPLATE", "key": "arg0", "value": "{{Page Path}}"}, + {"type": "TEMPLATE", "key": "arg1", "value": "/contacts"} + ] + }] + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.gtm.write import draft_gtm_trigger as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + workspace_id=workspace_id, + name=name, + trigger_type=trigger_type, + filters=filters, + custom_event_filters=custom_event_filters, + auto_event_filters=auto_event_filters, + custom_event_name=custom_event_name, + parameters=parameters, + notes=notes, + ) + +@mcp.tool(annotations=_DESTRUCTIVE) +@_safe +def publish_gtm_workspace( + gtm_account_id: str, + gtm_container_id: str, + workspace_id: str = "", + version_name: str = "", + version_notes: str = "", +) -> dict: + """Publish the workspace's drafted changes — returns a PREVIEW (irreversible apply). + + Publishing creates a new container version from the workspace and sets it + LIVE. Until you call confirm_and_apply with dry_run=false, no live change + happens. + + Once published, all visitors with the GTM snippet on their pages will + start firing the new tags within minutes. There's no rollback to the + workspace state — to revert, you'd publish a previous version (use + list_gtm_versions to find one). + + workspace_id: leave empty to use the Default Workspace. + version_name: optional friendly name for the version. + version_notes: optional release notes. + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.gtm.write import publish_gtm_workspace as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + workspace_id=workspace_id, + version_name=version_name, + version_notes=version_notes, + ) + +@mcp.tool(annotations=_WRITE) +@_safe +def draft_update_gtm_tag( + gtm_account_id: str, + gtm_container_id: str, + tag_id: str, + name: str = "", + parameters: list[dict] | None = None, + firing_trigger_ids: list[str] | None = None, + blocking_trigger_ids: list[str] | None = None, + paused: bool | None = None, + notes: str = "", + tag_type: str = "", + workspace_id: str = "", +) -> dict: + """Draft a partial UPDATE to an existing GTM tag — returns a PREVIEW. + + Pass only the fields you want to change. Empty/None means "keep current". + + Common use cases: + - Rename a tag: pass `name` + - Pause/unpause: pass `paused=True/False` + - Replace parameters: pass full `parameters` list (no partial merge) + - Re-route firing triggers: pass `firing_trigger_ids` + + Note: tag type cannot usually be changed; if you need to switch (e.g. + `html` → `awcc`), delete the old and create a new one. + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.gtm.write import draft_update_gtm_tag as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + workspace_id=workspace_id, + tag_id=tag_id, + name=name, + parameters=parameters, + firing_trigger_ids=firing_trigger_ids, + blocking_trigger_ids=blocking_trigger_ids, + paused=paused, + notes=notes, + tag_type=tag_type, + ) + +@mcp.tool(annotations=_WRITE) +@_safe +def draft_update_gtm_trigger( + gtm_account_id: str, + gtm_container_id: str, + trigger_id: str, + name: str = "", + filters: list[dict] | None = None, + custom_event_filters: list[dict] | None = None, + auto_event_filters: list[dict] | None = None, + parameters: list[dict] | None = None, + notes: str = "", + workspace_id: str = "", +) -> dict: + """Draft a partial UPDATE to an existing GTM trigger — returns a PREVIEW. + + Trigger type cannot be changed (delete + create instead). + + Call confirm_and_apply with the returned plan_id to execute. + """ + from adloop.gtm.write import draft_update_gtm_trigger as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + workspace_id=workspace_id, + trigger_id=trigger_id, + name=name, + filters=filters, + custom_event_filters=custom_event_filters, + auto_event_filters=auto_event_filters, + parameters=parameters, + notes=notes, + ) + +@mcp.tool(annotations=_DESTRUCTIVE) +@_safe +def draft_delete_gtm_tag( + gtm_account_id: str, + gtm_container_id: str, + tag_id: str, + workspace_id: str = "", +) -> dict: + """Draft a deletion of a GTM tag — returns a PREVIEW (irreversible). + + Publish the workspace to make the deletion live. + """ + from adloop.gtm.write import draft_delete_gtm_tag as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + workspace_id=workspace_id, + tag_id=tag_id, + ) + +@mcp.tool(annotations=_DESTRUCTIVE) +@_safe +def draft_delete_gtm_trigger( + gtm_account_id: str, + gtm_container_id: str, + trigger_id: str, + workspace_id: str = "", +) -> dict: + """Draft a deletion of a GTM trigger — returns a PREVIEW (irreversible). + + GTM blocks the deletion if any tag references this trigger. + """ + from adloop.gtm.write import draft_delete_gtm_trigger as _impl + + return _impl( + _config, + account_id=gtm_account_id, + container_id=gtm_container_id, + workspace_id=workspace_id, + trigger_id=trigger_id, + ) diff --git a/tests/test_gtm.py b/tests/test_gtm.py new file mode 100644 index 0000000..2bfa1d6 --- /dev/null +++ b/tests/test_gtm.py @@ -0,0 +1,644 @@ +"""Tests for the Google Tag Manager integration — parsers + audit_event_coverage.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from adloop.crossref import audit_event_coverage +from adloop.gtm.read import ( + _BUILT_IN_TRIGGERS, + GA4_EVENT_TAG, + _element_visibility_summary, + _params_dict, + _parse_trigger, + _parse_variable, + _resolve_trigger, + _summarize_filter, + _trigger_group_member_ids, +) + + +# --------------------------------------------------------------------------- +# _params_dict — flatten parameter[] arrays into a {key: value} dict +# --------------------------------------------------------------------------- + + +class TestParamsDict: + def test_value_only_param(self): + tag = {"parameter": [{"type": "template", "key": "tagId", "value": "G-XXX"}]} + assert _params_dict(tag) == {"tagId": "G-XXX"} + + def test_list_param(self): + tag = { + "parameter": [ + {"key": "ids", "type": "list", "list": [{"value": "a"}, {"value": "b"}]} + ] + } + result = _params_dict(tag) + assert result["ids"] == [{"value": "a"}, {"value": "b"}] + + def test_map_param(self): + tag = { + "parameter": [ + {"key": "settings", "type": "map", "map": [{"key": "k", "value": "v"}]} + ] + } + result = _params_dict(tag) + assert result["settings"] == [{"key": "k", "value": "v"}] + + def test_skips_keyless_params(self): + tag = {"parameter": [{"value": "orphan"}, {"key": "good", "value": "ok"}]} + assert _params_dict(tag) == {"good": "ok"} + + def test_empty_parameter_list(self): + assert _params_dict({"parameter": []}) == {} + + def test_no_parameter_key(self): + assert _params_dict({}) == {} + + +# --------------------------------------------------------------------------- +# _summarize_filter — render variable [NOT] OP value, including negate flag +# --------------------------------------------------------------------------- + + +class TestSummarizeFilter: + def test_basic_contains(self): + f = { + "type": "contains", + "parameter": [ + {"key": "arg0", "value": "{{Page Path}}"}, + {"key": "arg1", "value": "service-promotions"}, + ], + } + assert _summarize_filter(f) == "{{Page Path}} contains service-promotions" + + def test_negate_true_renders_NOT(self): + f = { + "type": "contains", + "parameter": [ + {"key": "arg0", "value": "{{Form ID}}"}, + {"key": "arg1", "value": "newsletter"}, + {"key": "negate", "value": "true"}, + ], + } + assert _summarize_filter(f) == "{{Form ID}} NOT contains newsletter" + + def test_negate_false_no_prefix(self): + f = { + "type": "equals", + "parameter": [ + {"key": "arg0", "value": "{{Event}}"}, + {"key": "arg1", "value": "click"}, + {"key": "negate", "value": "false"}, + ], + } + assert _summarize_filter(f) == "{{Event}} equals click" + + def test_arbitrary_op_preserved(self): + f = { + "type": "matchRegex", + "parameter": [ + {"key": "arg0", "value": "{{Page URL}}"}, + {"key": "arg1", "value": "^https://"}, + ], + } + assert _summarize_filter(f) == "{{Page URL}} matchRegex ^https://" + + def test_missing_args_render_question_mark(self): + f = {"type": "contains", "parameter": []} + assert _summarize_filter(f) == "? contains ?" + + def test_missing_type_renders_question_mark(self): + f = { + "parameter": [ + {"key": "arg0", "value": "{{X}}"}, + {"key": "arg1", "value": "y"}, + ] + } + assert _summarize_filter(f) == "{{X}} ? y" + + +# --------------------------------------------------------------------------- +# _resolve_trigger — built-in IDs (>= 2147479553) get readable names +# --------------------------------------------------------------------------- + + +class TestResolveTrigger: + def test_custom_trigger_in_dict(self): + by_id = {"42": {"name": "My Trigger", "type": "click"}} + assert _resolve_trigger(by_id, "42") == { + "id": "42", + "name": "My Trigger", + "type": "click", + } + + def test_built_in_all_pages(self): + result = _resolve_trigger({}, "2147479553") + assert result["id"] == "2147479553" + assert "All Pages" in result["name"] + assert result["name"].startswith("(built-in)") + assert result["type"] == "pageview" + + def test_built_in_initialization(self): + result = _resolve_trigger({}, "2147479573") + assert "Initialization" in result["name"] + assert result["type"] == "init" + + def test_built_in_consent(self): + result = _resolve_trigger({}, "2147479572") + assert "Consent" in result["name"] + assert result["type"] == "consentInit" + + def test_unknown_built_in_id(self): + result = _resolve_trigger({}, "9999999999") + assert result["id"] == "9999999999" + assert "unknown" in result["name"].lower() + assert result["type"] is None + + def test_built_in_dict_complete(self): + # Sanity: every entry in _BUILT_IN_TRIGGERS resolves cleanly + for tid in _BUILT_IN_TRIGGERS: + result = _resolve_trigger({}, tid) + assert result["name"].startswith("(built-in)") + assert result["type"] is not None + + +# --------------------------------------------------------------------------- +# _trigger_group_member_ids — extract triggerIds list from a triggerGroup +# --------------------------------------------------------------------------- + + +class TestTriggerGroupMemberIds: + def test_extracts_member_ids(self): + trigger = { + "type": "triggerGroup", + "parameter": [ + { + "key": "triggerIds", + "type": "list", + "list": [ + {"type": "triggerReference", "value": "9"}, + {"type": "triggerReference", "value": "21"}, + ], + } + ], + } + assert _trigger_group_member_ids(trigger) == ["9", "21"] + + def test_empty_when_no_triggerIds_param(self): + trigger = {"type": "triggerGroup", "parameter": []} + assert _trigger_group_member_ids(trigger) == [] + + def test_empty_when_list_is_empty(self): + trigger = { + "type": "triggerGroup", + "parameter": [{"key": "triggerIds", "type": "list", "list": []}], + } + assert _trigger_group_member_ids(trigger) == [] + + def test_skips_items_without_value(self): + trigger = { + "type": "triggerGroup", + "parameter": [ + { + "key": "triggerIds", + "type": "list", + "list": [ + {"type": "triggerReference", "value": "1"}, + {"type": "triggerReference"}, # missing value + ], + } + ], + } + assert _trigger_group_member_ids(trigger) == ["1"] + + +# --------------------------------------------------------------------------- +# _element_visibility_summary — selector + timing for elementVisibility triggers +# --------------------------------------------------------------------------- + + +class TestElementVisibilitySummary: + def test_id_selector_uppercase(self): + # GTM returns selectorType="ID" (uppercase) — the regression case + trigger = { + "type": "elementVisibility", + "parameter": [ + {"key": "selectorType", "value": "ID"}, + {"key": "elementId", "value": "form-success"}, + {"key": "firingFrequency", "value": "ONCE"}, + {"key": "onScreenRatio", "value": "10"}, + ], + } + result = _element_visibility_summary(trigger) + assert result["selector_type"] == "ID" + assert result["selector"] == "form-success" + assert result["firing_frequency"] == "ONCE" + assert result["on_screen_ratio"] == "10" + + def test_id_selector_lowercase(self): + # Defensive: case-insensitive match + trigger = { + "type": "elementVisibility", + "parameter": [ + {"key": "selectorType", "value": "id"}, + {"key": "elementId", "value": "x"}, + ], + } + result = _element_visibility_summary(trigger) + assert result["selector"] == "x" + + def test_css_selector(self): + trigger = { + "type": "elementVisibility", + "parameter": [ + {"key": "selectorType", "value": "CSS"}, + {"key": "elementSelector", "value": "#root .success"}, + {"key": "useDomChangeListener", "value": "true"}, + ], + } + result = _element_visibility_summary(trigger) + assert result["selector_type"] == "CSS" + assert result["selector"] == "#root .success" + assert result["use_dom_change_listener"] == "true" + + def test_missing_fields_return_none(self): + result = _element_visibility_summary({"parameter": []}) + # selectorType is None → falls through to elementSelector lookup → also None + assert result["selector"] is None + assert result["selector_type"] is None + assert result["firing_frequency"] is None + + +# --------------------------------------------------------------------------- +# _parse_trigger — type-specific dispatch +# --------------------------------------------------------------------------- + + +class TestParseTrigger: + def test_basic_trigger_no_extras(self): + trigger = { + "triggerId": "5", + "name": "Click Trigger", + "type": "click", + "filter": [], + } + result = _parse_trigger(trigger) + assert result["trigger_id"] == "5" + assert result["name"] == "Click Trigger" + assert result["type"] == "click" + assert "group_member_trigger_ids" not in result + assert "element_visibility" not in result + + def test_trigger_group_adds_member_ids(self): + trigger = { + "triggerId": "10", + "name": "Group", + "type": "triggerGroup", + "filter": [], + "parameter": [ + { + "key": "triggerIds", + "type": "list", + "list": [{"value": "1"}, {"value": "2"}], + } + ], + } + result = _parse_trigger(trigger) + assert result["group_member_trigger_ids"] == ["1", "2"] + + def test_element_visibility_adds_block(self): + trigger = { + "triggerId": "7", + "name": "Visibility", + "type": "elementVisibility", + "filter": [], + "parameter": [ + {"key": "selectorType", "value": "ID"}, + {"key": "elementId", "value": "thanks"}, + ], + } + result = _parse_trigger(trigger) + assert "element_visibility" in result + assert result["element_visibility"]["selector"] == "thanks" + + def test_filters_parsed_to_text(self): + trigger = { + "triggerId": "1", + "name": "X", + "type": "click", + "filter": [ + { + "type": "contains", + "parameter": [ + {"key": "arg0", "value": "{{Page Path}}"}, + {"key": "arg1", "value": "/x"}, + ], + } + ], + } + result = _parse_trigger(trigger) + assert result["filters"] == ["{{Page Path}} contains /x"] + + def test_wait_for_tags_extracted_from_dict(self): + trigger = { + "triggerId": "1", + "name": "X", + "type": "click", + "waitForTags": {"value": "true"}, + } + result = _parse_trigger(trigger) + assert result["wait_for_tags"] == "true" + + +# --------------------------------------------------------------------------- +# _parse_variable +# --------------------------------------------------------------------------- + + +class TestParseVariable: + def test_basic_variable(self): + variable = { + "variableId": "14", + "name": "DLV - promo", + "type": "v", + "parameter": [{"key": "name", "value": "promo_name"}], + "formatValue": {}, + } + result = _parse_variable(variable) + assert result["variable_id"] == "14" + assert result["name"] == "DLV - promo" + assert result["type"] == "v" + assert result["parameters"] == {"name": "promo_name"} + + +# --------------------------------------------------------------------------- +# audit_event_coverage — status determination + insights +# --------------------------------------------------------------------------- + + +def _container(tags=None): + """Helper: build the dict shape `get_live_container` returns.""" + return { + "account_id": "A", + "container_id": "C", + "container_version_id": "1", + "container_version_name": None, + "fingerprint": "f", + "tags": tags or [], + "trigger_count": 0, + "variable_count": 0, + } + + +def _ga4_response(events: dict[str, int]): + """Helper: build the dict shape `get_tracking_events` returns.""" + return { + "rows": [{"eventName": k, "eventCount": str(v)} for k, v in events.items()], + } + + +def _ga4_event_tag(name: str, event_name: str, paused: bool = False): + """Helper: build a parsed GA4 event tag.""" + return { + "tag_id": name, + "name": name, + "type": GA4_EVENT_TAG, + "event_name": event_name, + "paused": paused, + "firing_triggers": [], + "blocking_triggers": [], + "parameters": {}, + } + + +@pytest.fixture +def patch_gtm_and_ga4(): + """Patch the two external calls that audit_event_coverage makes.""" + + def _patch(container_dict, ga4_dict): + return ( + patch("adloop.gtm.read.get_live_container", return_value=container_dict), + patch("adloop.ga4.tracking.get_tracking_events", return_value=ga4_dict), + ) + + return _patch + + +class TestAuditEventCoverageStatuses: + """Each test forces one specific status code into the matrix.""" + + def _run(self, container, ga4, expected_events): + with ( + patch("adloop.gtm.read.get_live_container", return_value=container), + patch( + "adloop.ga4.tracking.get_tracking_events", return_value=ga4 + ), + ): + return audit_event_coverage( + config=None, + expected_events=expected_events, + gtm_account_id="A", + gtm_container_id="C", + date_range_start="2026-04-01", + date_range_end="2026-04-30", + ) + + def _status_for(self, result, event_name): + for row in result["matrix"]: + if row["event_name"] == event_name: + return row["status"] + raise AssertionError(f"event {event_name} not in matrix") + + def test_ok_status(self): + # codebase + active tag + ga4 fires + c = _container([_ga4_event_tag("T", "purchase")]) + g = _ga4_response({"purchase": 5}) + result = self._run(c, g, ["purchase"]) + assert self._status_for(result, "purchase") == "ok" + + def test_no_tag_no_fire(self): + # codebase event, no tag, no ga4 + result = self._run(_container([]), _ga4_response({}), ["my_custom_event"]) + assert self._status_for(result, "my_custom_event") == "no_tag_no_fire" + + def test_tag_paused(self): + # codebase + tag exists + paused (no ga4 fires either) + c = _container([_ga4_event_tag("T", "lead", paused=True)]) + result = self._run(c, _ga4_response({}), ["lead"]) + assert self._status_for(result, "lead") == "tag_paused" + + def test_tag_active_but_not_firing(self): + # codebase + active tag + ga4 reports zero + c = _container([_ga4_event_tag("T", "signup")]) + result = self._run(c, _ga4_response({}), ["signup"]) + assert self._status_for(result, "signup") == "tag_active_but_not_firing" + + def test_ok_auto_collected(self): + # codebase event matches a GA4 auto event, no tag, ga4 fires + result = self._run( + _container([]), + _ga4_response({"scroll": 100}), + ["scroll"], + ) + assert self._status_for(result, "scroll") == "ok_auto_collected" + + def test_ga4_fires_no_tag(self): + # codebase event fires in GA4 but no tag, NOT auto event + result = self._run( + _container([]), + _ga4_response({"my_custom": 3}), + ["my_custom"], + ) + assert self._status_for(result, "my_custom") == "ga4_fires_no_tag" + + def test_gtm_only_firing(self): + # tag exists + active + fires + NOT in codebase + c = _container([_ga4_event_tag("T", "newsletter_signup")]) + g = _ga4_response({"newsletter_signup": 7}) + result = self._run(c, g, []) + assert self._status_for(result, "newsletter_signup") == "gtm_only_firing" + + def test_gtm_only_not_firing(self): + # tag exists + NOT in codebase + no ga4 fires + c = _container([_ga4_event_tag("T", "stale_event")]) + result = self._run(c, _ga4_response({}), []) + assert self._status_for(result, "stale_event") == "gtm_only_not_firing" + + def test_auto_event_only(self): + # auto event fires + no tag + not in codebase + result = self._run( + _container([]), + _ga4_response({"page_view": 100}), + [], + ) + assert self._status_for(result, "page_view") == "auto_event_only" + + def test_ga4_only_non_auto(self): + # ga4 fires + no tag + not in codebase + not auto + result = self._run( + _container([]), + _ga4_response({"third_party_event": 4}), + [], + ) + assert self._status_for(result, "third_party_event") == "ga4_only" + + +class TestAuditEventCoverageInsights: + def _run(self, container, ga4, expected_events): + with ( + patch("adloop.gtm.read.get_live_container", return_value=container), + patch("adloop.ga4.tracking.get_tracking_events", return_value=ga4), + ): + return audit_event_coverage( + config=None, + expected_events=expected_events, + gtm_account_id="A", + gtm_container_id="C", + date_range_start="2026-04-01", + date_range_end="2026-04-30", + ) + + def test_no_tag_no_fire_generates_insight(self): + result = self._run(_container([]), _ga4_response({}), ["missing_event"]) + assert any("NO GTM tag" in s for s in result["insights"]) + assert any("missing_event" in s for s in result["insights"]) + + def test_paused_tag_generates_insight(self): + c = _container([_ga4_event_tag("T", "x", paused=True)]) + result = self._run(c, _ga4_response({}), ["x"]) + assert any("PAUSED" in s for s in result["insights"]) + + def test_dynamic_event_tag_generates_insight(self): + c = _container([_ga4_event_tag("T", "{{Event}}")]) + result = self._run(c, _ga4_response({}), []) + assert any("DYNAMIC" in s for s in result["insights"]) + # Dynamic event tags should not appear in the matrix as real events + assert all(row["event_name"] != "{{Event}}" for row in result["matrix"]) + assert len(result["dynamic_event_tags"]) == 1 + + def test_custom_html_tag_generates_insight(self): + # Custom HTML tag in the container + html_tag = { + "tag_id": "5", + "name": "FB Pixel", + "type": "html", + "event_name": None, + "paused": False, + "firing_triggers": [], + "blocking_triggers": [], + "parameters": {"html": ""}, + } + c = _container([html_tag]) + result = self._run(c, _ga4_response({}), []) + assert any("Custom HTML" in s for s in result["insights"]) + assert len(result["custom_html_tags"]) == 1 + + +class TestAuditEventCoverageMatrixShape: + def _run(self, container, ga4, expected_events): + with ( + patch("adloop.gtm.read.get_live_container", return_value=container), + patch("adloop.ga4.tracking.get_tracking_events", return_value=ga4), + ): + return audit_event_coverage( + config=None, + expected_events=expected_events, + gtm_account_id="A", + gtm_container_id="C", + date_range_start="2026-04-01", + date_range_end="2026-04-30", + ) + + def test_returns_required_fields(self): + result = self._run(_container([]), _ga4_response({}), []) + assert "container" in result + assert "matrix" in result + assert "insights" in result + assert "date_range" in result + assert result["date_range"] == {"start": "2026-04-01", "end": "2026-04-30"} + + def test_container_summary_has_tag_type_breakdown(self): + # Mixed tag types should be tallied in other_tag_types + misc_tag = { + "tag_id": "9", + "name": "Linker", + "type": "gclidw", + "event_name": None, + "paused": False, + "firing_triggers": [], + "blocking_triggers": [], + "parameters": {}, + } + c = _container([_ga4_event_tag("T", "x"), misc_tag]) + result = self._run(c, _ga4_response({}), []) + assert result["container"]["ga4_event_tag_count"] == 1 + assert result["container"]["other_tag_types"]["gclidw"] == 1 + + def test_ga4_error_short_circuits(self): + with ( + patch("adloop.gtm.read.get_live_container", return_value=_container([])), + patch( + "adloop.ga4.tracking.get_tracking_events", + return_value={"error": "GA4 unauthorized"}, + ), + ): + result = audit_event_coverage( + config=None, + expected_events=["x"], + gtm_account_id="A", + gtm_container_id="C", + ) + assert "error" in result + assert "GA4" in result["error"] + + def test_matrix_sorted_alphabetically(self): + # Multiple events should come back in sorted order + result = self._run( + _container([]), _ga4_response({"zzz": 1, "aaa": 1}), ["mmm"] + ) + names = [row["event_name"] for row in result["matrix"]] + assert names == sorted(names) diff --git a/tests/test_gtm_write.py b/tests/test_gtm_write.py new file mode 100644 index 0000000..247d59f --- /dev/null +++ b/tests/test_gtm_write.py @@ -0,0 +1,362 @@ +"""Tests for GTM write tools — create / update / delete tags + triggers + publish.""" +from __future__ import annotations + +import pytest + +from adloop.config import AdLoopConfig, AdsConfig, SafetyConfig +from adloop.safety import preview as preview_store + + +@pytest.fixture(autouse=True) +def clear_pending_plans(): + preview_store._pending_plans.clear() + yield + preview_store._pending_plans.clear() + + +@pytest.fixture +def config() -> AdLoopConfig: + return AdLoopConfig( + ads=AdsConfig(customer_id="123-456-7890"), + safety=SafetyConfig(require_dry_run=True), + ) + + +# --------------------------------------------------------------------------- +# draft_gtm_tag — creation validation +# --------------------------------------------------------------------------- + + +class TestDraftGtmTag: + def test_invalid_tag_type_rejected(self, config): + from adloop.gtm.write import draft_gtm_tag + + result = draft_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + name="Bad Tag", + tag_type="not_a_real_type", + ) + assert result["error"] == "Validation failed" + + def test_known_tag_types_accepted(self, config): + """awcc, awud, etc. that the validator now recognizes as valid.""" + from adloop.gtm.write import draft_gtm_tag + + for tag_type in ("googtag", "gclidw", "html", "gaawe", "awcc", "awud"): + result = draft_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + name=f"test-{tag_type}", + tag_type=tag_type, + ) + assert "error" not in result, f"{tag_type} rejected: {result}" + + def test_long_name_rejected(self, config): + from adloop.gtm.write import draft_gtm_tag + + result = draft_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + name="X" * 201, + tag_type="html", + ) + assert result["error"] == "Validation failed" + + def test_persists_parameters_and_triggers(self, config): + from adloop.gtm.write import draft_gtm_tag + + result = draft_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + name="GADS - Config - All Pages", + tag_type="googtag", + parameters=[{"type": "TEMPLATE", "key": "tagId", + "value": "AW-11437481610"}], + firing_trigger_ids=["2147479573"], + ) + plan = preview_store._pending_plans[result["plan_id"]] + assert plan.changes["name"] == "GADS - Config - All Pages" + assert plan.changes["type"] == "googtag" + assert plan.changes["parameters"][0]["value"] == "AW-11437481610" + assert plan.changes["firing_trigger_ids"] == ["2147479573"] + + +# --------------------------------------------------------------------------- +# draft_gtm_trigger +# --------------------------------------------------------------------------- + + +class TestDraftGtmTrigger: + def test_invalid_trigger_type_rejected(self, config): + from adloop.gtm.write import draft_gtm_trigger + + result = draft_gtm_trigger( + config, + account_id="6228172353", + container_id="183580785", + name="Bad Trigger", + trigger_type="madeup_event", + ) + assert result["error"] == "Validation failed" + + def test_custom_event_requires_name(self, config): + from adloop.gtm.write import draft_gtm_trigger + + result = draft_gtm_trigger( + config, + account_id="6228172353", + container_id="183580785", + name="Custom Event Trigger", + trigger_type="customEvent", + ) + assert result["error"] == "Validation failed" + assert any("custom_event_name" in d for d in result["details"]) + + def test_link_click_with_filter(self, config): + from adloop.gtm.write import draft_gtm_trigger + + result = draft_gtm_trigger( + config, + account_id="6228172353", + container_id="183580785", + name="TRG - Click to Call - All Pages", + trigger_type="linkClick", + filters=[{ + "type": "STARTS_WITH", + "parameter": [ + {"type": "TEMPLATE", "key": "arg0", + "value": "{{Click URL}}"}, + {"type": "TEMPLATE", "key": "arg1", "value": "tel:"}, + ], + }], + ) + plan = preview_store._pending_plans[result["plan_id"]] + assert plan.changes["type"] == "linkClick" + assert plan.changes["filters"][0]["type"] == "STARTS_WITH" + + +# --------------------------------------------------------------------------- +# draft_update_gtm_tag — partial update +# --------------------------------------------------------------------------- + + +class TestDraftUpdateGtmTag: + def test_tag_id_required(self, config): + from adloop.gtm.write import draft_update_gtm_tag + + result = draft_update_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + tag_id="", + name="x", + ) + assert "tag_id is required" in result["error"] + + def test_no_fields_to_update_rejected(self, config): + from adloop.gtm.write import draft_update_gtm_tag + + result = draft_update_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + tag_id="17", + ) + assert "No fields to update" in result["error"] + + def test_invalid_tag_type_rejected(self, config): + from adloop.gtm.write import draft_update_gtm_tag + + result = draft_update_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + tag_id="17", + tag_type="not_a_real_type", + ) + assert result["error"] == "Validation failed" + + def test_partial_update_persists_only_passed_fields(self, config): + from adloop.gtm.write import draft_update_gtm_tag + + result = draft_update_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + tag_id="17", + name="GADS - Event - Qualified Call - All Pages", + paused=False, + ) + plan = preview_store._pending_plans[result["plan_id"]] + assert plan.changes["name"] == "GADS - Event - Qualified Call - All Pages" + assert plan.changes["paused"] is False + assert "parameters" not in plan.changes + assert "firing_trigger_ids" not in plan.changes + + +# --------------------------------------------------------------------------- +# draft_update_gtm_trigger — partial update +# --------------------------------------------------------------------------- + + +class TestDraftUpdateGtmTrigger: + def test_trigger_id_required(self, config): + from adloop.gtm.write import draft_update_gtm_trigger + + result = draft_update_gtm_trigger( + config, + account_id="6228172353", + container_id="183580785", + trigger_id="", + name="x", + ) + assert "trigger_id is required" in result["error"] + + def test_no_fields_to_update_rejected(self, config): + from adloop.gtm.write import draft_update_gtm_trigger + + result = draft_update_gtm_trigger( + config, + account_id="6228172353", + container_id="183580785", + trigger_id="14", + ) + assert "No fields to update" in result["error"] + + def test_rename_only(self, config): + from adloop.gtm.write import draft_update_gtm_trigger + + result = draft_update_gtm_trigger( + config, + account_id="6228172353", + container_id="183580785", + trigger_id="14", + name="TRG - Submit Form - Contacts", + ) + plan = preview_store._pending_plans[result["plan_id"]] + assert plan.changes["name"] == "TRG - Submit Form - Contacts" + assert "filters" not in plan.changes + + +# --------------------------------------------------------------------------- +# draft_delete_gtm_tag / draft_delete_gtm_trigger +# --------------------------------------------------------------------------- + + +class TestDraftDeleteGtm: + def test_delete_tag_emits_warning(self, config): + from adloop.gtm.write import draft_delete_gtm_tag + + result = draft_delete_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + tag_id="17", + ) + assert "warnings" in result + assert any("irreversible" in w.lower() for w in result["warnings"]) + plan = preview_store._pending_plans[result["plan_id"]] + assert plan.operation == "delete_gtm_tag" + assert plan.entity_id == "17" + + def test_delete_tag_id_required(self, config): + from adloop.gtm.write import draft_delete_gtm_tag + + result = draft_delete_gtm_tag( + config, + account_id="6228172353", + container_id="183580785", + tag_id="", + ) + assert "tag_id is required" in result["error"] + + def test_delete_trigger_emits_warning(self, config): + from adloop.gtm.write import draft_delete_gtm_trigger + + result = draft_delete_gtm_trigger( + config, + account_id="6228172353", + container_id="183580785", + trigger_id="14", + ) + assert "warnings" in result + assert any("irreversible" in w.lower() for w in result["warnings"]) + + def test_delete_trigger_id_required(self, config): + from adloop.gtm.write import draft_delete_gtm_trigger + + result = draft_delete_gtm_trigger( + config, + account_id="6228172353", + container_id="183580785", + trigger_id="", + ) + assert "trigger_id is required" in result["error"] + + +# --------------------------------------------------------------------------- +# publish_gtm_workspace +# --------------------------------------------------------------------------- + + +class TestPublishGtmWorkspace: + def test_publish_emits_irreversible_warning(self, config): + from adloop.gtm.write import publish_gtm_workspace + + result = publish_gtm_workspace( + config, + account_id="6228172353", + container_id="183580785", + version_name="Test publish", + ) + assert "warnings" in result + assert any("LIVE" in w for w in result["warnings"]) + plan = preview_store._pending_plans[result["plan_id"]] + assert plan.operation == "publish_gtm_workspace" + + +# --------------------------------------------------------------------------- +# MCP registration of all 7 GTM-write tools + dispatch wiring +# --------------------------------------------------------------------------- + + +class TestGtmWriteMCPRegistration: + @pytest.fixture(scope="class") + def tools_by_name(self): + import asyncio + from adloop.server import mcp + + async def _list(): + return await mcp.list_tools() + + tools = asyncio.run(_list()) + return {t.name: t for t in tools} + + def test_seven_gtm_write_tools_registered(self, tools_by_name): + for name in ( + "draft_gtm_tag", + "draft_gtm_trigger", + "publish_gtm_workspace", + "draft_update_gtm_tag", + "draft_update_gtm_trigger", + "draft_delete_gtm_tag", + "draft_delete_gtm_trigger", + ): + assert name in tools_by_name, f"{name} not registered" + + def test_dispatch_routes_all_gtm_ops(self): + import inspect + from adloop.ads import write + + src = inspect.getsource(write._execute_plan) + for op in ( + "create_gtm_tag", "create_gtm_trigger", "publish_gtm_workspace", + "update_gtm_tag", "update_gtm_trigger", + "delete_gtm_tag", "delete_gtm_trigger", + ): + assert f'"{op}"' in src, f"dispatch missing {op}"