diff --git a/cmping.py b/cmping.py index a7e6a76..fb977a7 100644 --- a/cmping.py +++ b/cmping.py @@ -39,6 +39,12 @@ from deltachat_rpc_client import DeltaChat, EventType, Rpc from xdg_base_dirs import xdg_cache_home + +class CMPingError(Exception): + """Raised when cmping encounters a non-recoverable error during probing.""" + pass + + # Spinner characters for progress display SPINNER_CHARS = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] @@ -192,7 +198,11 @@ def main(): if not args.relay2: args.relay2 = args.relay1 - pinger = perform_ping(args) + try: + pinger = perform_ping(args) + except CMPingError as e: + print(f"\r✗ {e}") + raise SystemExit(1) expected_total = pinger.sent * args.numrecipients raise SystemExit(0 if pinger.received == expected_total else 1) @@ -298,8 +308,7 @@ def setup_accounts(args, sender_maker, receiver_maker): profiles_created += 1 print_progress("Setting up profiles", profiles_created, total_profiles, profiles_created) except Exception as e: - print(f"\r✗ Failed to setup sender profile on {args.relay1}: {e}") - sys.exit(1) + raise CMPingError(f"Failed to setup sender profile on {args.relay1}: {e}") from e # Create receiver accounts receivers = [] @@ -310,8 +319,9 @@ def setup_accounts(args, sender_maker, receiver_maker): profiles_created += 1 print_progress("Setting up profiles", profiles_created, total_profiles, profiles_created) except Exception as e: - print(f"\r✗ Failed to setup receiver profile {i+1} on {args.relay2}: {e}") - sys.exit(1) + raise CMPingError( + f"Failed to setup receiver profile {i+1} on {args.relay2}: {e}" + ) from e # Profile setup complete print_progress("Setting up profiles", done=True) @@ -531,8 +541,9 @@ def wait_online_thread(): wait_thread.join() if online_error: - print(f"\n✗ Timeout or error waiting for profiles to be online: {online_error}") - sys.exit(1) + raise CMPingError( + f"Timeout or error waiting for profiles to be online: {online_error}" + ) from online_error print_progress("Waiting for profiles to be online", done=True) @@ -572,15 +583,23 @@ def wait_online_thread(maker): t.join() if online_errors: - print(f"\n✗ Timeout or error waiting for profiles to be online: {online_errors[0]}") - sys.exit(1) + raise CMPingError( + f"Timeout or error waiting for profiles to be online: {online_errors[0]}" + ) from online_errors[0] print_progress("Waiting for profiles to be online", done=True) -def perform_ping(args): +def perform_ping(args, accounts_dir=None, timeout=None): """Main ping execution function with timing measurements. + Args: + args: Namespace with relay1, relay2, count, interval, verbose, + numrecipients, reset attributes. + accounts_dir: Optional base directory for account storage. + Defaults to $XDG_CACHE_HOME/cmping. Override this to isolate + concurrent probes (each needs its own DB to avoid locking). + Timing Phases: 1. account_setup_time: Time to create and configure all accounts 2. group_join_time: Time for all receivers to join the group @@ -588,8 +607,17 @@ def perform_ping(args): Returns: Pinger: The pinger object with results + Also has account_setup_time, group_join_time, message_time (float, seconds) + and results list of (seq, ms_duration, receiver_idx) tuples. + + Raises: + CMPingError: On account setup or connectivity failures. """ - base_accounts_dir = xdg_cache_home().joinpath("cmping") + if accounts_dir is not None: + from pathlib import Path + base_accounts_dir = Path(accounts_dir) + else: + base_accounts_dir = xdg_cache_home().joinpath("cmping") # Determine unique relays being tested. Using a set to deduplicate when # relay1 == relay2 (same relay testing), so we only create one RPC context. @@ -660,6 +688,8 @@ def perform_ping(args): message_start = time.time() pinger = Pinger(args, sender, group, receivers) + if timeout is not None: + pinger.deadline = time.time() + timeout received = {} # Track current sequence for output formatting current_seq = None @@ -670,6 +700,7 @@ def perform_ping(args): if seq not in received: received[seq] = [] received[seq].append(ms_duration) + pinger.results.append((seq, ms_duration, receiver_idx)) # Track timing for this sequence if seq not in seq_tracking: @@ -753,6 +784,11 @@ def perform_ping(args): recv_rate = pinger.received / message_time print(f"recv rate: {recv_rate:.2f} msg/s") + # Store timing data on pinger + pinger.account_setup_time = account_setup_time + pinger.group_join_time = group_join_time + pinger.message_time = message_time + return pinger finally: # Clean up all RPC contexts @@ -803,9 +839,17 @@ def __init__(self, args, sender, group, receivers): ) ALPHANUMERIC = string.ascii_lowercase + string.digits self.tx = "".join(random.choices(ALPHANUMERIC, k=30)) - t = threading.Thread(target=self.send_pings, daemon=True) self.sent = 0 self.received = 0 + self.results = [] # list of (seq, ms_duration, receiver_idx) + self.account_setup_time = 0.0 + self.group_join_time = 0.0 + self.message_time = 0.0 + # Optional wall-clock deadline for the messaging phase. When set, + # send_pings() stops sending and receive() stops waiting at this time. + # Set externally (e.g. by perform_ping) after setup phases complete. + self.deadline = None + t = threading.Thread(target=self.send_pings, daemon=True) t.start() @property @@ -818,15 +862,21 @@ def send_pings(self): Each message contains: unique_id timestamp sequence_number Flow: Sender -> SMTP relay1 -> IMAP relay2 -> All receivers + + Respects self.deadline: stops sending early when the wall clock passes + the deadline so we don't fire pings we'll never wait for. """ for seq in range(self.args.count): + if self.deadline is not None and time.time() >= self.deadline: + break text = f"{self.tx} {time.time():.4f} {seq:17}" self.group.send_text(text) self.sent += 1 time.sleep(self.args.interval) # we sent all pings, let's wait a bit, then force quit if main didn't finish - time.sleep(60) - os.kill(os.getpid(), signal.SIGINT) + if self.deadline is None: + time.sleep(60) + os.kill(os.getpid(), signal.SIGINT) def receive(self): """Receive ping messages from all receivers. @@ -867,6 +917,8 @@ def receiver_thread(receiver_idx, receiver): threads.append(t) while num_pending > 0: + if self.deadline is not None and time.time() >= self.deadline: + break try: receiver_idx, receiver, event = event_queue.get(timeout=1.0) if event is None: