Skip to content

Feature/dp devpod cli#133

Merged
blooop merged 9 commits into
mainfrom
feature/dp-devpod-cli
Jan 14, 2026
Merged

Feature/dp devpod cli#133
blooop merged 9 commits into
mainfrom
feature/dp-devpod-cli

Conversation

@blooop
Copy link
Copy Markdown
Owner

@blooop blooop commented Jan 14, 2026

Summary by Sourcery

Add a new dp DevPod CLI wrapper with autocompletion support and associated tests.

New Features:

  • Introduce a dp CLI entry point that wraps devpod for workspace management, including fuzzy selection, workspace lifecycle commands, and validation of workspace specs.
  • Add bash completion for the dp CLI with cached workspace and repository-aware suggestions, integrated into the existing completion installation flow.

Enhancements:

  • Register the new dp CLI as a console script in project configuration and include its completion script in the combined shell completions installation.

Documentation:

  • Add specification and implementation plan documents describing the dp DevPod CLI wrapper, its commands, autocomplete behavior, and architecture.

Tests:

  • Add a comprehensive test suite for the dp CLI helper functions, workspace parsing, repository discovery, and workspace listing behavior.

blooop and others added 6 commits January 14, 2026 20:36
Specification for a new CLI tool called dp that wraps devpod
with intuitive autocomplete and fzf fuzzy selection, similar
to the UX patterns established by renv.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Implements the dp command, a streamlined CLI wrapper for devpod:
- Interactive fzf workspace selection when run with no args
- Tab completion for workspace names and flags
- Simple commands: --ls, --stop, --rm, --code, --status
- Supports creating workspaces from git repos or local paths
- Integrates with rockerc completion system

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Change syntax from github.com/owner/repo to owner/repo
- Auto-expand owner/repo[@Branch] to github.com URL for devpod
- Update bash completion for owner/repo format
- Add comprehensive test suite (30 tests)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Handle KeyboardInterrupt gracefully in fzf selector and main
- Validate workspace spec before passing to devpod:
  - Must be existing workspace, owner/repo, or path
  - Unknown names like "blo" now show helpful error
- Add is_path_spec, is_git_spec, validate_workspace_spec functions
- Add 18 new tests (48 total)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Parse git remote URLs to discover owner/repo from workspaces
- Add --repos command to list known repos for bash completion
- Autocomplete now suggests owner/ and owner/repo based on
  git remotes from local workspace directories
- Support git@github.com: and https://github.com/ URL formats
- Add 13 new tests for git URL parsing (61 total)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Cache completion data to ~/.cache/dp/completions.json
- Bash completion reads from cache file directly (instant)
- Add --update-cache and --completion-data commands
- Background cache update after workspace operations
- ~200x faster autocomplete

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Jan 14, 2026

Reviewer's Guide

Introduces a new dp DevPod CLI wrapper with workspace management, fuzzy selection, and shell completion support, wires it into the existing rockerc CLI/completions system, and adds unit tests and specs for its behavior.

Sequence diagram for dp command execution and cache update

sequenceDiagram
    actor User
    participant Shell
    participant dp_cli as dp_main
    participant Devpod as devpod_binary
    participant Cache as completion_cache

    User->>Shell: run dp myworkspace "make test"
    Shell->>dp_cli: invoke main(args)

    dp_cli->>dp_cli: get_workspace_ids()
    dp_cli->>Devpod: devpod list --output json
    Devpod-->>dp_cli: list of workspaces (JSON)
    dp_cli->>dp_cli: validate_workspace_spec(raw_spec, existing_ids)
    alt invalid_spec
        dp_cli-->>User: error "Unknown workspace" (exit 1)
    else valid_spec
        dp_cli->>dp_cli: expand_workspace_spec(raw_spec)
        dp_cli->>Devpod: devpod up workspace
        Devpod-->>dp_cli: status code
        alt up_failed
            dp_cli-->>User: exit with error
        else up_ok
            dp_cli->>Devpod: devpod ssh workspace --command "make test"
            Devpod-->>dp_cli: command exit code
            dp_cli->>dp_cli: update_cache_background()
            dp_cli->>Cache: spawn subprocess dp --update-cache
            activate Cache
            Cache->>Devpod: devpod list --output json
            Devpod-->>Cache: list of workspaces (JSON)
            Cache-->>Cache: discover_repos_from_workspaces()
            Cache-->>Cache: write_completion_cache(data)
            deactivate Cache
            dp_cli-->>User: exit with ssh command code
        end
    end
Loading

Sequence diagram for dp bash completion using cached data

sequenceDiagram
    actor User
    participant Bash as bash_shell
    participant Comp as dp_bash_completion
    participant Cache as completion_cache_file

    User->>Bash: types "dp loft" and presses Tab
    Bash->>Comp: call _dp_completion()

    Comp->>Comp: determine cur, prev, COMP_CWORD
    alt completing_flag
        Comp-->>Bash: compgen over opts
    else positional_completion
        Comp->>Cache: read $HOME/.cache/dp/completions.json
        alt cache_exists
            Cache-->>Comp: JSON {workspaces, repos, owners}
            Comp->>Comp: build completion list
            alt completing_owner_slash
                Comp-->>Bash: known repos owner/repo
            else completing_workspace_or_owner
                Comp-->>Bash: workspace ids and owners/
            end
        else no_cache
            Comp-->>Bash: no completions from cache
        end
    end

    Bash-->>User: shows completion suggestions
Loading

Class diagram for the new dp DevPod CLI Workspace model

classDiagram
    class Workspace {
        +str id
        +str source_type
        +str source
        +str last_used
        +str provider
        +str ide
        +from_json(data Dict_str_Any) Workspace
    }

    class DPModule {
        +get_cache_path() pathlib_Path
        +read_completion_cache() Dict_str_Any
        +write_completion_cache(data Dict_str_Any) void
        +update_completion_cache() Dict_str_Any
        +update_cache_background() void
        +is_path_spec(spec str) bool
        +is_git_spec(spec str) bool
        +expand_workspace_spec(spec str) str
        +validate_workspace_spec(spec str, existing_ids List_str) str
        +parse_owner_repo_from_url(url str) tuple
        +get_git_remote_url(path str) str
        +get_git_branches(path str) List_str
        +discover_repos_from_workspaces(workspaces List_Workspace) Dict_str_List_str
        +get_known_repos() List_str
        +run_devpod(args List_str, capture bool) CompletedProcess
        +list_workspaces() List_Workspace
        +get_workspace_ids() List_str
        +print_workspaces() void
        +fuzzy_select_workspace() str
        +workspace_up(workspace str, ide str, recreate bool, reset bool) CompletedProcess
        +workspace_ssh(workspace str, command str) int
        +workspace_stop(workspace str) int
        +workspace_delete(workspace str) int
        +workspace_status(workspace str) int
        +print_help() void
        +main() int
    }

    DPModule "*" --> "*" Workspace : uses
Loading

Flowchart for dp CLI argument handling and command routing

flowchart TD
    A["Start dp main()"] --> B{No args or --help/-h}
    B -->|No args| C["Try fuzzy_select_workspace() via iterfzf"]
    C -->|workspace selected| D["workspace_up(selected)"] --> E["workspace_ssh(selected)"] --> Z["Exit"]
    C -->|none selected| H1["print_help() and exit 1"]

    B -->|--help or -h| H["print_help() and exit 0"]

    B -->|Other args| F{First arg}

    F -->|--ls| L1["print_workspaces() and exit"]
    F -->|--repos| L2["print known repos (cache or discovery) and exit"]
    F -->|--update-cache| L3["update_completion_cache() and exit"]
    F -->|--completion-data| L4["print JSON completion data (cache or fresh) and exit"]
    F -->|--install| L5["install_all_completions() and exit"]

    F -->|--stop/--rm/--code/--status/--recreate/--reset| G1["maybe fuzzy select workspace if missing"] --> G2["run corresponding workspace_* or workspace_up+ssh"] --> Z

    F -->|default: workspace spec| W1["raw_spec = args[0], command = args[1..]"]
    W1 --> W2["existing_ids = get_workspace_ids()"]
    W2 --> W3{"validate_workspace_spec(raw_spec, existing_ids)"}
    W3 -->|invalid| W4["log error and exit 1"]
    W3 -->|valid| W5["workspace = expand_workspace_spec(raw_spec)"]
    W5 --> W6["workspace_up(workspace)"]
    W6 -->|nonzero status| W7["exit with error"]
    W6 -->|ok| W8["workspace_ssh(workspace, command)"]
    W8 --> W9["update_cache_background()"] --> Z
Loading

File-Level Changes

Change Details Files
Add a new dp DevPod CLI wrapper with workspace management, git/path specs handling, fuzzy selection, and background completion-cache updates.
  • Implement workspace listing, selection, and lifecycle commands on top of the devpod CLI, including up, ssh, stop, delete, status, recreate, reset, and VS Code open flows.
  • Support multiple workspace spec formats (existing IDs, local paths, owner/repo[@Branch], and URLs) with validation, normalization to github.com/owner/repo where appropriate, and helpful error messages.
  • Introduce a Workspace dataclass plus helpers for parsing devpod JSON, discovering git remotes, extracting owner/repo from URLs, and building cached completion data (workspaces, repos, owners).
  • Add fzf-based interactive selection using iterfzf when dp is run without args, and integrate completion installation via the existing install_all_completions hook.
rockerc/dp.py
Extend shell completion to support the new dp CLI, including cached workspace/owner/repo completion data.
  • Add a dedicated bash completion script that completes flags, workspaces, owners, and known repos using a JSON cache under ~/.cache/dp/completions.json read via jq.
  • Ensure path-like arguments complete as directories and that owner/repo inputs can be extended with @Branch by disabling automatic space insertion.
  • Wire dp completion into the existing combined completion installer so dp completions are available wherever install_all_completions is used.
rockerc/completions/dp.bash
rockerc/completion.py
Register the new dp CLI entry point and add tests and specifications for its behavior.
  • Expose the dp command via a console_scripts entry in pyproject.toml so it can be invoked as a top-level CLI.
  • Provide a written implementation plan and spec describing dp’s architecture, devpod command mapping, and UX expectations for completion and fzf workflows.
  • Add unit tests to cover path/git spec detection, workspace spec validation/expansion, owner/repo parsing, workspace listing and ID extraction, and repo discovery from workspaces.
pyproject.toml
specs/24/dp-devpod-cli/plan.md
specs/24/dp-devpod-cli/spec.md
test/test_dp.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 security issues, 6 other issues, and left some high level feedback:

Security issues:

  • Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)

General comments:

  • Consider replacing the module-level logging.basicConfig(...) call in rockerc/dp.py with a module-specific logger (e.g. logger = logging.getLogger(__name__)) to avoid altering global logging configuration for applications that import this module.
  • The completion cache is only updated in the main workspace flow (after workspace_ssh); you might want to trigger update_cache_background() after other state-changing operations like --stop, --rm, --recreate, and --reset to keep completions accurate.
  • The bash completion script relies solely on the JSON cache and does not fall back to live data, so first-run or stale-cache scenarios yield no completions; consider adding a fallback to devpod list --output json when the cache is missing or unreadable.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider replacing the module-level `logging.basicConfig(...)` call in `rockerc/dp.py` with a module-specific logger (e.g. `logger = logging.getLogger(__name__)`) to avoid altering global logging configuration for applications that import this module.
- The completion cache is only updated in the main workspace flow (after `workspace_ssh`); you might want to trigger `update_cache_background()` after other state-changing operations like `--stop`, `--rm`, `--recreate`, and `--reset` to keep completions accurate.
- The bash completion script relies solely on the JSON cache and does not fall back to live data, so first-run or stale-cache scenarios yield no completions; consider adding a fallback to `devpod list --output json` when the cache is missing or unreadable.

## Individual Comments

### Comment 1
<location> `rockerc/completions/dp.bash:17-26` </location>
<code_context>
+        return 0
+    fi
+
+    # Cache file location
+    local cache_file="$HOME/.cache/dp/completions.json"
+
+    # Read from cache (fast path)
+    local workspaces=""
+    local known_repos=""
+    local owners=""
+
+    if [[ -f "$cache_file" ]]; then
+        workspaces=$(jq -r '.workspaces[]?' "$cache_file" 2>/dev/null | tr '\n' ' ')
+        known_repos=$(jq -r '.repos[]?' "$cache_file" 2>/dev/null | tr '\n' ' ')
+        owners=$(jq -r '.owners[]?' "$cache_file" 2>/dev/null | tr '\n' ' ')
+    fi
+
</code_context>

<issue_to_address>
**suggestion:** Completion relies entirely on a pre-existing cache file and jq, with no runtime fallback.

If `~/.cache/dp/completions.json` is missing or `jq` isn’t installed, completions degrade to almost nothing with no clear signal to the user. Since the CLI already has `--completion-data` and `--update-cache`, consider a fallback: e.g., populate the cache once per shell session via `dp --completion-data` when the file is absent, and explicitly short‑circuit when `jq` is unavailable. This would make first‑time use and jq‑less environments behave more predictably.
</issue_to_address>

