Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 95 additions & 61 deletions api/demo_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

from __future__ import annotations

import json
import os
import threading
import time
from datetime import UTC, datetime
from pathlib import Path
from typing import Any

from . import storage
Expand Down Expand Up @@ -49,6 +53,9 @@

DEMO_WALLET = "nodescope_demo"
DEMO_AMOUNT = 0.001 # BTC sent in the demo tx
MEMPOOL_DETECT_TIMEOUT_SECONDS = 6.0
ZMQ_EVENT_TIMEOUT_SECONDS = 8.0
DETECT_POLL_INTERVAL_SECONDS = 0.25


def _now() -> str:
Expand Down Expand Up @@ -131,6 +138,72 @@ def _get_step_data(step_id: str) -> dict[str, Any]:
return dict(_state["steps"][step_id]["data"])


def _iter_event_store() -> list[dict[str, Any]]:
"""Read current NDJSON monitor events from newest files first."""
log_dir = Path(os.environ.get("NODESCOPE_LOG_DIR", "logs"))
if not log_dir.is_dir():
return []

events: list[dict[str, Any]] = []
for f in sorted(log_dir.glob("*.ndjson"), reverse=True):
try:
for line in f.read_text(encoding="utf-8").splitlines():
try:
ev = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(ev, dict):
events.append(ev)
except OSError:
continue
return events


def _find_zmq_event(event_name: str, data_key: str, expected_value: str) -> dict[str, Any] | None:
for ev in _iter_event_store():
if ev.get("event") != event_name:
continue
data = ev.get("data") if isinstance(ev.get("data"), dict) else {}
if data.get(data_key) == expected_value:
return ev
return None


def _wait_for_zmq_event(
event_name: str,
data_key: str,
expected_value: str,
*,
timeout: float = ZMQ_EVENT_TIMEOUT_SECONDS,
) -> dict[str, Any] | None:
deadline = time.monotonic() + timeout
while True:
ev = _find_zmq_event(event_name, data_key, expected_value)
if ev is not None:
return ev
if time.monotonic() >= deadline:
return None
time.sleep(DETECT_POLL_INTERVAL_SECONDS)


def _wait_for_mempool_entry(
rpc: RPCClient,
txid: str,
*,
timeout: float = MEMPOOL_DETECT_TIMEOUT_SECONDS,
) -> dict[str, Any]:
deadline = time.monotonic() + timeout
last_error: RPCError | None = None
while True:
try:
return rpc.getmempoolentry(txid)
except RPCError as exc:
last_error = exc
if time.monotonic() >= deadline:
raise last_error from exc
time.sleep(DETECT_POLL_INTERVAL_SECONDS)


def run_step(step_id: str) -> dict[str, Any]:
"""Execute a single step and return the updated step dict."""
if step_id not in STEP_IDS:
Expand Down Expand Up @@ -408,7 +481,7 @@ def _step_detect_mempool_entry() -> None:
return
try:
rpc = _wallet_rpc()
entry = rpc.getmempoolentry(txid)
entry = _wait_for_mempool_entry(rpc, txid)
fee = entry.get("fees", {}).get("base")
vsize = entry.get("vsize")
fee_rate = round(fee / vsize * 1e8, 2) if fee and vsize else None
Expand All @@ -428,7 +501,7 @@ def _step_detect_mempool_entry() -> None:
_set_step(
"detect_mempool_entry",
"error",
"Transaction not found in mempool.",
"Transaction not found in mempool after waiting briefly.",
error=str(exc),
)

Expand All @@ -450,39 +523,19 @@ def _step_detect_zmq_rawtx() -> None:
)
return
try:
import os
from pathlib import Path

log_dir = Path(os.environ.get("NODESCOPE_LOG_DIR", "logs"))
found = False
if log_dir.is_dir():
import json as _json
ev = _wait_for_zmq_event("zmq_rawtx", "txid", txid)

for f in sorted(log_dir.glob("*.ndjson"), reverse=True):
try:
for line in f.read_text().splitlines():
try:
ev = _json.loads(line)
if (
ev.get("event") == "zmq_rawtx"
and ev.get("data", {}).get("txid") == txid
):
found = True
break
except _json.JSONDecodeError:
continue
except OSError:
continue
if found:
break

if found:
if ev is not None:
_set_step(
"detect_zmq_rawtx",
"success",
f"ZMQ rawtx event found for TXID {txid[:12]}…",
technical={"txid": txid, "source": "ndjson_event_store"},
data={"rawtx_seen": True},
technical={
"txid": txid,
"source": "ndjson_event_store",
"event_timestamp": ev.get("ts"),
},
data={"rawtx_seen": True, "rawtx_event_ts": ev.get("ts")},
)
else:
_set_step(
Expand All @@ -491,11 +544,11 @@ def _step_detect_zmq_rawtx() -> None:
"TX broadcast confirmed via RPC. ZMQ rawtx event may not yet be in store.",
technical={
"txid": txid,
"note": "event store lookup returned no match yet — ZMQ monitor processes asynchronously",
"note": f"event store lookup returned no match after {ZMQ_EVENT_TIMEOUT_SECONDS:.0f}s — ZMQ monitor processes asynchronously",
},
data={
"rawtx_seen": False,
"zmq_note": "async — may appear in store after a short delay",
"zmq_note": f"async — no matching event after {ZMQ_EVENT_TIMEOUT_SECONDS:.0f}s",
},
)
except Exception as exc:
Expand Down Expand Up @@ -584,38 +637,19 @@ def _step_detect_zmq_rawblock() -> None:
)
return
try:
import json as _json
import os
from pathlib import Path

