Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 66 additions & 14 deletions cmping.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
from deltachat_rpc_client import DeltaChat, EventType, Rpc
from xdg_base_dirs import xdg_cache_home


class CMPingError(Exception):
"""Raised when cmping encounters a non-recoverable error during probing."""
pass


# Spinner characters for progress display
SPINNER_CHARS = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]

Expand Down Expand Up @@ -192,7 +198,11 @@ def main():
if not args.relay2:
args.relay2 = args.relay1

pinger = perform_ping(args)
try:
pinger = perform_ping(args)
except CMPingError as e:
print(f"\r✗ {e}")
raise SystemExit(1)
expected_total = pinger.sent * args.numrecipients
raise SystemExit(0 if pinger.received == expected_total else 1)

Expand Down Expand Up @@ -298,8 +308,7 @@ def setup_accounts(args, sender_maker, receiver_maker):
profiles_created += 1
print_progress("Setting up profiles", profiles_created, total_profiles, profiles_created)
except Exception as e:
print(f"\r✗ Failed to setup sender profile on {args.relay1}: {e}")
sys.exit(1)
raise CMPingError(f"Failed to setup sender profile on {args.relay1}: {e}") from e

# Create receiver accounts
receivers = []
Expand All @@ -310,8 +319,9 @@ def setup_accounts(args, sender_maker, receiver_maker):
profiles_created += 1
print_progress("Setting up profiles", profiles_created, total_profiles, profiles_created)
except Exception as e:
print(f"\r✗ Failed to setup receiver profile {i+1} on {args.relay2}: {e}")
sys.exit(1)
raise CMPingError(
f"Failed to setup receiver profile {i+1} on {args.relay2}: {e}"
) from e

# Profile setup complete
print_progress("Setting up profiles", done=True)
Expand Down Expand Up @@ -531,8 +541,9 @@ def wait_online_thread():
wait_thread.join()

if online_error:
print(f"\n✗ Timeout or error waiting for profiles to be online: {online_error}")
sys.exit(1)
raise CMPingError(
f"Timeout or error waiting for profiles to be online: {online_error}"
) from online_error

print_progress("Waiting for profiles to be online", done=True)

Expand Down Expand Up @@ -572,24 +583,41 @@ def wait_online_thread(maker):
t.join()

if online_errors:
print(f"\n✗ Timeout or error waiting for profiles to be online: {online_errors[0]}")
sys.exit(1)
raise CMPingError(
f"Timeout or error waiting for profiles to be online: {online_errors[0]}"
) from online_errors[0]

print_progress("Waiting for profiles to be online", done=True)


def perform_ping(args):
def perform_ping(args, accounts_dir=None, timeout=None):
"""Main ping execution function with timing measurements.

Args:
args: Namespace with relay1, relay2, count, interval, verbose,
numrecipients, reset attributes.
accounts_dir: Optional base directory for account storage.
Defaults to $XDG_CACHE_HOME/cmping. Override this to isolate
concurrent probes (each needs its own DB to avoid locking).

Timing Phases:
1. account_setup_time: Time to create and configure all accounts
2. group_join_time: Time for all receivers to join the group
3. message_time: Time to send and receive all ping messages

Returns:
Pinger: The pinger object with results
Also has account_setup_time, group_join_time, message_time (float, seconds)
and results list of (seq, ms_duration, receiver_idx) tuples.

Raises:
CMPingError: On account setup or connectivity failures.
"""
base_accounts_dir = xdg_cache_home().joinpath("cmping")
if accounts_dir is not None:
from pathlib import Path
base_accounts_dir = Path(accounts_dir)
else:
base_accounts_dir = xdg_cache_home().joinpath("cmping")

# Determine unique relays being tested. Using a set to deduplicate when
# relay1 == relay2 (same relay testing), so we only create one RPC context.
Expand Down Expand Up @@ -660,6 +688,8 @@ def perform_ping(args):
message_start = time.time()

pinger = Pinger(args, sender, group, receivers)
if timeout is not None:
pinger.deadline = time.time() + timeout
received = {}
# Track current sequence for output formatting
current_seq = None
Expand All @@ -670,6 +700,7 @@ def perform_ping(args):
if seq not in received:
received[seq] = []
received[seq].append(ms_duration)
pinger.results.append((seq, ms_duration, receiver_idx))

# Track timing for this sequence
if seq not in seq_tracking:
Expand Down Expand Up @@ -753,6 +784,11 @@ def perform_ping(args):
recv_rate = pinger.received / message_time
print(f"recv rate: {recv_rate:.2f} msg/s")

# Store timing data on pinger
pinger.account_setup_time = account_setup_time
pinger.group_join_time = group_join_time
pinger.message_time = message_time

return pinger
finally:
# Clean up all RPC contexts
Expand Down Expand Up @@ -803,9 +839,17 @@ def __init__(self, args, sender, group, receivers):
)
ALPHANUMERIC = string.ascii_lowercase + string.digits
self.tx = "".join(random.choices(ALPHANUMERIC, k=30))
t = threading.Thread(target=self.send_pings, daemon=True)
self.sent = 0
self.received = 0
self.results = [] # list of (seq, ms_duration, receiver_idx)
self.account_setup_time = 0.0
self.group_join_time = 0.0
self.message_time = 0.0
# Optional wall-clock deadline for the messaging phase. When set,
# send_pings() stops sending and receive() stops waiting at this time.
# Set externally (e.g. by perform_ping) after setup phases complete.
self.deadline = None
t = threading.Thread(target=self.send_pings, daemon=True)
t.start()

@property
Expand All @@ -818,15 +862,21 @@ def send_pings(self):

Each message contains: unique_id timestamp sequence_number
Flow: Sender -> SMTP relay1 -> IMAP relay2 -> All receivers

Respects self.deadline: stops sending early when the wall clock passes
the deadline so we don't fire pings we'll never wait for.
"""
for seq in range(self.args.count):
if self.deadline is not None and time.time() >= self.deadline:
break
text = f"{self.tx} {time.time():.4f} {seq:17}"
self.group.send_text(text)
self.sent += 1
time.sleep(self.args.interval)
# we sent all pings, let's wait a bit, then force quit if main didn't finish
time.sleep(60)
os.kill(os.getpid(), signal.SIGINT)
if self.deadline is None:
time.sleep(60)
os.kill(os.getpid(), signal.SIGINT)

def receive(self):
"""Receive ping messages from all receivers.
Expand Down Expand Up @@ -867,6 +917,8 @@ def receiver_thread(receiver_idx, receiver):
threads.append(t)

while num_pending > 0:
if self.deadline is not None and time.time() >= self.deadline:
break
try:
receiver_idx, receiver, event = event_queue.get(timeout=1.0)
if event is None:
Expand Down