### Comment 2
<location> `test/test_dp.py:1` </location>
<code_context>
+"""Tests for dp (DevPod CLI Wrapper) functionality."""
+
+import json
</code_context>

<issue_to_address>
**suggestion (testing):** Add tests for completion cache helpers and related CLI flags

The cache-related helpers in `dp.py` (`get_cache_path`, `read_completion_cache`, `write_completion_cache`, `update_completion_cache`, `update_cache_background`) and CLI flags (`--repos`, `--completion-data`, `--update-cache`) are currently untested. Please add tests that:

- Exercise `read_completion_cache`/`write_completion_cache` with patched `get_cache_path`/filesystem for:
  - Missing cache file
  - Corrupt JSON (returning `None` without raising)
- Verify `update_completion_cache` builds the expected JSON shape (`workspaces`, `repos`, `owners`) from mocked `list_workspaces`/`discover_repos_from_workspaces` and calls `write_completion_cache`.
- Check `update_cache_background` invokes `subprocess.Popen` with the expected arguments.
- For the CLI:
  - `dp --repos` prefers the cache when present, falling back to `get_known_repos` otherwise.
  - `dp --completion-data` returns cached data when available, and regenerates+persists when missing.

This will help prevent regressions in completion behavior and ensure safe handling of missing/corrupt cache files.

Suggested implementation:

```python
"""Tests for dp (DevPod CLI Wrapper) functionality."""

import json
import os
import subprocess
from pathlib import Path
from unittest.mock import patch, MagicMock

import pytest

from rockerc.dp import (
    expand_workspace_spec,
    is_path_spec,
    is_git_spec,
    validate_workspace_spec,
    parse_owner_repo_from_url,
    discover_repos_from_workspaces,
    get_known_repos,
    get_cache_path,
    read_completion_cache,
    write_completion_cache,
    update_completion_cache,
    update_cache_background,
    list_workspaces,
    main,

```

```python
    get_known_repos,
    get_cache_path,
    read_completion_cache,
    write_completion_cache,
    update_completion_cache,
    update_cache_background,
    list_workspaces,
    main,
)

# ---------------------------------------------------------------------------
# Completion cache helper tests
# ---------------------------------------------------------------------------


def test_read_completion_cache_missing_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
    """read_completion_cache should return None when the cache file is missing."""
    cache_path = tmp_path / "dp_completion_cache.json"

    monkeypatch.setattr("rockerc.dp.get_cache_path", lambda: cache_path)

    result = read_completion_cache()
    assert result is None


def test_read_completion_cache_corrupt_json(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
    """read_completion_cache should return None (not raise) on corrupt JSON."""
    cache_path = tmp_path / "dp_completion_cache.json"
    cache_path.write_text("this is not json", encoding="utf-8")

    monkeypatch.setattr("rockerc.dp.get_cache_path", lambda: cache_path)

    result = read_completion_cache()
    assert result is None


def test_write_and_read_completion_cache_roundtrip(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
    """write_completion_cache should persist data that read_completion_cache can read back."""
    cache_path = tmp_path / "dp_completion_cache.json"
    monkeypatch.setattr("rockerc.dp.get_cache_path", lambda: cache_path)

    cache_data = {
        "workspaces": ["ws1", "ws2"],
        "repos": ["owner1/repo1", "owner2/repo2"],
        "owners": ["owner1", "owner2"],
    }

    write_completion_cache(cache_data)
    assert cache_path.exists()

    loaded = read_completion_cache()
    assert loaded == cache_data


def test_update_completion_cache_builds_expected_structure_and_calls_write(monkeypatch: pytest.MonkeyPatch) -> None:
    """update_completion_cache should build the expected JSON shape from workspaces and repos."""
    mock_workspaces = ["ws-a", "ws-b"]
    mock_repos = ["alice/repo1", "bob/repo2"]

    list_workspaces_mock = MagicMock(return_value=mock_workspaces)
    discover_repos_mock = MagicMock(return_value=mock_repos)
    write_cache_mock = MagicMock()

    monkeypatch.setattr("rockerc.dp.list_workspaces", list_workspaces_mock)
    monkeypatch.setattr("rockerc.dp.discover_repos_from_workspaces", discover_repos_mock)
    monkeypatch.setattr("rockerc.dp.write_completion_cache", write_cache_mock)

    result = update_completion_cache()

    # Ensure helpers are called with the expected data
    list_workspaces_mock.assert_called_once()
    discover_repos_mock.assert_called_once_with(mock_workspaces)
    write_cache_mock.assert_called_once()

    # Check JSON shape
    written_data = write_cache_mock.call_args.args[0]
    assert isinstance(written_data, dict)
    assert set(written_data.keys()) == {"workspaces", "repos", "owners"}

    assert written_data["workspaces"] == mock_workspaces
    assert written_data["repos"] == mock_repos
    # Owners are derived from "owner/repo" strings
    assert sorted(written_data["owners"]) == sorted(["alice", "bob"])

    # update_completion_cache should typically return the cache data it wrote
    assert result == written_data


def test_update_cache_background_invokes_popen_with_update_cache_flag(monkeypatch: pytest.MonkeyPatch) -> None:
    """update_cache_background should spawn a background process with --update-cache."""
    popen_mock = MagicMock()
    monkeypatch.setattr("rockerc.dp.subprocess.Popen", popen_mock)

    update_cache_background()

    popen_mock.assert_called_once()
    popen_args, popen_kwargs = popen_mock.call_args

    # The first positional arg should be the argv list
    argv_list = popen_args[0]
    assert isinstance(argv_list, (list, tuple))
    # Ensure the update-cache flag is present in the spawned command
    assert any("--update-cache" in str(arg) for arg in argv_list)

    # Background processes usually detach stdio; we just ensure Popen is called
    # with shell disabled by default (defensive, adjust if implementation differs).
    assert popen_kwargs.get("shell", False) is False


# ---------------------------------------------------------------------------
# CLI completion-related flag tests
# ---------------------------------------------------------------------------


def test_cli_repos_prefers_cache_over_known_repos(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture) -> None:
    """`dp --repos` should use cached repos when available and not call get_known_repos."""
    cache_data = {"repos": ["alice/repo1", "bob/repo2"], "workspaces": [], "owners": []}

    read_cache_mock = MagicMock(return_value=cache_data)
    get_known_repos_mock = MagicMock()

    monkeypatch.setattr("rockerc.dp.read_completion_cache", read_cache_mock)
    monkeypatch.setattr("rockerc.dp.get_known_repos", get_known_repos_mock)

    # Invoke CLI with --repos; main() is expected to write repo list to stdout
    main(["--repos"])
    out = capsys.readouterr().out

    # Cached repos should be present in the output
    for repo in cache_data["repos"]:
        assert repo in out

    # Fallback should not be used when cache exists
    get_known_repos_mock.assert_not_called()


def test_cli_repos_falls_back_to_known_repos_when_no_cache(
    monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
) -> None:
    """`dp --repos` should fall back to get_known_repos when cache is missing/invalid."""
    monkeypatch.setattr("rockerc.dp.read_completion_cache", lambda: None)

    known_repos = ["carol/repo3", "dave/repo4"]
    get_known_repos_mock = MagicMock(return_value=known_repos)
    monkeypatch.setattr("rockerc.dp.get_known_repos", get_known_repos_mock)

    main(["--repos"])
    out = capsys.readouterr().out

    for repo in known_repos:
        assert repo in out

    get_known_repos_mock.assert_called_once()


def test_cli_completion_data_uses_cached_data_when_available(
    monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
) -> None:
    """`dp --completion-data` should return cached JSON blob when available."""
    cache_data = {
        "workspaces": ["ws1"],
        "repos": ["alice/repo1"],
        "owners": ["alice"],
    }

    read_cache_mock = MagicMock(return_value=cache_data)
    update_cache_mock = MagicMock()
    monkeypatch.setattr("rockerc.dp.read_completion_cache", read_cache_mock)
    monkeypatch.setattr("rockerc.dp.update_completion_cache", update_cache_mock)

    main(["--completion-data"])
    out = capsys.readouterr().out.strip()

    # Should emit the cached JSON without regenerating
    assert json.loads(out) == cache_data
    update_cache_mock.assert_not_called()


def test_cli_completion_data_regenerates_and_persists_when_cache_missing(
    monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
) -> None:
    """`dp --completion-data` should regenerate+persist completion data when cache is missing."""
    monkeypatch.setattr("rockerc.dp.read_completion_cache", lambda: None)

    regenerated_data = {
        "workspaces": ["ws-x"],
        "repos": ["eve/repo5"],
        "owners": ["eve"],
    }

    update_cache_mock = MagicMock(return_value=regenerated_data)
    monkeypatch.setattr("rockerc.dp.update_completion_cache", update_cache_mock)

    main(["--completion-data"])
    out = capsys.readouterr().out.strip()

    # Should emit the regenerated JSON and persist via update_completion_cache
    assert json.loads(out) == regenerated_data
    update_cache_mock.assert_called_once()

```