log_dir = Path(os.environ.get("NODESCOPE_LOG_DIR", "logs"))
found = False
if log_dir.is_dir():
for f in sorted(log_dir.glob("*.ndjson"), reverse=True):
try:
for line in f.read_text().splitlines():
try:
ev = _json.loads(line)
if (
ev.get("event") == "zmq_rawblock"
and ev.get("data", {}).get("hash") == block_hash
):
found = True
break
except _json.JSONDecodeError:
continue
except OSError:
continue
if found:
break
ev = _wait_for_zmq_event("zmq_rawblock", "hash", block_hash)

if found:
if ev is not None:
_set_step(
"detect_zmq_rawblock",
"success",
f"ZMQ rawblock event found for block {block_hash[:12]}…",
technical={"block_hash": block_hash, "source": "ndjson_event_store"},
data={"rawblock_seen": True},
technical={
"block_hash": block_hash,
"source": "ndjson_event_store",
"event_timestamp": ev.get("ts"),
},
data={"rawblock_seen": True, "rawblock_event_ts": ev.get("ts")},
)
else:
_set_step(
Expand All @@ -624,11 +658,11 @@ def _step_detect_zmq_rawblock() -> None:
"Block confirmed via RPC. ZMQ rawblock event may not yet be in store.",
technical={
"block_hash": block_hash,
"note": "ZMQ monitor processes asynchronously",
"note": f"no matching rawblock event after {ZMQ_EVENT_TIMEOUT_SECONDS:.0f}s — ZMQ monitor processes asynchronously",
},
data={
"rawblock_seen": False,
"zmq_note": "async — may appear in store after a short delay",
"zmq_note": f"async — no matching event after {ZMQ_EVENT_TIMEOUT_SECONDS:.0f}s",
},
)
except Exception as exc:
Expand Down
23 changes: 13 additions & 10 deletions frontend/src/professional-theme.css
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,17 @@ body::after {
}

.lab-visual {
display: grid;
grid-template-columns: minmax(0, 1fr) minmax(0, 0.88fr);
gap: 16px;
align-content: start;
min-height: 380px;
}

.lab-window {
position: absolute;
top: 28px;
right: 18px;
width: min(560px, 100%);
position: relative;
grid-column: 1 / -1;
width: 100%;
overflow: hidden;
border: 1px solid rgba(84, 199, 236, 0.32);
border-radius: 8px;
Expand Down Expand Up @@ -400,8 +403,8 @@ body::after {
}

.lab-code-card {
position: absolute;
width: 270px;
width: 100%;
min-width: 0;
padding: 16px;
border: 1px solid rgba(247, 147, 26, 0.34);
border-radius: 8px;
Expand All @@ -419,20 +422,20 @@ body::after {
}

.lab-code-card code {
display: block;
color: var(--accent-bright);
font-family: var(--font-mono);
font-size: 12px;
line-height: 1.5;
overflow-wrap: anywhere;
}

.lab-code-card-a {
left: 10px;
top: 172px;
grid-column: 1;
}

.lab-code-card-b {
right: 0;
bottom: 18px;
grid-column: 2;
border-color: rgba(84, 199, 236, 0.34);
}

Expand Down
41 changes: 41 additions & 0 deletions tests/test_demo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from __future__ import annotations

import os
import tempfile
import unittest
from pathlib import Path
from unittest import mock

from api import demo_service
from api.rpc import RPCError

ROOT = Path(__file__).resolve().parents[1]

Expand Down Expand Up @@ -29,5 +35,40 @@ def test_demo_static_assets_exist_and_reference_api_features(self) -> None:
self.assertIn(".stream-chip", css)


class DemoDetectionTests(unittest.TestCase):
def test_find_zmq_event_reads_matching_ndjson_event(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
event_log = Path(tmp) / "monitor.ndjson"
event_log.write_text(
'{"event":"zmq_rawblock","data":{"hash":"block-1"},"ts":"2026-05-09T00:00:00+00:00"}\n',
encoding="utf-8",
)

with mock.patch.dict(os.environ, {"NODESCOPE_LOG_DIR": tmp}):
ev = demo_service._find_zmq_event("zmq_rawblock", "hash", "block-1")

self.assertIsNotNone(ev)
self.assertEqual(ev["data"]["hash"], "block-1")

def test_wait_for_mempool_entry_retries_transient_rpc_miss(self) -> None:
class FakeRPC:
def __init__(self) -> None:
self.calls = 0

def getmempoolentry(self, txid: str) -> dict:
self.calls += 1
if self.calls == 1:
raise RPCError("not in mempool yet")
return {"txid": txid, "vsize": 141, "fees": {"base": 0.00000141}}

rpc = FakeRPC()

with mock.patch.object(demo_service, "DETECT_POLL_INTERVAL_SECONDS", 0):
entry = demo_service._wait_for_mempool_entry(rpc, "tx-demo", timeout=1)

self.assertEqual(entry["txid"], "tx-demo")
self.assertEqual(rpc.calls, 2)


if __name__ == "__main__":
unittest.main()
Loading