From 1dd6d43d03e8c8528c12a363d03f3b82f77dddc9 Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Thu, 19 Feb 2026 17:10:24 -0500 Subject: [PATCH 01/22] Removed terminal_utils.py. Added documentation for completion.py. --- .github/CODEOWNERS | 4 +- CHANGELOG.md | 1 + cmd2/completion.py | 2 +- docs/api/completion.md | 3 ++ docs/api/index.md | 2 +- docs/api/terminal_utils.md | 3 -- docs/features/generating_output.md | 1 - docs/upgrades.md | 15 +----- mkdocs.yml | 2 +- tests/test_terminal_utils.py | 81 ------------------------------ 10 files changed, 10 insertions(+), 104 deletions(-) create mode 100644 docs/api/completion.md delete mode 100644 docs/api/terminal_utils.md delete mode 100644 tests/test_terminal_utils.py diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 423c242ef..0c39d4248 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -31,7 +31,8 @@ cmd2/argparse_*.py @kmvanbrunt @anselor cmd2/clipboard.py @tleonhardt cmd2/cmd2.py @tleonhardt @kmvanbrunt cmd2/colors.py @tleonhardt @kmvanbrunt -cmd2/command_definition.py @anselor +cmd2/command_definition.py @anselor @kmvanbrunt +cmd2/completion.py @kmvanbrunt cmd2/constants.py @tleonhardt @kmvanbrunt cmd2/decorators.py @kmvanbrunt @anselor cmd2/exceptions.py @kmvanbrunt @anselor @@ -43,7 +44,6 @@ cmd2/py_bridge.py @kmvanbrunt cmd2/rich_utils.py @kmvanbrunt cmd2/string_utils.py @kmvanbrunt cmd2/styles.py @tleonhardt @kmvanbrunt -cmd2/terminal_utils.py @kmvanbrunt cmd2/transcript.py @tleonhardt cmd2/utils.py @tleonhardt @kmvanbrunt diff --git a/CHANGELOG.md b/CHANGELOG.md index 1091bd69f..d46507fc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ shell, and the option for a persistent bottom bar that can display realtime stat `Statement.redirect_to`. - Changed `StatementParser.parse_command_only()` to return a `PartialStatement` object. - Renamed `Macro.arg_list` to `Macro.args`. + - Removed `terminal_utils.py` since `prompt-toolkit` provides this functionality. - Enhancements - New `cmd2.Cmd` parameters - **auto_suggest**: (boolean) if `True`, provide fish shell style auto-suggestions. These diff --git a/cmd2/completion.py b/cmd2/completion.py index d6e1afe93..f729fc65c 100644 --- a/cmd2/completion.py +++ b/cmd2/completion.py @@ -1,4 +1,4 @@ -"""Provides classes and functions related to completion.""" +"""Provides classes and functions related to command-line completion.""" import re import sys diff --git a/docs/api/completion.md b/docs/api/completion.md new file mode 100644 index 000000000..7cd7b6111 --- /dev/null +++ b/docs/api/completion.md @@ -0,0 +1,3 @@ +# cmd2.completion + +::: cmd2.completion diff --git a/docs/api/index.md b/docs/api/index.md index 47eaf259c..ac7f74b3d 100644 --- a/docs/api/index.md +++ b/docs/api/index.md @@ -18,6 +18,7 @@ incremented according to the [Semantic Version Specification](https://semver.org - [cmd2.colors](./colors.md) - StrEnum of all color names supported by the Rich library - [cmd2.command_definition](./command_definition.md) - supports the definition of commands in separate classes to be composed into cmd2.Cmd +- [cmd2.completion](./completion.md) - classes and functions related to command-line completion - [cmd2.constants](./constants.md) - constants used in `cmd2` - [cmd2.decorators](./decorators.md) - decorators for `cmd2` commands - [cmd2.exceptions](./exceptions.md) - custom `cmd2` exceptions @@ -30,6 +31,5 @@ incremented according to the [Semantic Version Specification](https://semver.org - [cmd2.rich_utils](./rich_utils.md) - common utilities to support Rich in cmd2 applications - [cmd2.string_utils](./string_utils.md) - string utility functions - [cmd2.styles](./styles.md) - cmd2-specific Rich styles and a StrEnum of their corresponding names -- [cmd2.terminal_utils](./terminal_utils.md) - support for terminal control escape sequences - [cmd2.transcript](./transcript.md) - functions and classes for running and validating transcripts - [cmd2.utils](./utils.md) - various utility classes and functions diff --git a/docs/api/terminal_utils.md b/docs/api/terminal_utils.md deleted file mode 100644 index 919f36dd5..000000000 --- a/docs/api/terminal_utils.md +++ /dev/null @@ -1,3 +0,0 @@ -# cmd2.terminal_utils - -::: cmd2.terminal_utils diff --git a/docs/features/generating_output.md b/docs/features/generating_output.md index da685208b..0f9c83092 100644 --- a/docs/features/generating_output.md +++ b/docs/features/generating_output.md @@ -121,7 +121,6 @@ following sections: - [cmd2.colors][] - [cmd2.rich_utils][] - [cmd2.string_utils][] -- [cmd2.terminal_utils][] The [color.py](https://github.com/python-cmd2/cmd2/blob/main/examples/color.py) example demonstrates all colors available to your `cmd2` application. diff --git a/docs/upgrades.md b/docs/upgrades.md index 9bdb83cd1..f664e3745 100644 --- a/docs/upgrades.md +++ b/docs/upgrades.md @@ -63,8 +63,7 @@ The major things users should be aware of when upgrading to 3.x are detailed in #### ansi The functionality within the `cmd2.ansi` module has either been removed or changed to be based on -`rich` and moved to one of the new modules: [cmd2.string_utils][], [cmd2.styles][], or -[cmd2.terminal_utils][]. +`rich` and moved to one of the new modules: [cmd2.string_utils][] or [cmd2.styles][]. To ease the migration path from `cmd2` 2.x to 3.x, we have created the `cmd2-ansi` module which is a backport of the `cmd2.ansi` module present in `cmd2` 2.7.0 in a standalone fashion. Relevant links: @@ -147,18 +146,6 @@ This includes functions for styling, aligning, and quoting/un-quoting text. See [getting_started.py](https://github.com/python-cmd2/cmd2/blob/main/examples/getting_started.py) example for a demonstration of how to use the common [cmd2.string_utils.stylize][] function. -#### terminal_utils - -Support for terminal control escape sequences for things like setting the window title and -asynchronous alerts has been moved from `cmd2.ansi` to the new [cmd2.terminal_utils][] module. - -This isn't really intended to be used by end users, but is used by higher-level functionality that -is intended to be used by end users such as [cmd2.Cmd.set_window_title][] and -[cmd2.Cmd.async_alert][]. - -See [async_printing.py](https://github.com/python-cmd2/cmd2/blob/main/examples/async_printing.py) -for an example of how to use this functionality in a `cmd2` application. - ### Argparse HelpFormatter classes `cmd2` now has 5 different Argparse HelpFormatter classes, all of which are based on the diff --git a/mkdocs.yml b/mkdocs.yml index 5d970c9b1..dd57faf49 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -200,6 +200,7 @@ nav: - api/clipboard.md - api/colors.md - api/command_definition.md + - api/completion.md - api/constants.md - api/decorators.md - api/exceptions.md @@ -211,7 +212,6 @@ nav: - api/rich_utils.md - api/string_utils.md - api/styles.md - - api/terminal_utils.md - api/transcript.md - api/utils.md - Version Upgrades: diff --git a/tests/test_terminal_utils.py b/tests/test_terminal_utils.py deleted file mode 100644 index c7d8a22f3..000000000 --- a/tests/test_terminal_utils.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Unit testing for cmd2/terminal_utils.py module""" - -import pytest - -from cmd2 import ( - Color, -) -from cmd2 import string_utils as su -from cmd2 import terminal_utils as tu - - -def test_set_title() -> None: - title = "Hello, world!" - assert tu.set_title_str(title) == tu.OSC + '2;' + title + tu.BEL - - -@pytest.mark.parametrize( - ('cols', 'prompt', 'line', 'cursor', 'msg', 'expected'), - [ - ( - 127, - '(Cmd) ', - 'help his', - 12, - su.stylize('Hello World!', style=Color.MAGENTA), - '\x1b[2K\r\x1b[35mHello World!\x1b[0m', - ), - (127, '\n(Cmd) ', 'help ', 5, 'foo', '\x1b[2K\x1b[1A\x1b[2K\rfoo'), - ( - 10, - '(Cmd) ', - 'help history of the american republic', - 4, - 'boo', - '\x1b[3B\x1b[2K\x1b[1A\x1b[2K\x1b[1A\x1b[2K\x1b[1A\x1b[2K\x1b[1A\x1b[2K\rboo', - ), - ], -) -def test_async_alert_str(cols, prompt, line, cursor, msg, expected) -> None: - alert_str = tu.async_alert_str(terminal_columns=cols, prompt=prompt, line=line, cursor_offset=cursor, alert_msg=msg) - assert alert_str == expected - - -def test_clear_screen() -> None: - clear_type = 2 - assert tu.clear_screen_str(clear_type) == f"{tu.CSI}{clear_type}J" - - clear_type = -1 - expected_err = "clear_type must in an integer from 0 to 3" - with pytest.raises(ValueError, match=expected_err): - tu.clear_screen_str(clear_type) - - clear_type = 4 - with pytest.raises(ValueError, match=expected_err): - tu.clear_screen_str(clear_type) - - -def test_clear_line() -> None: - clear_type = 2 - assert tu.clear_line_str(clear_type) == f"{tu.CSI}{clear_type}K" - - clear_type = -1 - expected_err = "clear_type must in an integer from 0 to 2" - with pytest.raises(ValueError, match=expected_err): - tu.clear_line_str(clear_type) - - clear_type = 3 - with pytest.raises(ValueError, match=expected_err): - tu.clear_line_str(clear_type) - - -def test_cursor() -> None: - count = 1 - assert tu.Cursor.UP(count) == f"{tu.CSI}{count}A" - assert tu.Cursor.DOWN(count) == f"{tu.CSI}{count}B" - assert tu.Cursor.FORWARD(count) == f"{tu.CSI}{count}C" - assert tu.Cursor.BACK(count) == f"{tu.CSI}{count}D" - - x = 4 - y = 5 - assert tu.Cursor.SET_POS(x, y) == f"{tu.CSI}{y};{x}H" From 47834559a9493699b0288a5177d373a90ea0bac0 Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Thu, 19 Feb 2026 22:37:43 -0500 Subject: [PATCH 02/22] Restored upgrade documentation. --- docs/upgrades.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/upgrades.md b/docs/upgrades.md index f664e3745..9bdb83cd1 100644 --- a/docs/upgrades.md +++ b/docs/upgrades.md @@ -63,7 +63,8 @@ The major things users should be aware of when upgrading to 3.x are detailed in #### ansi The functionality within the `cmd2.ansi` module has either been removed or changed to be based on -`rich` and moved to one of the new modules: [cmd2.string_utils][] or [cmd2.styles][]. +`rich` and moved to one of the new modules: [cmd2.string_utils][], [cmd2.styles][], or +[cmd2.terminal_utils][]. To ease the migration path from `cmd2` 2.x to 3.x, we have created the `cmd2-ansi` module which is a backport of the `cmd2.ansi` module present in `cmd2` 2.7.0 in a standalone fashion. Relevant links: @@ -146,6 +147,18 @@ This includes functions for styling, aligning, and quoting/un-quoting text. See [getting_started.py](https://github.com/python-cmd2/cmd2/blob/main/examples/getting_started.py) example for a demonstration of how to use the common [cmd2.string_utils.stylize][] function. +#### terminal_utils + +Support for terminal control escape sequences for things like setting the window title and +asynchronous alerts has been moved from `cmd2.ansi` to the new [cmd2.terminal_utils][] module. + +This isn't really intended to be used by end users, but is used by higher-level functionality that +is intended to be used by end users such as [cmd2.Cmd.set_window_title][] and +[cmd2.Cmd.async_alert][]. + +See [async_printing.py](https://github.com/python-cmd2/cmd2/blob/main/examples/async_printing.py) +for an example of how to use this functionality in a `cmd2` application. + ### Argparse HelpFormatter classes `cmd2` now has 5 different Argparse HelpFormatter classes, all of which are based on the From 0f647bb486e892904869ab3cfad43c3623de61ca Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Fri, 20 Feb 2026 10:19:55 -0500 Subject: [PATCH 03/22] Deleted terminal_utils.py. --- cmd2/terminal_utils.py | 144 ----------------------------------------- 1 file changed, 144 deletions(-) delete mode 100644 cmd2/terminal_utils.py diff --git a/cmd2/terminal_utils.py b/cmd2/terminal_utils.py deleted file mode 100644 index 4a5a2cddd..000000000 --- a/cmd2/terminal_utils.py +++ /dev/null @@ -1,144 +0,0 @@ -r"""Support for terminal control escape sequences. - -These are used for things like setting the window title and asynchronous alerts. -""" - -from . import string_utils as su - -####################################################### -# Common ANSI escape sequence constants -####################################################### -ESC = '\x1b' -CSI = f'{ESC}[' -OSC = f'{ESC}]' -BEL = '\a' - - -#################################################################################### -# Utility functions which create various ANSI sequences -#################################################################################### -def set_title_str(title: str) -> str: - """Generate a string that, when printed, sets a terminal's window title. - - :param title: new title for the window - :return: the set title string - """ - return f"{OSC}2;{title}{BEL}" - - -def clear_screen_str(clear_type: int = 2) -> str: - """Generate a string that, when printed, clears a terminal screen based on value of clear_type. - - :param clear_type: integer which specifies how to clear the screen (Defaults to 2) - Possible values: - 0 - clear from cursor to end of screen - 1 - clear from cursor to beginning of the screen - 2 - clear entire screen - 3 - clear entire screen and delete all lines saved in the scrollback buffer - :return: the clear screen string - :raises ValueError: if clear_type is not a valid value - """ - if 0 <= clear_type <= 3: - return f"{CSI}{clear_type}J" - raise ValueError("clear_type must in an integer from 0 to 3") - - -def clear_line_str(clear_type: int = 2) -> str: - """Generate a string that, when printed, clears a line based on value of clear_type. - - :param clear_type: integer which specifies how to clear the line (Defaults to 2) - Possible values: - 0 - clear from cursor to the end of the line - 1 - clear from cursor to beginning of the line - 2 - clear entire line - :return: the clear line string - :raises ValueError: if clear_type is not a valid value - """ - if 0 <= clear_type <= 2: - return f"{CSI}{clear_type}K" - raise ValueError("clear_type must in an integer from 0 to 2") - - -#################################################################################### -# Implementations intended for direct use (do NOT use outside of cmd2) -#################################################################################### -class Cursor: - """Create ANSI sequences to alter the cursor position.""" - - @staticmethod - def UP(count: int = 1) -> str: # noqa: N802 - """Move the cursor up a specified amount of lines (Defaults to 1).""" - return f"{CSI}{count}A" - - @staticmethod - def DOWN(count: int = 1) -> str: # noqa: N802 - """Move the cursor down a specified amount of lines (Defaults to 1).""" - return f"{CSI}{count}B" - - @staticmethod - def FORWARD(count: int = 1) -> str: # noqa: N802 - """Move the cursor forward a specified amount of lines (Defaults to 1).""" - return f"{CSI}{count}C" - - @staticmethod - def BACK(count: int = 1) -> str: # noqa: N802 - """Move the cursor back a specified amount of lines (Defaults to 1).""" - return f"{CSI}{count}D" - - @staticmethod - def SET_POS(x: int, y: int) -> str: # noqa: N802 - """Set the cursor position to coordinates which are 1-based.""" - return f"{CSI}{y};{x}H" - - -def async_alert_str(*, terminal_columns: int, prompt: str, line: str, cursor_offset: int, alert_msg: str) -> str: - """Calculate the desired string, including ANSI escape codes, for displaying an asynchronous alert message. - - :param terminal_columns: terminal width (number of columns) - :param prompt: current onscreen prompt - :param line: current contents of the prompt-toolkit line buffer - :param cursor_offset: the offset of the current cursor position within line - :param alert_msg: the message to display to the user - :return: the correct string so that the alert message appears to the user to be printed above the current line. - """ - # Split the prompt lines since it can contain newline characters. - prompt_lines = prompt.splitlines() or [''] - - # Calculate how many terminal lines are taken up by all prompt lines except for the last one. - # That will be included in the input lines calculations since that is where the cursor is. - num_prompt_terminal_lines = 0 - for prompt_line in prompt_lines[:-1]: - prompt_line_width = su.str_width(prompt_line) - num_prompt_terminal_lines += int(prompt_line_width / terminal_columns) + 1 - - # Now calculate how many terminal lines are take up by the input - last_prompt_line = prompt_lines[-1] - last_prompt_line_width = su.str_width(last_prompt_line) - - input_width = last_prompt_line_width + su.str_width(line) - - num_input_terminal_lines = int(input_width / terminal_columns) + 1 - - # Get the cursor's offset from the beginning of the first input line - cursor_input_offset = last_prompt_line_width + cursor_offset - - # Calculate what input line the cursor is on - cursor_input_line = int(cursor_input_offset / terminal_columns) + 1 - - # Create a string that when printed will clear all input lines and display the alert - terminal_str = '' - - # Move the cursor down to the last input line - if cursor_input_line != num_input_terminal_lines: - terminal_str += Cursor.DOWN(num_input_terminal_lines - cursor_input_line) - - # Clear each line from the bottom up so that the cursor ends up on the first prompt line - total_lines = num_prompt_terminal_lines + num_input_terminal_lines - terminal_str += (clear_line_str() + Cursor.UP(1)) * (total_lines - 1) - - # Clear the first prompt line - terminal_str += clear_line_str() - - # Move the cursor to the beginning of the first prompt line and print the alert - terminal_str += '\r' + alert_msg - return terminal_str From 00412f4cb9b6d5d585819bd47034a39e092ac6a9 Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Mon, 23 Feb 2026 20:43:27 -0500 Subject: [PATCH 04/22] Moved logic specific to the main command line from Cmd.read_input() to Cmd._read_command_line(). No longer setting Cmd.session.input to a pipe since that causes warnings from prompt-toolkit. For non-interactive sessions (stdin is a pipe), we use stdin.readline(). --- cmd2/__init__.py | 2 - cmd2/cmd2.py | 357 ++++++++++++------------- cmd2/constants.py | 1 + cmd2/pt_utils.py | 3 +- cmd2/utils.py | 19 -- examples/read_input.py | 21 +- tests/test_cmd2.py | 572 ++++++++++++++--------------------------- 7 files changed, 364 insertions(+), 611 deletions(-) diff --git a/cmd2/__init__.py b/cmd2/__init__.py index a87303daa..d36aa1461 100644 --- a/cmd2/__init__.py +++ b/cmd2/__init__.py @@ -52,7 +52,6 @@ from .string_utils import stylize from .styles import Cmd2Style from .utils import ( - CompletionMode, CustomCompletionSettings, Settable, categorize, @@ -103,7 +102,6 @@ "Cmd2Style", # Utilities 'categorize', - 'CompletionMode', 'CustomCompletionSettings', 'Settable', 'set_default_str_sort_key', diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 10b8bebd7..3e08c3622 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -60,6 +60,7 @@ ) import rich.box +from prompt_toolkit.application import get_app from rich.console import ( Group, RenderableType, @@ -150,9 +151,9 @@ from prompt_toolkit.completion import Completer, DummyCompleter from prompt_toolkit.formatted_text import ANSI, FormattedText from prompt_toolkit.history import InMemoryHistory -from prompt_toolkit.input import DummyInput +from prompt_toolkit.input import DummyInput, create_input from prompt_toolkit.key_binding import KeyBindings -from prompt_toolkit.output import DummyOutput +from prompt_toolkit.output import DummyOutput, create_output from prompt_toolkit.patch_stdout import patch_stdout from prompt_toolkit.shortcuts import CompleteStyle, PromptSession, set_title @@ -371,7 +372,6 @@ def __init__( # Configure a few defaults self.prompt = Cmd.DEFAULT_PROMPT self.intro = intro - self.use_rawinput = True # What to use for standard input if stdin is not None: @@ -387,19 +387,6 @@ def __init__( # Key used for completion self.completekey = completekey - key_bindings = None - if self.completekey != self.DEFAULT_COMPLETEKEY: - # Configure prompt_toolkit `KeyBindings` with the custom key for completion - key_bindings = KeyBindings() - - @key_bindings.add(self.completekey) - def _(event: Any) -> None: # pragma: no cover - """Trigger completion.""" - b = event.current_buffer - if b.complete_state: - b.complete_next() - else: - b.start_completion(select_first=False) # Attributes which should NOT be dynamically settable via the set command at runtime self.default_to_shell = False # Attempt to run unrecognized commands as shell commands @@ -440,7 +427,7 @@ def _(event: Any) -> None: # pragma: no cover self.self_in_py = False # Commands to exclude from the help menu and completion - self.hidden_commands = ['eof', '_relative_run_script'] + self.hidden_commands = [constants.EOF, '_relative_run_script'] # Initialize history from a persistent history file (if present) self.persistent_history_file = '' @@ -457,38 +444,10 @@ def _(event: Any) -> None: # pragma: no cover if auto_suggest: self.auto_suggest = AutoSuggestFromHistory() - try: - self.session: PromptSession[str] = PromptSession( - auto_suggest=self.auto_suggest, - bottom_toolbar=self.get_bottom_toolbar if self.bottom_toolbar else None, - complete_in_thread=True, - complete_style=CompleteStyle.MULTI_COLUMN, - complete_while_typing=False, - completer=self.completer, - history=self.history_adapter, - key_bindings=key_bindings, - lexer=self.lexer, - ) - except (NoConsoleScreenBufferError, AttributeError, ValueError): - # Fallback to dummy input/output if PromptSession initialization fails. - # This can happen in some CI environments (like GitHub Actions on Windows) - # where isatty() is True but there is no real console. - self.session = PromptSession( - auto_suggest=self.auto_suggest, - bottom_toolbar=self.get_bottom_toolbar if self.bottom_toolbar else None, - complete_in_thread=True, - complete_style=CompleteStyle.MULTI_COLUMN, - complete_while_typing=False, - completer=self.completer, - history=self.history_adapter, - input=DummyInput(), - key_bindings=key_bindings, - lexer=self.lexer, - output=DummyOutput(), - ) + self.session = self._build_session() # Commands to exclude from the history command - self.exclude_from_history = ['eof', 'history'] + self.exclude_from_history = [constants.EOF, 'history'] # Dictionary of macro names and their values self.macros: dict[str, Macro] = {} @@ -611,11 +570,6 @@ def _(event: Any) -> None: # pragma: no cover # This determines the value returned by cmdloop() when exiting the application self.exit_code = 0 - # This flag is set to True when the prompt is displayed and the application is waiting for user input. - # It is used by async_alert() to determine if it is safe to alert the user. - self._in_prompt = False - self._in_prompt_lock = threading.Lock() - # Commands disabled during specific application states # Key: Command name | Value: DisabledCommand object self.disabled_commands: dict[str, DisabledCommand] = {} @@ -655,6 +609,61 @@ def _(event: Any) -> None: # pragma: no cover # the current command being executed self.current_command: Statement | None = None + def _build_session(self) -> PromptSession[str]: + """Construct the primary PromptSession for the cmd2 application. + + Builds an interactive session if stdin is a TTY. Otherwise, uses + dummy drivers to support non-interactive streams like pipes or files. + """ + key_bindings = None + if self.completekey != self.DEFAULT_COMPLETEKEY: + # Configure prompt_toolkit `KeyBindings` with the custom key for completion + key_bindings = KeyBindings() + + @key_bindings.add(self.completekey) + def _(event: Any) -> None: # pragma: no cover + """Trigger completion.""" + b = event.current_buffer + if b.complete_state: + b.complete_next() + else: + b.start_completion(select_first=False) + + # Base configuration + kwargs: dict[str, Any] = { + "auto_suggest": self.auto_suggest, + "bottom_toolbar": self.get_bottom_toolbar if self.bottom_toolbar else None, + "complete_style": CompleteStyle.MULTI_COLUMN, + "complete_in_thread": True, + "complete_while_typing": False, + "completer": self.completer, + "history": self.history_adapter, + "key_bindings": key_bindings, + "lexer": self.lexer, + "rprompt": self.get_rprompt, + } + + if self.stdin.isatty(): + try: + kwargs["input"] = create_input(stdin=self.stdin) + kwargs["output"] = create_output(stdout=self.stdout) + return PromptSession(**kwargs) + + except (NoConsoleScreenBufferError, AttributeError, ValueError): + # Fallback to dummy input/output if PromptSession initialization fails. + # This can happen in some CI environments (like GitHub Actions on Windows) + # where isatty() is True but there is no real console. + pass + + # Fallback to dummy drivers for non-interactive environments. + kwargs.update( + { + "input": DummyInput(), + "output": DummyOutput(), + } + ) + return PromptSession(**kwargs) + def find_commandsets(self, commandset_type: type[CommandSet], *, subclass_match: bool = False) -> list[CommandSet]: """Find all CommandSets that match the provided CommandSet type. @@ -1226,10 +1235,6 @@ def allow_style(self, new_val: ru.AllowStyle) -> None: """Setter property needed to support do_set when it updates allow_style.""" ru.ALLOW_STYLE = new_val - def _completion_supported(self) -> bool: - """Return whether completion is supported.""" - return self.use_rawinput and bool(self.completekey) - @property def visible_prompt(self) -> str: """Read-only property to get the visible prompt with any ANSI style sequences stripped. @@ -2847,7 +2852,7 @@ def _complete_statement(self, line: str) -> Statement: # Get next line of this command nextline = self._read_command_line(self.continuation_prompt) - if nextline == 'eof': + if nextline == constants.EOF: # they entered either a blank line, or we hit an EOF # for some other reason. Turn the literal 'eof' # into a blank line, which serves as a command @@ -3185,36 +3190,85 @@ def completedefault(self, *_ignored: Sequence[str]) -> Completions: def _suggest_similar_command(self, command: str) -> str | None: return suggest_similar(command, self.get_visible_commands()) + def _read_raw_input( + self, + prompt: Callable[[], ANSI | str] | ANSI | str, + session: PromptSession[str], + completer: Completer, + **prompt_kwargs: Any, # optional keyword args for session.prompt() + ) -> str: + """Read input from either an interactive terminal session or a redirected stream.""" + # _build_session() sets session.input to a DummyInput when not in a TTY. + if not isinstance(session.input, DummyInput): + return session.prompt(prompt, completer=completer, **prompt_kwargs) # type: ignore[arg-type] + + # We're not at a terminal, so we're likely reading from a file or a pipe. + # We wait for a line of data before we print anything. + line = self.stdin.readline() + + # If the stream is empty, we've reached the end of the input. + if not line: + return constants.EOF + + # If echo is on, we want the output to look like a session transcript. + # Print the prompt and the command before the results. + if self.echo: + prompt_obj = prompt() if callable(prompt) else prompt + prompt_str = prompt_obj.value if isinstance(prompt_obj, ANSI) else prompt_obj + end = "" if line.endswith('\n') else "\n" + + self.poutput(f'{prompt_str}{line}', end=end) + + return line.rstrip('\r\n') + + def _resolve_completer( + self, + preserve_quotes: bool = False, + choices: Iterable[Any] | None = None, + choices_provider: ChoicesProviderUnbound | None = None, + completer: CompleterUnbound | None = None, + parser: argparse.ArgumentParser | None = None, + ) -> Completer: + """Determine the appropriate completer based on provided arguments.""" + if not any((parser, choices, choices_provider, completer)): + return DummyCompleter() + + if parser and any((choices, choices_provider, completer)): + err_msg = "None of the following parameters can be used alongside a parser:\nchoices, choices_provider, completer" + raise ValueError(err_msg) + + if parser is None: + parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(add_help=False) + parser.add_argument( + 'arg', + suppress_tab_hint=True, + choices=choices, + choices_provider=choices_provider, + completer=completer, + ) + + settings = utils.CustomCompletionSettings(parser, preserve_quotes=preserve_quotes) + return Cmd2Completer(self, custom_settings=settings) + def read_input( self, prompt: str = '', *, - history: Iterable[str] | None = None, - completion_mode: utils.CompletionMode = utils.CompletionMode.NONE, + history: Sequence[str] | None = None, preserve_quotes: bool = False, choices: Iterable[Any] | None = None, choices_provider: ChoicesProviderUnbound | None = None, completer: CompleterUnbound | None = None, parser: argparse.ArgumentParser | None = None, ) -> str: - """Read input from appropriate stdin value. + """Read a line of input with optional completion and history. - Also supports completion and up-arrow history while input is being entered. - - :param prompt: prompt to display to user - :param history: optional Iterable of strings to use for up-arrow history. If completion_mode is - CompletionMode.COMMANDS and this is None, then cmd2's command list history will - be used. The passed in history will not be edited. It is the caller's responsibility - to add the returned input to history if desired. Defaults to None. - :param completion_mode: tells what type of completion to support. Completion only works when - self.use_rawinput is True and sys.stdin is a terminal. Defaults to - CompletionMode.NONE. - The following optional settings apply when completion_mode is CompletionMode.CUSTOM: :param preserve_quotes: if True, then quoted tokens will keep their quotes when processed by ArgparseCompleter. This is helpful in cases when you're completing flag-like tokens (e.g. -o, --option) and you don't want them to be treated as argparse flags when quoted. Set this to True if you plan on passing the string to argparse with the tokens still quoted. + A maximum of one of these should be provided: :param choices: iterable of accepted values for single argument :param choices_provider: function that provides choices for single argument @@ -3223,135 +3277,49 @@ def read_input( :return: the line read from stdin with all trailing new lines removed :raises Exception: any exceptions raised by prompt() """ - with self._in_prompt_lock: - self._in_prompt = True - try: - if self.use_rawinput and self.stdin.isatty(): - # Determine completer - completer_to_use: Completer - if completion_mode == utils.CompletionMode.NONE: - completer_to_use = DummyCompleter() - - # No up-arrow history when CompletionMode.NONE and history is None - if history is None: - history = [] - elif completion_mode == utils.CompletionMode.COMMANDS: - completer_to_use = self.completer - else: - # Custom completion - if parser is None: - parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(add_help=False) - parser.add_argument( - 'arg', - suppress_tab_hint=True, - choices=choices, - choices_provider=choices_provider, - completer=completer, - ) - custom_settings = utils.CustomCompletionSettings(parser, preserve_quotes=preserve_quotes) - completer_to_use = Cmd2Completer(self, custom_settings=custom_settings) - - # Use dynamic prompt if the prompt matches self.prompt - def get_prompt() -> ANSI | str: - return ANSI(self.prompt) - - prompt_to_use: Callable[[], ANSI | str] | ANSI | str = ANSI(prompt) - if prompt == self.prompt: - prompt_to_use = get_prompt - - with patch_stdout(): - if history is not None: - # If custom history is provided, we use the prompt() shortcut - # which can take a history object. - history_to_use = InMemoryHistory() - for item in history: - history_to_use.append_string(item) - - temp_session1: PromptSession[str] = PromptSession( - complete_style=self.session.complete_style, - complete_while_typing=self.session.complete_while_typing, - history=history_to_use, - input=self.session.input, - lexer=self.lexer, - output=self.session.output, - ) - - return temp_session1.prompt( - prompt_to_use, - bottom_toolbar=self.get_bottom_toolbar if self.bottom_toolbar else None, - completer=completer_to_use, - lexer=self.lexer, - pre_run=self.pre_prompt, - rprompt=self.get_rprompt, - ) - - # history is None - return self.session.prompt( - prompt_to_use, - bottom_toolbar=self.get_bottom_toolbar if self.bottom_toolbar else None, - completer=completer_to_use, - lexer=self.lexer, - pre_run=self.pre_prompt, - rprompt=self.get_rprompt, - ) - - # Otherwise read from self.stdin - elif self.stdin.isatty(): - # on a tty, print the prompt first, then read the line - temp_session2: PromptSession[str] = PromptSession( - input=self.session.input, - output=self.session.output, - lexer=self.lexer, - complete_style=self.session.complete_style, - complete_while_typing=self.session.complete_while_typing, - ) - line = temp_session2.prompt( - prompt, - bottom_toolbar=self.get_bottom_toolbar if self.bottom_toolbar else None, - pre_run=self.pre_prompt, - rprompt=self.get_rprompt, - ) - if len(line) == 0: - raise EOFError - return line.rstrip('\n') - else: - # not a tty, just read the line - temp_session3: PromptSession[str] = PromptSession( - complete_style=self.session.complete_style, - complete_while_typing=self.session.complete_while_typing, - input=self.session.input, - lexer=self.lexer, - output=self.session.output, - ) - line = temp_session3.prompt( - bottom_toolbar=self.get_bottom_toolbar if self.bottom_toolbar else None, - pre_run=self.pre_prompt, - rprompt=self.get_rprompt, - ) - if len(line) == 0: - raise EOFError - line = line.rstrip('\n') - - if self.echo: - self.poutput(f'{prompt}{line}') + completer_to_use = self._resolve_completer( + preserve_quotes=preserve_quotes, + choices=choices, + choices_provider=choices_provider, + completer=completer, + parser=parser, + ) - return line + temp_session: PromptSession[str] = PromptSession( + complete_style=self.session.complete_style, + complete_while_typing=self.session.complete_while_typing, + history=InMemoryHistory(history) if history is not None else InMemoryHistory(), + input=self.session.input, + output=self.session.output, + ) - finally: - with self._in_prompt_lock: - self._in_prompt = False + return self._read_raw_input(prompt, temp_session, completer_to_use) def _read_command_line(self, prompt: str) -> str: - """Read command line from appropriate stdin. + """Read the next command line from the input stream. :param prompt: prompt to display to user - :return: command line text of 'eof' if an EOFError was caught + :return: command line text or 'eof' if an EOFError was caught :raises Exception: whatever exceptions are raised by input() except for EOFError """ try: - return self.read_input(prompt, completion_mode=utils.CompletionMode.COMMANDS) + # Use dynamic prompt if the prompt matches self.prompt + def get_prompt() -> ANSI | str: + return ANSI(self.prompt) + + prompt_to_use: Callable[[], ANSI | str] | ANSI | str = ANSI(prompt) + if prompt == self.prompt: + prompt_to_use = get_prompt + + with patch_stdout(): + return self._read_raw_input( + prompt=prompt_to_use, + session=self.session, + completer=self.completer, + pre_run=self.pre_prompt, + ) except EOFError: - return 'eof' + return constants.EOF def _cmdloop(self) -> None: """Repeatedly issue a prompt, accept input, parse it, and dispatch to apporpriate commands. @@ -5232,11 +5200,8 @@ def async_alert(self, alert_msg: str, new_prompt: str | None = None) -> None: :raises RuntimeError: if called from the main thread. :raises RuntimeError: if main thread is not currently at the prompt. """ - # Check if prompt is currently displayed and waiting for user input - with self._in_prompt_lock: - if not self._in_prompt or not self.session.app.is_running: - raise RuntimeError("Main thread is not at the prompt") + # Check if prompt is currently displayed and waiting for user input def _alert() -> None: if new_prompt is not None: self.prompt = new_prompt @@ -5248,11 +5213,11 @@ def _alert() -> None: if hasattr(self, 'session'): # Invalidate to force prompt update - self.session.app.invalidate() + get_app().invalidate() # Schedule the alert to run on the main thread's event loop try: - self.session.app.loop.call_soon_threadsafe(_alert) # type: ignore[union-attr] + get_app().loop.call_soon_threadsafe(_alert) # type: ignore[union-attr] except AttributeError: # Fallback if loop is not accessible (e.g. prompt not running or session not initialized) # This shouldn't happen if _in_prompt is True, unless prompt exited concurrently. diff --git a/cmd2/constants.py b/cmd2/constants.py index 75c60662c..41aba11f9 100644 --- a/cmd2/constants.py +++ b/cmd2/constants.py @@ -4,6 +4,7 @@ # nothing here should be considered part of the public API of this module INFINITY = float('inf') +EOF = 'eof' # Used for command parsing, output redirection, completion, and word breaks. Do not change. QUOTES = ['"', "'"] diff --git a/cmd2/pt_utils.py b/cmd2/pt_utils.py index a79afa14d..2adde87db 100644 --- a/cmd2/pt_utils.py +++ b/cmd2/pt_utils.py @@ -11,6 +11,7 @@ ) from prompt_toolkit import print_formatted_text +from prompt_toolkit.application import get_app from prompt_toolkit.completion import ( Completer, Completion, @@ -95,7 +96,7 @@ def get_completions(self, document: Document, _complete_event: object) -> Iterab # and returning early, we trigger a new completion cycle where the quote # is already present, allowing for proper common prefix calculation. if completions._add_opening_quote and search_text_length > 0: - buffer = self.cmd_app.session.app.current_buffer + buffer = get_app().current_buffer buffer.cursor_left(search_text_length) buffer.insert_text(completions._quote_char) diff --git a/cmd2/utils.py b/cmd2/utils.py index d698b4eb7..8d314d741 100644 --- a/cmd2/utils.py +++ b/cmd2/utils.py @@ -17,7 +17,6 @@ MutableSequence, ) from difflib import SequenceMatcher -from enum import Enum from typing import ( TYPE_CHECKING, Any, @@ -733,24 +732,6 @@ def get_defining_class(meth: Callable[..., Any]) -> type[Any] | None: return cast(type, getattr(meth, '__objclass__', None)) # handle special descriptor objects -class CompletionMode(Enum): - """Enum for what type of completion to perform in cmd2.Cmd.read_input().""" - - # Completion will be disabled during read_input() call - # Use of custom up-arrow history supported - NONE = 1 - - # read_input() will complete cmd2 commands and their arguments - # cmd2's command line history will be used for up arrow if history is not provided. - # Otherwise use of custom up-arrow history supported. - COMMANDS = 2 - - # read_input() will complete based on one of its following parameters: - # choices, choices_provider, completer, parser - # Use of custom up-arrow history supported - CUSTOM = 3 - - class CustomCompletionSettings: """Used by cmd2.Cmd.complete() to complete strings other than command arguments.""" diff --git a/examples/read_input.py b/examples/read_input.py index 408617705..24286110f 100755 --- a/examples/read_input.py +++ b/examples/read_input.py @@ -32,13 +32,6 @@ def do_basic_with_history(self, _) -> None: else: self.custom_history.append(input_str) - @cmd2.with_category(EXAMPLE_COMMANDS) - def do_commands(self, _) -> None: - """Call read_input the same way cmd2 prompt does to read commands.""" - self.poutput("Tab completing and up-arrow history configured for commands") - with contextlib.suppress(EOFError): - self.read_input("> ", completion_mode=cmd2.CompletionMode.COMMANDS) - @cmd2.with_category(EXAMPLE_COMMANDS) def do_custom_choices(self, _) -> None: """Call read_input to use custom history and choices.""" @@ -47,7 +40,6 @@ def do_custom_choices(self, _) -> None: input_str = self.read_input( "> ", history=self.custom_history, - completion_mode=cmd2.CompletionMode.CUSTOM, choices=['choice_1', 'choice_2', 'choice_3'], ) except EOFError: @@ -55,9 +47,9 @@ def do_custom_choices(self, _) -> None: else: self.custom_history.append(input_str) - def choices_provider(self) -> list[str]: + def choices_provider(self) -> cmd2.Choices: """Example choices provider function.""" - return ["from_provider_1", "from_provider_2", "from_provider_3"] + return cmd2.Choices.from_values(["from_provider_1", "from_provider_2", "from_provider_3"]) @cmd2.with_category(EXAMPLE_COMMANDS) def do_custom_choices_provider(self, _) -> None: @@ -67,7 +59,6 @@ def do_custom_choices_provider(self, _) -> None: input_str = self.read_input( "> ", history=self.custom_history, - completion_mode=cmd2.CompletionMode.CUSTOM, choices_provider=ReadInputApp.choices_provider, ) except EOFError: @@ -80,9 +71,7 @@ def do_custom_completer(self, _) -> None: """Call read_input to use custom history and completer function.""" self.poutput("Tab completing paths and using custom history") try: - input_str = self.read_input( - "> ", history=self.custom_history, completion_mode=cmd2.CompletionMode.CUSTOM, completer=cmd2.Cmd.path_complete - ) + input_str = self.read_input("> ", history=self.custom_history, completer=cmd2.Cmd.path_complete) self.custom_history.append(input_str) except EOFError: pass @@ -99,9 +88,7 @@ def do_custom_parser(self, _) -> None: self.poutput(parser.format_usage()) try: - input_str = self.read_input( - "> ", history=self.custom_history, completion_mode=cmd2.CompletionMode.CUSTOM, parser=parser - ) + input_str = self.read_input("> ", history=self.custom_history, parser=parser) except EOFError: pass else: diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index f9ed0a5fa..428a48f21 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -13,6 +13,10 @@ import pytest from prompt_toolkit.auto_suggest import AutoSuggestFromHistory +from prompt_toolkit.completion import DummyCompleter +from prompt_toolkit.input import DummyInput, create_pipe_input +from prompt_toolkit.output import DummyOutput +from prompt_toolkit.shortcuts import PromptSession from rich.text import Text import cmd2 @@ -389,7 +393,6 @@ def test_run_script_with_binary_file(base_app, request) -> None: def test_run_script_with_python_file(base_app, request, monkeypatch) -> None: - # Mock out the read_input call so we don't actually wait for a user's response on stdin read_input_mock = mock.MagicMock(name='read_input', return_value='2') monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) @@ -1015,8 +1018,6 @@ def test_base_cmdloop_with_startup_commands() -> None: with mock.patch.object(sys, 'argv', testargs): app = create_outsim_app() - app.use_rawinput = True - # Run the command loop with custom intro app.cmdloop(intro=intro) @@ -1030,12 +1031,10 @@ def test_base_cmdloop_without_startup_commands(monkeypatch) -> None: with mock.patch.object(sys, 'argv', testargs): app = create_outsim_app() - app.use_rawinput = True app.intro = 'Hello World, this is an intro ...' - # Mock out the read_input call so we don't actually wait for a user's response on stdin - read_input_mock = mock.MagicMock(name='read_input', return_value='quit') - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_command_mock = mock.MagicMock(name='_read_command_line', return_value='quit') + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) expected = app.intro + '\n' @@ -1045,27 +1044,6 @@ def test_base_cmdloop_without_startup_commands(monkeypatch) -> None: assert out == expected -def test_cmdloop_without_rawinput(monkeypatch) -> None: - # Need to patch sys.argv so cmd2 doesn't think it was called with arguments equal to the py.test args - testargs = ["prog"] - with mock.patch.object(sys, 'argv', testargs): - app = create_outsim_app() - - app.use_rawinput = False - app.echo = False - app.intro = 'Hello World, this is an intro ...' - - # Mock out the read_input call so we don't actually wait for a user's response on stdin - read_input_mock = mock.MagicMock(name='read_input', return_value='quit') - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) - - expected = app.intro + '\n' - - app.cmdloop() - out = app.stdout.getvalue() - assert out == expected - - def test_cmdfinalizations_runs(base_app, monkeypatch) -> None: """Make sure _run_cmdfinalization_hooks is run after each command.""" with ( @@ -1202,10 +1180,9 @@ def say_app(): def test_ctrl_c_at_prompt(say_app, monkeypatch) -> None: - # Mock out the read_input call so we don't actually wait for a user's response on stdin - read_input_mock = mock.MagicMock(name='read_input') - read_input_mock.side_effect = ['say hello', KeyboardInterrupt(), 'say goodbye', 'eof'] - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_command_mock = mock.MagicMock(name='_read_command_line') + read_command_mock.side_effect = ['say hello', KeyboardInterrupt(), 'say goodbye', constants.EOF] + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) say_app.cmdloop() @@ -1502,7 +1479,6 @@ def select_app(): def test_select_options(select_app, monkeypatch) -> None: - # Mock out the read_input call so we don't actually wait for a user's response on stdin read_input_mock = mock.MagicMock(name='read_input', return_value='2') monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) @@ -1524,7 +1500,6 @@ def test_select_options(select_app, monkeypatch) -> None: def test_select_invalid_option_too_big(select_app, monkeypatch) -> None: - # Mock out the input call so we don't actually wait for a user's response on stdin read_input_mock = mock.MagicMock(name='read_input') # If side_effect is an iterable then each call to the mock will return the next value from the iterable. @@ -1553,7 +1528,6 @@ def test_select_invalid_option_too_big(select_app, monkeypatch) -> None: def test_select_invalid_option_too_small(select_app, monkeypatch) -> None: - # Mock out the input call so we don't actually wait for a user's response on stdin read_input_mock = mock.MagicMock(name='read_input') # If side_effect is an iterable then each call to the mock will return the next value from the iterable. @@ -1582,7 +1556,6 @@ def test_select_invalid_option_too_small(select_app, monkeypatch) -> None: def test_select_list_of_strings(select_app, monkeypatch) -> None: - # Mock out the input call so we don't actually wait for a user's response on stdin read_input_mock = mock.MagicMock(name='read_input', return_value='2') monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) @@ -1603,7 +1576,6 @@ def test_select_list_of_strings(select_app, monkeypatch) -> None: def test_select_list_of_tuples(select_app, monkeypatch) -> None: - # Mock out the input call so we don't actually wait for a user's response on stdin read_input_mock = mock.MagicMock(name='read_input', return_value='2') monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) @@ -1624,7 +1596,6 @@ def test_select_list_of_tuples(select_app, monkeypatch) -> None: def test_select_uneven_list_of_tuples(select_app, monkeypatch) -> None: - # Mock out the input call so we don't actually wait for a user's response on stdin read_input_mock = mock.MagicMock(name='read_input', return_value='2') monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) @@ -1653,7 +1624,6 @@ def test_select_uneven_list_of_tuples(select_app, monkeypatch) -> None: ], ) def test_select_return_type(select_app, monkeypatch, selection, type_str) -> None: - # Mock out the input call so we don't actually wait for a user's response on stdin read_input_mock = mock.MagicMock(name='read_input', return_value=selection) monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) @@ -1756,10 +1726,8 @@ def test_multiline_complete_empty_statement_raises_exception(multiline_app) -> N def test_multiline_complete_statement_without_terminator(multiline_app, monkeypatch) -> None: - # Mock out the input call so we don't actually wait for a user's response - # on stdin when it looks for more input - read_input_mock = mock.MagicMock(name='read_input', return_value='\n') - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_command_mock = mock.MagicMock(name='_read_command_line', return_value='\n') + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) command = 'orate' args = 'hello world' @@ -1771,10 +1739,8 @@ def test_multiline_complete_statement_without_terminator(multiline_app, monkeypa def test_multiline_complete_statement_with_unclosed_quotes(multiline_app, monkeypatch) -> None: - # Mock out the input call so we don't actually wait for a user's response - # on stdin when it looks for more input - read_input_mock = mock.MagicMock(name='read_input', side_effect=['quotes', '" now closed;']) - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_command_mock = mock.MagicMock(name='_read_command_line', side_effect=['quotes', '" now closed;']) + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) line = 'orate hi "partially open' statement = multiline_app._complete_statement(line) @@ -1786,11 +1752,8 @@ def test_multiline_complete_statement_with_unclosed_quotes(multiline_app, monkey def test_multiline_input_line_to_statement(multiline_app, monkeypatch) -> None: # Verify _input_line_to_statement saves the fully entered input line for multiline commands - - # Mock out the input call so we don't actually wait for a user's response - # on stdin when it looks for more input - read_input_mock = mock.MagicMock(name='read_input', side_effect=['person', '\n']) - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_command_mock = mock.MagicMock(name='_read_command_line', side_effect=['person', '\n']) + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) line = 'orate hi' statement = multiline_app._input_line_to_statement(line) @@ -1802,8 +1765,8 @@ def test_multiline_input_line_to_statement(multiline_app, monkeypatch) -> None: def test_multiline_history_added(multiline_app, monkeypatch) -> None: # Test that multiline commands are added to history as a single item - read_input_mock = mock.MagicMock(name='read_input', side_effect=['person', '\n']) - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_command_mock = mock.MagicMock(name='_read_command_line', side_effect=['person', '\n']) + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) multiline_app.history.clear() @@ -1816,8 +1779,8 @@ def test_multiline_history_added(multiline_app, monkeypatch) -> None: def test_multiline_history_with_quotes(multiline_app, monkeypatch) -> None: # Test combined multiline command with quotes is added to history correctly - read_input_mock = mock.MagicMock(name='read_input', side_effect=[' and spaces ', ' "', ' in', 'quotes.', ';']) - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_command_mock = mock.MagicMock(name='_read_command_line', side_effect=[' and spaces ', ' "', ' in', 'quotes.', ';']) + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) multiline_app.history.clear() @@ -1840,9 +1803,8 @@ def test_multiline_complete_statement_eof(multiline_app, monkeypatch): poutput_mock = mock.MagicMock(name='poutput') monkeypatch.setattr(multiline_app, 'poutput', poutput_mock) - # Mock out the read_input call so we return EOFError - read_input_mock = mock.MagicMock(name='read_input', side_effect=EOFError) - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_raw_mock = mock.MagicMock(name='_read_raw_input', side_effect=EOFError) + monkeypatch.setattr("cmd2.Cmd._read_raw_input", read_raw_mock) command = 'orate' args = 'hello world' @@ -1938,130 +1900,125 @@ def test_echo(capsys) -> None: assert out.startswith(f'{app.prompt}{commands[0]}\nUsage: history') -def test_read_input_rawinput_true(capsys, monkeypatch) -> None: - prompt_str = 'the_prompt' - input_str = 'some input' +def test_read_raw_input_tty(base_app: cmd2.Cmd) -> None: + with create_pipe_input() as pipe_input: + base_app.session = PromptSession( + input=pipe_input, + output=DummyOutput(), + history=base_app.session.history, + completer=base_app.session.completer, + ) + pipe_input.send_text("foo\n") + + result = base_app._read_raw_input("prompt> ", base_app.session, DummyCompleter()) + assert result == "foo" - app = cmd2.Cmd() - app.use_rawinput = True - # Mock PromptSession.prompt (used when isatty=False) - # Also mock patch_stdout to prevent it from attempting to access the Windows console buffer in a Windows test environment - with ( - mock.patch('cmd2.cmd2.PromptSession.prompt', return_value=input_str), - mock.patch('cmd2.cmd2.patch_stdout'), - ): - # isatty is True - with mock.patch('sys.stdin.isatty', mock.MagicMock(name='isatty', return_value=True)): - line = app.read_input(prompt_str) - assert line == input_str - - # Run custom history code - custom_history = ['cmd1', 'cmd2'] - line = app.read_input(prompt_str, history=custom_history, completion_mode=cmd2.CompletionMode.NONE) - assert line == input_str - - # Run all completion modes - line = app.read_input(prompt_str, completion_mode=cmd2.CompletionMode.NONE) - assert line == input_str - - line = app.read_input(prompt_str, completion_mode=cmd2.CompletionMode.COMMANDS) - assert line == input_str - - # custom choices - custom_choices = ['choice1', 'choice2'] - line = app.read_input(prompt_str, completion_mode=cmd2.CompletionMode.CUSTOM, choices=custom_choices) - assert line == input_str - - # custom choices_provider - line = app.read_input( - prompt_str, completion_mode=cmd2.CompletionMode.CUSTOM, choices_provider=cmd2.Cmd.get_all_commands - ) - assert line == input_str - - # custom completer - line = app.read_input(prompt_str, completion_mode=cmd2.CompletionMode.CUSTOM, completer=cmd2.Cmd.path_complete) - assert line == input_str - - # custom parser - line = app.read_input(prompt_str, completion_mode=cmd2.CompletionMode.CUSTOM, parser=cmd2.Cmd2ArgumentParser()) - assert line == input_str - - # isatty is False - with mock.patch('sys.stdin.isatty', mock.MagicMock(name='isatty', return_value=False)): - # echo True - app.echo = True - line = app.read_input(prompt_str) - out, _err = capsys.readouterr() - assert line == input_str - assert out == f"{prompt_str}{input_str}\n" - - # echo False - app.echo = False - line = app.read_input(prompt_str) - out, _err = capsys.readouterr() - assert line == input_str - assert not out - - -def test_read_input_rawinput_false(capsys, monkeypatch) -> None: - prompt_str = 'the_prompt' - input_str = 'some input' - - def make_app(isatty: bool, empty_input: bool = False): - """Make a cmd2 app with a custom stdin""" - app_input_str = '' if empty_input else input_str - - fakein = io.StringIO(f'{app_input_str}') - fakein.isatty = mock.MagicMock(name='isatty', return_value=isatty) - - new_app = cmd2.Cmd(stdin=fakein) - new_app.use_rawinput = False - return new_app - - def mock_pt_prompt(message='', **kwargs): - # Emulate prompt printing for isatty=True case - if message: - print(message, end='') - return input_str - - # isatty True - app = make_app(isatty=True) - with mock.patch('cmd2.cmd2.PromptSession.prompt', side_effect=mock_pt_prompt): - line = app.read_input(prompt_str) - out, _err = capsys.readouterr() - assert line == input_str - assert out == prompt_str +def test_read_raw_input_pipe() -> None: + app = cmd2.Cmd(stdin=io.StringIO("input from pipe\n")) + result = app._read_raw_input("prompt> ", app.session, DummyCompleter()) + assert result == "input from pipe" - # isatty True, empty input - app = make_app(isatty=True, empty_input=True) - with mock.patch('cmd2.cmd2.PromptSession.prompt', return_value=''), pytest.raises(EOFError): - app.read_input(prompt_str) - out, _err = capsys.readouterr() - # isatty is False, echo is True - app = make_app(isatty=False) +def test_read_raw_input_pipe_echo(capsys) -> None: + app = cmd2.Cmd(stdin=io.StringIO("input from pipe\n")) app.echo = True - with mock.patch('cmd2.cmd2.PromptSession.prompt', return_value=input_str): - line = app.read_input(prompt_str) - out, _err = capsys.readouterr() - assert line == input_str - assert out == f"{prompt_str}{input_str}\n" - - # isatty is False, echo is False - app = make_app(isatty=False) - app.echo = False - with mock.patch('cmd2.cmd2.PromptSession.prompt', return_value=input_str): - line = app.read_input(prompt_str) - out, _err = capsys.readouterr() - assert line == input_str - assert not out + result = app._read_raw_input("prompt> ", app.session, DummyCompleter()) + assert result == "input from pipe" - # isatty is False, empty input - app = make_app(isatty=False, empty_input=True) - with mock.patch('cmd2.cmd2.PromptSession.prompt', return_value=''), pytest.raises(EOFError): - app.read_input(prompt_str) - out, _err = capsys.readouterr() + captured = capsys.readouterr() + assert "prompt> input from pipe" in captured.out + + +def test_read_raw_input_eof() -> None: + app = cmd2.Cmd(stdin=io.StringIO("")) + result = app._read_raw_input("prompt> ", app.session, DummyCompleter()) + assert result == constants.EOF + + +def test_resolve_completer_none(base_app: cmd2.Cmd) -> None: + completer = base_app._resolve_completer() + assert isinstance(completer, DummyCompleter) + + +def test_resolve_completer_with_choices(base_app: cmd2.Cmd) -> None: + from cmd2.pt_utils import Cmd2Completer + + choices = ['apple', 'banana', 'cherry'] + completer = base_app._resolve_completer(choices=choices) + assert isinstance(completer, Cmd2Completer) + + # Verify contents + settings = completer.custom_settings + assert settings is not None + + action = settings.parser._actions[-1] + assert action.choices == choices + assert not settings.preserve_quotes + + +def test_resolve_completer_with_choices_provider(base_app: cmd2.Cmd) -> None: + from cmd2.pt_utils import Cmd2Completer + + mock_provider = mock.MagicMock() + completer = base_app._resolve_completer(choices_provider=mock_provider) + assert isinstance(completer, Cmd2Completer) + + # Verify contents + settings = completer.custom_settings + assert settings is not None + + action = settings.parser._actions[-1] + assert action.get_choices_callable().choices_provider == mock_provider + assert not settings.preserve_quotes + + +def test_resolve_completer_with_completer(base_app: cmd2.Cmd) -> None: + """Verify that providing choices creates a Cmd2Completer with a generated parser.""" + from cmd2.pt_utils import Cmd2Completer + + mock_completer = mock.MagicMock() + completer = base_app._resolve_completer(completer=mock_completer) + assert isinstance(completer, Cmd2Completer) + + # Verify contents + settings = completer.custom_settings + assert settings is not None + + action = settings.parser._actions[-1] + assert action.get_choices_callable().completer == mock_completer + assert not settings.preserve_quotes + + +def test_resolve_completer_with_parser(base_app: cmd2.Cmd) -> None: + from cmd2.pt_utils import Cmd2Completer + + mock_parser = mock.MagicMock() + completer = base_app._resolve_completer(parser=mock_parser) + assert isinstance(completer, Cmd2Completer) + + # Verify contents + settings = completer.custom_settings + assert settings is not None + + assert settings.parser == mock_parser + assert not settings.preserve_quotes + + +def test_resolve_completer_with_bad_input(base_app: cmd2.Cmd) -> None: + mock_provider = mock.MagicMock() + mock_completer = mock.MagicMock() + mock_parser = mock.MagicMock() + + with pytest.raises(ValueError) as excinfo: # noqa: PT011 + base_app._resolve_completer( + choices=[], + choices_provider=mock_provider, + completer=mock_completer, + parser=mock_parser, + ) + + assert "None of the following parameters can be used alongside a parser" in str(excinfo.value) def test_custom_stdout() -> None: @@ -2080,11 +2037,11 @@ def test_custom_stdout() -> None: def test_read_command_line_eof(base_app, monkeypatch) -> None: - read_input_mock = mock.MagicMock(name='read_input', side_effect=EOFError) - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_raw_mock = mock.MagicMock(name='_read_raw_input', side_effect=EOFError) + monkeypatch.setattr("cmd2.Cmd._read_raw_input", read_raw_mock) line = base_app._read_command_line("Prompt> ") - assert line == 'eof' + assert line == constants.EOF def test_poutput_string(outsim_app) -> None: @@ -2354,28 +2311,6 @@ def test_get_settable_choices(base_app: cmd2.Cmd) -> None: assert ru.rich_text_to_string(cur_choice.table_row[1]) == cur_settable.description -def test_completion_supported(base_app) -> None: - # use_rawinput is True and completekey is non-empty -> True - base_app.use_rawinput = True - base_app.completekey = 'tab' - assert base_app._completion_supported() is True - - # use_rawinput is False and completekey is non-empty -> False - base_app.use_rawinput = False - base_app.completekey = 'tab' - assert base_app._completion_supported() is False - - # use_rawinput is True and completekey is empty -> False - base_app.use_rawinput = True - base_app.completekey = '' - assert base_app._completion_supported() is False - - # use_rawinput is False and completekey is empty -> False - base_app.use_rawinput = False - base_app.completekey = '' - assert base_app._completion_supported() is False - - def test_alias_no_subcommand(base_app) -> None: _out, err = run_cmd(base_app, 'alias') assert "Usage: alias [-h]" in err[0] @@ -3044,7 +2979,7 @@ def test_get_all_commands(base_app) -> None: '_relative_run_script', 'alias', 'edit', - 'eof', + constants.EOF, 'help', 'history', 'ipy', @@ -3124,11 +3059,9 @@ def exit_code_repl(): def test_exit_code_default(exit_code_repl, monkeypatch) -> None: app = exit_code_repl - app.use_rawinput = True - # Mock out the input call so we don't actually wait for a user's response on stdin - read_input_mock = mock.MagicMock(name='read_input', return_value='exit') - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_command_mock = mock.MagicMock(name='_read_command_line', return_value='exit') + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) expected = 'exiting with code: 0\n' @@ -3140,11 +3073,9 @@ def test_exit_code_default(exit_code_repl, monkeypatch) -> None: def test_exit_code_nonzero(exit_code_repl, monkeypatch) -> None: app = exit_code_repl - app.use_rawinput = True - # Mock out the input call so we don't actually wait for a user's response on stdin - read_input_mock = mock.MagicMock(name='read_input', return_value='exit 23') - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + read_input_mock = mock.MagicMock(name='_read_command_line', return_value='exit 23') + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_input_mock) expected = 'exiting with code: 23\n' @@ -3567,8 +3498,7 @@ def test_custom_completekey(): assert app.completekey == '?' -def test_prompt_session_init_exception(monkeypatch): - from prompt_toolkit.shortcuts import PromptSession +def test_build_session_exception(monkeypatch): # Mock PromptSession to raise ValueError on first call, then succeed valid_session_mock = mock.MagicMock(spec=PromptSession) @@ -3576,10 +3506,8 @@ def test_prompt_session_init_exception(monkeypatch): monkeypatch.setattr("cmd2.cmd2.PromptSession", mock_session) cmd2.Cmd() - # Check that fallback to DummyInput/Output happened - from prompt_toolkit.input import DummyInput - from prompt_toolkit.output import DummyOutput + # Check that fallback to DummyInput/Output happened assert mock_session.call_count == 2 # Check args of second call call_args = mock_session.call_args_list[1] @@ -3588,16 +3516,21 @@ def test_prompt_session_init_exception(monkeypatch): assert isinstance(kwargs['output'], DummyOutput) +@pytest.mark.skipif( + not sys.platform.startswith('win'), + reason="This tests how app.pager is set when running on Windows.", +) def test_pager_on_windows(monkeypatch): - monkeypatch.setattr("sys.platform", "win32") app = cmd2.Cmd() assert app.pager == 'more' assert app.pager_chop == 'more' +@pytest.mark.skipif( + not sys.platform.startswith('win'), + reason="This tests how Cmd._complete_users() behaves on Windows.", +) def test_path_complete_users_windows(monkeypatch, base_app): - monkeypatch.setattr("sys.platform", "win32") - # Mock os.path.expanduser and isdir monkeypatch.setattr("os.path.expanduser", lambda p: '/home/user' if p == '~user' else p) monkeypatch.setattr("os.path.isdir", lambda p: p == '/home/user') @@ -3609,68 +3542,6 @@ def test_path_complete_users_windows(monkeypatch, base_app): assert expected in matches -def test_async_alert_success(base_app): - import threading - - success = [] - - # Mock loop and app - mock_loop = mock.MagicMock() - mock_app = mock.MagicMock() - mock_app.loop = mock_loop - # Mocking base_app.session which is a PromptSession. - # PromptSession does not expose .app directly in types but it has .app at runtime. - # However in tests base_app.session might be PromptSession(input=DummyInput(), ...) - base_app.session.app = mock_app - - # Pretend we are at the prompt - base_app._in_prompt = True - - def run_alert(): - base_app.async_alert("Alert Message", new_prompt="(New) ") - success.append(True) - - t = threading.Thread(target=run_alert) - t.start() - t.join() - - assert success - - # Verify callback scheduled - mock_loop.call_soon_threadsafe.assert_called_once() - - # Verify functionality of the callback - callback = mock_loop.call_soon_threadsafe.call_args[0][0] - - with mock.patch('builtins.print') as mock_print: - callback() - mock_print.assert_called_with("Alert Message") - assert base_app.prompt == "(New) " - mock_app.invalidate.assert_called_once() - - -def test_async_alert_not_at_prompt(base_app): - import threading - - # Ensure we are NOT at prompt - base_app._in_prompt = False - - exceptions = [] - - def run_alert(): - try: - base_app.async_alert("fail") - except RuntimeError as e: - exceptions.append(e) - - t = threading.Thread(target=run_alert) - t.start() - t.join() - - assert len(exceptions) == 1 - assert "Main thread is not at the prompt" in str(exceptions[0]) - - def test_get_bottom_toolbar(base_app, monkeypatch): # Test default (disabled) assert base_app.get_bottom_toolbar() is None @@ -3701,9 +3572,9 @@ def test_get_rprompt(base_app): def test_multiline_complete_statement_keyboard_interrupt(multiline_app, monkeypatch): - # Mock read_input to raise KeyboardInterrupt - read_input_mock = mock.MagicMock(name='read_input', side_effect=KeyboardInterrupt) - monkeypatch.setattr("cmd2.Cmd.read_input", read_input_mock) + # Mock _read_command_line to raise KeyboardInterrupt + read_command_mock = mock.MagicMock(name='_read_command_line', side_effect=KeyboardInterrupt) + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) # Mock poutput to verify ^C is printed poutput_mock = mock.MagicMock(name='poutput') @@ -3715,9 +3586,7 @@ def test_multiline_complete_statement_keyboard_interrupt(multiline_app, monkeypa poutput_mock.assert_called_with('^C') -def test_prompt_session_init_no_console_error(monkeypatch): - from prompt_toolkit.shortcuts import PromptSession - +def test_build_session_no_console_error(monkeypatch): from cmd2.cmd2 import NoConsoleScreenBufferError # Mock PromptSession to raise NoConsoleScreenBufferError on first call, then succeed @@ -3728,10 +3597,8 @@ def test_prompt_session_init_no_console_error(monkeypatch): cmd2.Cmd() # Check that fallback to DummyInput/Output happened - from prompt_toolkit.input import DummyInput - from prompt_toolkit.output import DummyOutput - assert mock_session.call_count == 2 + # Check args of second call call_args = mock_session.call_args_list[1] kwargs = call_args[1] @@ -3747,65 +3614,52 @@ def test_no_console_screen_buffer_error_dummy(): assert isinstance(err, Exception) -def test_read_input_dynamic_prompt(base_app, monkeypatch): - """Test that read_input uses a dynamic prompt when provided prompt matches app.prompt""" - input_str = 'some input' - base_app.use_rawinput = True +def test_read_command_line_dynamic_prompt(base_app: cmd2.Cmd) -> None: + """Test that _read_command_line uses a dynamic prompt when provided prompt matches app.prompt""" - # Mock PromptSession.prompt - # Also mock patch_stdout to prevent it from attempting to access the Windows console buffer in a Windows test environment - with ( - mock.patch('cmd2.cmd2.PromptSession.prompt', return_value=input_str) as mock_prompt, - mock.patch('cmd2.cmd2.patch_stdout'), - mock.patch('sys.stdin.isatty', mock.MagicMock(name='isatty', return_value=True)), - ): - # Call with exact app prompt - line = base_app.read_input(base_app.prompt) - assert line == input_str + # Set input to something other than DummyInput so _read_raw_input() will go down the TTY route. + mock_session = mock.MagicMock() + mock_session.input = mock.MagicMock() + base_app.session = mock_session + base_app._read_command_line(base_app.prompt) - # Check that mock_prompt was called with a callable for the prompt - # args[0] should be the prompt_to_use - args, _ = mock_prompt.call_args - prompt_arg = args[0] - assert callable(prompt_arg) + # Check that mock_prompt was called with a callable for the prompt + # args[0] should be the prompt_to_use + args, _ = mock_session.prompt.call_args + prompt_arg = args[0] + assert callable(prompt_arg) - # Verify the callable returns the expected ANSI formatted prompt - from prompt_toolkit.formatted_text import ANSI + # Verify the callable returns the expected ANSI formatted prompt + from prompt_toolkit.formatted_text import ANSI - result = prompt_arg() - assert isinstance(result, ANSI) - assert result.value == ANSI(base_app.prompt).value + result = prompt_arg() + assert isinstance(result, ANSI) + assert result.value == ANSI(base_app.prompt).value -def test_read_input_dynamic_prompt_with_history(base_app, monkeypatch): - """Test that read_input uses a dynamic prompt when provided prompt matches app.prompt and history is provided""" - input_str = 'some input' - base_app.use_rawinput = True - custom_history = ['cmd1', 'cmd2'] +def test_read_input_history_isolation(base_app: cmd2.Cmd) -> None: + local_history = ["secret_command", "another_command"] - # Mock PromptSession.prompt - # Also mock patch_stdout to prevent it from attempting to access the Windows console buffer in a Windows test environment - with ( - mock.patch('cmd2.cmd2.PromptSession.prompt', return_value=input_str) as mock_prompt, - mock.patch('cmd2.cmd2.patch_stdout'), - mock.patch('sys.stdin.isatty', mock.MagicMock(name='isatty', return_value=True)), - ): - # Call with exact app prompt and history - line = base_app.read_input(base_app.prompt, history=custom_history) - assert line == input_str + # Mock _read_raw_input to prevent actual blocking + # We want to inspect the session object passed to it + with mock.patch.object(base_app, '_read_raw_input') as mock_raw: + mock_raw.return_value = "user_input" + + base_app.read_input("prompt> ", history=local_history) - # Check that mock_prompt was called with a callable for the prompt - # args[0] should be the prompt_to_use - args, _ = mock_prompt.call_args - prompt_arg = args[0] - assert callable(prompt_arg) + # Inspect the session used in the call + args, _ = mock_raw.call_args + passed_session = args[1] - # Verify the callable returns the expected ANSI formatted prompt - from prompt_toolkit.formatted_text import ANSI + # Verify the session's history is an InMemoryHistory containing our list + loaded_history = list(passed_session.history.load_history_strings()) + assert "secret_command" in loaded_history + assert "another_command" in loaded_history - result = prompt_arg() - assert isinstance(result, ANSI) - assert result.value == ANSI(base_app.prompt).value + # Verify the main app session was not touched + # This is the crucial check for isolation + main_history = base_app.session.history.get_strings() + assert "secret_command" not in main_history @pytest.mark.skipif( @@ -3816,11 +3670,7 @@ def test_pre_prompt_running_loop(base_app): # Test that pre_prompt runs with a running event loop. import asyncio - from prompt_toolkit.input import create_pipe_input - from prompt_toolkit.output import DummyOutput - from prompt_toolkit.shortcuts import PromptSession - - # Setup pipe input to feed data to prompt_toolkit + # Set up pipe input to feed data to prompt_toolkit with create_pipe_input() as pipe_input: # Create a new session with our pipe input because the input property is read-only base_app.session = PromptSession( @@ -3844,11 +3694,8 @@ def my_pre_prompt(): # Feed input to exit prompt immediately pipe_input.send_text("foo\n") - # Enable raw input and mock isatty to ensure self.session.prompt is used - base_app.use_rawinput = True - with mock.patch('sys.stdin.isatty', return_value=True): - # patch_stdout is used in this branch. It should work with DummyOutput/PipeInput. - base_app.read_input("prompt> ") + # Ensure self.session.prompt is used + base_app._read_command_line("prompt> ") assert loop_check['running'] @@ -3872,33 +3719,6 @@ def test_get_bottom_toolbar_narrow_terminal(base_app, monkeypatch): assert toolbar[1] == ('', ' ') -def test_async_alert_loop_not_available(base_app): - import threading - - # Mock app but without loop attribute - mock_app = mock.MagicMock(spec=['is_running', 'invalidate']) - mock_app.is_running = True - base_app.session.app = mock_app - - # Pretend we are at the prompt - base_app._in_prompt = True - - exceptions = [] - - def run_alert(): - try: - base_app.async_alert("fail") - except RuntimeError as e: - exceptions.append(e) - - t = threading.Thread(target=run_alert) - t.start() - t.join() - - assert len(exceptions) == 1 - assert "Event loop not available" in str(exceptions[0]) - - def test_auto_suggest_true(): """Test that auto_suggest=True initializes AutoSuggestFromHistory.""" app = cmd2.Cmd(auto_suggest=True) From 6c36faf06cee310362026ccb60577369708c292c Mon Sep 17 00:00:00 2001 From: Todd Leonhardt Date: Mon, 23 Feb 2026 21:24:23 -0500 Subject: [PATCH 05/22] Try setting shell to cmd.exe on Windows but bash on Linux or Mac for GitHub Actions --- .github/workflows/tests.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5866b0281..26b726fcd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,7 +18,9 @@ jobs: runs-on: ${{ matrix.os }} defaults: run: - shell: bash + # Use a conditional expression to set the default shell + # 'cmd' on Windows, 'bash' on other platforms (Linux, macOS) + shell: ${{ runner.os == 'Windows' && 'cmd' || 'bash' }} steps: - name: Check out uses: actions/checkout@v6.0.2 From 7a52b454b1ffab384d11203fcf47a231e5a05221 Mon Sep 17 00:00:00 2001 From: Todd Leonhardt Date: Mon, 23 Feb 2026 21:28:43 -0500 Subject: [PATCH 06/22] Try to fix GitHub Actions --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 26b726fcd..96341438a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -20,7 +20,7 @@ jobs: run: # Use a conditional expression to set the default shell # 'cmd' on Windows, 'bash' on other platforms (Linux, macOS) - shell: ${{ runner.os == 'Windows' && 'cmd' || 'bash' }} + shell: ${{ matrix.os == 'windows-latest' && 'cmd' || 'bash' }} steps: - name: Check out uses: actions/checkout@v6.0.2 From 0ce93bf8edaed36de5a2ff4f4e6534f731e0efa0 Mon Sep 17 00:00:00 2001 From: Todd Leonhardt Date: Mon, 23 Feb 2026 21:36:03 -0500 Subject: [PATCH 07/22] Try running windows tests using winpty --- .github/workflows/tests.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 96341438a..66db67b11 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -38,6 +38,13 @@ jobs: run: uv sync --all-extras --dev - name: Run tests + if: runner.os == 'Windows' + run: + uv run winpty python -Xutf8 -m pytest --cov --cov-config=pyproject.toml --cov-report=xml + tests + + - name: Run tests + if: runner.os != 'Windows' run: uv run python -Xutf8 -m pytest --cov --cov-config=pyproject.toml --cov-report=xml tests - name: Upload test results to Codecov From 76f13b48248f6bf461dc29989a192f8a40caef91 Mon Sep 17 00:00:00 2001 From: Todd Leonhardt Date: Mon, 23 Feb 2026 21:43:19 -0500 Subject: [PATCH 08/22] Try to fix running windows tests using winpty --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 66db67b11..2baa5fda3 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -40,7 +40,7 @@ jobs: - name: Run tests if: runner.os == 'Windows' run: - uv run winpty python -Xutf8 -m pytest --cov --cov-config=pyproject.toml --cov-report=xml + winpty uv run python -Xutf8 -m pytest --cov --cov-config=pyproject.toml --cov-report=xml tests - name: Run tests From afdafda44afd80ab7e06f92615c6b4c355fc505e Mon Sep 17 00:00:00 2001 From: Todd Leonhardt Date: Mon, 23 Feb 2026 21:53:19 -0500 Subject: [PATCH 09/22] Try using powershell on Windows tests --- .github/workflows/tests.yml | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2baa5fda3..eb813fdfc 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,9 +18,7 @@ jobs: runs-on: ${{ matrix.os }} defaults: run: - # Use a conditional expression to set the default shell - # 'cmd' on Windows, 'bash' on other platforms (Linux, macOS) - shell: ${{ matrix.os == 'windows-latest' && 'cmd' || 'bash' }} + shell: bash steps: - name: Check out uses: actions/checkout@v6.0.2 @@ -39,9 +37,8 @@ jobs: - name: Run tests if: runner.os == 'Windows' - run: - winpty uv run python -Xutf8 -m pytest --cov --cov-config=pyproject.toml --cov-report=xml - tests + shell: powershell + run: uv run python -Xutf8 -m pytest --cov --cov-config=pyproject.toml --cov-report=xml tests - name: Run tests if: runner.os != 'Windows' From 73a6d9ce0458c14844d976b0c8b4ea30f5229542 Mon Sep 17 00:00:00 2001 From: Todd Leonhardt Date: Mon, 23 Feb 2026 21:56:16 -0500 Subject: [PATCH 10/22] Try setting windows test shell to cmd --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index eb813fdfc..462d0edc1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -37,7 +37,7 @@ jobs: - name: Run tests if: runner.os == 'Windows' - shell: powershell + shell: cmd run: uv run python -Xutf8 -m pytest --cov --cov-config=pyproject.toml --cov-report=xml tests - name: Run tests From db34ca97ee7c48ccc6d7a55380520579b75f4730 Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Mon, 23 Feb 2026 22:09:29 -0500 Subject: [PATCH 11/22] Moved call to patch_stdout() right before creating the prompt. --- cmd2/cmd2.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 3e08c3622..6f075866a 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -3200,7 +3200,8 @@ def _read_raw_input( """Read input from either an interactive terminal session or a redirected stream.""" # _build_session() sets session.input to a DummyInput when not in a TTY. if not isinstance(session.input, DummyInput): - return session.prompt(prompt, completer=completer, **prompt_kwargs) # type: ignore[arg-type] + with patch_stdout(): + return session.prompt(prompt, completer=completer, **prompt_kwargs) # type: ignore[arg-type] # We're not at a terminal, so we're likely reading from a file or a pipe. # We wait for a line of data before we print anything. @@ -3311,13 +3312,12 @@ def get_prompt() -> ANSI | str: if prompt == self.prompt: prompt_to_use = get_prompt - with patch_stdout(): - return self._read_raw_input( - prompt=prompt_to_use, - session=self.session, - completer=self.completer, - pre_run=self.pre_prompt, - ) + return self._read_raw_input( + prompt=prompt_to_use, + session=self.session, + completer=self.completer, + pre_run=self.pre_prompt, + ) except EOFError: return constants.EOF From 24dfb2bb9693c13ee256c2971979574d1f1f726f Mon Sep 17 00:00:00 2001 From: Todd Leonhardt Date: Mon, 23 Feb 2026 22:13:55 -0500 Subject: [PATCH 12/22] Put tests.yml back to the way it was originally --- .github/workflows/tests.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 462d0edc1..5866b0281 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -36,12 +36,6 @@ jobs: run: uv sync --all-extras --dev - name: Run tests - if: runner.os == 'Windows' - shell: cmd - run: uv run python -Xutf8 -m pytest --cov --cov-config=pyproject.toml --cov-report=xml tests - - - name: Run tests - if: runner.os != 'Windows' run: uv run python -Xutf8 -m pytest --cov --cov-config=pyproject.toml --cov-report=xml tests - name: Upload test results to Codecov From 38b4e62d12aa492fd6417ad166ad3cc03ae03a7c Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Mon, 23 Feb 2026 23:54:55 -0500 Subject: [PATCH 13/22] Only wrap self.stdin and self.stdout if they differ from sys equivalents. --- cmd2/cmd2.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 6f075866a..db70e8b91 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -645,8 +645,10 @@ def _(event: Any) -> None: # pragma: no cover if self.stdin.isatty(): try: - kwargs["input"] = create_input(stdin=self.stdin) - kwargs["output"] = create_output(stdout=self.stdout) + if self.stdin != sys.stdin: + kwargs["input"] = create_input(stdin=self.stdin) + if self.stdout != sys.stdout: + kwargs["output"] = create_output(stdout=self.stdout) return PromptSession(**kwargs) except (NoConsoleScreenBufferError, AttributeError, ValueError): From a39b929fc58871995cb4718479373f175693a31f Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Tue, 24 Feb 2026 00:13:59 -0500 Subject: [PATCH 14/22] Fixing tests on Windows. --- tests/test_cmd2.py | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index 428a48f21..8031a030f 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -1900,6 +1900,10 @@ def test_echo(capsys) -> None: assert out.startswith(f'{app.prompt}{commands[0]}\nUsage: history') +@pytest.mark.skipif( + sys.platform.startswith('win'), + reason="Don't have a real Windows console with how we are currently running tests in GitHub Actions", +) def test_read_raw_input_tty(base_app: cmd2.Cmd) -> None: with create_pipe_input() as pipe_input: base_app.session = PromptSession( @@ -3617,24 +3621,27 @@ def test_no_console_screen_buffer_error_dummy(): def test_read_command_line_dynamic_prompt(base_app: cmd2.Cmd) -> None: """Test that _read_command_line uses a dynamic prompt when provided prompt matches app.prompt""" - # Set input to something other than DummyInput so _read_raw_input() will go down the TTY route. - mock_session = mock.MagicMock() - mock_session.input = mock.MagicMock() - base_app.session = mock_session - base_app._read_command_line(base_app.prompt) - - # Check that mock_prompt was called with a callable for the prompt - # args[0] should be the prompt_to_use - args, _ = mock_session.prompt.call_args - prompt_arg = args[0] - assert callable(prompt_arg) - - # Verify the callable returns the expected ANSI formatted prompt - from prompt_toolkit.formatted_text import ANSI - - result = prompt_arg() - assert isinstance(result, ANSI) - assert result.value == ANSI(base_app.prompt).value + # Mock patch_stdout to prevent it from attempting to access the Windows + # console buffer in a Windows test environment. + with mock.patch('prompt_toolkit.patch_stdout.patch_stdout', return_value=mock.MagicMock()): + # Set input to something other than DummyInput so _read_raw_input() + # will go down the TTY route. + mock_session = mock.MagicMock() + mock_session.input = mock.MagicMock() + base_app.session = mock_session + base_app._read_command_line(base_app.prompt) + + # Check that mock_prompt was called with a callable for the prompt + args, _ = mock_session.prompt.call_args + prompt_arg = args[0] + assert callable(prompt_arg) + + # Verify the callable returns the expected ANSI formatted prompt + from prompt_toolkit.formatted_text import ANSI + + result = prompt_arg() + assert isinstance(result, ANSI) + assert result.value == ANSI(base_app.prompt).value def test_read_input_history_isolation(base_app: cmd2.Cmd) -> None: From 11cdbda241b02ae458f1b93a5a0dd4bc35bf6be0 Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Tue, 24 Feb 2026 00:21:52 -0500 Subject: [PATCH 15/22] Fixed mock patch. --- tests/test_cmd2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index 8031a030f..66356ae90 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -3623,7 +3623,7 @@ def test_read_command_line_dynamic_prompt(base_app: cmd2.Cmd) -> None: # Mock patch_stdout to prevent it from attempting to access the Windows # console buffer in a Windows test environment. - with mock.patch('prompt_toolkit.patch_stdout.patch_stdout', return_value=mock.MagicMock()): + with mock.patch('cmd2.cmd2.patch_stdout'): # Set input to something other than DummyInput so _read_raw_input() # will go down the TTY route. mock_session = mock.MagicMock() From db30ea3e45a79502f4aeeeb9cf0464c11d7de2fe Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Tue, 24 Feb 2026 01:34:44 -0500 Subject: [PATCH 16/22] Added tests. --- cmd2/cmd2.py | 37 +++++++++++++++++++++++++++---------- tests/test_cmd2.py | 42 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 65 insertions(+), 14 deletions(-) diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index db70e8b91..6f5c7f05b 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -444,7 +444,7 @@ def __init__( if auto_suggest: self.auto_suggest = AutoSuggestFromHistory() - self.session = self._build_session() + self.session = self._init_session() # Commands to exclude from the history command self.exclude_from_history = [constants.EOF, 'history'] @@ -609,8 +609,8 @@ def __init__( # the current command being executed self.current_command: Statement | None = None - def _build_session(self) -> PromptSession[str]: - """Construct the primary PromptSession for the cmd2 application. + def _init_session(self) -> PromptSession[str]: + """Initialize and return the core PromptSession for the application. Builds an interactive session if stdin is a TTY. Otherwise, uses dummy drivers to support non-interactive streams like pipes or files. @@ -3197,13 +3197,25 @@ def _read_raw_input( prompt: Callable[[], ANSI | str] | ANSI | str, session: PromptSession[str], completer: Completer, - **prompt_kwargs: Any, # optional keyword args for session.prompt() + **prompt_kwargs: Any, ) -> str: - """Read input from either an interactive terminal session or a redirected stream.""" - # _build_session() sets session.input to a DummyInput when not in a TTY. + """Execute the low-level input read from either a terminal or a redirected stream. + + If the session is interactive (TTY), it uses `prompt_toolkit` to render a + rich UI with completion and `patch_stdout` protection. If non-interactive + (Pipe/File), it performs a direct line read from `stdin`. + + :param prompt: the prompt text or a callable that returns the prompt. + :param session: the PromptSession instance to use for reading. + :param completer: the completer to use for this specific input. + :param prompt_kwargs: additional arguments passed directly to session.prompt(). + :return: the stripped input string. + :raises EOFError: if the input stream is closed or the user signals EOF (e.g., Ctrl+D) + """ + # Check if the session is configured for interactive terminal use. if not isinstance(session.input, DummyInput): with patch_stdout(): - return session.prompt(prompt, completer=completer, **prompt_kwargs) # type: ignore[arg-type] + return session.prompt(prompt, completer=completer, **prompt_kwargs) # We're not at a terminal, so we're likely reading from a file or a pipe. # We wait for a line of data before we print anything. @@ -3211,7 +3223,7 @@ def _read_raw_input( # If the stream is empty, we've reached the end of the input. if not line: - return constants.EOF + raise EOFError # If echo is on, we want the output to look like a session transcript. # Print the prompt and the command before the results. @@ -3266,6 +3278,10 @@ def read_input( ) -> str: """Read a line of input with optional completion and history. + :param prompt: prompt to display to user + :param history: optional Sequence of strings to use for up-arrow history. The passed in history + will not be edited. It is the caller's responsibility to add the returned input + to history if desired. Defaults to None. :param preserve_quotes: if True, then quoted tokens will keep their quotes when processed by ArgparseCompleter. This is helpful in cases when you're completing flag-like tokens (e.g. -o, --option) and you don't want them to be @@ -3278,7 +3294,8 @@ def read_input( :param completer: completion function that provides choices for single argument :param parser: an argument parser which supports the completion of multiple arguments :return: the line read from stdin with all trailing new lines removed - :raises Exception: any exceptions raised by prompt() + :raises EOFError: if the input stream is closed or the user signals EOF (e.g., Ctrl+D) + :raises Exception: any other exceptions raised by prompt() """ completer_to_use = self._resolve_completer( preserve_quotes=preserve_quotes, @@ -3303,7 +3320,7 @@ def _read_command_line(self, prompt: str) -> str: :param prompt: prompt to display to user :return: command line text or 'eof' if an EOFError was caught - :raises Exception: whatever exceptions are raised by input() except for EOFError + :raises Exception: any exceptions raised by prompt(), except EOFError """ try: # Use dynamic prompt if the prompt matches self.prompt diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index 66356ae90..dc8ca3aff 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -1936,8 +1936,8 @@ def test_read_raw_input_pipe_echo(capsys) -> None: def test_read_raw_input_eof() -> None: app = cmd2.Cmd(stdin=io.StringIO("")) - result = app._read_raw_input("prompt> ", app.session, DummyCompleter()) - assert result == constants.EOF + with pytest.raises(EOFError): + app._read_raw_input("prompt> ", app.session, DummyCompleter()) def test_resolve_completer_none(base_app: cmd2.Cmd) -> None: @@ -3502,7 +3502,7 @@ def test_custom_completekey(): assert app.completekey == '?' -def test_build_session_exception(monkeypatch): +def test_init_session_exception(monkeypatch): # Mock PromptSession to raise ValueError on first call, then succeed valid_session_mock = mock.MagicMock(spec=PromptSession) @@ -3590,7 +3590,7 @@ def test_multiline_complete_statement_keyboard_interrupt(multiline_app, monkeypa poutput_mock.assert_called_with('^C') -def test_build_session_no_console_error(monkeypatch): +def test_init_session_no_console_error(monkeypatch): from cmd2.cmd2 import NoConsoleScreenBufferError # Mock PromptSession to raise NoConsoleScreenBufferError on first call, then succeed @@ -3610,6 +3610,40 @@ def test_build_session_no_console_error(monkeypatch): assert isinstance(kwargs['output'], DummyOutput) +def test_init_session_with_custom_tty() -> None: + # Create a mock stdin with says it's a TTY + custom_stdin = mock.MagicMock(spec=io.TextIOWrapper) + custom_stdin.isatty.return_value = True + assert custom_stdin is not sys.stdin + + # Create a mock stdout which is not sys.stdout + custom_stdout = mock.MagicMock(spec=io.TextIOWrapper) + assert custom_stdout is not sys.stdout + + # Check if the streams were wrapped + with ( + mock.patch('cmd2.cmd2.create_input') as mock_create_input, + mock.patch('cmd2.cmd2.create_output') as mock_create_output, + ): + app = cmd2.Cmd() + app.stdin = custom_stdin + app.stdout = custom_stdout + app._init_session() + + mock_create_input.assert_called_once_with(stdin=custom_stdin) + mock_create_output.assert_called_once_with(stdout=custom_stdout) + + +def test_init_session_non_interactive() -> None: + # Set up a mock for a non-TTY stream (like a pipe) + mock_stdin = mock.MagicMock(spec=io.TextIOWrapper) + mock_stdin.isatty.return_value = False + + app = cmd2.Cmd(stdin=mock_stdin) + assert isinstance(app.session.input, DummyInput) + assert isinstance(app.session.output, DummyOutput) + + def test_no_console_screen_buffer_error_dummy(): from cmd2.cmd2 import NoConsoleScreenBufferError From c488828375d458f94e58c57fd1b3cfe217a1c7ce Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Tue, 24 Feb 2026 01:48:28 -0500 Subject: [PATCH 17/22] Removed obsolete test code. --- tests/test_pt_utils.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/tests/test_pt_utils.py b/tests/test_pt_utils.py index 99d2f990f..0d0feb443 100644 --- a/tests/test_pt_utils.py +++ b/tests/test_pt_utils.py @@ -5,7 +5,6 @@ from unittest.mock import Mock import pytest -from prompt_toolkit.buffer import Buffer from prompt_toolkit.document import Document from prompt_toolkit.formatted_text import ( ANSI, @@ -23,21 +22,9 @@ from cmd2.parsing import Statement -class MockSession: - """Simulates a prompt_toolkit PromptSession.""" - - def __init__(self): - # Contains the CLI text and cursor position - self.buffer = Buffer() - - # Mock the app structure: session -> app -> current_buffer - self.app = Mock() - self.app.current_buffer = self.buffer - - # Mock for cmd2.Cmd class MockCmd: - def __init__(self): + def __init__(self) -> None: # Return empty completions by default self.complete = Mock(return_value=cmd2.Completions()) @@ -50,14 +37,13 @@ def __init__(self): self.aliases = {} self.macros = {} self.all_commands = [] - self.session = MockSession() - def get_all_commands(self): + def get_all_commands(self) -> list[str]: return self.all_commands @pytest.fixture -def mock_cmd_app(): +def mock_cmd_app() -> MockCmd: return MockCmd() From df1db8acbc7f80890ae4466a799663b2119b8bfb Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Tue, 24 Feb 2026 10:09:29 -0500 Subject: [PATCH 18/22] Added interactive pipe mode. --- cmd2/cmd2.py | 25 +++++++++++++++++++------ tests/test_cmd2.py | 32 +++++++++++++++++++++++++++----- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 6f5c7f05b..0a79dbe7c 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -392,6 +392,12 @@ def __init__( self.default_to_shell = False # Attempt to run unrecognized commands as shell commands self.allow_redirection = allow_redirection # Security setting to prevent redirection of stdout + # If True, cmd2 treats redirected input (pipes/files) as an interactive session. + # It will display the prompt before reading each line to synchronize with + # automation tools (like Pexpect) and will skip echoing the input to prevent + # duplicate prompts in the output. + self.interactive_pipe = False + # Attributes which ARE dynamically settable via the set command at runtime self.always_show_hint = False self.debug = False @@ -3218,18 +3224,25 @@ def _read_raw_input( return session.prompt(prompt, completer=completer, **prompt_kwargs) # We're not at a terminal, so we're likely reading from a file or a pipe. - # We wait for a line of data before we print anything. + prompt_obj = prompt() if callable(prompt) else prompt + prompt_str = prompt_obj.value if isinstance(prompt_obj, ANSI) else prompt_obj + + # If this is an interactive pipe, then display the prompt first + if self.interactive_pipe: + self.poutput(prompt_str, end='') + self.stdout.flush() + + # Wait for the next line of input line = self.stdin.readline() # If the stream is empty, we've reached the end of the input. if not line: raise EOFError - # If echo is on, we want the output to look like a session transcript. - # Print the prompt and the command before the results. - if self.echo: - prompt_obj = prompt() if callable(prompt) else prompt - prompt_str = prompt_obj.value if isinstance(prompt_obj, ANSI) else prompt_obj + # If not interactive and echo is on, we want the output to simulate a + # live session. Print the prompt and the command so they appear in the + # output stream before the results. + if not self.interactive_pipe and self.echo: end = "" if line.endswith('\n') else "\n" self.poutput(f'{prompt_str}{line}', end=end) diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index dc8ca3aff..f1b692af9 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -1918,20 +1918,42 @@ def test_read_raw_input_tty(base_app: cmd2.Cmd) -> None: assert result == "foo" -def test_read_raw_input_pipe() -> None: +def test_read_raw_input_interactive_pipe(capsys) -> None: + prompt = "prompt> " app = cmd2.Cmd(stdin=io.StringIO("input from pipe\n")) - result = app._read_raw_input("prompt> ", app.session, DummyCompleter()) + app.interactive_pipe = True + result = app._read_raw_input(prompt, app.session, DummyCompleter()) assert result == "input from pipe" + # In interactive mode, _read_raw_input() prints the prompt. + captured = capsys.readouterr() + assert captured.out == prompt + + +def test_read_raw_input_non_interactive_pipe_echo_off(capsys) -> None: + prompt = "prompt> " + app = cmd2.Cmd(stdin=io.StringIO("input from pipe\n")) + app.interactive_pipe = False + app.echo = False + result = app._read_raw_input(prompt, app.session, DummyCompleter()) + assert result == "input from pipe" + + # When not echoing in non-interactive mode, _read_raw_input() prints nothing. + captured = capsys.readouterr() + assert not captured.out + -def test_read_raw_input_pipe_echo(capsys) -> None: +def test_read_raw_input_non_interactive_pipe_echo_on(capsys) -> None: + prompt = "prompt> " app = cmd2.Cmd(stdin=io.StringIO("input from pipe\n")) + app.interactive_pipe = False app.echo = True - result = app._read_raw_input("prompt> ", app.session, DummyCompleter()) + result = app._read_raw_input(prompt, app.session, DummyCompleter()) assert result == "input from pipe" + # When echoing in non-interactive mode, _read_raw_input() prints the prompt and input text. captured = capsys.readouterr() - assert "prompt> input from pipe" in captured.out + assert f"{prompt}input from pipe\n" == captured.out def test_read_raw_input_eof() -> None: From 484930cb8c99b95a3a9fd15cd682f36f1518b03a Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Tue, 24 Feb 2026 11:01:47 -0500 Subject: [PATCH 19/22] Removed need for constants.EOF. --- cmd2/cmd2.py | 70 +++++++++++++++++++++------------------------- cmd2/constants.py | 1 - tests/test_cmd2.py | 29 ++++++++++++------- 3 files changed, 51 insertions(+), 49 deletions(-) diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 0a79dbe7c..da64d615d 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -433,7 +433,7 @@ def __init__( self.self_in_py = False # Commands to exclude from the help menu and completion - self.hidden_commands = [constants.EOF, '_relative_run_script'] + self.hidden_commands = ['_eof', '_relative_run_script'] # Initialize history from a persistent history file (if present) self.persistent_history_file = '' @@ -453,7 +453,7 @@ def __init__( self.session = self._init_session() # Commands to exclude from the history command - self.exclude_from_history = [constants.EOF, 'history'] + self.exclude_from_history = ['_eof', 'history'] # Dictionary of macro names and their values self.macros: dict[str, Macro] = {} @@ -2821,11 +2821,6 @@ def runcmds_plus_hooks( def _complete_statement(self, line: str) -> Statement: """Keep accepting lines of input until the command is complete. - There is some pretty hacky code here to handle some quirks of - self._read_command_line(). It returns a literal 'eof' if the input - pipe runs out. We can't refactor it because we need to retain - backwards compatibility with the standard library version of cmd. - :param line: the line being parsed :return: the completed Statement :raises Cmd2ShlexError: if a shlex error occurs (e.g. No closing quotation) @@ -2859,12 +2854,10 @@ def _complete_statement(self, line: str) -> Statement: self._multiline_in_progress = line + '\n' # Get next line of this command - nextline = self._read_command_line(self.continuation_prompt) - if nextline == constants.EOF: - # they entered either a blank line, or we hit an EOF - # for some other reason. Turn the literal 'eof' - # into a blank line, which serves as a command - # terminator + try: + nextline = self._read_command_line(self.continuation_prompt) + except EOFError: + # Add a blank line, which serves as a command terminator. nextline = '\n' self.poutput(nextline) @@ -3332,26 +3325,25 @@ def _read_command_line(self, prompt: str) -> str: """Read the next command line from the input stream. :param prompt: prompt to display to user - :return: command line text or 'eof' if an EOFError was caught - :raises Exception: any exceptions raised by prompt(), except EOFError + :return: the line read from stdin with all trailing new lines removed + :raises EOFError: if the input stream is closed or the user signals EOF (e.g., Ctrl+D) + :raises Exception: any other exceptions raised by prompt() """ - try: - # Use dynamic prompt if the prompt matches self.prompt - def get_prompt() -> ANSI | str: - return ANSI(self.prompt) - - prompt_to_use: Callable[[], ANSI | str] | ANSI | str = ANSI(prompt) - if prompt == self.prompt: - prompt_to_use = get_prompt - - return self._read_raw_input( - prompt=prompt_to_use, - session=self.session, - completer=self.completer, - pre_run=self.pre_prompt, - ) - except EOFError: - return constants.EOF + + # Use dynamic prompt if the prompt matches self.prompt + def get_prompt() -> ANSI | str: + return ANSI(self.prompt) + + prompt_to_use: Callable[[], ANSI | str] | ANSI | str = ANSI(prompt) + if prompt == self.prompt: + prompt_to_use = get_prompt + + return self._read_raw_input( + prompt=prompt_to_use, + session=self.session, + completer=self.completer, + pre_run=self.pre_prompt, + ) def _cmdloop(self) -> None: """Repeatedly issue a prompt, accept input, parse it, and dispatch to apporpriate commands. @@ -3373,6 +3365,8 @@ def _cmdloop(self) -> None: except KeyboardInterrupt: self.poutput('^C') line = '' + except EOFError: + line = "_eof" # Run the command along with all associated pre and post hooks stop = self.onecmd_plus_hooks(line) @@ -4193,17 +4187,17 @@ def do_shortcuts(self, _: argparse.Namespace) -> None: self.last_result = True @staticmethod - def _build_eof_parser() -> Cmd2ArgumentParser: - eof_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Called when Ctrl-D is pressed.") - eof_parser.epilog = eof_parser.create_text_group( + def _build__eof_parser() -> Cmd2ArgumentParser: + _eof_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Called when Ctrl-D is pressed.") + _eof_parser.epilog = _eof_parser.create_text_group( "Note", "This command is for internal use and is not intended to be called from the command line.", ) - return eof_parser + return _eof_parser - @with_argparser(_build_eof_parser) - def do_eof(self, _: argparse.Namespace) -> bool | None: + @with_argparser(_build__eof_parser) + def do__eof(self, _: argparse.Namespace) -> bool | None: """Quit with no arguments, called when Ctrl-D is pressed. This can be overridden if quit should be called differently. diff --git a/cmd2/constants.py b/cmd2/constants.py index 41aba11f9..75c60662c 100644 --- a/cmd2/constants.py +++ b/cmd2/constants.py @@ -4,7 +4,6 @@ # nothing here should be considered part of the public API of this module INFINITY = float('inf') -EOF = 'eof' # Used for command parsing, output redirection, completion, and word breaks. Do not change. QUOTES = ['"', "'"] diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index f1b692af9..f335c204f 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -1181,14 +1181,14 @@ def say_app(): def test_ctrl_c_at_prompt(say_app, monkeypatch) -> None: read_command_mock = mock.MagicMock(name='_read_command_line') - read_command_mock.side_effect = ['say hello', KeyboardInterrupt(), 'say goodbye', constants.EOF] + read_command_mock.side_effect = ['say hello', KeyboardInterrupt(), 'say goodbye', 'quit'] monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) say_app.cmdloop() # And verify the expected output to stdout out = say_app.stdout.getvalue() - assert out == 'hello\n^C\ngoodbye\n\n' + assert out == 'hello\n^C\ngoodbye\n' class ShellApp(cmd2.Cmd): @@ -1877,14 +1877,13 @@ def test_is_text_file_bad_input(base_app) -> None: utils.is_text_file('.') -def test_eof(base_app) -> None: - # Only thing to verify is that it returns True - assert base_app.do_eof('') - assert base_app.last_result is True +def test__eof(base_app) -> None: + base_app.do_quit = mock.MagicMock(return_value=True) + assert base_app.do__eof('') + base_app.do_quit.assert_called_once_with('') def test_quit(base_app) -> None: - # Only thing to verify is that it returns True assert base_app.do_quit('') assert base_app.last_result is True @@ -2063,11 +2062,21 @@ def test_custom_stdout() -> None: def test_read_command_line_eof(base_app, monkeypatch) -> None: + """Test that _read_command_line passes up EOFErrors.""" + read_raw_mock = mock.MagicMock(name='_read_raw_input', side_effect=EOFError) + monkeypatch.setattr("cmd2.Cmd._read_raw_input", read_raw_mock) + + with pytest.raises(EOFError): + base_app._read_command_line("Prompt> ") + + +def test_read_input_eof(base_app, monkeypatch) -> None: + """Test that read_input passes up EOFErrors.""" read_raw_mock = mock.MagicMock(name='_read_raw_input', side_effect=EOFError) monkeypatch.setattr("cmd2.Cmd._read_raw_input", read_raw_mock) - line = base_app._read_command_line("Prompt> ") - assert line == constants.EOF + with pytest.raises(EOFError): + base_app.read_input("Prompt> ") def test_poutput_string(outsim_app) -> None: @@ -3002,10 +3011,10 @@ def test_get_all_commands(base_app) -> None: # Verify that the base app has the expected commands commands = base_app.get_all_commands() expected_commands = [ + '_eof', '_relative_run_script', 'alias', 'edit', - constants.EOF, 'help', 'history', 'ipy', From 9edd09146bda56190c309ad562a29fb63ad437a7 Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Tue, 24 Feb 2026 11:17:04 -0500 Subject: [PATCH 20/22] Added Ctrl-d test. --- tests/test_cmd2.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index f335c204f..f13e8c53a 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -1191,6 +1191,18 @@ def test_ctrl_c_at_prompt(say_app, monkeypatch) -> None: assert out == 'hello\n^C\ngoodbye\n' +def test_ctrl_d_at_prompt(say_app, monkeypatch) -> None: + read_command_mock = mock.MagicMock(name='_read_command_line') + read_command_mock.side_effect = ['say hello', EOFError()] + monkeypatch.setattr("cmd2.Cmd._read_command_line", read_command_mock) + + say_app.cmdloop() + + # And verify the expected output to stdout + out = say_app.stdout.getvalue() + assert out == 'hello\n\n' + + class ShellApp(cmd2.Cmd): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) From 712d55d4b38cbda772a03a51436541dfde04739c Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Tue, 24 Feb 2026 11:22:28 -0500 Subject: [PATCH 21/22] Renamed a parser builder function to reflect name of the command. --- cmd2/cmd2.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index da64d615d..5e7bfe19b 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -5172,12 +5172,12 @@ def do_run_script(self, args: argparse.Namespace) -> bool | None: self._script_dir.pop() @classmethod - def _build_relative_run_script_parser(cls) -> Cmd2ArgumentParser: - relative_run_script_parser = cls._build_base_run_script_parser() + def _build__relative_run_script_parser(cls) -> Cmd2ArgumentParser: + _relative_run_script_parser = cls._build_base_run_script_parser() # Append to existing description - relative_run_script_parser.description = Group( - cast(Group, relative_run_script_parser.description), + _relative_run_script_parser.description = Group( + cast(Group, _relative_run_script_parser.description), "\n", ( "If this is called from within an already-running script, the filename will be " @@ -5185,14 +5185,14 @@ def _build_relative_run_script_parser(cls) -> Cmd2ArgumentParser: ), ) - relative_run_script_parser.epilog = relative_run_script_parser.create_text_group( + _relative_run_script_parser.epilog = _relative_run_script_parser.create_text_group( "Note", "This command is intended to be used from within a text script.", ) - return relative_run_script_parser + return _relative_run_script_parser - @with_argparser(_build_relative_run_script_parser) + @with_argparser(_build__relative_run_script_parser) def do__relative_run_script(self, args: argparse.Namespace) -> bool | None: """Run text script. From a300f100e0623aab0c49b20df0352e3356c2b044 Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Tue, 24 Feb 2026 11:31:07 -0500 Subject: [PATCH 22/22] Added documentation. --- docs/upgrades.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/upgrades.md b/docs/upgrades.md index 9bdb83cd1..a89e248f2 100644 --- a/docs/upgrades.md +++ b/docs/upgrades.md @@ -46,6 +46,10 @@ See the example for a demonstration of how to implement a background thread that refreshes the toolbar periodically. +### Deleted Modules + +Removed `rl_utils.py` and `terminal_utils.py` since `prompt-toolkit` provides this functionality. + ## Upgrading to cmd2 3.x from 2.x For details about all of the changes in the 3.0.0 release, please refer to