These edits assume the following about `rockerc.dp`:

1. The cache helpers are available with these signatures:
   - `get_cache_path() -> pathlib.Path | str`
   - `read_completion_cache() -> dict | None`
   - `write_completion_cache(data: dict) -> None`
   - `update_completion_cache() -> dict`
   - `update_cache_background() -> None`
   - `list_workspaces() -> list[str]`

2. The CLI entry point is a function `main(argv: list[str] | None = None)`, which:
   - Handles `["--repos"]` by writing repo identifiers to stdout.
   - Handles `["--completion-data"]` by writing a JSON blob to stdout.
   - Uses `read_completion_cache()` first for both flags, falling back to `get_known_repos()` for `--repos`, and to `update_completion_cache()` for `--completion-data`.

3. `update_cache_background` uses `subprocess.Popen` from the `subprocess` module imported in `rockerc.dp`.

If the actual API differs, please adjust:

- The imported names: for example, if the CLI entry function is named `cli` or `dp_main` instead of `main`, update both the import list and test calls accordingly.
- The module paths in `monkeypatch.setattr` calls (e.g., if helpers live in a submodule such as `rockerc.dp.cache`, update those strings).
- The way CLI output is formatted; the tests currently assume that:
  - `--repos` prints each repo identifier somewhere in stdout (any format is OK as long as the strings appear).
  - `--completion-data` prints a JSON document representing the cache data (a single JSON object per invocation).

Finally, if `update_completion_cache` does not return the cache dict, either:
- Adjust the test to assert on the argument passed to `write_completion_cache` only, or
- Update the implementation so it returns the dict it writes, which makes the function easier to test and reuse.
</issue_to_address>

### Comment 3
<location> `test/test_dp.py:354-363` </location>
<code_context>
+class TestParseOwnerRepoFromUrl:
</code_context>

<issue_to_address>
**suggestion (testing):** Add coverage for `get_git_remote_url` and `get_git_branches` helpers

The higher-level tests are strong, but the underlying git helpers that wrap `subprocess.run` are untested, which makes regressions in repo/branch discovery more likely.

Please add tests (mocking `subprocess.run`) for:
- `get_git_remote_url`: returns trimmed stdout on success, and `None` on non-zero exit codes or `OSError`/`SubprocessError`.
- `get_git_branches`: parses realistic `git branch -r` output, filters out `origin/HEAD -> origin/main`, and only strips the `origin/` prefix (including cases like `origin/feature/my-branch`), plus failure/exception paths.

This will solidify the contract of these helpers and protect completion behavior from subtle changes in git output handling.

Suggested implementation:

```python
from subprocess import SubprocessError
from unittest.mock import patch

```

