diff --git a/README.md b/README.md index 6b697dd..f8ec474 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ mutation replay ## Usage ``` -mutation play [--verbose] [--exclude=]... [--only-deadcode-detection] [--include=]... [--sampling=] [--randomly-seed=] [--max-workers=] [ ...] [-- PYTEST-COMMAND ...] +mutation play [--verbose] [--exclude=]... [--only-deadcode-detection] [--without-exception-injection] [--include=]... [--sampling=] [--randomly-seed=] [--max-workers=] [ ...] [-- PYTEST-COMMAND ...] mutation replay [--verbose] [--max-workers=] mutation list mutation show MUTATION @@ -171,6 +171,38 @@ if True: +
InjectException — replace expressions with the exception they raise + +Replace expressions that have well-known failure modes with a `raise` of the exception they can produce. This targets error-handling paths that pass on the happy path but silently break when the environment misbehaves. + +The contracts are intentionally narrow — stdlib only, no inference: + +| Expression | Injected mutation | +|---|---| +| `d[key]` (string key) | `raise KeyError(key)` | +| `lst[i]` (integer index) | `raise IndexError(i)` | +| `d[k]` (ambiguous) | both `raise KeyError(k)` and `raise IndexError(k)` | +| `int(x)`, `float(x)` | `raise ValueError(x)` | +| `open(path)` | `raise FileNotFoundError(path)` | +| `next(it)` | `raise StopIteration` | +| `x / y`, `x // y`, `x % y` | `raise ZeroDivisionError` | +| `obj.attr` | `raise AttributeError('attr')` | +| `for x in iterable` | `raise StopIteration` | + +Mutations are skipped when the expression is already inside a `try/except` that handles the relevant exception, and never injected inside `except` blocks. + +```python +# before +value = data[key] + +# after +raise KeyError(key) +``` + +Use `--without-exception-injection` to skip all `InjectException` mutations when error-handling paths are intentionally untested or produce too much noise. + +
+
MutateAssignment — replace assignment values with None Replace the right-hand side of a plain assignment with `None`, verifying that the assigned value is not silently ignored. diff --git a/foobar/ex.py b/foobar/ex.py index 7737bd7..09a2c8e 100644 --- a/foobar/ex.py +++ b/foobar/ex.py @@ -3,5 +3,6 @@ def decrement_by_two(a): """docstring for testing false-positive mutation (function)""" abc = 42 + ijk = 42 / a return a - 2 diff --git a/mutation.py b/mutation.py index b7d5299..9063f31 100755 --- a/mutation.py +++ b/mutation.py @@ -2,7 +2,7 @@ """Mutation. Usage: - mutation play [--verbose] [--exclude=]... [--only-deadcode-detection] [--include=]... [--sampling=] [--randomly-seed=] [--max-workers=] [ ...] [-- PYTEST-COMMAND ...] + mutation play [--verbose] [--exclude=]... [--only-deadcode-detection] [--without-exception-injection] [--include=]... [--sampling=] [--randomly-seed=] [--max-workers=] [ ...] [-- PYTEST-COMMAND ...] mutation replay [--verbose] [--max-workers=] mutation list mutation show MUTATION @@ -24,6 +24,8 @@ (default: current Unix timestamp) --only-deadcode-detection Only apply dead-code detection mutations (StatementDrop, DefinitionDrop). + --without-exception-injection Skip all InjectException mutations (useful when error-handling + paths are intentionally untested or produce too much noise). --max-workers= Number of parallel workers (default: cpu_count - 1) --verbose Show more information. -h --help Show this screen. @@ -1092,6 +1094,150 @@ def mutate(self, node, index, tree): yield tree_copy, node_copy +class InjectException(metaclass=Mutation): + """Replace expressions with the exception they can raise, targeting error-handling paths that are commonly forgotten.""" + + _DICT_HINTS = frozenset(["dict", "map", "table", "cache", "store", "config", "registry", "lookup"]) + _LIST_HINTS = frozenset(["list", "array", "arr", "seq", "items", "elements"]) + + def predicate(self, node): + if isinstance(node, (ast.For, ast.AsyncFor)): + return True + if isinstance(node, ast.Subscript) and not isinstance(node.slice, ast.Slice): + return True + if (isinstance(node, ast.Call) + and isinstance(node.func, ast.Name) + and node.func.id in ("int", "float", "open", "next")): + return True + if isinstance(node, ast.BinOp) and isinstance(node.op, (ast.Div, ast.FloorDiv, ast.Mod)): + return True + if isinstance(node, ast.Attribute): + return True + return False + + def _type_specs(self, node): + """Return list of (exc_name, args_fn) where args_fn(node_copy) -> list of AST nodes.""" + if isinstance(node, (ast.For, ast.AsyncFor)): + return [("StopIteration", lambda n: [])] + if isinstance(node, ast.Subscript): + return self._subscript_specs(node) + if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): + fn = node.func.id + if fn in ("int", "float"): + return [("ValueError", lambda n: n.args[:1] if n.args else [])] + if fn == "open": + return [("FileNotFoundError", lambda n: n.args[:1] if n.args else [])] + if fn == "next": + return [("StopIteration", lambda n: [])] + if isinstance(node, ast.BinOp): + return [("ZeroDivisionError", lambda n: [])] + if isinstance(node, ast.Attribute): + return [("AttributeError", lambda n: [ast.Constant( + value=n.attr, lineno=n.lineno, col_offset=n.col_offset, + )])] + return [] + + def _subscript_specs(self, node): + """Heuristic: KeyError for dict-like, IndexError for list-like, both if ambiguous.""" + slice_ = node.slice + value = node.value + # String key → definitely KeyError + if isinstance(slice_, ast.Constant) and isinstance(slice_.value, str): + return [("KeyError", lambda n: [n.slice])] + # Integer key → more likely IndexError, but could be dict + if isinstance(slice_, ast.Constant) and isinstance(slice_.value, int): + return [("IndexError", lambda n: [n.slice])] + # Check variable name for hints + name = None + if isinstance(value, ast.Name): + name = value.id.lower() + elif isinstance(value, ast.Attribute): + name = value.attr.lower() + if name: + if any(h in name for h in self._DICT_HINTS): + return [("KeyError", lambda n: [n.slice])] + if any(h in name for h in self._LIST_HINTS): + return [("IndexError", lambda n: [n.slice])] + # Ambiguous — generate both + return [ + ("KeyError", lambda n: [n.slice]), + ("IndexError", lambda n: [n.slice]), + ] + + def _build_parent_map(self, tree): + parent_map = {} + for n in ast.walk(tree): + for child in ast.iter_child_nodes(n): + parent_map[id(child)] = n + return parent_map + + def _find_enclosing_stmt(self, parent_map, node): + """Return (stmt, parent, field, idx) for the innermost statement in a body list.""" + current = node + while id(current) in parent_map: + parent = parent_map[id(current)] + if isinstance(current, ast.stmt): + for field, value in ast.iter_fields(parent): + if isinstance(value, list): + for i, item in enumerate(value): + if item is current: + return current, parent, field, i + current = parent + return None, None, None, None + + def _is_guarded(self, parent_map, node, exc_name): + """Return True if node is inside an except block or a try that handles exc_name.""" + current = node + while id(current) in parent_map: + parent = parent_map[id(current)] + if isinstance(parent, ast.ExceptHandler): + return True # never inject inside except blocks + if isinstance(parent, ast.Try) and any(current is s for s in parent.body): + for handler in parent.handlers: + if handler.type is None: + return True # bare except catches everything + names = [] + if isinstance(handler.type, ast.Name): + names = [handler.type.id] + elif isinstance(handler.type, ast.Tuple): + names = [e.id for e in handler.type.elts if isinstance(e, ast.Name)] + if exc_name in names or "Exception" in names or "BaseException" in names: + return True + current = parent + return False + + def _make_raise(self, exc_name, args, lineno, col_offset): + if args: + exc = ast.Call( + func=ast.Name(id=exc_name, ctx=ast.Load(), lineno=lineno, col_offset=col_offset), + args=args, + keywords=[], + lineno=lineno, + col_offset=col_offset, + ) + else: + exc = ast.Name(id=exc_name, ctx=ast.Load(), lineno=lineno, col_offset=col_offset) + return ast.Raise(exc=exc, cause=None, lineno=lineno, col_offset=col_offset) + + def mutate(self, node, index, tree): + specs = self._type_specs(node) + for exc_name, args_fn in specs: + tree_copy, node_copy = copy_tree_at(tree, index) + parent_map = self._build_parent_map(tree_copy) + if self._is_guarded(parent_map, node_copy, exc_name): + continue + lineno = getattr(node_copy, "lineno", 1) + col_offset = getattr(node_copy, "col_offset", 0) + stmt, parent, field, idx = self._find_enclosing_stmt(parent_map, node_copy) + if stmt is None or idx is None: + continue + exc_args = args_fn(node_copy) + raise_node = self._make_raise(exc_name, exc_args, lineno, col_offset) + getattr(parent, field)[idx] = raise_node + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + def diff(source, target, filename=""): lines = unified_diff( source.split("\n"), target.split("\n"), filename, filename, lineterm="" @@ -1195,20 +1341,27 @@ def install_module_loader(uid): patched = patch(diff, ast.unparse(ast.parse(source))) - components = path[:-3].split("/") + # Derive the importable module name by finding which sys.path entry + # contains this file. For src/ layouts (e.g. src/mypkg/__init__.py) + # the editable install adds src/ to sys.path, so the module name is + # "mypkg" not "src.mypkg". We resolve every sys.path entry and pick + # the one that gives the *shortest* (most specific) module name. + path_obj = Path(path).resolve() + if path_obj.name == "__init__.py": + module_file = path_obj.parent # package dir: strip __init__.py + else: + module_file = path_obj.with_suffix("") # regular module: strip .py - while components: - for pythonpath in sys.path: - filepath = os.path.join(pythonpath, "/".join(components)) - filepath += ".py" - ok = os.path.exists(filepath) - if ok: - module_path = ".".join(components) - break - else: - components.pop() + module_path = None + for pythonpath in sys.path: + base = Path(pythonpath).resolve() if pythonpath else Path(".").resolve() + try: + rel = module_file.relative_to(base) + except ValueError: continue - break + candidate = ".".join(rel.parts) + if module_path is None or len(candidate) < len(module_path): + module_path = candidate if module_path is None: raise Exception("sys.path oops!") @@ -1229,7 +1382,10 @@ def pytest_configure(config): def pytest_addoption(parser, pluginmanager): - parser.addoption("--mutation", dest="mutation", type=str) + try: + parser.addoption("--mutation", dest="mutation", type=str) + except ValueError: + pass # already registered (e.g. conftest.py + -p mutation both active) def for_each_par_map(loop, pool, inc, proc, items): @@ -1490,6 +1646,10 @@ def mutation_all(x): return True +def mutation_without_inject_exception(x): + return not isinstance(x, InjectException) + + async def play_create_mutations(loop, root, db, max_workers, arguments): # Go through all files, and produce mutations, take into account # include pattern, and exclude patterns. Also, exclude what has @@ -1506,8 +1666,11 @@ async def play_create_mutations(loop, root, db, max_workers, arguments): # setup coverage support coverage = coverage_read(root) only_dead_code = arguments["--only-deadcode-detection"] + without_inject = arguments.get("--without-exception-injection", False) if only_dead_code: mutation_predicate = mutation_only_deadcode + elif without_inject: + mutation_predicate = mutation_without_inject_exception else: mutation_predicate = mutation_all