Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
FROM python:3-slim-bullseye

RUN apt-get update && apt-get install -y htop iotop iftop net-tools sysstat procps coreutils grep sed gawk curl wget && rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y --no-install-recommends iputils-ping traceroute mtr-tiny curl dnsutils iproute2 htop iotop iftop net-tools sysstat procps coreutils grep sed gawk wget && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY cli/requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir pytest

COPY cli /app/cli

ENTRYPOINT ["python", "cli/app.py"]

CMD [""]
119 changes: 103 additions & 16 deletions cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
help="Surge - A DevOps CLI Tool For System Monitoring and Production Reliability"
)


def run_cmd(cmd: str) -> str:
"""
Helper function to abstract lengthy subprocess command implementation :D
Expand Down Expand Up @@ -141,26 +140,114 @@ def monitor(
f"Size: {size} | Used: {used} | Available: {available} | Usage: {percent}"
)


@app.command()
@app.command("network")
def network(
url: Annotated[str, typer.Argument(help="URL to test network/API metrics")],
requests: Annotated[
int, typer.Option("-n", "--count", help="Number of requests to send")
] = 5,
url: Annotated[str | None, typer.Option("-u", "--url", help="HTTP URL to test (curl)", show_default=False)] = None,
host: Annotated[str | None, typer.Option("-h", "--host", help="Host/IP for ping and traceroute", show_default=False)] = None,
domain: Annotated[str | None, typer.Option("-d", "--domain", help="Domain for DNS lookup", show_default=False)] = None,
requests: Annotated[int, typer.Option("-n", "--count", help="Number of ICMP echo requests")] = 5,
dtype: Annotated[str, typer.Option("-t", "--type", help="DNS record type (A, AAAA, MX, TXT, etc.)")] = "A",
sockets: Annotated[bool, typer.Option("--sockets", help="Show socket information (ss)")] = False,
no_trace: Annotated[bool, typer.Option("--no-trace", help="Skip traceroute/mtr when --host is set")] = False,
):
"""
Run basic network/API tests with a number of requests.
Flexible network diagnostics: run only what you request.
Provide one or more of: --host, --url, --domain, --sockets.
"""

if len(url.strip()) == 0:
raise typer.BadParameter("URL must be provided")
if requests <= 0:
raise typer.BadParameter("Request count must be a positive integer")

print(f"Testing network connection to {url} with {requests} requests.")
# TODO: Add http requests or use curl through subprocess

def header(title: str):
print(f"\n[bold]{title}[/bold]")
print("-" * len(title))

def warn(msg: str):
print(f"[warn] {msg}")

def normalize_url(u: str) -> str:
return u if u.startswith(("http://", "https://")) else f"http://{u}"

def curl_brief(u: str) -> str:
fmt = "HTTP %{http_code} | total %{time_total}s | connect %{time_connect}s | ttfb %{time_starttransfer}s\n"
return run_cmd(f'curl -s -o /dev/null -w "{fmt}" {u}')

def summarize_ping(out: str) -> str:
lines = out.splitlines()
sent = loss = avg = None
for ln in lines:
if "packets transmitted" in ln and "packet loss" in ln:
parts = ln.replace(",", "").split()
try:
sent = int(parts[0])
loss = parts[6]
except Exception:
pass
if "rtt min/avg/max" in ln or "round-trip min/avg/max" in ln:
try:
avg = ln.split("=")[1].split("/")[1].strip()
except Exception:
pass
bits = []

if sent is not None:
bits.append(f"sent={sent}")
if loss is not None:
bits.append(f"loss={loss}")
if avg is not None:
bits.append(f"avg_rtt_ms={avg}")

return " | ".join(bits) if bits else (out.strip()[:200] if out else "")

def summarize_trace(out: str, max_lines: int = 12) -> str:
lines = [line for line in out.splitlines() if line.strip()]
if len(lines) <= max_lines:
return out
return "\n".join(lines[:6] + ["..."] + lines[-6:])

# --- validate empty values FIRST ---
if host is not None and not str(host).strip():
warn("--host was provided but empty")
raise typer.Exit(code=2)
if url is not None and not str(url).strip():
warn("--url was provided but empty")
raise typer.Exit(code=2)
if domain is not None and not str(domain).strip():
warn("--domain was provided but empty")
raise typer.Exit(code=2)

# --- then require at least one section ---
if not any([host, url, domain, sockets]):
warn("Nothing to do. Provide at least one of: --host, --url, --domain, --sockets")
raise typer.Exit(code=1)

# ---- ping / traceroute (or mtr -r fallback) ----
if host:
header("Ping")
ping_out = run_cmd(f"ping -c {requests} {host}")
print(summarize_ping(ping_out) if ping_out else "[warn] ping not available or produced no output")

if not no_trace:
header("Traceroute")
trace_out = run_cmd(f"traceroute {host}") or run_cmd(f"mtr -r {host}")
print(summarize_trace(trace_out) if trace_out else "[warn] traceroute/mtr not available or produced no output")

# ---- http (curl) ----
if url:
header("HTTP (curl)")
u = normalize_url(url)
print(curl_brief(u))
headers = run_cmd(f"curl -s -I {u}")
print(headers.strip() if headers else "[warn] curl not available or produced no output")

# ---- dns ----
if domain:
header("DNS")
dns_out = run_cmd(f"dig +short {domain} {dtype}") or run_cmd(f"nslookup -type={dtype} {domain}")
print(dns_out.strip() if dns_out else "[warn] dig/nslookup not available or produced no output")

# ---- sockets (ss) ----
if sockets:
header("Sockets (ss)")
ss_out = run_cmd("ss -tulwn")
print(ss_out or "[warn] ss not available or produced no output")

if __name__ == "__main__":
app()
32 changes: 0 additions & 32 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,35 +144,3 @@ def test_monitor_empty_flag(self, runner):
assert result.exit_code != 0
assert isinstance(result.exception, SystemExit)
assert result.exit_code == 2

def test_network_command_prints_message(self, runner):
result = runner.invoke(
app_mod.app, ["network", "https://example.com", "--count", "2"]
)
assert result.exit_code == 0
assert (
"Testing network connection to https://example.com with 2 requests."
in result.stdout
)

def test_network_command_with_defaults(self, runner):
result = runner.invoke(app_mod.app, ["network", "https://example.com"])
assert result.exit_code == 0
assert (
"Testing network connection to https://example.com with 5 requests."
in result.stdout
)

def test_network_missing_url_errors(self, runner):
result = runner.invoke(app_mod.app, ["network"])
assert result.exit_code != 0
assert isinstance(result.exception, SystemExit)
assert result.exit_code == 2

def test_network_invalid_count_errors(self, runner):
result = runner.invoke(
app_mod.app, ["network", "https://example.com", "--count", "-3"]
)
assert result.exit_code != 0
assert isinstance(result.exception, SystemExit)
assert result.exit_code == 2
72 changes: 72 additions & 0 deletions tests/test_network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import click
from typer.testing import CliRunner

import cli.app as appmod
runner = CliRunner()

def run_cmd_spy_factory():
calls = []
def _spy(cmd: str):
calls.append(cmd)
if cmd.startswith("ping "):
return "5 packets transmitted, 5 received, 0% packet loss\nrtt min/avg/max/mdev = 10/11/12/0.3 ms"
if cmd.startswith("traceroute "):
return "traceroute to host\n1 a\n2 b\n3 c"
if cmd.startswith("mtr -r "):
return "Start: mtr report\n1. a\n2. b"
if cmd.startswith("curl -s -o /dev/null"):
return "HTTP 200 | total 0.123s | connect 0.010s | ttfb 0.050s\n"
if cmd.startswith("curl -s -I "):
return "HTTP/1.1 200 OK\nServer: test\n"
if cmd.startswith("dig +short "):
return "93.184.216.34\n"
if cmd.startswith("nslookup "):
return "Server: 8.8.8.8\nName: example.com\nAddress: 93.184.216.34\n"
if cmd.startswith("ss -tulwn"):
return "Netid State Local Address:Port Peer Address:Port\n"
return ""
_spy.calls = calls
return _spy

def test_network_runs_all_sections(monkeypatch, capsys):
spy = run_cmd_spy_factory()
monkeypatch.setattr(appmod, "run_cmd", spy)
appmod.network(url="http://example.com", host="1.1.1.1", domain="example.com", sockets=True)
out = capsys.readouterr().out
assert "Ping" in out
assert "Traceroute" in out
assert "HTTP (curl)" in out
assert "DNS" in out
assert "Sockets (ss)" in out
assert "HTTP 200 | total" in out

def test_empty_flags_fail_fast(monkeypatch):
spy = run_cmd_spy_factory()
monkeypatch.setattr(appmod, "run_cmd", spy)
# url vacío debe fallar con exit_code 2 (click.exceptions.Exit)
try:
appmod.network(url="", host=None, domain=None, sockets=False)
assert False, "Expected click.exceptions.Exit"
except click.exceptions.Exit as e:
assert e.exit_code == 2

def test_traceroute_then_mtr_fallback(monkeypatch, capsys):
def run_cmd_fake(cmd: str):
if cmd.startswith("ping "):
return "5 packets transmitted, 5 received, 0% packet loss\nrtt min/avg/max/mdev = 10/11/12/0.3 ms"
if cmd.startswith("traceroute "):
return "" # force fallback
if cmd.startswith("mtr -r "):
return "Start: mtr report\n1. a\n2. b"
return ""
monkeypatch.setattr(appmod, "run_cmd", run_cmd_fake)
appmod.network(host="1.1.1.1")
out = capsys.readouterr().out
assert "mtr report" in out

def test_cli_invocation_smoke(monkeypatch):
spy = run_cmd_spy_factory()
monkeypatch.setattr(appmod, "run_cmd", spy)
result = runner.invoke(appmod.app, ["network", "-u", "http://example.com", "-h", "1.1.1.1", "-d", "example.com", "--sockets"])
assert result.exit_code == 0
assert "HTTP (curl)" in result.stdout