```python
class TestParseOwnerRepoFromUrl:
    """Tests for parse_owner_repo_from_url function."""

    def test_parse_ssh_url(self):
        """Test parsing git@github.com:owner/repo.git URL."""
        result = parse_owner_repo_from_url("git@github.com:blooop/python_template.git")
        assert result == ("blooop", "python_template")

    def test_parse_ssh_url_no_git_suffix(self):
        """Test parsing git@github.com:owner/repo URL without .git."""
        result = parse_owner_repo_from_url("git@github.com:blooop/rockerc")


class TestGitHelpers:
    """Tests for git helper functions that wrap subprocess.run."""

    @patch("rockerc.dp.subprocess.run")
    def test_get_git_remote_url_success(self, mock_run):
        """get_git_remote_url returns trimmed stdout on success."""
        from rockerc.dp import get_git_remote_url

        mock_run.return_value = mock_run.return_value = type(
            "CompletedProcess",
            (),
            {"returncode": 0, "stdout": "git@github.com:blooop/rockerc.git\n", "stderr": ""},
        )()

        result = get_git_remote_url()

        assert result == "git@github.com:blooop/rockerc.git"
        mock_run.assert_called_once()

    @patch("rockerc.dp.subprocess.run")
    def test_get_git_remote_url_non_zero_exit(self, mock_run):
        """get_git_remote_url returns None on non-zero exit codes."""
        from rockerc.dp import get_git_remote_url

        mock_run.return_value = type(
            "CompletedProcess",
            (),
            {"returncode": 1, "stdout": "git@github.com:blooop/rockerc.git\n", "stderr": "fatal: error"},
        )()

        result = get_git_remote_url()

        assert result is None
        mock_run.assert_called_once()

    @patch("rockerc.dp.subprocess.run")
    def test_get_git_remote_url_oserror(self, mock_run):
        """get_git_remote_url returns None on OSError."""
        from rockerc.dp import get_git_remote_url

        mock_run.side_effect = OSError("git not installed")

        result = get_git_remote_url()

        assert result is None
        mock_run.assert_called_once()

    @patch("rockerc.dp.subprocess.run")
    def test_get_git_remote_url_subprocess_error(self, mock_run):
        """get_git_remote_url returns None on SubprocessError."""
        from rockerc.dp import get_git_remote_url

        mock_run.side_effect = SubprocessError("subprocess failed")

        result = get_git_remote_url()

        assert result is None
        mock_run.assert_called_once()

    @patch("rockerc.dp.subprocess.run")
    def test_get_git_branches_success(self, mock_run):
        """get_git_branches parses realistic git branch -r output."""
        from rockerc.dp import get_git_branches

        mock_run.return_value = type(
            "CompletedProcess",
            (),
            {
                "returncode": 0,
                "stdout": (
                    "  origin/HEAD -> origin/main\n"
                    "  origin/main\n"
                    "  origin/feature/my-branch\n"
                    "  upstream/other\n"
                ),
                "stderr": "",
            },
        )()

        branches = get_git_branches()

        # origin/HEAD line filtered out, origin/ prefix stripped, other remotes preserved
        assert branches == ["main", "feature/my-branch", "upstream/other"]
        mock_run.assert_called_once()

    @patch("rockerc.dp.subprocess.run")
    def test_get_git_branches_non_zero_exit(self, mock_run):
        """get_git_branches returns empty list on non-zero exit codes."""
        from rockerc.dp import get_git_branches

        mock_run.return_value = type(
            "CompletedProcess",
            (),
            {"returncode": 1, "stdout": "", "stderr": "fatal: error"},
        )()

        branches = get_git_branches()

        assert branches == []
        mock_run.assert_called_once()

    @patch("rockerc.dp.subprocess.run")
    def test_get_git_branches_oserror(self, mock_run):
        """get_git_branches returns empty list on OSError."""
        from rockerc.dp import get_git_branches

        mock_run.side_effect = OSError("git not installed")

        branches = get_git_branches()

        assert branches == []
        mock_run.assert_called_once()

    @patch("rockerc.dp.subprocess.run")
    def test_get_git_branches_subprocess_error(self, mock_run):
        """get_git_branches returns empty list on SubprocessError."""
        from rockerc.dp import get_git_branches

        mock_run.side_effect = SubprocessError("subprocess failed")

        branches = get_git_branches()

        assert branches == []
        mock_run.assert_called_once()

```

If `get_git_remote_url` or `get_git_branches` currently return something other than `None`/`[]` in failure cases, you will need to align their implementations with these tests (or adjust the expectations in the tests accordingly). The tests assume:
- `get_git_remote_url()` -> `str | None`
- `get_git_branches()` -> `list[str]`, returning `[]` on any failure.
</issue_to_address>

### Comment 4
<location> `test/test_dp.py:263-272` </location>
<code_context>
+class TestListWorkspaces:
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding integration-style tests around `main()` argument handling and workspace validation

Unit coverage of helpers like `is_path_spec`, `is_git_spec`, and `validate_workspace_spec` is solid, but the composed CLI flow in `main()` isn’t exercised.

Consider adding tests that patch `sys.argv` and the key integration points (`rockerc.dp.workspace_up`, `rockerc.dp.workspace_ssh`, `rockerc.dp.get_workspace_ids`, `rockerc.dp.update_cache_background`) to cover cases such as:

- Invalid workspace spec: e.g. `['dp', 'unknown']` with `get_workspace_ids` not containing `unknown` should return `1` and not call `workspace_up`.
- Valid git spec: e.g. `['dp', 'owner/repo']` validates and calls `workspace_up`/`workspace_ssh` with `github.com/owner/repo`.
- Command execution: e.g. `['dp', 'myws', 'echo', 'hi']` results in `workspace_ssh('myws', 'echo hi')`.
- Flag handling: `--ls`, `--stop`, `--rm`, `--status`, `--code`, `--recreate`, `--reset`, `--install` each dispatch to the correct helper and propagate exit codes.
- `update_cache_background` is only invoked for the default workspace path after successful operations.

This would verify the CLI wiring and reduce regressions in argument routing and validation.

Suggested implementation:

