Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions factory/runners/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,46 @@
from __future__ import annotations

import asyncio
import re
import sys
from typing import BinaryIO

# Robust multi-branch matcher for ANSI/VT escape sequences. Deliberately
# preserves \r and \n (they are not ANSI escapes). Covers five classes:
# - CSI: \x1b[ <params 0x30-0x3F> <intermediates 0x20-0x2F> <final 0x40-0x7E>
# (colors incl. colon-delimited truecolor, cursor moves, clear-screen,
# alt-screen toggles \x1b[?1049h/l, cursor-visibility \x1b[?25l/h)
# - OSC: \x1b] ... terminated by BEL (\x07) or ST (\x1b\\) — e.g. window title
# - String-introducer DCS/SOS/PM/APC: \x1bP, \x1bX, \x1b^, \x1b_ carry a payload
# terminated by BEL (\x07) or ST (\x1b\\). This branch MUST precede the Fe/C1
# branch so the whole payload is consumed — otherwise Fe greedily matches just
# the 2-byte introducer and the payload leaks as visible text.
# - Fe / 2-byte C1: \x1b followed by 0x40-0x5F (incl. ESC M reverse line feed)
# - Fp: \x1b7 (DECSC), \x1b8 (DECRC), \x1b= / \x1b> (keypad modes)
# The 8-bit C1 ST (\x9C) is intentionally NOT matched: on a raw byte stream that
# is later UTF-8 decoded, 0x9C is a valid continuation byte and matching it could
# clip a multibyte character. A lone trailing \x1b is left as-is.
# Known limitation: stripping is stateless and line-oriented (operates on one
# readline() chunk). An UNTERMINATED string/OSC sequence, or a sequence split
# across a readline() boundary, may leak its payload as visible text. This is
# low-probability for Bob (escape sequences normally arrive intact within one
# line) and fixing it would require stateful cross-line parsing — intentionally
# out of scope.
_ANSI_ESCAPE_RE = re.compile(
rb"\x1B(?:"
rb"\[[0-?]*[ -/]*[@-~]" # CSI ... <final>
rb"|\][^\x07\x1B]*(?:\x07|\x1B\\)" # OSC ... (BEL or ST terminator)
rb"|[PX^_][^\x07\x1B]*(?:\x07|\x1B\\)" # DCS/SOS/PM/APC ... (BEL or ST terminator)
rb"|[@-Z\\-_]" # 2-byte C1 / Fe (incl. ESC M)
rb"|[78=>]" # Fp: DECSC, DECRC, keypad =/>
rb")"
)


def strip_ansi(data: bytes) -> bytes:
r"""Remove ANSI/VT escape sequences. Leaves \r, \n and plain text intact."""
return _ANSI_ESCAPE_RE.sub(b"", data)


def should_stream() -> bool:
"""Determine if we should stream subprocess output to the terminal.
Expand All @@ -31,6 +68,7 @@ async def tee_stream(
*,
stream: bool = True,
prefix: bytes | None = None,
sanitize: bool = False,
) -> None:
"""Read from an async stream, optionally tee to a destination, and collect in buffer.

Expand All @@ -40,16 +78,25 @@ async def tee_stream(
buffer: List to collect all bytes read.
stream: If True, write to dest as data arrives. If False, only buffer.
prefix: Optional prefix to prepend to each line (e.g., b"[bob:researcher] ").
sanitize: If True, strip ANSI/VT escape sequences from the bytes written to
dest. The buffer always receives the raw line, never sanitized. Lines
that contained ONLY escape sequences (empty after stripping, modulo
\\r/\\n) are skipped entirely, including the prefix, so redraw-only TUI
frames do not flood the terminal with bare prefixes. Genuine blank
lines (no escapes) are preserved.
"""
while True:
line = await src.readline()
if not line:
break
buffer.append(line)
buffer.append(line) # ALWAYS raw — the captured buffer is never sanitized
if stream:
out = strip_ansi(line) if sanitize else line
if sanitize and out != line and not out.strip(b"\r\n"):
continue # drop redraw-only lines (avoids empty prefixed lines)
if prefix:
dest.write(prefix)
dest.write(line)
dest.write(out)
dest.flush()


Expand All @@ -58,13 +105,16 @@ async def stream_subprocess(
*,
stream: bool = True,
prefix: str | None = None,
sanitize: bool = False,
) -> tuple[bytes, bytes]:
"""Stream subprocess stdout/stderr to the terminal while collecting output.

Args:
proc: The subprocess with PIPE for stdout and stderr.
stream: If True, stream to sys.stdout/stderr. If False, only collect.
prefix: Optional prefix for each line (e.g., "[bob:researcher]").
sanitize: If True, strip ANSI/VT escape sequences from the bytes written to
the terminal (both stdout and stderr). The returned buffers stay raw.

Returns:
(stdout_bytes, stderr_bytes) tuple with all collected output.
Expand All @@ -84,13 +134,15 @@ async def stream_subprocess(
stdout_buf,
stream=stream,
prefix=prefix_bytes,
sanitize=sanitize,
),
tee_stream(
proc.stderr,
sys.stderr.buffer,
stderr_buf,
stream=stream,
prefix=prefix_bytes,
sanitize=sanitize,
),
)

Expand Down
5 changes: 4 additions & 1 deletion factory/runners/bob.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ async def headless(
env=env,
)
stdout_bytes, stderr_bytes = await asyncio.wait_for(
stream_subprocess(proc, stream=stream, prefix=prefix),
# sanitize=True: Bob is a TUI emitting cursor/clear/alt-screen
# escapes — strip them from the live terminal write (the captured
# buffer stays raw). See issue #379.
stream_subprocess(proc, stream=stream, prefix=prefix, sanitize=True),
timeout=timeout,
)
except asyncio.TimeoutError:
Expand Down
33 changes: 33 additions & 0 deletions tests/test_codex_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,39 @@ async def test_uses_streaming_prefix(
assert call_kwargs["stream"] is True
assert call_kwargs["prefix"] == "[codex:builder]"

async def test_codex_runner_does_not_sanitize(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""CodexRunner.headless() does not sanitize (default False) — issue #379."""
monkeypatch.setenv("CODEX_API_KEY", "test-key")
monkeypatch.delenv("FACTORY_CODEX_DRY_RUN", raising=False)
monkeypatch.delenv("FACTORY_RUNNER_QUIET", raising=False)

runner = CodexRunner()

with patch("factory.runners.codex.should_stream", return_value=True):
with patch(
"factory.runners.codex.stream_subprocess", new_callable=AsyncMock
) as mock_stream:
mock_stream.return_value = (b"output\n", b"")

with patch(
"asyncio.create_subprocess_exec", new_callable=AsyncMock
) as mock_exec:
mock_proc = AsyncMock()
mock_proc.returncode = 0
mock_exec.return_value = mock_proc

await runner.headless(
prompt="Test",
task="Test",
cwd=tmp_path,
role="builder",
)

mock_stream.assert_called_once()
assert mock_stream.call_args.kwargs.get("sanitize", False) is False


class TestCodexInteractive:
def test_interactive_run_builds_correct_command(
Expand Down
Loading
Loading