diff --git a/barcodeforge/auspice_tree_to_table.py b/barcodeforge/auspice_tree_to_table.py index 9336074..5e94966 100644 --- a/barcodeforge/auspice_tree_to_table.py +++ b/barcodeforge/auspice_tree_to_table.py @@ -4,10 +4,9 @@ import pandas as pd import Bio.Phylo from augur.utils import annotate_parents_for_tree -from rich.console import Console -import click # For click.Abort +import click -from .utils import STYLES # For consistent console messages +from .utils import print_error, print_warning, print_success def json_to_tree(json_dict, root=True, parent_cumulative_branch_length=None): @@ -128,7 +127,6 @@ def process_auspice_json( output_tree_path: str | None, include_internal_nodes: bool, attributes: list[str] | None, - console: Console, ): """ Converts an Auspice JSON tree to other formats (Newick, metadata table). @@ -138,14 +136,10 @@ def process_auspice_json( with open(tree_json_path, "r", encoding="utf-8") as fh: tree_json_data = json.load(fh) except FileNotFoundError: - console.print( - f"[{STYLES['error']}]Error: Tree JSON file not found at '{tree_json_path}'[/{STYLES['error']}]" - ) + print_error(f"Tree JSON file not found at '{tree_json_path}'") raise click.Abort() except json.JSONDecodeError: - console.print( - f"[{STYLES['error']}]Error: Could not decode JSON from '{tree_json_path}'[/{STYLES['error']}]" - ) + print_error(f"Could not decode JSON from '{tree_json_path}'") raise click.Abort() tree = json_to_tree(tree_json_data) @@ -158,13 +152,9 @@ def process_auspice_json( output_tree_path, "newick", ) - console.print( - f"[{STYLES['success']}]Tree successfully written to '{output_tree_path}'[/{STYLES['success']}]" - ) + print_success(f"Tree successfully written to '{output_tree_path}'") except Exception as e: - console.print( - f"[{STYLES['error']}]Error writing Newick tree to '{output_tree_path}': {e}[/{STYLES['error']}]" - ) + print_error(f"Error writing Newick tree to '{output_tree_path}': {e}") raise click.Abort() if output_metadata_path: @@ -181,10 +171,10 @@ def process_auspice_json( attrs_set.update(getattr(tree.root, "branch_attrs", {}).keys()) if not attrs_set: - console.print( - f"[{STYLES['warning']}]Warning: Could not auto-detect any attributes from the tree root. " + print_warning( + "Could not auto-detect any attributes from the tree root. " "The metadata output might be sparse or only contain 'name'. " - f"Consider using --attributes to specify columns.[/{STYLES['warning']}]" + "Consider using --attributes to specify columns." ) attributes_to_export = sorted(list(attrs_set)) @@ -254,18 +244,14 @@ def process_auspice_json( header=True, index=False, ) - console.print( - f"[{STYLES['success']}]Metadata successfully written to '{output_metadata_path}'[/{STYLES['success']}]" - ) + print_success(f"Metadata successfully written to '{output_metadata_path}'") except Exception as e: - console.print( - f"[{STYLES['error']}]Error writing metadata to '{output_metadata_path}': {e}[/{STYLES['error']}]" - ) + print_error(f"Error writing metadata to '{output_metadata_path}': {e}") raise click.Abort() if not output_tree_path and not output_metadata_path: # This case should be handled by the CLI command to require at least one output. # However, a warning here is fine if called directly. - console.print( - f"[{STYLES['warning']}]No output requested. Use --output-tree and/or --output-metadata.[/{STYLES['warning']}]" + print_warning( + "No output requested. Use --output-tree and/or --output-metadata." ) diff --git a/barcodeforge/cli.py b/barcodeforge/cli.py index bbe542a..81e60d3 100644 --- a/barcodeforge/cli.py +++ b/barcodeforge/cli.py @@ -1,20 +1,20 @@ import os import rich_click as click -from rich.console import Console from .format_tree import convert_nexus_to_newick from .utils import ( resolve_tree_format, run_subprocess_command, - STYLES, -) # Import new utils + print_error, + print_success, + print_info, + print_debug, +) from .ref_muts import process_and_reroot_lineages from .generate_barcodes import create_barcodes_from_lineage_paths from .plot_barcode import create_barcode_plot from .auspice_tree_to_table import process_auspice_json from . import __version__ -console = Console() - @click.group() @click.version_option(__version__, message="%(prog)s version %(version)s") @@ -25,7 +25,7 @@ def cli(ctx, debug): ctx.ensure_object(dict) ctx.obj["DEBUG"] = debug if debug: - console.print(f"[{STYLES['debug']}]Debug mode is ON[/{STYLES['debug']}]") + print_debug("Debug mode is ON") @cli.command() @@ -97,19 +97,13 @@ def barcode( """Process barcode data, including VCF generation, tree formatting, USHER placement, matUtils annotation, and matUtils extraction.""" is_debug = ctx.obj.get("DEBUG", False) # More robust way to get debug status - console.print( - f"[bold {STYLES['info']}]Reference Genome:[/bold {STYLES['info']}]\t{reference_genome}" - ) - console.print( - f"[bold {STYLES['info']}]Alignment:[/bold {STYLES['info']}]\t\t{alignment}" - ) - console.print(f"[bold {STYLES['info']}]Tree:[/bold {STYLES['info']}]\t\t\t{tree}") - console.print( - f"[bold {STYLES['info']}]Lineages:[/bold {STYLES['info']}]\t\t{lineages}" - ) + print_info(f"Reference Genome:\t{reference_genome}", bold=True) + print_info(f"Alignment:\t\t{alignment}", bold=True) + print_info(f"Tree:\t\t\t{tree}", bold=True) + print_info(f"Lineages:\t\t{lineages}", bold=True) # Use utility function to resolve tree format - resolved_tree_format = resolve_tree_format(tree, tree_format, console, is_debug) + resolved_tree_format = resolve_tree_format(tree, tree_format, is_debug) # Directory for intermediate files intermediate_dir = "barcodeforge_workdir" @@ -120,7 +114,6 @@ def barcode( fatovcf_cmd = ["faToVcf", alignment, fatovcf_output_vcf] run_subprocess_command( fatovcf_cmd, - console, is_debug, success_message=f"Successfully created VCF file: {fatovcf_output_vcf}", error_message_prefix="Error running faToVcf", @@ -131,34 +124,33 @@ def barcode( if resolved_tree_format == "nexus": if is_debug: - console.print( - f"[{STYLES['info']}]Converting Nexus tree ({tree}) to Newick format at {output_converted_tree_path}...[/{STYLES['info']}]" + print_info( + f"Converting Nexus tree ({tree}) to Newick format at {output_converted_tree_path}..." ) convert_nexus_to_newick( input_file=tree, output_file=output_converted_tree_path, input_format="nexus", ) - console.print( - f"[{STYLES['success']}]Converted tree saved to {output_converted_tree_path}[/{STYLES['success']}]" - ) + print_success(f"Converted tree saved to {output_converted_tree_path}") elif resolved_tree_format == "newick": if is_debug: - console.print( - f"[{STYLES['info']}]Processing Newick tree ({tree}) to {output_converted_tree_path}...[/{STYLES['info']}]" + print_info( + f"Processing Newick tree ({tree}) to {output_converted_tree_path}..." ) convert_nexus_to_newick( input_file=tree, output_file=output_converted_tree_path, input_format="newick", ) - console.print( - f"[{STYLES['success']}]Processed tree saved to {output_converted_tree_path} (if conversion/reformatting occurred)[/{STYLES['success']}]" + print_success( + f"Processed tree saved to {output_converted_tree_path} (if conversion/reformatting occurred)" ) else: - raise ValueError( - f"Unsupported tree format: {resolved_tree_format}. Expected 'newick' or 'nexus'." + print_error( + f"Error: Unsupported tree format: {resolved_tree_format}. Expected 'newick' or 'nexus'." ) + raise click.Abort() # Run usher command usher_cmd = ["usher"] @@ -179,7 +171,6 @@ def barcode( run_subprocess_command( usher_cmd, - console, is_debug, success_message=f"Successfully ran USHER. Output protobuf: {usher_output_pb}", error_message_prefix="Error running USHER", @@ -208,9 +199,8 @@ def barcode( run_subprocess_command( matutils_annotate_cmd, - console, is_debug, - success_message=f"Successfully ran matUtils annotate. Output: annotated_tree.pb", + success_message="Successfully ran matUtils annotate. Output: annotated_tree.pb", error_message_prefix="Error running matUtils annotate", ) @@ -236,9 +226,8 @@ def barcode( run_subprocess_command( matutils_extract_cmd, - console, is_debug, - success_message=f"Successfully ran matUtils extract. Outputs: lineagePaths.txt, samplePaths.txt, auspice_tree.json", + success_message="Successfully ran matUtils extract. Outputs: lineagePaths.txt, samplePaths.txt, auspice_tree.json", error_message_prefix="Error running matUtils extract", ) @@ -259,14 +248,10 @@ def barcode( # Determine base name for output files clean_prefix = prefix.strip() if clean_prefix: - console.print( - f"[{STYLES['info']}]Using prefix '{clean_prefix}' for lineage names in barcodes...[/{STYLES['info']}]" - ) + print_info(f"Using prefix '{clean_prefix}' for lineage names in barcodes...") base_name = f"{clean_prefix}-barcode" else: - console.print( - f"[{STYLES['info']}]No prefix provided for lineage names in barcodes.[/{STYLES['info']}]" - ) + print_info("No prefix provided for lineage names in barcodes.") base_name = "barcode" csv_path = f"{base_name}.csv" @@ -285,8 +270,8 @@ def barcode( output_file_path=plot_output_path, ) - console.print( - f"[{STYLES['success']}]Generated barcodes are saved to '{csv_path}' and plot saved to '{plot_output_path}'[/{STYLES['success']}]" + print_success( + f"Generated barcodes are saved to '{csv_path}' and plot saved to '{plot_output_path}'" ) @@ -334,16 +319,13 @@ def extract_auspice_data( Source: https://gist.github.com/huddlej/5d7bd023d3807c698bd18c706974f2db """ is_debug = ctx.obj.get("DEBUG", False) # More robust way to get debug status - console.print( - f"[bold {STYLES['info']}]Processing Auspice JSON file:[/bold {STYLES['info']}]\t{auspice_json_path}" - ) + print_info(f"Processing Auspice JSON file:\t{auspice_json_path}", bold=True) process_auspice_json( tree_json_path=auspice_json_path, output_metadata_path=output_metadata_path, output_tree_path=output_tree_path, include_internal_nodes=include_internal_nodes, attributes=list(attributes) if attributes else None, - console=console, ) diff --git a/barcodeforge/generate_barcodes.py b/barcodeforge/generate_barcodes.py index 65e20f3..f8977ed 100755 --- a/barcodeforge/generate_barcodes.py +++ b/barcodeforge/generate_barcodes.py @@ -3,10 +3,8 @@ # This script is a modified version of the https://github.com/andersen-lab/Freyja/blob/main/freyja/convert_paths2barcodes.py import pandas as pd -from rich.console import Console -from .utils import sortFun, STYLES - -console = Console() +import rich_click as click +from .utils import sortFun, print_error, print_success, print_info, print_debug def parse_tree_paths(df: pd.DataFrame) -> pd.DataFrame: @@ -57,7 +55,6 @@ def convert_to_barcodes(df: pd.DataFrame) -> pd.DataFrame: ) df_barcodes = pd.concat((df_barcodes, cladeSeries), axis=1) - # console.print('separating combined splits') # Original print, can be made conditional with a verbose flag if needed df_barcodes = df_barcodes.T # dropped since no '' column this time. # df_barcodes = df_barcodes.drop(columns='') @@ -118,7 +115,7 @@ def check_no_flip_pairs(barcode_file: str): Args: barcode_file (str): Path to the barcode file to be tested. Raises: - Exception: If flip pairs are found in the barcode file. + click.Abort: If flip pairs are found in the barcode file. """ df_barcodes = pd.read_csv(barcode_file, index_col=0) flipPairs = [ @@ -127,15 +124,44 @@ def check_no_flip_pairs(barcode_file: str): if (d[-1] + d[1 : len(d) - 1] + d[0]) in df_barcodes.columns ] if len(flipPairs) == 0: - console.print( - f"[{STYLES['success']}]PASS: no flip pairs found in the generated barcode file.[/{STYLES['success']}]" - ) + print_success("PASS: no flip pairs found in the generated barcode file.") else: - # This should ideally not happen if the logic is correct, consider logging or raising a more specific error. - console.print( - f"[{STYLES['error']}]FAIL: flip pairs found: {flipPairs}[/{STYLES['error']}]" + print_error(f"FAIL: flip pairs found: {flipPairs}") + raise click.Abort() + + +def check_allele_consistency(df_barcodes: pd.DataFrame): + """ + Extract columns and ensure for every position ([1:-1]) there is only 1 reference allele ([0]). + Additionally check if any lineage has multiple alternative alleles at the same position. + """ + positions = df_barcodes.columns.str[1:-1] + refs = df_barcodes.columns.str[0] + + # Check for multiple reference alleles at the same position + ref_series = pd.Series(refs.values, index=positions) + bad_pos = ref_series.groupby(level=0).nunique() + bad_pos = bad_pos[bad_pos > 1] + if not bad_pos.empty: + pos = bad_pos.index[0] + conflicting = ref_series[pos].unique() + print_error( + f"Position {pos} has multiple reference alleles: {', '.join(repr(a) for a in conflicting)}" + ) + raise click.Abort() + + # Convert to presence/absence before summing per position per lineage, + # so only distinct alleles are counted (not raw mutation counts). + presence = (df_barcodes > 0).astype(int) + pos_sums = presence.T.groupby(positions.values).sum().T + invalid_mask = pos_sums > 1 + if invalid_mask.any(axis=None): + offending_pos = invalid_mask.columns[invalid_mask.any(axis=0)][0] + lineages = pos_sums.index[invalid_mask[offending_pos]].tolist() + print_error( + f"Position {offending_pos} has multiple alternative alleles in lineages: {lineages}" ) - raise Exception(f"FAIL: flip pairs found: {flipPairs}") + raise click.Abort() def identify_chains(df_barcodes: pd.DataFrame) -> list: @@ -200,13 +226,13 @@ def check_mutation_chain(df_barcodes: pd.DataFrame) -> pd.DataFrame: # remove constituent mutations df_barcodes.loc[lin_seq.index, sm[0:2]] -= 1 # drop all unused mutations - # print('before_trim\n',df_barcodes) df_barcodes = df_barcodes.drop( columns=df_barcodes.columns[df_barcodes.sum(axis=0) == 0] ) # in case mutation path leads to a return to the reference. df_barcodes = reversion_checking(df_barcodes) seq_muts = identify_chains(df_barcodes) + check_allele_consistency(df_barcodes) # The barcode should be a binary sparse matrix assert df_barcodes.isin([0, 1]).all(axis=None), "Barcode matrix should be binary" return df_barcodes @@ -237,42 +263,32 @@ def create_barcodes_from_lineage_paths( prefix: Optional prefix to add to lineage names in the barcode file. """ if debug: - console.print( - f"[{STYLES['info']}]Reading lineage paths from: {input_file_path}[/{STYLES['info']}]" - ) + print_info(f"Reading lineage paths from: {input_file_path}") df = pd.read_csv(input_file_path, sep="\t") if debug: - console.print(f"[{STYLES['info']}]Parsing tree paths...[/{STYLES['info']}]") + print_info("Parsing tree paths...") df = parse_tree_paths(df) - console.print(f"[{STYLES['info']}]Converting to barcodes...[/{STYLES['info']}]") + print_info("Converting to barcodes...") df_barcodes = convert_to_barcodes(df) if prefix and prefix.strip() != "": - console.print( - f"[{STYLES['info']}]Adding prefix '{prefix}' to lineage names...[/{STYLES['info']}]" - ) + print_info(f"Adding prefix '{prefix}' to lineage names...") df_barcodes.index = [prefix + "-" + str(i) for i in df_barcodes.index] - console.print( - f"[{STYLES['info']}]Performing reversion checking...[/{STYLES['info']}]" - ) + print_info("Performing reversion checking...") df_barcodes = reversion_checking(df_barcodes) - console.print(f"[{STYLES['info']}]Checking mutation chains...[/{STYLES['info']}]") + print_info("Checking mutation chains...") df_barcodes = check_mutation_chain(df_barcodes) if debug: - console.print( - f"[{STYLES['info']}]Replacing underscores with dashes in lineage names...[/{STYLES['info']}]" - ) + print_info("Replacing underscores with dashes in lineage names...") df_barcodes = replace_underscore_with_dash(df_barcodes) if debug: - console.print( - f"[{STYLES['info']}]Sorting barcode columns...[/{STYLES['info']}]" - ) + print_info("Sorting barcode columns...") df_barcodes = df_barcodes.reindex(sorted(df_barcodes.columns, key=sortFun), axis=1) # Drop unclassified lineage if it exists @@ -280,15 +296,7 @@ def create_barcodes_from_lineage_paths( df_barcodes = df_barcodes.drop(index="unclassified") df_barcodes.to_csv(output_file_path) - console.print( - f"[{STYLES['success']}]Barcode file saved to: {output_file_path}[/{STYLES['success']}]" - ) + print_success(f"Barcode file saved to: {output_file_path}") # Test for flip pairs in the final output file - try: - check_no_flip_pairs(output_file_path) - except Exception as e: - console.print( - f"[{STYLES['error']}]Error during final flip pair test: {e}[/{STYLES['error']}]" - ) - # Depending on desired behavior, you might re-raise the exception or just log it. + check_no_flip_pairs(output_file_path) diff --git a/barcodeforge/plot_barcode.py b/barcodeforge/plot_barcode.py index 0f1b52c..132447a 100755 --- a/barcodeforge/plot_barcode.py +++ b/barcodeforge/plot_barcode.py @@ -1,16 +1,12 @@ """Plot barcode from CSV file.""" import pandas as pd -from rich.console import Console - -from .utils import sortFun, STYLES +from .utils import sortFun, print_info, print_debug import seaborn as sns import matplotlib.pyplot as plt from matplotlib.colors import ListedColormap import math -console = Console() - def create_barcode_visualization( barcode_df_long: pd.DataFrame, chunk_size: int, output_path: str @@ -137,21 +133,13 @@ def create_barcode_plot( output_file_path: Path to save the generated plot. """ if debug: - console.print( - f"[{STYLES['info']}]Reading barcode data from: {input_file_path}[/{STYLES['info']}]" - ) + print_info(f"Reading barcode data from: {input_file_path}") barcode_df = pd.read_csv(input_file_path, header=0, index_col=0) if debug: - console.print( - f"[{STYLES['debug']}]Barcode DataFrame shape: {barcode_df.shape}[/{STYLES['debug']}]" - ) - console.print( - f"[{STYLES['debug']}]Barcode DataFrame columns: {barcode_df.columns.tolist()}[/{STYLES['debug']}]" - ) - console.print( - f"[{STYLES['info']}]Transforming barcode data to long format...[/{STYLES['info']}]" - ) + print_debug(f"Barcode DataFrame shape: {barcode_df.shape}") + print_debug(f"Barcode DataFrame columns: {barcode_df.columns.tolist()}") + print_info("Transforming barcode data to long format...") barcode_df_long = barcode_df.stack().reset_index() barcode_df_long.columns = ["Lineage", "Mutation", "z"] diff --git a/barcodeforge/ref_muts.py b/barcodeforge/ref_muts.py index 70f7807..fae360a 100755 --- a/barcodeforge/ref_muts.py +++ b/barcodeforge/ref_muts.py @@ -2,10 +2,7 @@ from Bio import SeqIO from collections import OrderedDict import re -from rich.console import Console -from .utils import STYLES # Assuming STYLES is in utils.py - -console = Console() +from .utils import print_error, print_warning, print_success, print_info, print_debug def _load_sample_mutations(path): @@ -137,15 +134,13 @@ def process_and_reroot_lineages( missing_count = len(sample_ids - set(seqs.keys())) if missing_count: total_count = len(sample_ids) - console.print( - f"[{STYLES['warning']}]Warning: {missing_count} out of {total_count} samples ({missing_count / total_count:.1%}) were not found in the FASTA file.[/{STYLES['warning']}]" + print_warning( + f"{missing_count} out of {total_count} samples ({missing_count / total_count:.1%}) were not found in the FASTA file." ) # if reference in the sample mutations file, use that as the root if sample_muts_df[sample_muts_df["sample"] == ref.id].shape[0] > 0: - console.print( - f"[{STYLES['success']}]Reference {ref.id} is present in sample mutations file.[/{STYLES['success']}]" - ) + print_success(f"Reference {ref.id} is present in sample mutations file.") additional_muts = _reverse_mutations_to_root( _extract_mutations( sample_muts_df[sample_muts_df["sample"] == ref.id].iloc[0] @@ -157,13 +152,13 @@ def process_and_reroot_lineages( additional_muts[i]["root"] = additional_muts[i].pop("mut") if debug: - console.print( - f"[{STYLES['debug']}]Additional mutations derived from reference {ref.id}: {additional_muts}[/{STYLES['debug']}]" + print_debug( + f"Additional mutations derived from reference {ref.id}: {additional_muts}" ) # else generate the root sequence else: - console.print( - f"[{STYLES['warning']}]Reference {ref.id} not present in sample mutations file. Inferring root sequence.[/{STYLES['warning']}]" + print_warning( + f"Reference {ref.id} not present in sample mutations file. Inferring root sequence." ) # Pre‑filter samples with non‑null mutations valid = sample_muts_df.loc[ @@ -180,8 +175,8 @@ def process_and_reroot_lineages( if seq is None: # In debug mode, log a per-sample warning and track missing samples for a summary warning below. if debug: - console.print( - f"[{STYLES['warning']}]Warning: Sample {sample_id} not found in FASTA file. Skipping.[/{STYLES['warning']}]" + print_warning( + f"Sample {sample_id} not found in FASTA file. Skipping." ) continue root_seqs.append(_construct_root_sequence(root_muts, seq)) @@ -193,9 +188,7 @@ def process_and_reroot_lineages( # convert to dataframe and save as csv df = pd.DataFrame.from_dict(additional_muts, orient="index") df.to_csv(output_additional_muts_path, sep="\t", index_label="position") - console.print( - f"[{STYLES['success']}]Additional mutations saved to {output_additional_muts_path}[/{STYLES['success']}]" - ) + print_success(f"Additional mutations saved to {output_additional_muts_path}") lineage_paths_df = _parse_tree_paths( pd.read_csv(input_lineage_paths_path, sep="\t").fillna("") @@ -208,17 +201,15 @@ def process_and_reroot_lineages( ) if not additional_muts_list: - console.print( - f"[{STYLES['warning']}]No additional mutations found to add to lineage paths.[/{STYLES['warning']}]" - ) + print_warning("No additional mutations found to add to lineage paths.") # If no additional mutations, write the original lineage paths content or handle as needed # For now, let's just save the parsed (and potentially slightly reformatted) df lineage_paths_df["from_tree_root"] = lineage_paths_df["from_tree_root"].apply( lambda x: " ".join(x) ) else: - console.print( - f"[{STYLES['info']}]Found {len(additional_muts_list)} additional mutations to incorporate into lineage paths.[/{STYLES['info']}]" + print_info( + f"Found {len(additional_muts_list)} additional mutations to incorporate into lineage paths." ) # add the additional mutations to the lineage paths after the first item @@ -241,6 +232,6 @@ def update_path(path_list): ) lineage_paths_df.to_csv(output_rerooted_lineage_paths_path, sep="\t") - console.print( - f"[{STYLES['success']}]Rerooted lineage paths saved to {output_rerooted_lineage_paths_path}[/{STYLES['success']}]" + print_success( + f"Rerooted lineage paths saved to {output_rerooted_lineage_paths_path}" ) diff --git a/barcodeforge/utils.py b/barcodeforge/utils.py index 31305ca..4395658 100644 --- a/barcodeforge/utils.py +++ b/barcodeforge/utils.py @@ -3,26 +3,46 @@ import rich_click as click from rich.console import Console +console = Console() + STYLES = { "info": "blue", "success": "green", "error": "bold red", "warning": "yellow", "debug": "cyan", - "dim": "dim", - "highlight": "bold magenta", } +def print_error(msg: str) -> None: + console.print(f"[ERROR] {msg}", style=STYLES["error"], markup=False) + + +def print_warning(msg: str) -> None: + console.print(f"[WARNING] {msg}", style=STYLES["warning"], markup=False) + + +def print_success(msg: str) -> None: + console.print(f"[SUCCESS] {msg}", style=STYLES["success"], markup=False) + + +def print_info(msg: str, bold: bool = False) -> None: + style = f"bold {STYLES['info']}" if bold else STYLES["info"] + console.print(f"[INFO] {msg}", style=style, markup=False) + + +def print_debug(msg: str) -> None: + console.print(f"[DEBUG] {msg}", style=STYLES["debug"], markup=False) + + def resolve_tree_format( - tree_path: str, specified_format: str | None, console: Console, debug: bool + tree_path: str, specified_format: str | None, debug: bool ) -> str: """ Resolves the format of a phylogenetic tree file based on its extension or specified_format. Args: tree_path (str): Path to the tree file. specified_format (str | None): User-specified format ('newick' or 'nexus'). - console (Console): Rich console for output. debug (bool): If True, prints debug information. Returns: str: Resolved format ('newick' or 'nexus'). @@ -38,25 +58,21 @@ def resolve_tree_format( elif ext_lower == ".nexus": resolved_format = "nexus" else: - if console: - console.print( - f"[{STYLES['error']}]Error: Unknown tree format for file '{tree_path}'. Extension '{ext}' is not recognized.[/{STYLES['error']}]" - ) - console.print( - f"[{STYLES['error']}]Please specify the format using --tree-format ('newick' or 'nexus').[/]" - ) + print_error( + f"Unknown tree format for file '{tree_path}'. Extension '{ext}' is not recognized." + ) + print_error( + "Please specify the format using --tree-format ('newick' or 'nexus')." + ) raise click.Abort() - if debug and console: - console.print( - f"[{STYLES['warning']}]Resolved tree format for '{tree_path}': {resolved_format}[/]" - ) + if debug: + print_debug(f"Resolved tree format for '{tree_path}': {resolved_format}") return resolved_format def run_subprocess_command( cmd: list[str], - console: Console, debug: bool, success_message: str = "Successfully executed command.", error_message_prefix: str = "Error executing command", @@ -65,45 +81,39 @@ def run_subprocess_command( Runs a subprocess command and handles errors with rich output. Args: cmd (list[str]): Command to run as a list of strings. - console (Console): Rich console for output. debug (bool): If True, prints debug information. success_message (str): Message to print on successful execution. error_message_prefix (str): Prefix for error messages. Returns: - bool: True if the command was executed successfully, False otherwise. + bool: True if the command was executed successfully. Raises: click.Abort: If the command fails or is not found. """ - if debug and console: - console.print(f"[{STYLES['debug']}]Running command: {' '.join(cmd)}[/]") + if debug: + print_debug(f"Running command: {' '.join(cmd)}") try: process_result = subprocess.run(cmd, check=True, capture_output=True, text=True) - if debug and console: + if debug: if process_result.stdout: - console.print( - f"[{STYLES['dim']}]{cmd[0]} stdout:\\\\n{process_result.stdout}[/]" - ) + print_debug(f"{cmd[0]} stdout:\n{process_result.stdout}") if process_result.stderr: - console.print( - f"[{STYLES['dim']}]{cmd[0]} stderr:\\\\n{process_result.stderr}[/]" - ) - if success_message and console: - console.print(f"[{STYLES['success']}]{success_message}[/]") + print_debug(f"{cmd[0]} stderr:\n{process_result.stderr}") + if success_message: + print_success(success_message) return True except FileNotFoundError: - if console: - console.print( - f"[{STYLES['error']}]{error_message_prefix}: {cmd[0]} command not found. Please ensure it is installed and in your PATH.[/]" - ) + print_error( + f"{error_message_prefix}: {cmd[0]} command not found. Please ensure it is installed and in your PATH." + ) raise click.Abort() except subprocess.CalledProcessError as e: - if console: - console.print(f"[{STYLES['error']}]{error_message_prefix} {cmd[0]}: {e}[/]") + print_error(f"{error_message_prefix} {cmd[0]}: {e}") + if debug: if e.stdout: - console.print(f"[{STYLES['dim']}]{cmd[0]} stdout:\\\\n{e.stdout}[/]") + print_debug(f"{cmd[0]} stdout:\n{e.stdout}") if e.stderr: - console.print(f"[{STYLES['dim']}]{cmd[0]} stderr:\\\\n{e.stderr}[/]") + print_debug(f"{cmd[0]} stderr:\n{e.stderr}") raise click.Abort() diff --git a/tests/test_auspice_tree_to_table.py b/tests/test_auspice_tree_to_table.py index a19656a..6e733f9 100644 --- a/tests/test_auspice_tree_to_table.py +++ b/tests/test_auspice_tree_to_table.py @@ -1,9 +1,11 @@ import json import pandas as pd import pytest -from barcodeforge.auspice_tree_to_table import json_to_tree, process_auspice_json from rich.console import Console +from unittest.mock import patch import click +import barcodeforge.utils +from barcodeforge.auspice_tree_to_table import json_to_tree, process_auspice_json @pytest.fixture @@ -45,14 +47,12 @@ def test_json_to_tree_basic(sample_auspice_json): def test_process_auspice_json(tmp_path, sample_auspice_json): meta_out = tmp_path / "meta.tsv" tree_out = tmp_path / "tree.nwk" - console = Console(file=None) process_auspice_json( tree_json_path=str(sample_auspice_json), output_metadata_path=str(meta_out), output_tree_path=str(tree_out), include_internal_nodes=False, attributes=None, - console=console, ) assert meta_out.exists() assert tree_out.exists() @@ -64,14 +64,12 @@ def test_process_auspice_json(tmp_path, sample_auspice_json): def test_process_auspice_json_include_internal(tmp_path, sample_auspice_json): meta_out = tmp_path / "meta.tsv" tree_out = tmp_path / "tree.nwk" - console = Console(file=None) process_auspice_json( tree_json_path=str(sample_auspice_json), output_metadata_path=str(meta_out), output_tree_path=str(tree_out), include_internal_nodes=True, attributes=["country"], - console=console, ) df = pd.read_csv(meta_out, sep="\t") assert set(df["name"]) == {"root", "A", "B"} @@ -79,30 +77,30 @@ def test_process_auspice_json_include_internal(tmp_path, sample_auspice_json): def test_process_auspice_json_missing_file(tmp_path): - console = Console(record=True) - with pytest.raises(click.Abort): - process_auspice_json( - tree_json_path=str(tmp_path / "no.json"), - output_metadata_path=str(tmp_path / "meta.tsv"), - output_tree_path=None, - include_internal_nodes=False, - attributes=None, - console=console, - ) - assert "Error: Tree JSON file not found" in console.export_text() + recording_console = Console(record=True) + with patch.object(barcodeforge.utils, "console", recording_console): + with pytest.raises(click.Abort): + process_auspice_json( + tree_json_path=str(tmp_path / "no.json"), + output_metadata_path=str(tmp_path / "meta.tsv"), + output_tree_path=None, + include_internal_nodes=False, + attributes=None, + ) + assert "Tree JSON file not found" in recording_console.export_text() def test_process_auspice_json_bad_json(tmp_path): bad_path = tmp_path / "bad.json" bad_path.write_text("not valid") - console = Console(record=True) - with pytest.raises(click.Abort): - process_auspice_json( - tree_json_path=str(bad_path), - output_metadata_path=None, - output_tree_path=None, - include_internal_nodes=False, - attributes=None, - console=console, - ) - assert "Error: Could not decode JSON" in console.export_text() + recording_console = Console(record=True) + with patch.object(barcodeforge.utils, "console", recording_console): + with pytest.raises(click.Abort): + process_auspice_json( + tree_json_path=str(bad_path), + output_metadata_path=None, + output_tree_path=None, + include_internal_nodes=False, + attributes=None, + ) + assert "Could not decode JSON" in recording_console.export_text() diff --git a/tests/test_cli.py b/tests/test_cli.py index 6e68c57..73dd79c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -8,6 +8,7 @@ import shutil from unittest.mock import call, ANY, MagicMock from rich.console import Console +import barcodeforge.utils from barcodeforge.utils import STYLES @@ -89,7 +90,9 @@ def test_barcode_command_default_options(runner, temp_files, mocker): "barcodeforge.cli.create_barcodes_from_lineage_paths" ) mock_create_plot = mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) args = [ "barcode", @@ -114,9 +117,7 @@ def test_barcode_command_default_options(runner, temp_files, mocker): final_barcodes_csv_fn = "barcode.csv" final_barcode_plot_fn = "barcode_plot.pdf" - mock_resolve_format.assert_called_once_with( - temp_files["tree"], None, mock_cli_console, False - ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], None, False) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file=converted_tree_fn, @@ -137,14 +138,12 @@ def test_barcode_command_default_options(runner, temp_files, mocker): expected_subprocess_calls = [ call( ["faToVcf", temp_files["alignment"], aligned_vcf_fn], - mock_cli_console, # console passed False, # debug status passed success_message=ANY, error_message_prefix=ANY, ), call( expected_usher_cmd, - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -164,7 +163,6 @@ def test_barcode_command_default_options(runner, temp_files, mocker): "-T", "8", ], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -184,7 +182,6 @@ def test_barcode_command_default_options(runner, temp_files, mocker): "-T", "8", ], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -228,7 +225,9 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): "barcodeforge.cli.create_barcodes_from_lineage_paths" ) mock_create_plot = mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) prefix = "MYPREFIX" custom_usher_args = "-U -l" @@ -268,9 +267,7 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): final_barcodes_csv_fn = f"{prefix}-barcode.csv" final_barcode_plot_fn = f"{prefix}-barcode_plot.pdf" - mock_resolve_format.assert_called_once_with( - temp_files["tree"], None, mock_cli_console, False - ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], None, False) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file=converted_tree_fn, @@ -294,14 +291,12 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): expected_subprocess_calls = [ call( ["faToVcf", temp_files["alignment"], aligned_vcf_fn], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, ), call( expected_usher_cmd, - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -321,7 +316,6 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): "-T", custom_threads, ], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -341,7 +335,6 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): "-T", custom_threads, ], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -383,7 +376,9 @@ def test_barcode_command_nexus_tree(runner, temp_files, mocker): mocker.patch("barcodeforge.cli.process_and_reroot_lineages") mocker.patch("barcodeforge.cli.create_barcodes_from_lineage_paths") mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) args = [ "barcode", @@ -402,9 +397,7 @@ def test_barcode_command_nexus_tree(runner, temp_files, mocker): aligned_vcf_fn = f"{intermediate_dir}/aligned.vcf" tree_pb_fn = f"{intermediate_dir}/tree.pb" - mock_resolve_format.assert_called_once_with( - temp_files["tree"], "nexus", mock_cli_console, False - ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], "nexus", False) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file=converted_tree_fn, @@ -424,7 +417,6 @@ def test_barcode_command_nexus_tree(runner, temp_files, mocker): ] mock_run_subp.assert_any_call( expected_usher_cmd, - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -440,7 +432,9 @@ def test_barcode_command_newick_tree_reformat(runner, temp_files, mocker): mocker.patch("barcodeforge.cli.process_and_reroot_lineages") mocker.patch("barcodeforge.cli.create_barcodes_from_lineage_paths") mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) args = [ "barcode", @@ -455,9 +449,7 @@ def test_barcode_command_newick_tree_reformat(runner, temp_files, mocker): assert result.exit_code == 0, f"CLI failed: {result.output}" converted_tree_fn = "barcodeforge_workdir/converted_tree.nwk" - mock_resolve_format.assert_called_once_with( - temp_files["tree"], "newick", mock_cli_console, False - ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], "newick", False) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file=converted_tree_fn, @@ -478,7 +470,9 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): "barcodeforge.cli.create_barcodes_from_lineage_paths" ) mock_create_plot = mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) args = [ "--debug", # Main CLI debug flag @@ -501,11 +495,11 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): final_barcode_plot_fn = "barcode_plot.pdf" mock_cli_console.print.assert_any_call( - f"[{STYLES['debug']}]Debug mode is ON[/{STYLES['debug']}]" - ) - mock_resolve_format.assert_called_once_with( - temp_files["tree"], None, mock_cli_console, True + "[DEBUG] Debug mode is ON", + style=STYLES["debug"], + markup=False, ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], None, True) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file="barcodeforge_workdir/converted_tree.nwk", # Unprefixed @@ -535,8 +529,7 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): ) for call_obj in mock_run_subp.call_args_list: - assert call_obj.args[1] is mock_cli_console - assert call_obj.args[2] is True + assert call_obj.args[1] is True # debug=True passed to all subprocess calls def test_barcode_command_missing_file(runner, temp_files): diff --git a/tests/test_generate_barcodes.py b/tests/test_generate_barcodes.py index b078650..3941b0a 100644 --- a/tests/test_generate_barcodes.py +++ b/tests/test_generate_barcodes.py @@ -1,5 +1,6 @@ import pytest import pandas as pd +import click from barcodeforge.generate_barcodes import ( parse_tree_paths, convert_to_barcodes, @@ -9,6 +10,7 @@ check_mutation_chain, replace_underscore_with_dash, create_barcodes_from_lineage_paths, + check_allele_consistency, ) from barcodeforge.utils import sortFun # Assuming sortFun is in utils @@ -26,7 +28,6 @@ def sample_barcode_data(): "T123C": [1, 0], "G456A": [1, 0], "C789T": [0, 1], - "A123T": [0, 0], # For reversion check } df = pd.DataFrame(data, index=["A", "B"]) return df @@ -112,7 +113,7 @@ def test_check_mutation_chain_non_binary_values(): "C225A": [1], "G225T": [1], "T225C": [2], - "C123A": [2], + "C123A": [-1], }, index=["lineage"], ) @@ -128,6 +129,22 @@ def test_replace_underscore_with_dash(): assert "lineage-B" in replaced_df.index +def test_check_allele_consistency(): + # Valid: same ref at each position, each lineage carries at most one allele per position + df_valid = pd.DataFrame({"A123T": [1, 0], "A123C": [0, 1]}) + check_allele_consistency(df_valid) # Should not raise + + # Invalid: conflicting reference alleles at position 123 (A vs C) + df_conflicting_refs = pd.DataFrame({"A123T": [1, 0], "C123G": [0, 1]}) + with pytest.raises(click.Abort): + check_allele_consistency(df_conflicting_refs) + + # Invalid: lineage carries two distinct alleles at the same position + df_multi_alt = pd.DataFrame({"A123T": [1], "A123C": [1]}, index=["lin1"]) + with pytest.raises(click.Abort): + check_allele_consistency(df_multi_alt) + + @pytest.fixture def temp_barcode_file(tmp_path): file_path = tmp_path / "test_barcodes.csv" @@ -155,7 +172,7 @@ def temp_barcode_file_with_flips(tmp_path): def test_test_no_flip_pairs_with_flips(temp_barcode_file_with_flips): - with pytest.raises(Exception, match=r"FAIL: flip pairs found"): + with pytest.raises(click.Abort): check_no_flip_pairs( str(temp_barcode_file_with_flips) ) # Renamed from test_no_flip_pairs diff --git a/tests/test_ref_muts.py b/tests/test_ref_muts.py index 377b35f..9a7d62b 100644 --- a/tests/test_ref_muts.py +++ b/tests/test_ref_muts.py @@ -7,6 +7,7 @@ import copy # Ensure copy is imported from unittest.mock import MagicMock, call # For console mocking from rich.console import Console # For console spec +import barcodeforge.utils from barcodeforge.ref_muts import ( _load_sample_mutations, _extract_mutations, @@ -263,7 +264,7 @@ def test_process_and_reroot_lineages_ref_not_in_muts_infer_root( output_rerooted_lineages = tmp_path / "rerooted_lineages_infer.tsv" mocked_console = MagicMock(spec=Console) - mocker.patch("barcodeforge.ref_muts.console", mocked_console) + mocker.patch("barcodeforge.utils.console", mocked_console) process_and_reroot_lineages( debug=False, @@ -279,7 +280,7 @@ def test_process_and_reroot_lineages_ref_not_in_muts_infer_root( assert output_rerooted_lineages.exists() # Verify warning for inferred root - expected_warning_call_substr = "[yellow]Reference ref_genome not present in sample mutations file. Inferring root sequence." + expected_warning_call_substr = "[WARNING] Reference ref_genome not present in sample mutations file. Inferring root sequence." assert any( expected_warning_call_substr in str(c_args) for c_args in mocked_console.print.call_args_list @@ -342,7 +343,7 @@ def test_process_and_reroot_lineages_warning_missing_sample_in_fasta( output_rerooted_lineages = tmp_path / "rerooted_lineages_missing_fasta.tsv" mocked_console = MagicMock(spec=Console) - mocker.patch("barcodeforge.ref_muts.console", mocked_console) + mocker.patch("barcodeforge.utils.console", mocked_console) # Inferred root will be based on sampleA only: AAAAAAAAAA # Additional muts (ref vs inferred): none @@ -412,7 +413,7 @@ def test_process_and_reroot_lineages_debug_shows_per_sample_warning( output_rerooted_lineages = tmp_path / "rerooted_lineages_debug.tsv" mocked_console = MagicMock(spec=Console) - mocker.patch("barcodeforge.ref_muts.console", mocked_console) + mocker.patch("barcodeforge.utils.console", mocked_console) process_and_reroot_lineages( debug=True, @@ -464,7 +465,7 @@ def test_process_and_reroot_lineages_summary_warning_any_missing( output_rerooted_lineages = tmp_path / "rerooted_lineages_below.tsv" mocked_console = MagicMock(spec=Console) - mocker.patch("barcodeforge.ref_muts.console", mocked_console) + mocker.patch("barcodeforge.utils.console", mocked_console) process_and_reroot_lineages( debug=False, diff --git a/tests/test_utils.py b/tests/test_utils.py index 5521231..d4d8356 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,6 +3,7 @@ import subprocess from unittest.mock import MagicMock, patch, call from rich.console import Console +import barcodeforge.utils from barcodeforge.utils import ( sortFun, resolve_tree_format, @@ -18,35 +19,38 @@ def test_sortFun(): def test_resolve_tree_format_specified_newick(): - assert resolve_tree_format("any.tree", "newick", None, False) == "newick" + assert resolve_tree_format("any.tree", "newick", False) == "newick" def test_resolve_tree_format_specified_nexus(): - assert resolve_tree_format("any.tree", "nexus", None, False) == "nexus" + assert resolve_tree_format("any.tree", "nexus", False) == "nexus" def test_resolve_tree_format_infer_nwk(): - assert resolve_tree_format("test.nwk", None, None, False) == "newick" + assert resolve_tree_format("test.nwk", None, False) == "newick" def test_resolve_tree_format_infer_newick(): - assert resolve_tree_format("test.newick", None, None, False) == "newick" + assert resolve_tree_format("test.newick", None, False) == "newick" def test_resolve_tree_format_infer_nexus(): - assert resolve_tree_format("test.nexus", None, None, False) == "nexus" + assert resolve_tree_format("test.nexus", None, False) == "nexus" def test_resolve_tree_format_unknown_extension(): with pytest.raises(click.Abort): - resolve_tree_format("test.txt", None, None, False) + resolve_tree_format("test.txt", None, False) def test_resolve_tree_format_debug_output(): mock_console = MagicMock(spec=Console) - resolve_tree_format("some.nwk", None, mock_console, debug=True) + with patch.object(barcodeforge.utils, "console", mock_console): + resolve_tree_format("some.nwk", None, debug=True) mock_console.print.assert_any_call( - f"[{STYLES['warning']}]Resolved tree format for 'some.nwk': newick[/]" + "[DEBUG] Resolved tree format for 'some.nwk': newick", + style=STYLES["debug"], + markup=False, ) @@ -56,59 +60,57 @@ def test_run_subprocess_command_success(mock_subproc_run): args=["test_cmd"], returncode=0, stdout="Success output", stderr="" ) mock_console = MagicMock(spec=Console) - result = run_subprocess_command( - ["test_cmd"], - mock_console, - debug=False, - success_message="Command executed successfully", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + result = run_subprocess_command( + ["test_cmd"], + debug=False, + success_message="Command executed successfully", + ) assert result is True mock_console.print.assert_called_once_with( - f"[{STYLES['success']}]Command executed successfully[/]" + "[SUCCESS] Command executed successfully", + style=STYLES["success"], + markup=False, ) @patch("subprocess.run") def test_run_subprocess_command_failure_called_process_error(mock_subproc_run): - # This test covers cases where subprocess.run(check=True) would raise CalledProcessError mock_subproc_run.side_effect = subprocess.CalledProcessError( returncode=1, cmd=["fail_cmd_cpe"], output="out", stderr="Error output cpe" ) mock_console = MagicMock(spec=Console) - with pytest.raises(click.Abort): - run_subprocess_command( - ["fail_cmd_cpe"], - mock_console, - debug=False, - error_message_prefix="Test error CPE", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + with pytest.raises(click.Abort): + run_subprocess_command( + ["fail_cmd_cpe"], + debug=False, + error_message_prefix="Test error CPE", + ) - expected_calls = [ - call( - f"[{STYLES['error']}]Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.[/]" - ), - call( - f"[{STYLES['dim']}]fail_cmd_cpe stdout:\\\\nout[/]" - ), # Corrected: output is stdout - call(f"[{STYLES['dim']}]fail_cmd_cpe stderr:\\\\nError output cpe[/]"), - ] - mock_console.print.assert_has_calls(expected_calls) + mock_console.print.assert_called_once_with( + "[ERROR] Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.", + style=STYLES["error"], + markup=False, + ) @patch("subprocess.run") def test_run_subprocess_command_file_not_found(mock_subproc_run): mock_subproc_run.side_effect = FileNotFoundError("Command not found") mock_console = MagicMock(spec=Console) - with pytest.raises(click.Abort): - run_subprocess_command( - ["non_existent_cmd"], - mock_console, - debug=False, - error_message_prefix="FNF error", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + with pytest.raises(click.Abort): + run_subprocess_command( + ["non_existent_cmd"], + debug=False, + error_message_prefix="FNF error", + ) mock_console.print.assert_called_once_with( - f"[{STYLES['error']}]FNF error: non_existent_cmd command not found. Please ensure it is installed and in your PATH.[/]" + "[ERROR] FNF error: non_existent_cmd command not found. Please ensure it is installed and in your PATH.", + style=STYLES["error"], + markup=False, ) @@ -121,18 +123,30 @@ def test_run_subprocess_command_success_debug(mock_subproc_run): stderr="Debug success stderr", ) mock_console = MagicMock(spec=Console) - result = run_subprocess_command( - ["debug_cmd_success", "arg1"], - mock_console, - debug=True, - success_message="Debug success", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + result = run_subprocess_command( + ["debug_cmd_success", "arg1"], + debug=True, + success_message="Debug success", + ) assert result is True expected_calls = [ - call(f"[{STYLES['debug']}]Running command: debug_cmd_success arg1[/]"), - call(f"[{STYLES['dim']}]debug_cmd_success stdout:\\\\nDebug success output[/]"), - call(f"[{STYLES['dim']}]debug_cmd_success stderr:\\\\nDebug success stderr[/]"), - call(f"[{STYLES['success']}]Debug success[/]"), + call( + "[DEBUG] Running command: debug_cmd_success arg1", + style=STYLES["debug"], + markup=False, + ), + call( + "[DEBUG] debug_cmd_success stdout:\nDebug success output", + style=STYLES["debug"], + markup=False, + ), + call( + "[DEBUG] debug_cmd_success stderr:\nDebug success stderr", + style=STYLES["debug"], + markup=False, + ), + call("[SUCCESS] Debug success", style=STYLES["success"], markup=False), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) @@ -146,41 +160,31 @@ def test_run_subprocess_command_success_debug_empty_stderr(mock_subproc_run): stderr="", ) mock_console = MagicMock(spec=Console) - result = run_subprocess_command( - ["debug_cmd_success_no_stderr"], - mock_console, - debug=True, - success_message="Debug success no stderr", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + result = run_subprocess_command( + ["debug_cmd_success_no_stderr"], + debug=True, + success_message="Debug success no stderr", + ) assert result is True expected_calls = [ - call(f"[{STYLES['debug']}]Running command: debug_cmd_success_no_stderr[/]"), call( - f"[{STYLES['dim']}]debug_cmd_success_no_stderr stdout:\\\\nDebug success output[/]" + "[DEBUG] Running command: debug_cmd_success_no_stderr", + style=STYLES["debug"], + markup=False, + ), + call( + "[DEBUG] debug_cmd_success_no_stderr stdout:\nDebug success output", + style=STYLES["debug"], + markup=False, + ), + call( + "[SUCCESS] Debug success no stderr", style=STYLES["success"], markup=False ), - # No call for stderr as it's empty - call(f"[{STYLES['success']}]Debug success no stderr[/]"), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) - # Verify stderr was not printed. - # We check that if a call contains "stderr:\\n", it must be followed by another character, - # meaning it's not an empty stderr print. - for acall in mock_console.print.call_args_list: - call_str = acall[0][0] - # This assertion is a bit tricky. We want to ensure that if "stderr:\\n" is present, - # it's not *just* "stderr:\\n" (or "stderr:\\n[/]" with a style closing tag). - # It should have content after "stderr:\\n". - # A simpler way is to count calls that would match an empty stderr print. - # However, the current structure of the code in utils.py ensures that - # if stderr is empty, the print call for stderr is skipped entirely. - # So, we just need to ensure no call looks like an empty stderr print. - assert not ( - f" stderr:\\\\n