```python
        assert ws.ide == ""


class TestMainCli:
    """Integration-style tests for `main()` CLI wiring and workspace validation."""

    @patch("rockerc.dp.update_cache_background")
    @patch("rockerc.dp.workspace_up")
    @patch("rockerc.dp.get_workspace_ids")
    def test_invalid_workspace_spec_returns_error_and_does_not_start(
        self,
        mock_get_workspace_ids,
        mock_workspace_up,
        mock_update_cache,
        monkeypatch,
    ):
        """`dp unknown` should exit 1 and not start a workspace when spec is invalid."""
        # Arrange: only "known" workspace exists
        mock_get_workspace_ids.return_value = ["known"]

        # Simulate CLI invocation: dp unknown
        import sys
        from rockerc.dp import main

        monkeypatch.setattr(sys, "argv", ["dp", "unknown"], raising=False)

        # Act
        exit_code = main()

        # Assert
        assert exit_code == 1
        mock_workspace_up.assert_not_called()
        mock_update_cache.assert_not_called()

    @patch("rockerc.dp.update_cache_background")
    @patch("rockerc.dp.workspace_ssh")
    @patch("rockerc.dp.workspace_up")
    def test_valid_git_spec_starts_and_ssh_with_normalized_repo(
        self,
        mock_workspace_up,
        mock_workspace_ssh,
        mock_update_cache,
        monkeypatch,
    ):
        """`dp owner/repo` should normalize to github.com/owner/repo and start/ssh."""
        import sys
        from rockerc.dp import main

        monkeypatch.setattr(sys, "argv", ["dp", "owner/repo"], raising=False)

        exit_code = main()

        assert exit_code == 0

        # Expect git repo normalized to github.com/owner/repo
        expected_spec = "github.com/owner/repo"
        mock_workspace_up.assert_called_once_with(expected_spec, ANY)
        mock_workspace_ssh.assert_called_once()
        mock_update_cache.assert_not_called()

    @patch("rockerc.dp.update_cache_background")
    @patch("rockerc.dp.workspace_ssh")
    @patch("rockerc.dp.workspace_up")
    def test_command_execution_for_workspace_uses_workspace_ssh(
        self,
        mock_workspace_up,
        mock_workspace_ssh,
        mock_update_cache,
        monkeypatch,
    ):
        """`dp myws echo hi` should call workspace_ssh('myws', 'echo hi')."""
        import sys
        from rockerc.dp import main

        monkeypatch.setattr(sys, "argv", ["dp", "myws", "echo", "hi"], raising=False)

        exit_code = main()

        assert exit_code == 0
        # workspace_up may or may not be called depending on implementation; don't assert it
        mock_workspace_ssh.assert_called_once()
        ws_id, cmd = mock_workspace_ssh.call_args.args
        assert ws_id == "myws"
        assert cmd == "echo hi"
        mock_update_cache.assert_not_called()

    def test_flag_handling_dispatches_and_propagates_exit_codes(self, monkeypatch):
        """Flag-style invocations should dispatch to correct helpers and propagate exit codes."""
        import sys
        import rockerc.dp as dp
        from rockerc.dp import main

        flag_to_helper = {
            "--ls": "list_workspaces",
            "--stop": "workspace_stop",
            "--rm": "workspace_rm",
            "--status": "workspace_status",
            "--code": "workspace_code",
            "--recreate": "workspace_recreate",
            "--reset": "workspace_reset",
            "--install": "workspace_install",
        }

        for flag, helper_name in flag_to_helper.items():
            # Arrange
            mock_helper = MagicMock(return_value=42)
            monkeypatch.setattr(dp, helper_name, mock_helper, raising=False)
            monkeypatch.setattr(sys, "argv", ["dp", flag], raising=False)

            # Act
            exit_code = main()

            # Assert: exit code propagated and helper invoked once
            assert exit_code == 42
            mock_helper.assert_called_once()

    @patch("rockerc.dp.update_cache_background")
    @patch("rockerc.dp.workspace_ssh")
    @patch("rockerc.dp.workspace_up")
    def test_update_cache_background_only_for_default_workspace(
        self,
        mock_workspace_up,
        mock_workspace_ssh,
        mock_update_cache,
        monkeypatch,
    ):
        """
        Cache update should only be triggered for the default workspace path after success.

        We treat `dp` with no explicit workspace spec as the "default" invocation and assert that
        update_cache_background is called only in that case.
        """
        import sys
        from rockerc.dp import main

        # Case 1: default invocation (no workspace argument)
        monkeypatch.setattr(sys, "argv", ["dp"], raising=False)
        mock_update_cache.reset_mock()

        exit_code_default = main()
        assert exit_code_default == 0
        mock_update_cache.assert_called_once()

        # Case 2: explicit git workspace, should not trigger cache update
        mock_update_cache.reset_mock()
        monkeypatch.setattr(sys, "argv", ["dp", "owner/repo"], raising=False)

        exit_code_git = main()
        assert exit_code_git == 0
        mock_update_cache.assert_not_called()


class TestListWorkspaces:
    """Tests for list_workspaces function."""

    @patch("rockerc.dp.run_devpod")
    def test_list_workspaces_success(self, mock_run):
        """Test successful workspace listing."""
        mock_result = MagicMock()
        mock_result.returncode = 0
        mock_result.stdout = json.dumps(
            [
                {

```

1. Ensure `MagicMock` and `ANY` are imported at the top of `test/test_dp.py`:
   - `from unittest.mock import MagicMock, patch, ANY`
2. Ensure `pytest` is available so the `monkeypatch` fixture works; if not already used in this file, import `pytest` or configure it appropriately.
3. Ensure `main` is importable from `rockerc.dp` (`from rockerc.dp import main`). If `main` lives elsewhere (e.g., `rockerc.cli`), adjust the imports and patch targets accordingly.
4. The helper names in `flag_to_helper` (`list_workspaces`, `workspace_stop`, `workspace_rm`, `workspace_status`, `workspace_code`, `workspace_recreate`, `workspace_reset`, `workspace_install`) are inferred from the comment and may differ from your actual implementation. Update the mapping to match your real helper function names so that the flag-dispatch test aligns with the CLI implementation.
5. The “default workspace” behavior in `test_update_cache_background_only_for_default_workspace` assumes that invoking `dp` with no additional arguments corresponds to the default workspace path and that successful completion returns `0`. If your CLI uses a different convention for the default workspace, adjust the argv setup and expectations accordingly.
</issue_to_address>

### Comment 5
<location> `test/test_dp.py:166-175` </location>
<code_context>
+class TestOwnerRepoPattern:
</code_context>

<issue_to_address>
**suggestion (testing):** Add a few negative regex cases to lock down OWNER_REPO_PATTERN edge behavior

You’ve got good positive coverage and a few negative examples already. Since this regex is central to deciding when to expand/validate workspace specs, consider adding explicit negative cases to document constraints, such as:

- Trailing slashes or missing repo (e.g. `owner/`, `/owner/repo`).
- Illegal characters or spaces in owner/repo.
- URL-shaped inputs (e.g. `github.com/owner/repo`) to show these are handled elsewhere.

These will clarify what is intentionally rejected and help prevent future unintended changes to the pattern’s scope.
</issue_to_address>

### Comment 6
<location> `rockerc/dp.py:450` </location>
<code_context>
+    print(help_text)
+
+
+def main() -> int:
+    """Main entry point for dp CLI."""
+    args = sys.argv[1:]
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting shared helpers for workspace resolution, cache handling, and spec classification to remove repeated logic and make the CLI flow easier to follow.

You can reduce complexity in a few focused spots without changing behavior.

### 1) Centralize the `--stop/--rm/--status/--code/--recreate/--reset` workspace resolution

The repeated pattern:

- check `len(args) < 2`
- maybe call `fuzzy_select_workspace()`
- error with almost identical message
- call a workspace handler

can be hidden behind a tiny `resolve_workspace` helper and a simple dispatcher.

```python
def resolve_workspace(args: List[str], usage: str) -> Optional[str]:
    if len(args) >= 2:
        return args[1]
    workspace = fuzzy_select_workspace()
    if not workspace:
        logging.error(usage)
        return None
    return workspace
```

Then in `main()` you can replace several branches, e.g.:

```python
if args[0] == "--stop":
    workspace = resolve_workspace(args, "Usage: dp --stop <workspace>")
    if not workspace:
        return 1
    return workspace_stop(workspace)

if args[0] == "--rm":
    workspace = resolve_workspace(args, "Usage: dp --rm <workspace>")
    if not workspace:
        return 1
    return workspace_delete(workspace)
```

