diff --git a/factory/runners/_stream.py b/factory/runners/_stream.py index 8927322..4cf48c6 100644 --- a/factory/runners/_stream.py +++ b/factory/runners/_stream.py @@ -3,9 +3,46 @@ from __future__ import annotations import asyncio +import re import sys from typing import BinaryIO +# Robust multi-branch matcher for ANSI/VT escape sequences. Deliberately +# preserves \r and \n (they are not ANSI escapes). Covers five classes: +# - CSI: \x1b[ +# (colors incl. colon-delimited truecolor, cursor moves, clear-screen, +# alt-screen toggles \x1b[?1049h/l, cursor-visibility \x1b[?25l/h) +# - OSC: \x1b] ... terminated by BEL (\x07) or ST (\x1b\\) — e.g. window title +# - String-introducer DCS/SOS/PM/APC: \x1bP, \x1bX, \x1b^, \x1b_ carry a payload +# terminated by BEL (\x07) or ST (\x1b\\). This branch MUST precede the Fe/C1 +# branch so the whole payload is consumed — otherwise Fe greedily matches just +# the 2-byte introducer and the payload leaks as visible text. +# - Fe / 2-byte C1: \x1b followed by 0x40-0x5F (incl. ESC M reverse line feed) +# - Fp: \x1b7 (DECSC), \x1b8 (DECRC), \x1b= / \x1b> (keypad modes) +# The 8-bit C1 ST (\x9C) is intentionally NOT matched: on a raw byte stream that +# is later UTF-8 decoded, 0x9C is a valid continuation byte and matching it could +# clip a multibyte character. A lone trailing \x1b is left as-is. +# Known limitation: stripping is stateless and line-oriented (operates on one +# readline() chunk). An UNTERMINATED string/OSC sequence, or a sequence split +# across a readline() boundary, may leak its payload as visible text. This is +# low-probability for Bob (escape sequences normally arrive intact within one +# line) and fixing it would require stateful cross-line parsing — intentionally +# out of scope. +_ANSI_ESCAPE_RE = re.compile( + rb"\x1B(?:" + rb"\[[0-?]*[ -/]*[@-~]" # CSI ... + rb"|\][^\x07\x1B]*(?:\x07|\x1B\\)" # OSC ... (BEL or ST terminator) + rb"|[PX^_][^\x07\x1B]*(?:\x07|\x1B\\)" # DCS/SOS/PM/APC ... (BEL or ST terminator) + rb"|[@-Z\\-_]" # 2-byte C1 / Fe (incl. ESC M) + rb"|[78=>]" # Fp: DECSC, DECRC, keypad =/> + rb")" +) + + +def strip_ansi(data: bytes) -> bytes: + r"""Remove ANSI/VT escape sequences. Leaves \r, \n and plain text intact.""" + return _ANSI_ESCAPE_RE.sub(b"", data) + def should_stream() -> bool: """Determine if we should stream subprocess output to the terminal. @@ -31,6 +68,7 @@ async def tee_stream( *, stream: bool = True, prefix: bytes | None = None, + sanitize: bool = False, ) -> None: """Read from an async stream, optionally tee to a destination, and collect in buffer. @@ -40,16 +78,25 @@ async def tee_stream( buffer: List to collect all bytes read. stream: If True, write to dest as data arrives. If False, only buffer. prefix: Optional prefix to prepend to each line (e.g., b"[bob:researcher] "). + sanitize: If True, strip ANSI/VT escape sequences from the bytes written to + dest. The buffer always receives the raw line, never sanitized. Lines + that contained ONLY escape sequences (empty after stripping, modulo + \\r/\\n) are skipped entirely, including the prefix, so redraw-only TUI + frames do not flood the terminal with bare prefixes. Genuine blank + lines (no escapes) are preserved. """ while True: line = await src.readline() if not line: break - buffer.append(line) + buffer.append(line) # ALWAYS raw — the captured buffer is never sanitized if stream: + out = strip_ansi(line) if sanitize else line + if sanitize and out != line and not out.strip(b"\r\n"): + continue # drop redraw-only lines (avoids empty prefixed lines) if prefix: dest.write(prefix) - dest.write(line) + dest.write(out) dest.flush() @@ -58,6 +105,7 @@ async def stream_subprocess( *, stream: bool = True, prefix: str | None = None, + sanitize: bool = False, ) -> tuple[bytes, bytes]: """Stream subprocess stdout/stderr to the terminal while collecting output. @@ -65,6 +113,8 @@ async def stream_subprocess( proc: The subprocess with PIPE for stdout and stderr. stream: If True, stream to sys.stdout/stderr. If False, only collect. prefix: Optional prefix for each line (e.g., "[bob:researcher]"). + sanitize: If True, strip ANSI/VT escape sequences from the bytes written to + the terminal (both stdout and stderr). The returned buffers stay raw. Returns: (stdout_bytes, stderr_bytes) tuple with all collected output. @@ -84,6 +134,7 @@ async def stream_subprocess( stdout_buf, stream=stream, prefix=prefix_bytes, + sanitize=sanitize, ), tee_stream( proc.stderr, @@ -91,6 +142,7 @@ async def stream_subprocess( stderr_buf, stream=stream, prefix=prefix_bytes, + sanitize=sanitize, ), ) diff --git a/factory/runners/bob.py b/factory/runners/bob.py index 981da5a..614fa9f 100644 --- a/factory/runners/bob.py +++ b/factory/runners/bob.py @@ -254,7 +254,10 @@ async def headless( env=env, ) stdout_bytes, stderr_bytes = await asyncio.wait_for( - stream_subprocess(proc, stream=stream, prefix=prefix), + # sanitize=True: Bob is a TUI emitting cursor/clear/alt-screen + # escapes — strip them from the live terminal write (the captured + # buffer stays raw). See issue #379. + stream_subprocess(proc, stream=stream, prefix=prefix, sanitize=True), timeout=timeout, ) except asyncio.TimeoutError: diff --git a/tests/test_codex_runner.py b/tests/test_codex_runner.py index 474c9bb..d1cfc94 100644 --- a/tests/test_codex_runner.py +++ b/tests/test_codex_runner.py @@ -398,6 +398,39 @@ async def test_uses_streaming_prefix( assert call_kwargs["stream"] is True assert call_kwargs["prefix"] == "[codex:builder]" + async def test_codex_runner_does_not_sanitize( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """CodexRunner.headless() does not sanitize (default False) — issue #379.""" + monkeypatch.setenv("CODEX_API_KEY", "test-key") + monkeypatch.delenv("FACTORY_CODEX_DRY_RUN", raising=False) + monkeypatch.delenv("FACTORY_RUNNER_QUIET", raising=False) + + runner = CodexRunner() + + with patch("factory.runners.codex.should_stream", return_value=True): + with patch( + "factory.runners.codex.stream_subprocess", new_callable=AsyncMock + ) as mock_stream: + mock_stream.return_value = (b"output\n", b"") + + with patch( + "asyncio.create_subprocess_exec", new_callable=AsyncMock + ) as mock_exec: + mock_proc = AsyncMock() + mock_proc.returncode = 0 + mock_exec.return_value = mock_proc + + await runner.headless( + prompt="Test", + task="Test", + cwd=tmp_path, + role="builder", + ) + + mock_stream.assert_called_once() + assert mock_stream.call_args.kwargs.get("sanitize", False) is False + class TestCodexInteractive: def test_interactive_run_builds_correct_command( diff --git a/tests/test_runners.py b/tests/test_runners.py index abc4482..97f05d4 100644 --- a/tests/test_runners.py +++ b/tests/test_runners.py @@ -961,6 +961,283 @@ async def test_output_saved_to_review_file_matches_buffer( assert "Line 3" in content +class TestAnsiSanitization: + """Tests for strip_ansi + sanitize on the live-terminal write path (issue #379).""" + + def test_strip_ansi_removes_csi_color_and_cursor(self) -> None: + """CSI color/cursor/clear sequences are removed; text survives.""" + from factory.runners._stream import strip_ansi + + assert strip_ansi(b"\x1b[1;36mhi\x1b[0m") == b"hi" + # colon-delimited truecolor SGR (covered by [0-?] param class) + assert strip_ansi(b"\x1b[38:2:255:0:0mred\x1b[0m") == b"red" + # clear-screen + cursor-home leaves nothing + assert strip_ansi(b"\x1b[2J\x1b[H") == b"" + + def test_strip_ansi_removes_alt_screen_and_cursor_toggle(self) -> None: + """DEC private alt-screen / cursor-visibility toggles (the issue's culprits).""" + from factory.runners._stream import strip_ansi + + assert strip_ansi(b"\x1b[?1049h") == b"" + assert strip_ansi(b"\x1b[?1049l") == b"" + assert strip_ansi(b"\x1b[?25l") == b"" + assert strip_ansi(b"\x1b[?25h") == b"" + + def test_strip_ansi_removes_osc_window_title(self) -> None: + """OSC sequences (BEL- and ST-terminated) are removed, payload survives.""" + from factory.runners._stream import strip_ansi + + # BEL-terminated + assert strip_ansi(b"\x1b]0;title\x07rest") == b"rest" + # ST (ESC \\)-terminated + assert strip_ansi(b"\x1b]0;title\x1b\\rest") == b"rest" + + def test_strip_ansi_removes_string_sequences(self) -> None: + """DCS/SOS/PM/APC introducer + ST-terminated payload are fully removed.""" + from factory.runners._stream import strip_ansi + + assert strip_ansi(b"\x1bP1$r0m\x1b\\after") == b"after" # DCS + assert strip_ansi(b"\x1b_payload\x1b\\after") == b"after" # APC + assert strip_ansi(b"\x1b^foo\x1b\\after") == b"after" # PM + assert strip_ansi(b"\x1bXsos\x1b\\after") == b"after" # SOS + + def test_strip_ansi_removes_decsc_decrc_ri(self) -> None: + """Fp save/restore cursor and Fe reverse-line-feed are removed.""" + from factory.runners._stream import strip_ansi + + assert strip_ansi(b"\x1b7save\x1b8") == b"save" # DECSC / DECRC + assert strip_ansi(b"\x1bMup") == b"up" # RI (reverse line feed) + + def test_strip_ansi_preserves_plaintext_and_newlines(self) -> None: + r"""Plain text, \r, \n and UTF-8 multibyte content are left intact.""" + from factory.runners._stream import strip_ansi + + assert strip_ansi(b"plain text\n") == b"plain text\n" + assert strip_ansi(b"a\rb\n") == b"a\rb\n" + # UTF-8 multibyte must not be clipped (guards the \x9C omission) + utf8 = "café — 日本語".encode() + assert strip_ansi(utf8) == utf8 + + async def test_tee_stream_sanitize_strips_dest_keeps_buffer_raw(self) -> None: + """sanitize=True strips dest writes but the buffer keeps the raw line.""" + from io import BytesIO + + from factory.runners._stream import tee_stream + + class MockReader: + def __init__(self, lines: list[bytes]) -> None: + self.lines = iter(lines) + + async def readline(self) -> bytes: + try: + return next(self.lines) + except StopIteration: + return b"" + + reader = MockReader([b"\x1b[2J\x1b[Hhello\n"]) + dest = BytesIO() + buffer: list[bytes] = [] + + await tee_stream(reader, dest, buffer, stream=True, sanitize=True) # type: ignore[arg-type] + + assert dest.getvalue() == b"hello\n" + assert buffer == [b"\x1b[2J\x1b[Hhello\n"] # raw, never sanitized + + async def test_tee_stream_sanitize_skips_redraw_only_lines(self) -> None: + """sanitize=True skips empty-after-strip lines so prefixes don't flood.""" + from io import BytesIO + + from factory.runners._stream import tee_stream + + class MockReader: + def __init__(self, lines: list[bytes]) -> None: + self.lines = iter(lines) + + async def readline(self) -> bytes: + try: + return next(self.lines) + except StopIteration: + return b"" + + reader = MockReader([b"\x1b[32mok\n", b"\x1b[2J\x1b[H\n"]) + dest = BytesIO() + buffer: list[bytes] = [] + + await tee_stream( + reader, # type: ignore[arg-type] + dest, + buffer, + stream=True, + prefix=b"[bob] ", + sanitize=True, + ) + + # Only the real line reaches dest (with prefix); redraw-only line dropped + assert dest.getvalue() == b"[bob] ok\n" + # Buffer keeps BOTH lines raw + assert buffer == [b"\x1b[32mok\n", b"\x1b[2J\x1b[H\n"] + + async def test_tee_stream_sanitize_preserves_genuine_blank_line(self) -> None: + """sanitize=True preserves a genuine blank line (no escapes) — only + redraw-only lines (empty *because* escapes were stripped) are dropped.""" + from io import BytesIO + + from factory.runners._stream import tee_stream + + class MockReader: + def __init__(self, lines: list[bytes]) -> None: + self.lines = iter(lines) + + async def readline(self) -> bytes: + try: + return next(self.lines) + except StopIteration: + return b"" + + reader = MockReader([b"hello\n", b"\n", b"world\n"]) + dest = BytesIO() + buffer: list[bytes] = [] + + await tee_stream(reader, dest, buffer, stream=True, sanitize=True) # type: ignore[arg-type] + + # The bare blank line is unchanged by strip_ansi, so out == line and it is + # NOT dropped — all three lines reach dest. + assert dest.getvalue() == b"hello\n\nworld\n" + # Buffer keeps all three lines raw. + assert buffer == [b"hello\n", b"\n", b"world\n"] + + async def test_tee_stream_sanitize_false_byte_identical(self) -> None: + """sanitize=False (default) writes the raw bytes unchanged.""" + from io import BytesIO + + from factory.runners._stream import tee_stream + + class MockReader: + def __init__(self, lines: list[bytes]) -> None: + self.lines = iter(lines) + + async def readline(self) -> bytes: + try: + return next(self.lines) + except StopIteration: + return b"" + + raw = b"\x1b[2J\x1b[Hhello\n" + reader = MockReader([raw]) + dest = BytesIO() + buffer: list[bytes] = [] + + await tee_stream(reader, dest, buffer, stream=True) # type: ignore[arg-type] + + assert dest.getvalue() == raw + assert buffer == [raw] + + async def test_stream_subprocess_threads_sanitize_to_both(self) -> None: + """stream_subprocess threads sanitize=True to BOTH tee_stream calls.""" + from factory.runners._stream import stream_subprocess + + class MockReader: + def __init__(self, lines: list[bytes]) -> None: + self.lines = iter(lines) + + async def readline(self) -> bytes: + try: + return next(self.lines) + except StopIteration: + return b"" + + class MockProc: + def __init__(self) -> None: + self.stdout = MockReader([b"out\n"]) + self.stderr = MockReader([b"err\n"]) + + async def wait(self) -> int: + return 0 + + proc = MockProc() + + with patch( + "factory.runners._stream.tee_stream", new_callable=AsyncMock + ) as mock_tee: + await stream_subprocess(proc, stream=False, sanitize=True) # type: ignore[arg-type] + + assert mock_tee.call_count == 2 + for call in mock_tee.call_args_list: + assert call.kwargs["sanitize"] is True + + async def test_bob_runner_passes_sanitize_true( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """BobRunner.headless() passes sanitize=True to stream_subprocess.""" + monkeypatch.delenv("FACTORY_BOB_DRY_RUN", raising=False) + monkeypatch.delenv("FACTORY_RUNNER_QUIET", raising=False) + monkeypatch.setenv("BOBSHELL_API_KEY", "test-key") + + (tmp_path / ".factory").mkdir() + + import factory.runners.bob as bob_module + + bob_module._auth_checked = False + + runner = BobRunner() + + with patch("factory.runners.bob.should_stream", return_value=True): + with patch( + "factory.runners.bob.stream_subprocess", new_callable=AsyncMock + ) as mock_stream: + mock_stream.return_value = (b"output\n", b"") + + with patch( + "asyncio.create_subprocess_exec", new_callable=AsyncMock + ) as mock_exec: + mock_proc = AsyncMock() + mock_proc.returncode = 0 + mock_exec.return_value = mock_proc + + await runner.headless( + prompt="Test", + task="Test", + cwd=tmp_path, + role="builder", + ) + + mock_stream.assert_called_once() + assert mock_stream.call_args.kwargs["sanitize"] is True + + bob_module._auth_checked = False + + async def test_claude_runner_does_not_sanitize( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """ClaudeRunner.headless() does not sanitize (default False).""" + monkeypatch.delenv("FACTORY_RUNNER_QUIET", raising=False) + + runner = ClaudeRunner() + + with patch("factory.runners.claude.should_stream", return_value=True): + with patch( + "factory.runners.claude.stream_subprocess", new_callable=AsyncMock + ) as mock_stream: + mock_stream.return_value = (b"output\n", b"") + + with patch( + "asyncio.create_subprocess_exec", new_callable=AsyncMock + ) as mock_exec: + mock_proc = AsyncMock() + mock_proc.returncode = 0 + mock_exec.return_value = mock_proc + + await runner.headless( + prompt="Test", + task="Test", + cwd=tmp_path, + role="researcher", + ) + + mock_stream.assert_called_once() + assert mock_stream.call_args.kwargs.get("sanitize", False) is False + + class TestCeilingAccumulationAcrossInvocations: """Tests that per-cycle ceiling accumulates across invoke_agent calls."""