Or go one step further with a small table-driven dispatch to avoid repeating the verb mapping:

```python
WORKSPACE_FLAG_COMMANDS = {
    "--stop":  ("Usage: dp --stop <workspace>",  workspace_stop),
    "--rm":    ("Usage: dp --rm <workspace>",    workspace_delete),
    "--status":("Usage: dp --status <workspace>",workspace_status),
}

if args[0] in WORKSPACE_FLAG_COMMANDS:
    usage, handler = WORKSPACE_FLAG_COMMANDS[args[0]]
    workspace = resolve_workspace(args, usage)
    if not workspace:
        return 1
    return handler(workspace)
```

You can still keep the more complex `--code/--recreate/--reset` flows as separate branches.

### 2) Remove duplication in cache access (`--repos` vs `--completion-data`)

Both `--repos` and `--completion-data` implement “read cache or build” logic. A small helper keeps this in one place:

```python
def get_or_build_completion_cache() -> Dict[str, Any]:
    cache = read_completion_cache()
    if cache is not None:
        return cache
    return update_completion_cache()
```

Then:

```python
if args[0] == "--repos":
    data = get_or_build_completion_cache()
    for repo in data.get("repos", []):
        print(repo)
    return 0

if args[0] == "--completion-data":
    data = get_or_build_completion_cache()
    print(json.dumps(data))
    return 0
```

This keeps cache behavior consistent and makes future changes to caching (e.g. schema changes, invalidation) localized.

### 3) Generic helper for simple workspace verbs

`workspace_stop`, `workspace_delete`, and `workspace_status` are very similar wrappers around `run_devpod`. A generic helper can reduce noise while keeping the more complex ones (`workspace_up`, `workspace_ssh`) explicit:

```python
def run_workspace_command(subcommand: str, workspace: str) -> int:
    result = run_devpod([subcommand, workspace])
    return result.returncode

def workspace_stop(workspace: str) -> int:
    return run_workspace_command("stop", workspace)

def workspace_delete(workspace: str) -> int:
    return run_workspace_command("delete", workspace)

def workspace_status(workspace: str) -> int:
    return run_workspace_command("status", workspace)
```

This keeps existing call sites unchanged while making the intent and pattern clearer.

### 4) Unify workspace spec classification

`validate_workspace_spec` and `expand_workspace_spec` both partially re-derive whether a spec is an existing id, path, git spec, etc. A tiny classifier reduces this conceptual duplication:

```python
from enum import Enum, auto

class SpecKind(Enum):
    EXISTING = auto()
    PATH = auto()
    GIT = auto()
    OTHER = auto()

def classify_spec(spec: str, existing_ids: List[str]) -> SpecKind:
    if spec in existing_ids:
        return SpecKind.EXISTING
    if is_path_spec(spec):
        return SpecKind.PATH
    if is_git_spec(spec):
        return SpecKind.GIT
    return SpecKind.OTHER
```

Then:

```python
def validate_workspace_spec(spec: str, existing_ids: List[str]) -> Optional[str]:
    kind = classify_spec(spec, existing_ids)
    if kind in (SpecKind.EXISTING, SpecKind.PATH, SpecKind.GIT):
        return None
    return (
        f"Unknown workspace '{spec}'. Use 'dp --ls' to list workspaces, "
        "or specify owner/repo or ./path"
    )

def expand_workspace_spec(spec: str, existing_ids: List[str]) -> str:
    kind = classify_spec(spec, existing_ids)
    if kind == SpecKind.PATH:
        return spec
    if kind == SpecKind.GIT:
        if "://" in spec or spec.startswith("github.com/") or spec.startswith("gitlab.com/"):
            return spec
        if OWNER_REPO_PATTERN.match(spec):
            return f"github.com/{spec}"
    # EXISTING or OTHER: devpod expects the raw id/name
    return spec
```

You’d pass `existing_ids` you’re already computing in `main()` into `expand_workspace_spec`:

```python
workspace = expand_workspace_spec(raw_spec, existing_ids)
```

These small extra abstractions should keep all current behavior but make `main()` and the workspace handling easier to scan and extend.
</issue_to_address>

### Comment 7
<location> `rockerc/dp.py:94-99` </location>
<code_context>
        subprocess.Popen(
            [sys.executable, "-m", "rockerc.dp", "--update-cache"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            start_new_session=True,
        )
</code_context>

<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 8
<location> `rockerc/dp.py:295` </location>
<code_context>
        return subprocess.run(cmd, capture_output=True, text=True, check=False)
</code_context>

<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 9
<location> `rockerc/dp.py:296` </location>
<code_context>
    return subprocess.run(cmd, check=False)
</code_context>

<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +17 to +26
# Cache file location
local cache_file="$HOME/.cache/dp/completions.json"

# Read from cache (fast path)
local workspaces=""
local known_repos=""
local owners=""

if [[ -f "$cache_file" ]]; then
workspaces=$(jq -r '.workspaces[]?' "$cache_file" 2>/dev/null | tr '\n' ' ')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Completion relies entirely on a pre-existing cache file and jq, with no runtime fallback.

If ~/.cache/dp/completions.json is missing or jq isn’t installed, completions degrade to almost nothing with no clear signal to the user. Since the CLI already has --completion-data and --update-cache, consider a fallback: e.g., populate the cache once per shell session via dp --completion-data when the file is absent, and explicitly short‑circuit when jq is unavailable. This would make first‑time use and jq‑less environments behave more predictably.

Comment thread test/test_dp.py
Comment on lines +166 to +175
class TestOwnerRepoPattern:
"""Tests for the OWNER_REPO_PATTERN regex."""

def test_matches_simple(self):
"""Test simple owner/repo matches."""
assert OWNER_REPO_PATTERN.match("owner/repo")

def test_matches_with_dashes(self):
"""Test owner/repo with dashes matches."""
assert OWNER_REPO_PATTERN.match("loft-sh/devpod")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a few negative regex cases to lock down OWNER_REPO_PATTERN edge behavior

You’ve got good positive coverage and a few negative examples already. Since this regex is central to deciding when to expand/validate workspace specs, consider adding explicit negative cases to document constraints, such as:

  • Trailing slashes or missing repo (e.g. owner/, /owner/repo).
  • Illegal characters or spaces in owner/repo.
  • URL-shaped inputs (e.g. github.com/owner/repo) to show these are handled elsewhere.

These will clarify what is intentionally rejected and help prevent future unintended changes to the pattern’s scope.

Comment thread rockerc/dp.py
print(help_text)


def main() -> int:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting shared helpers for workspace resolution, cache handling, and spec classification to remove repeated logic and make the CLI flow easier to follow.

You can reduce complexity in a few focused spots without changing behavior.

1) Centralize the --stop/--rm/--status/--code/--recreate/--reset workspace resolution

The repeated pattern:

  • check len(args) < 2
  • maybe call fuzzy_select_workspace()
  • error with almost identical message
  • call a workspace handler

can be hidden behind a tiny resolve_workspace helper and a simple dispatcher.

def resolve_workspace(args: List[str], usage: str) -> Optional[str]:
    if len(args) >= 2:
        return args[1]
    workspace = fuzzy_select_workspace()
    if not workspace:
        logging.error(usage)
        return None
    return workspace

Then in main() you can replace several branches, e.g.:

if args[0] == "--stop":
    workspace = resolve_workspace(args, "Usage: dp --stop <workspace>")
    if not workspace:
        return 1
    return workspace_stop(workspace)

if args[0] == "--rm":
    workspace = resolve_workspace(args, "Usage: dp --rm <workspace>")
    if not workspace:
        return 1
    return workspace_delete(workspace)

Or go one step further with a small table-driven dispatch to avoid repeating the verb mapping:

WORKSPACE_FLAG_COMMANDS = {
    "--stop":  ("Usage: dp --stop <workspace>",  workspace_stop),
    "--rm":    ("Usage: dp --rm <workspace>",    workspace_delete),
    "--status":("Usage: dp --status <workspace>",workspace_status),
}

if args[0] in WORKSPACE_FLAG_COMMANDS:
    usage, handler = WORKSPACE_FLAG_COMMANDS[args[0]]
    workspace = resolve_workspace(args, usage)
    if not workspace:
        return 1
    return handler(workspace)

You can still keep the more complex --code/--recreate/--reset flows as separate branches.

2) Remove duplication in cache access (--repos vs --completion-data)

Both --repos and --completion-data implement “read cache or build” logic. A small helper keeps this in one place:

def get_or_build_completion_cache() -> Dict[str, Any]:
    cache = read_completion_cache()
    if cache is not None:
        return cache
    return update_completion_cache()

Then:

if args[0] == "--repos":
    data = get_or_build_completion_cache()
    for repo in data.get("repos", []):
        print(repo)
    return 0

if args[0] == "--completion-data":
    data = get_or_build_completion_cache()
    print(json.dumps(data))
    return 0

This keeps cache behavior consistent and makes future changes to caching (e.g. schema changes, invalidation) localized.

3) Generic helper for simple workspace verbs

workspace_stop, workspace_delete, and workspace_status are very similar wrappers around run_devpod. A generic helper can reduce noise while keeping the more complex ones (workspace_up, workspace_ssh) explicit:

def run_workspace_command(subcommand: str, workspace: str) -> int:
    result = run_devpod([subcommand, workspace])
    return result.returncode

def workspace_stop(workspace: str) -> int:
    return run_workspace_command("stop", workspace)

def workspace_delete(workspace: str) -> int:
    return run_workspace_command("delete", workspace)

def workspace_status(workspace: str) -> int:
    return run_workspace_command("status", workspace)

This keeps existing call sites unchanged while making the intent and pattern clearer.

4) Unify workspace spec classification

validate_workspace_spec and expand_workspace_spec both partially re-derive whether a spec is an existing id, path, git spec, etc. A tiny classifier reduces this conceptual duplication:

from enum import Enum, auto

class SpecKind(Enum):
    EXISTING = auto()
    PATH = auto()
    GIT = auto()
    OTHER = auto()

def classify_spec(spec: str, existing_ids: List[str]) -> SpecKind:
    if spec in existing_ids:
        return SpecKind.EXISTING
    if is_path_spec(spec):
        return SpecKind.PATH
    if is_git_spec(spec):
        return SpecKind.GIT
    return SpecKind.OTHER

Then:

def validate_workspace_spec(spec: str, existing_ids: List[str]) -> Optional[str]:
    kind = classify_spec(spec, existing_ids)
    if kind in (SpecKind.EXISTING, SpecKind.PATH, SpecKind.GIT):
        return None
    return (
        f"Unknown workspace '{spec}'. Use 'dp --ls' to list workspaces, "
        "or specify owner/repo or ./path"
    )

def expand_workspace_spec(spec: str, existing_ids: List[str]) -> str:
    kind = classify_spec(spec, existing_ids)
    if kind == SpecKind.PATH:
        return spec
    if kind == SpecKind.GIT:
        if "://" in spec or spec.startswith("github.com/") or spec.startswith("gitlab.com/"):
            return spec
        if OWNER_REPO_PATTERN.match(spec):
            return f"github.com/{spec}"
    # EXISTING or OTHER: devpod expects the raw id/name
    return spec

You’d pass existing_ids you’re already computing in main() into expand_workspace_spec:

workspace = expand_workspace_spec(raw_spec, existing_ids)

These small extra abstractions should keep all current behavior but make main() and the workspace handling easier to scan and extend.

- Add pylint disable comment for consider-using-with on subprocess.Popen
  (intentional fire-and-forget background process)
- Add too-many-return-statements to pylint disable list (CLI dispatch
  functions naturally have many return paths)
- Remove unused mock_remote parameter from test_get_known_repos

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New security issues found

Comment thread rockerc/dp.py
cmd = ["devpod"] + args
logging.debug("Running: %s", " ".join(cmd))
if capture:
return subprocess.run(cmd, capture_output=True, text=True, check=False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

Comment thread rockerc/dp.py
logging.debug("Running: %s", " ".join(cmd))
if capture:
return subprocess.run(cmd, capture_output=True, text=True, check=False)
return subprocess.run(cmd, check=False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

@codecov
Copy link
Copy Markdown

codecov Bot commented Jan 14, 2026

Codecov Report

❌ Patch coverage is 35.92814% with 214 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.04%. Comparing base (6ce2101) to head (c5c7abb).
⚠️ Report is 10 commits behind head on main.

Files with missing lines Patch % Lines
rockerc/dp.py 35.92% 214 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #133      +/-   ##
==========================================
- Coverage   69.05%   63.04%   -6.02%     
==========================================
  Files           8        9       +1     
  Lines        1506     1840     +334     
==========================================
+ Hits         1040     1160     +120     
- Misses        466      680     +214     
Files with missing lines Coverage Δ
rockerc/completion.py 89.39% <ø> (ø)
rockerc/dp.py 35.92% <35.92%> (ø)

Impacted file tree graph

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@blooop blooop merged commit f7d4fd3 into main Jan 14, 2026
7 of 9 checks passed
@blooop blooop deleted the feature/dp-devpod-cli branch January 14, 2026 22:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant