From 9f1b28fe2bdfcabf73455ed63c750143294efac4 Mon Sep 17 00:00:00 2001 From: gp201 Date: Wed, 18 Mar 2026 09:04:45 -0700 Subject: [PATCH 1/9] Add allele consistency checks. --- barcodeforge/generate_barcodes.py | 32 +++++++++++++++++++++++++++++++ tests/test_generate_barcodes.py | 15 +++++++++++++-- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/barcodeforge/generate_barcodes.py b/barcodeforge/generate_barcodes.py index 65e20f3..d331c13 100755 --- a/barcodeforge/generate_barcodes.py +++ b/barcodeforge/generate_barcodes.py @@ -138,6 +138,37 @@ def check_no_flip_pairs(barcode_file: str): raise Exception(f"FAIL: flip pairs found: {flipPairs}") +def check_allele_consistency(df_barcodes: pd.DataFrame): + """ + Extract columns and ensure for every position ([1:-1]) there is only 1 reference allele ([0]). + Additionally check if any lineage has multiple alternative alleles at the same position. + """ + pos_refs = {} + pos_cols = {} + for col in df_barcodes.columns: + pos = col[1:-1] + ref = col[0] + if pos in pos_refs: + if pos_refs[pos] != ref: + raise ValueError( + f"Position {pos} has multiple reference alleles: '{pos_refs[pos]}' and '{ref}'" + ) + else: + pos_refs[pos] = ref + + if pos not in pos_cols: + pos_cols[pos] = [] + pos_cols[pos].append(col) + + for pos, cols in pos_cols.items(): + if len(cols) > 1: + if (df_barcodes[cols].sum(axis=1) > 1).any(): + lineages = df_barcodes[df_barcodes[cols].sum(axis=1) > 1].index.tolist() + raise ValueError( + f"Position {pos} has multiple alternative alleles in lineages: {lineages}" + ) + + def identify_chains(df_barcodes: pd.DataFrame) -> list: """ Identify sequential mutations in the barcodes DataFrame. @@ -207,6 +238,7 @@ def check_mutation_chain(df_barcodes: pd.DataFrame) -> pd.DataFrame: # in case mutation path leads to a return to the reference. df_barcodes = reversion_checking(df_barcodes) seq_muts = identify_chains(df_barcodes) + check_allele_consistency(df_barcodes) # The barcode should be a binary sparse matrix assert df_barcodes.isin([0, 1]).all(axis=None), "Barcode matrix should be binary" return df_barcodes diff --git a/tests/test_generate_barcodes.py b/tests/test_generate_barcodes.py index b078650..812eecb 100644 --- a/tests/test_generate_barcodes.py +++ b/tests/test_generate_barcodes.py @@ -9,6 +9,7 @@ check_mutation_chain, replace_underscore_with_dash, create_barcodes_from_lineage_paths, + check_allele_consistency, ) from barcodeforge.utils import sortFun # Assuming sortFun is in utils @@ -26,7 +27,6 @@ def sample_barcode_data(): "T123C": [1, 0], "G456A": [1, 0], "C789T": [0, 1], - "A123T": [0, 0], # For reversion check } df = pd.DataFrame(data, index=["A", "B"]) return df @@ -112,7 +112,7 @@ def test_check_mutation_chain_non_binary_values(): "C225A": [1], "G225T": [1], "T225C": [2], - "C123A": [2], + "C123A": [-1], }, index=["lineage"], ) @@ -128,6 +128,17 @@ def test_replace_underscore_with_dash(): assert "lineage-B" in replaced_df.index +def test_check_allele_consistency(): + df_valid = pd.DataFrame({"A123T": [1, 0], "A123C": [0, 1]}) + check_allele_consistency(df_valid) # Should not raise + + df_invalid = pd.DataFrame({"A123T": [1, 0], "C123G": [0, 1]}) + with pytest.raises( + ValueError, match="Position 123 has multiple reference alleles: 'A' and 'C'" + ): + check_allele_consistency(df_invalid) + + @pytest.fixture def temp_barcode_file(tmp_path): file_path = tmp_path / "test_barcodes.csv" From 40d27d2539dc2772014e18c5c6f0419be63c817d Mon Sep 17 00:00:00 2001 From: gp201 Date: Wed, 18 Mar 2026 10:01:27 -0700 Subject: [PATCH 2/9] Replace Console usage with utils print helpers --- barcodeforge/auspice_tree_to_table.py | 40 ++--- barcodeforge/cli.py | 74 +++------- barcodeforge/generate_barcodes.py | 65 +++----- barcodeforge/plot_barcode.py | 22 +-- barcodeforge/ref_muts.py | 41 ++---- barcodeforge/utils.py | 83 ++++++----- tests/test_auspice_tree_to_table.py | 19 +-- tests/test_cli.py | 33 ++--- tests/test_generate_barcodes.py | 3 +- tests/test_ref_muts.py | 9 +- tests/test_utils.py | 204 +++++++++----------------- 11 files changed, 212 insertions(+), 381 deletions(-) diff --git a/barcodeforge/auspice_tree_to_table.py b/barcodeforge/auspice_tree_to_table.py index 9336074..cb62583 100644 --- a/barcodeforge/auspice_tree_to_table.py +++ b/barcodeforge/auspice_tree_to_table.py @@ -4,10 +4,9 @@ import pandas as pd import Bio.Phylo from augur.utils import annotate_parents_for_tree -from rich.console import Console -import click # For click.Abort +import click -from .utils import STYLES # For consistent console messages +from .utils import print_error, print_warning, print_success def json_to_tree(json_dict, root=True, parent_cumulative_branch_length=None): @@ -128,7 +127,6 @@ def process_auspice_json( output_tree_path: str | None, include_internal_nodes: bool, attributes: list[str] | None, - console: Console, ): """ Converts an Auspice JSON tree to other formats (Newick, metadata table). @@ -138,14 +136,10 @@ def process_auspice_json( with open(tree_json_path, "r", encoding="utf-8") as fh: tree_json_data = json.load(fh) except FileNotFoundError: - console.print( - f"[{STYLES['error']}]Error: Tree JSON file not found at '{tree_json_path}'[/{STYLES['error']}]" - ) + print_error(f"Error: Tree JSON file not found at '{tree_json_path}'") raise click.Abort() except json.JSONDecodeError: - console.print( - f"[{STYLES['error']}]Error: Could not decode JSON from '{tree_json_path}'[/{STYLES['error']}]" - ) + print_error(f"Error: Could not decode JSON from '{tree_json_path}'") raise click.Abort() tree = json_to_tree(tree_json_data) @@ -158,13 +152,9 @@ def process_auspice_json( output_tree_path, "newick", ) - console.print( - f"[{STYLES['success']}]Tree successfully written to '{output_tree_path}'[/{STYLES['success']}]" - ) + print_success(f"Tree successfully written to '{output_tree_path}'") except Exception as e: - console.print( - f"[{STYLES['error']}]Error writing Newick tree to '{output_tree_path}': {e}[/{STYLES['error']}]" - ) + print_error(f"Error writing Newick tree to '{output_tree_path}': {e}") raise click.Abort() if output_metadata_path: @@ -181,10 +171,10 @@ def process_auspice_json( attrs_set.update(getattr(tree.root, "branch_attrs", {}).keys()) if not attrs_set: - console.print( - f"[{STYLES['warning']}]Warning: Could not auto-detect any attributes from the tree root. " + print_warning( + "Warning: Could not auto-detect any attributes from the tree root. " "The metadata output might be sparse or only contain 'name'. " - f"Consider using --attributes to specify columns.[/{STYLES['warning']}]" + "Consider using --attributes to specify columns." ) attributes_to_export = sorted(list(attrs_set)) @@ -254,18 +244,12 @@ def process_auspice_json( header=True, index=False, ) - console.print( - f"[{STYLES['success']}]Metadata successfully written to '{output_metadata_path}'[/{STYLES['success']}]" - ) + print_success(f"Metadata successfully written to '{output_metadata_path}'") except Exception as e: - console.print( - f"[{STYLES['error']}]Error writing metadata to '{output_metadata_path}': {e}[/{STYLES['error']}]" - ) + print_error(f"Error writing metadata to '{output_metadata_path}': {e}") raise click.Abort() if not output_tree_path and not output_metadata_path: # This case should be handled by the CLI command to require at least one output. # However, a warning here is fine if called directly. - console.print( - f"[{STYLES['warning']}]No output requested. Use --output-tree and/or --output-metadata.[/{STYLES['warning']}]" - ) + print_warning("No output requested. Use --output-tree and/or --output-metadata.") diff --git a/barcodeforge/cli.py b/barcodeforge/cli.py index bbe542a..030906a 100644 --- a/barcodeforge/cli.py +++ b/barcodeforge/cli.py @@ -1,20 +1,20 @@ import os import rich_click as click -from rich.console import Console from .format_tree import convert_nexus_to_newick from .utils import ( resolve_tree_format, run_subprocess_command, - STYLES, -) # Import new utils + print_error, + print_success, + print_info, + print_debug, +) from .ref_muts import process_and_reroot_lineages from .generate_barcodes import create_barcodes_from_lineage_paths from .plot_barcode import create_barcode_plot from .auspice_tree_to_table import process_auspice_json from . import __version__ -console = Console() - @click.group() @click.version_option(__version__, message="%(prog)s version %(version)s") @@ -25,7 +25,7 @@ def cli(ctx, debug): ctx.ensure_object(dict) ctx.obj["DEBUG"] = debug if debug: - console.print(f"[{STYLES['debug']}]Debug mode is ON[/{STYLES['debug']}]") + print_debug("Debug mode is ON") @cli.command() @@ -97,19 +97,13 @@ def barcode( """Process barcode data, including VCF generation, tree formatting, USHER placement, matUtils annotation, and matUtils extraction.""" is_debug = ctx.obj.get("DEBUG", False) # More robust way to get debug status - console.print( - f"[bold {STYLES['info']}]Reference Genome:[/bold {STYLES['info']}]\t{reference_genome}" - ) - console.print( - f"[bold {STYLES['info']}]Alignment:[/bold {STYLES['info']}]\t\t{alignment}" - ) - console.print(f"[bold {STYLES['info']}]Tree:[/bold {STYLES['info']}]\t\t\t{tree}") - console.print( - f"[bold {STYLES['info']}]Lineages:[/bold {STYLES['info']}]\t\t{lineages}" - ) + print_info(f"Reference Genome:\t{reference_genome}", bold=True) + print_info(f"Alignment:\t\t{alignment}", bold=True) + print_info(f"Tree:\t\t\t{tree}", bold=True) + print_info(f"Lineages:\t\t{lineages}", bold=True) # Use utility function to resolve tree format - resolved_tree_format = resolve_tree_format(tree, tree_format, console, is_debug) + resolved_tree_format = resolve_tree_format(tree, tree_format, is_debug) # Directory for intermediate files intermediate_dir = "barcodeforge_workdir" @@ -120,7 +114,6 @@ def barcode( fatovcf_cmd = ["faToVcf", alignment, fatovcf_output_vcf] run_subprocess_command( fatovcf_cmd, - console, is_debug, success_message=f"Successfully created VCF file: {fatovcf_output_vcf}", error_message_prefix="Error running faToVcf", @@ -131,34 +124,25 @@ def barcode( if resolved_tree_format == "nexus": if is_debug: - console.print( - f"[{STYLES['info']}]Converting Nexus tree ({tree}) to Newick format at {output_converted_tree_path}...[/{STYLES['info']}]" - ) + print_info(f"Converting Nexus tree ({tree}) to Newick format at {output_converted_tree_path}...") convert_nexus_to_newick( input_file=tree, output_file=output_converted_tree_path, input_format="nexus", ) - console.print( - f"[{STYLES['success']}]Converted tree saved to {output_converted_tree_path}[/{STYLES['success']}]" - ) + print_success(f"Converted tree saved to {output_converted_tree_path}") elif resolved_tree_format == "newick": if is_debug: - console.print( - f"[{STYLES['info']}]Processing Newick tree ({tree}) to {output_converted_tree_path}...[/{STYLES['info']}]" - ) + print_info(f"Processing Newick tree ({tree}) to {output_converted_tree_path}...") convert_nexus_to_newick( input_file=tree, output_file=output_converted_tree_path, input_format="newick", ) - console.print( - f"[{STYLES['success']}]Processed tree saved to {output_converted_tree_path} (if conversion/reformatting occurred)[/{STYLES['success']}]" - ) + print_success(f"Processed tree saved to {output_converted_tree_path} (if conversion/reformatting occurred)") else: - raise ValueError( - f"Unsupported tree format: {resolved_tree_format}. Expected 'newick' or 'nexus'." - ) + print_error(f"Error: Unsupported tree format: {resolved_tree_format}. Expected 'newick' or 'nexus'.") + raise click.Abort() # Run usher command usher_cmd = ["usher"] @@ -179,7 +163,6 @@ def barcode( run_subprocess_command( usher_cmd, - console, is_debug, success_message=f"Successfully ran USHER. Output protobuf: {usher_output_pb}", error_message_prefix="Error running USHER", @@ -208,9 +191,8 @@ def barcode( run_subprocess_command( matutils_annotate_cmd, - console, is_debug, - success_message=f"Successfully ran matUtils annotate. Output: annotated_tree.pb", + success_message="Successfully ran matUtils annotate. Output: annotated_tree.pb", error_message_prefix="Error running matUtils annotate", ) @@ -236,9 +218,8 @@ def barcode( run_subprocess_command( matutils_extract_cmd, - console, is_debug, - success_message=f"Successfully ran matUtils extract. Outputs: lineagePaths.txt, samplePaths.txt, auspice_tree.json", + success_message="Successfully ran matUtils extract. Outputs: lineagePaths.txt, samplePaths.txt, auspice_tree.json", error_message_prefix="Error running matUtils extract", ) @@ -259,14 +240,10 @@ def barcode( # Determine base name for output files clean_prefix = prefix.strip() if clean_prefix: - console.print( - f"[{STYLES['info']}]Using prefix '{clean_prefix}' for lineage names in barcodes...[/{STYLES['info']}]" - ) + print_info(f"Using prefix '{clean_prefix}' for lineage names in barcodes...") base_name = f"{clean_prefix}-barcode" else: - console.print( - f"[{STYLES['info']}]No prefix provided for lineage names in barcodes.[/{STYLES['info']}]" - ) + print_info("No prefix provided for lineage names in barcodes.") base_name = "barcode" csv_path = f"{base_name}.csv" @@ -285,9 +262,7 @@ def barcode( output_file_path=plot_output_path, ) - console.print( - f"[{STYLES['success']}]Generated barcodes are saved to '{csv_path}' and plot saved to '{plot_output_path}'[/{STYLES['success']}]" - ) + print_success(f"Generated barcodes are saved to '{csv_path}' and plot saved to '{plot_output_path}'") @cli.command() @@ -334,16 +309,13 @@ def extract_auspice_data( Source: https://gist.github.com/huddlej/5d7bd023d3807c698bd18c706974f2db """ is_debug = ctx.obj.get("DEBUG", False) # More robust way to get debug status - console.print( - f"[bold {STYLES['info']}]Processing Auspice JSON file:[/bold {STYLES['info']}]\t{auspice_json_path}" - ) + print_info(f"Processing Auspice JSON file:\t{auspice_json_path}", bold=True) process_auspice_json( tree_json_path=auspice_json_path, output_metadata_path=output_metadata_path, output_tree_path=output_tree_path, include_internal_nodes=include_internal_nodes, attributes=list(attributes) if attributes else None, - console=console, ) diff --git a/barcodeforge/generate_barcodes.py b/barcodeforge/generate_barcodes.py index d331c13..c9673ce 100755 --- a/barcodeforge/generate_barcodes.py +++ b/barcodeforge/generate_barcodes.py @@ -3,10 +3,8 @@ # This script is a modified version of the https://github.com/andersen-lab/Freyja/blob/main/freyja/convert_paths2barcodes.py import pandas as pd -from rich.console import Console -from .utils import sortFun, STYLES - -console = Console() +import rich_click as click +from .utils import sortFun, print_error, print_success, print_info, print_debug def parse_tree_paths(df: pd.DataFrame) -> pd.DataFrame: @@ -57,7 +55,6 @@ def convert_to_barcodes(df: pd.DataFrame) -> pd.DataFrame: ) df_barcodes = pd.concat((df_barcodes, cladeSeries), axis=1) - # console.print('separating combined splits') # Original print, can be made conditional with a verbose flag if needed df_barcodes = df_barcodes.T # dropped since no '' column this time. # df_barcodes = df_barcodes.drop(columns='') @@ -118,7 +115,7 @@ def check_no_flip_pairs(barcode_file: str): Args: barcode_file (str): Path to the barcode file to be tested. Raises: - Exception: If flip pairs are found in the barcode file. + click.Abort: If flip pairs are found in the barcode file. """ df_barcodes = pd.read_csv(barcode_file, index_col=0) flipPairs = [ @@ -127,15 +124,10 @@ def check_no_flip_pairs(barcode_file: str): if (d[-1] + d[1 : len(d) - 1] + d[0]) in df_barcodes.columns ] if len(flipPairs) == 0: - console.print( - f"[{STYLES['success']}]PASS: no flip pairs found in the generated barcode file.[/{STYLES['success']}]" - ) + print_success("PASS: no flip pairs found in the generated barcode file.") else: - # This should ideally not happen if the logic is correct, consider logging or raising a more specific error. - console.print( - f"[{STYLES['error']}]FAIL: flip pairs found: {flipPairs}[/{STYLES['error']}]" - ) - raise Exception(f"FAIL: flip pairs found: {flipPairs}") + print_error(f"FAIL: flip pairs found: {flipPairs}") + raise click.Abort() def check_allele_consistency(df_barcodes: pd.DataFrame): @@ -150,6 +142,9 @@ def check_allele_consistency(df_barcodes: pd.DataFrame): ref = col[0] if pos in pos_refs: if pos_refs[pos] != ref: + print_error( + f"Error: Position {pos} has multiple reference alleles: '{pos_refs[pos]}' and '{ref}'" + ) raise ValueError( f"Position {pos} has multiple reference alleles: '{pos_refs[pos]}' and '{ref}'" ) @@ -164,6 +159,9 @@ def check_allele_consistency(df_barcodes: pd.DataFrame): if len(cols) > 1: if (df_barcodes[cols].sum(axis=1) > 1).any(): lineages = df_barcodes[df_barcodes[cols].sum(axis=1) > 1].index.tolist() + print_error( + f"Error: Position {pos} has multiple alternative alleles in lineages: {lineages}" + ) raise ValueError( f"Position {pos} has multiple alternative alleles in lineages: {lineages}" ) @@ -231,7 +229,6 @@ def check_mutation_chain(df_barcodes: pd.DataFrame) -> pd.DataFrame: # remove constituent mutations df_barcodes.loc[lin_seq.index, sm[0:2]] -= 1 # drop all unused mutations - # print('before_trim\n',df_barcodes) df_barcodes = df_barcodes.drop( columns=df_barcodes.columns[df_barcodes.sum(axis=0) == 0] ) @@ -269,42 +266,32 @@ def create_barcodes_from_lineage_paths( prefix: Optional prefix to add to lineage names in the barcode file. """ if debug: - console.print( - f"[{STYLES['info']}]Reading lineage paths from: {input_file_path}[/{STYLES['info']}]" - ) + print_info(f"Reading lineage paths from: {input_file_path}") df = pd.read_csv(input_file_path, sep="\t") if debug: - console.print(f"[{STYLES['info']}]Parsing tree paths...[/{STYLES['info']}]") + print_info("Parsing tree paths...") df = parse_tree_paths(df) - console.print(f"[{STYLES['info']}]Converting to barcodes...[/{STYLES['info']}]") + print_info("Converting to barcodes...") df_barcodes = convert_to_barcodes(df) if prefix and prefix.strip() != "": - console.print( - f"[{STYLES['info']}]Adding prefix '{prefix}' to lineage names...[/{STYLES['info']}]" - ) + print_info(f"Adding prefix '{prefix}' to lineage names...") df_barcodes.index = [prefix + "-" + str(i) for i in df_barcodes.index] - console.print( - f"[{STYLES['info']}]Performing reversion checking...[/{STYLES['info']}]" - ) + print_info("Performing reversion checking...") df_barcodes = reversion_checking(df_barcodes) - console.print(f"[{STYLES['info']}]Checking mutation chains...[/{STYLES['info']}]") + print_info("Checking mutation chains...") df_barcodes = check_mutation_chain(df_barcodes) if debug: - console.print( - f"[{STYLES['info']}]Replacing underscores with dashes in lineage names...[/{STYLES['info']}]" - ) + print_info("Replacing underscores with dashes in lineage names...") df_barcodes = replace_underscore_with_dash(df_barcodes) if debug: - console.print( - f"[{STYLES['info']}]Sorting barcode columns...[/{STYLES['info']}]" - ) + print_info("Sorting barcode columns...") df_barcodes = df_barcodes.reindex(sorted(df_barcodes.columns, key=sortFun), axis=1) # Drop unclassified lineage if it exists @@ -312,15 +299,7 @@ def create_barcodes_from_lineage_paths( df_barcodes = df_barcodes.drop(index="unclassified") df_barcodes.to_csv(output_file_path) - console.print( - f"[{STYLES['success']}]Barcode file saved to: {output_file_path}[/{STYLES['success']}]" - ) + print_success(f"Barcode file saved to: {output_file_path}") # Test for flip pairs in the final output file - try: - check_no_flip_pairs(output_file_path) - except Exception as e: - console.print( - f"[{STYLES['error']}]Error during final flip pair test: {e}[/{STYLES['error']}]" - ) - # Depending on desired behavior, you might re-raise the exception or just log it. + check_no_flip_pairs(output_file_path) diff --git a/barcodeforge/plot_barcode.py b/barcodeforge/plot_barcode.py index 0f1b52c..132447a 100755 --- a/barcodeforge/plot_barcode.py +++ b/barcodeforge/plot_barcode.py @@ -1,16 +1,12 @@ """Plot barcode from CSV file.""" import pandas as pd -from rich.console import Console - -from .utils import sortFun, STYLES +from .utils import sortFun, print_info, print_debug import seaborn as sns import matplotlib.pyplot as plt from matplotlib.colors import ListedColormap import math -console = Console() - def create_barcode_visualization( barcode_df_long: pd.DataFrame, chunk_size: int, output_path: str @@ -137,21 +133,13 @@ def create_barcode_plot( output_file_path: Path to save the generated plot. """ if debug: - console.print( - f"[{STYLES['info']}]Reading barcode data from: {input_file_path}[/{STYLES['info']}]" - ) + print_info(f"Reading barcode data from: {input_file_path}") barcode_df = pd.read_csv(input_file_path, header=0, index_col=0) if debug: - console.print( - f"[{STYLES['debug']}]Barcode DataFrame shape: {barcode_df.shape}[/{STYLES['debug']}]" - ) - console.print( - f"[{STYLES['debug']}]Barcode DataFrame columns: {barcode_df.columns.tolist()}[/{STYLES['debug']}]" - ) - console.print( - f"[{STYLES['info']}]Transforming barcode data to long format...[/{STYLES['info']}]" - ) + print_debug(f"Barcode DataFrame shape: {barcode_df.shape}") + print_debug(f"Barcode DataFrame columns: {barcode_df.columns.tolist()}") + print_info("Transforming barcode data to long format...") barcode_df_long = barcode_df.stack().reset_index() barcode_df_long.columns = ["Lineage", "Mutation", "z"] diff --git a/barcodeforge/ref_muts.py b/barcodeforge/ref_muts.py index 70f7807..ac1545b 100755 --- a/barcodeforge/ref_muts.py +++ b/barcodeforge/ref_muts.py @@ -2,10 +2,7 @@ from Bio import SeqIO from collections import OrderedDict import re -from rich.console import Console -from .utils import STYLES # Assuming STYLES is in utils.py - -console = Console() +from .utils import print_error, print_warning, print_success, print_info, print_debug def _load_sample_mutations(path): @@ -137,15 +134,13 @@ def process_and_reroot_lineages( missing_count = len(sample_ids - set(seqs.keys())) if missing_count: total_count = len(sample_ids) - console.print( - f"[{STYLES['warning']}]Warning: {missing_count} out of {total_count} samples ({missing_count / total_count:.1%}) were not found in the FASTA file.[/{STYLES['warning']}]" + print_warning( + f"Warning: {missing_count} out of {total_count} samples ({missing_count / total_count:.1%}) were not found in the FASTA file." ) # if reference in the sample mutations file, use that as the root if sample_muts_df[sample_muts_df["sample"] == ref.id].shape[0] > 0: - console.print( - f"[{STYLES['success']}]Reference {ref.id} is present in sample mutations file.[/{STYLES['success']}]" - ) + print_success(f"Reference {ref.id} is present in sample mutations file.") additional_muts = _reverse_mutations_to_root( _extract_mutations( sample_muts_df[sample_muts_df["sample"] == ref.id].iloc[0] @@ -157,13 +152,11 @@ def process_and_reroot_lineages( additional_muts[i]["root"] = additional_muts[i].pop("mut") if debug: - console.print( - f"[{STYLES['debug']}]Additional mutations derived from reference {ref.id}: {additional_muts}[/{STYLES['debug']}]" - ) + print_debug(f"Additional mutations derived from reference {ref.id}: {additional_muts}") # else generate the root sequence else: - console.print( - f"[{STYLES['warning']}]Reference {ref.id} not present in sample mutations file. Inferring root sequence.[/{STYLES['warning']}]" + print_warning( + f"Reference {ref.id} not present in sample mutations file. Inferring root sequence." ) # Pre‑filter samples with non‑null mutations valid = sample_muts_df.loc[ @@ -180,9 +173,7 @@ def process_and_reroot_lineages( if seq is None: # In debug mode, log a per-sample warning and track missing samples for a summary warning below. if debug: - console.print( - f"[{STYLES['warning']}]Warning: Sample {sample_id} not found in FASTA file. Skipping.[/{STYLES['warning']}]" - ) + print_warning(f"Warning: Sample {sample_id} not found in FASTA file. Skipping.") continue root_seqs.append(_construct_root_sequence(root_muts, seq)) @@ -193,9 +184,7 @@ def process_and_reroot_lineages( # convert to dataframe and save as csv df = pd.DataFrame.from_dict(additional_muts, orient="index") df.to_csv(output_additional_muts_path, sep="\t", index_label="position") - console.print( - f"[{STYLES['success']}]Additional mutations saved to {output_additional_muts_path}[/{STYLES['success']}]" - ) + print_success(f"Additional mutations saved to {output_additional_muts_path}") lineage_paths_df = _parse_tree_paths( pd.read_csv(input_lineage_paths_path, sep="\t").fillna("") @@ -208,18 +197,14 @@ def process_and_reroot_lineages( ) if not additional_muts_list: - console.print( - f"[{STYLES['warning']}]No additional mutations found to add to lineage paths.[/{STYLES['warning']}]" - ) + print_warning("No additional mutations found to add to lineage paths.") # If no additional mutations, write the original lineage paths content or handle as needed # For now, let's just save the parsed (and potentially slightly reformatted) df lineage_paths_df["from_tree_root"] = lineage_paths_df["from_tree_root"].apply( lambda x: " ".join(x) ) else: - console.print( - f"[{STYLES['info']}]Found {len(additional_muts_list)} additional mutations to incorporate into lineage paths.[/{STYLES['info']}]" - ) + print_info(f"Found {len(additional_muts_list)} additional mutations to incorporate into lineage paths.") # add the additional mutations to the lineage paths after the first item # Ensure the logic for constructing the path string is robust @@ -241,6 +226,4 @@ def update_path(path_list): ) lineage_paths_df.to_csv(output_rerooted_lineage_paths_path, sep="\t") - console.print( - f"[{STYLES['success']}]Rerooted lineage paths saved to {output_rerooted_lineage_paths_path}[/{STYLES['success']}]" - ) + print_success(f"Rerooted lineage paths saved to {output_rerooted_lineage_paths_path}") diff --git a/barcodeforge/utils.py b/barcodeforge/utils.py index 31305ca..864af5a 100644 --- a/barcodeforge/utils.py +++ b/barcodeforge/utils.py @@ -3,26 +3,46 @@ import rich_click as click from rich.console import Console +console = Console() + STYLES = { "info": "blue", "success": "green", "error": "bold red", "warning": "yellow", "debug": "cyan", - "dim": "dim", - "highlight": "bold magenta", } +def print_error(msg: str) -> None: + console.print(f"[{STYLES['error']}]{msg}[/{STYLES['error']}]") + + +def print_warning(msg: str) -> None: + console.print(f"[{STYLES['warning']}]{msg}[/{STYLES['warning']}]") + + +def print_success(msg: str) -> None: + console.print(f"[{STYLES['success']}]{msg}[/{STYLES['success']}]") + + +def print_info(msg: str, bold: bool = False) -> None: + style = f"bold {STYLES['info']}" if bold else STYLES["info"] + console.print(f"[{style}]{msg}[/{style}]") + + +def print_debug(msg: str) -> None: + console.print(f"[{STYLES['debug']}]{msg}[/{STYLES['debug']}]") + + def resolve_tree_format( - tree_path: str, specified_format: str | None, console: Console, debug: bool + tree_path: str, specified_format: str | None, debug: bool ) -> str: """ Resolves the format of a phylogenetic tree file based on its extension or specified_format. Args: tree_path (str): Path to the tree file. specified_format (str | None): User-specified format ('newick' or 'nexus'). - console (Console): Rich console for output. debug (bool): If True, prints debug information. Returns: str: Resolved format ('newick' or 'nexus'). @@ -38,25 +58,19 @@ def resolve_tree_format( elif ext_lower == ".nexus": resolved_format = "nexus" else: - if console: - console.print( - f"[{STYLES['error']}]Error: Unknown tree format for file '{tree_path}'. Extension '{ext}' is not recognized.[/{STYLES['error']}]" - ) - console.print( - f"[{STYLES['error']}]Please specify the format using --tree-format ('newick' or 'nexus').[/]" - ) + print_error( + f"Error: Unknown tree format for file '{tree_path}'. Extension '{ext}' is not recognized." + ) + print_error("Please specify the format using --tree-format ('newick' or 'nexus').") raise click.Abort() - if debug and console: - console.print( - f"[{STYLES['warning']}]Resolved tree format for '{tree_path}': {resolved_format}[/]" - ) + if debug: + print_debug(f"Resolved tree format for '{tree_path}': {resolved_format}") return resolved_format def run_subprocess_command( cmd: list[str], - console: Console, debug: bool, success_message: str = "Successfully executed command.", error_message_prefix: str = "Error executing command", @@ -65,7 +79,6 @@ def run_subprocess_command( Runs a subprocess command and handles errors with rich output. Args: cmd (list[str]): Command to run as a list of strings. - console (Console): Rich console for output. debug (bool): If True, prints debug information. success_message (str): Message to print on successful execution. error_message_prefix (str): Prefix for error messages. @@ -74,36 +87,30 @@ def run_subprocess_command( Raises: click.Abort: If the command fails or is not found. """ - if debug and console: - console.print(f"[{STYLES['debug']}]Running command: {' '.join(cmd)}[/]") + if debug: + print_debug(f"Running command: {' '.join(cmd)}") try: process_result = subprocess.run(cmd, check=True, capture_output=True, text=True) - if debug and console: + if debug: if process_result.stdout: - console.print( - f"[{STYLES['dim']}]{cmd[0]} stdout:\\\\n{process_result.stdout}[/]" - ) + print_debug(f"{cmd[0]} stdout:\n{process_result.stdout}") if process_result.stderr: - console.print( - f"[{STYLES['dim']}]{cmd[0]} stderr:\\\\n{process_result.stderr}[/]" - ) - if success_message and console: - console.print(f"[{STYLES['success']}]{success_message}[/]") + print_debug(f"{cmd[0]} stderr:\n{process_result.stderr}") + if success_message: + print_success(success_message) return True except FileNotFoundError: - if console: - console.print( - f"[{STYLES['error']}]{error_message_prefix}: {cmd[0]} command not found. Please ensure it is installed and in your PATH.[/]" - ) + print_error( + f"{error_message_prefix}: {cmd[0]} command not found. Please ensure it is installed and in your PATH." + ) raise click.Abort() except subprocess.CalledProcessError as e: - if console: - console.print(f"[{STYLES['error']}]{error_message_prefix} {cmd[0]}: {e}[/]") - if e.stdout: - console.print(f"[{STYLES['dim']}]{cmd[0]} stdout:\\\\n{e.stdout}[/]") - if e.stderr: - console.print(f"[{STYLES['dim']}]{cmd[0]} stderr:\\\\n{e.stderr}[/]") + print_error(f"{error_message_prefix} {cmd[0]}: {e}") + if e.stdout: + print_debug(f"{cmd[0]} stdout:\n{e.stdout}") + if e.stderr: + print_debug(f"{cmd[0]} stderr:\n{e.stderr}") raise click.Abort() diff --git a/tests/test_auspice_tree_to_table.py b/tests/test_auspice_tree_to_table.py index a19656a..a0fba65 100644 --- a/tests/test_auspice_tree_to_table.py +++ b/tests/test_auspice_tree_to_table.py @@ -1,9 +1,10 @@ import json import pandas as pd import pytest -from barcodeforge.auspice_tree_to_table import json_to_tree, process_auspice_json from rich.console import Console import click +import barcodeforge.utils +from barcodeforge.auspice_tree_to_table import json_to_tree, process_auspice_json @pytest.fixture @@ -45,14 +46,12 @@ def test_json_to_tree_basic(sample_auspice_json): def test_process_auspice_json(tmp_path, sample_auspice_json): meta_out = tmp_path / "meta.tsv" tree_out = tmp_path / "tree.nwk" - console = Console(file=None) process_auspice_json( tree_json_path=str(sample_auspice_json), output_metadata_path=str(meta_out), output_tree_path=str(tree_out), include_internal_nodes=False, attributes=None, - console=console, ) assert meta_out.exists() assert tree_out.exists() @@ -64,14 +63,12 @@ def test_process_auspice_json(tmp_path, sample_auspice_json): def test_process_auspice_json_include_internal(tmp_path, sample_auspice_json): meta_out = tmp_path / "meta.tsv" tree_out = tmp_path / "tree.nwk" - console = Console(file=None) process_auspice_json( tree_json_path=str(sample_auspice_json), output_metadata_path=str(meta_out), output_tree_path=str(tree_out), include_internal_nodes=True, attributes=["country"], - console=console, ) df = pd.read_csv(meta_out, sep="\t") assert set(df["name"]) == {"root", "A", "B"} @@ -79,7 +76,8 @@ def test_process_auspice_json_include_internal(tmp_path, sample_auspice_json): def test_process_auspice_json_missing_file(tmp_path): - console = Console(record=True) + recording_console = Console(record=True) + barcodeforge.utils.console = recording_console with pytest.raises(click.Abort): process_auspice_json( tree_json_path=str(tmp_path / "no.json"), @@ -87,15 +85,15 @@ def test_process_auspice_json_missing_file(tmp_path): output_tree_path=None, include_internal_nodes=False, attributes=None, - console=console, ) - assert "Error: Tree JSON file not found" in console.export_text() + assert "Error: Tree JSON file not found" in recording_console.export_text() def test_process_auspice_json_bad_json(tmp_path): bad_path = tmp_path / "bad.json" bad_path.write_text("not valid") - console = Console(record=True) + recording_console = Console(record=True) + barcodeforge.utils.console = recording_console with pytest.raises(click.Abort): process_auspice_json( tree_json_path=str(bad_path), @@ -103,6 +101,5 @@ def test_process_auspice_json_bad_json(tmp_path): output_tree_path=None, include_internal_nodes=False, attributes=None, - console=console, ) - assert "Error: Could not decode JSON" in console.export_text() + assert "Error: Could not decode JSON" in recording_console.export_text() diff --git a/tests/test_cli.py b/tests/test_cli.py index 6e68c57..4c0248a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -8,6 +8,7 @@ import shutil from unittest.mock import call, ANY, MagicMock from rich.console import Console +import barcodeforge.utils from barcodeforge.utils import STYLES @@ -89,7 +90,7 @@ def test_barcode_command_default_options(runner, temp_files, mocker): "barcodeforge.cli.create_barcodes_from_lineage_paths" ) mock_create_plot = mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) args = [ "barcode", @@ -115,7 +116,7 @@ def test_barcode_command_default_options(runner, temp_files, mocker): final_barcode_plot_fn = "barcode_plot.pdf" mock_resolve_format.assert_called_once_with( - temp_files["tree"], None, mock_cli_console, False + temp_files["tree"], None, False ) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], @@ -137,14 +138,12 @@ def test_barcode_command_default_options(runner, temp_files, mocker): expected_subprocess_calls = [ call( ["faToVcf", temp_files["alignment"], aligned_vcf_fn], - mock_cli_console, # console passed False, # debug status passed success_message=ANY, error_message_prefix=ANY, ), call( expected_usher_cmd, - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -164,7 +163,6 @@ def test_barcode_command_default_options(runner, temp_files, mocker): "-T", "8", ], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -184,7 +182,6 @@ def test_barcode_command_default_options(runner, temp_files, mocker): "-T", "8", ], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -228,7 +225,7 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): "barcodeforge.cli.create_barcodes_from_lineage_paths" ) mock_create_plot = mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) prefix = "MYPREFIX" custom_usher_args = "-U -l" @@ -269,7 +266,7 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): final_barcode_plot_fn = f"{prefix}-barcode_plot.pdf" mock_resolve_format.assert_called_once_with( - temp_files["tree"], None, mock_cli_console, False + temp_files["tree"], None, False ) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], @@ -294,14 +291,12 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): expected_subprocess_calls = [ call( ["faToVcf", temp_files["alignment"], aligned_vcf_fn], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, ), call( expected_usher_cmd, - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -321,7 +316,6 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): "-T", custom_threads, ], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -341,7 +335,6 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): "-T", custom_threads, ], - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -383,7 +376,7 @@ def test_barcode_command_nexus_tree(runner, temp_files, mocker): mocker.patch("barcodeforge.cli.process_and_reroot_lineages") mocker.patch("barcodeforge.cli.create_barcodes_from_lineage_paths") mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) args = [ "barcode", @@ -403,7 +396,7 @@ def test_barcode_command_nexus_tree(runner, temp_files, mocker): tree_pb_fn = f"{intermediate_dir}/tree.pb" mock_resolve_format.assert_called_once_with( - temp_files["tree"], "nexus", mock_cli_console, False + temp_files["tree"], "nexus", False ) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], @@ -424,7 +417,6 @@ def test_barcode_command_nexus_tree(runner, temp_files, mocker): ] mock_run_subp.assert_any_call( expected_usher_cmd, - mock_cli_console, False, success_message=ANY, error_message_prefix=ANY, @@ -440,7 +432,7 @@ def test_barcode_command_newick_tree_reformat(runner, temp_files, mocker): mocker.patch("barcodeforge.cli.process_and_reroot_lineages") mocker.patch("barcodeforge.cli.create_barcodes_from_lineage_paths") mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) args = [ "barcode", @@ -456,7 +448,7 @@ def test_barcode_command_newick_tree_reformat(runner, temp_files, mocker): converted_tree_fn = "barcodeforge_workdir/converted_tree.nwk" mock_resolve_format.assert_called_once_with( - temp_files["tree"], "newick", mock_cli_console, False + temp_files["tree"], "newick", False ) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], @@ -478,7 +470,7 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): "barcodeforge.cli.create_barcodes_from_lineage_paths" ) mock_create_plot = mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch("barcodeforge.cli.console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) args = [ "--debug", # Main CLI debug flag @@ -504,7 +496,7 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): f"[{STYLES['debug']}]Debug mode is ON[/{STYLES['debug']}]" ) mock_resolve_format.assert_called_once_with( - temp_files["tree"], None, mock_cli_console, True + temp_files["tree"], None, True ) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], @@ -535,8 +527,7 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): ) for call_obj in mock_run_subp.call_args_list: - assert call_obj.args[1] is mock_cli_console - assert call_obj.args[2] is True + assert call_obj.args[1] is True # debug=True passed to all subprocess calls def test_barcode_command_missing_file(runner, temp_files): diff --git a/tests/test_generate_barcodes.py b/tests/test_generate_barcodes.py index 812eecb..9ff5493 100644 --- a/tests/test_generate_barcodes.py +++ b/tests/test_generate_barcodes.py @@ -1,5 +1,6 @@ import pytest import pandas as pd +import click from barcodeforge.generate_barcodes import ( parse_tree_paths, convert_to_barcodes, @@ -166,7 +167,7 @@ def temp_barcode_file_with_flips(tmp_path): def test_test_no_flip_pairs_with_flips(temp_barcode_file_with_flips): - with pytest.raises(Exception, match=r"FAIL: flip pairs found"): + with pytest.raises(click.Abort): check_no_flip_pairs( str(temp_barcode_file_with_flips) ) # Renamed from test_no_flip_pairs diff --git a/tests/test_ref_muts.py b/tests/test_ref_muts.py index 377b35f..d519163 100644 --- a/tests/test_ref_muts.py +++ b/tests/test_ref_muts.py @@ -7,6 +7,7 @@ import copy # Ensure copy is imported from unittest.mock import MagicMock, call # For console mocking from rich.console import Console # For console spec +import barcodeforge.utils from barcodeforge.ref_muts import ( _load_sample_mutations, _extract_mutations, @@ -263,7 +264,7 @@ def test_process_and_reroot_lineages_ref_not_in_muts_infer_root( output_rerooted_lineages = tmp_path / "rerooted_lineages_infer.tsv" mocked_console = MagicMock(spec=Console) - mocker.patch("barcodeforge.ref_muts.console", mocked_console) + mocker.patch("barcodeforge.utils.console", mocked_console) process_and_reroot_lineages( debug=False, @@ -342,7 +343,7 @@ def test_process_and_reroot_lineages_warning_missing_sample_in_fasta( output_rerooted_lineages = tmp_path / "rerooted_lineages_missing_fasta.tsv" mocked_console = MagicMock(spec=Console) - mocker.patch("barcodeforge.ref_muts.console", mocked_console) + mocker.patch("barcodeforge.utils.console", mocked_console) # Inferred root will be based on sampleA only: AAAAAAAAAA # Additional muts (ref vs inferred): none @@ -412,7 +413,7 @@ def test_process_and_reroot_lineages_debug_shows_per_sample_warning( output_rerooted_lineages = tmp_path / "rerooted_lineages_debug.tsv" mocked_console = MagicMock(spec=Console) - mocker.patch("barcodeforge.ref_muts.console", mocked_console) + mocker.patch("barcodeforge.utils.console", mocked_console) process_and_reroot_lineages( debug=True, @@ -464,7 +465,7 @@ def test_process_and_reroot_lineages_summary_warning_any_missing( output_rerooted_lineages = tmp_path / "rerooted_lineages_below.tsv" mocked_console = MagicMock(spec=Console) - mocker.patch("barcodeforge.ref_muts.console", mocked_console) + mocker.patch("barcodeforge.utils.console", mocked_console) process_and_reroot_lineages( debug=False, diff --git a/tests/test_utils.py b/tests/test_utils.py index 5521231..dad69b3 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,6 +3,7 @@ import subprocess from unittest.mock import MagicMock, patch, call from rich.console import Console +import barcodeforge.utils from barcodeforge.utils import ( sortFun, resolve_tree_format, @@ -18,35 +19,36 @@ def test_sortFun(): def test_resolve_tree_format_specified_newick(): - assert resolve_tree_format("any.tree", "newick", None, False) == "newick" + assert resolve_tree_format("any.tree", "newick", False) == "newick" def test_resolve_tree_format_specified_nexus(): - assert resolve_tree_format("any.tree", "nexus", None, False) == "nexus" + assert resolve_tree_format("any.tree", "nexus", False) == "nexus" def test_resolve_tree_format_infer_nwk(): - assert resolve_tree_format("test.nwk", None, None, False) == "newick" + assert resolve_tree_format("test.nwk", None, False) == "newick" def test_resolve_tree_format_infer_newick(): - assert resolve_tree_format("test.newick", None, None, False) == "newick" + assert resolve_tree_format("test.newick", None, False) == "newick" def test_resolve_tree_format_infer_nexus(): - assert resolve_tree_format("test.nexus", None, None, False) == "nexus" + assert resolve_tree_format("test.nexus", None, False) == "nexus" def test_resolve_tree_format_unknown_extension(): with pytest.raises(click.Abort): - resolve_tree_format("test.txt", None, None, False) + resolve_tree_format("test.txt", None, False) def test_resolve_tree_format_debug_output(): mock_console = MagicMock(spec=Console) - resolve_tree_format("some.nwk", None, mock_console, debug=True) + with patch.object(barcodeforge.utils, "console", mock_console): + resolve_tree_format("some.nwk", None, debug=True) mock_console.print.assert_any_call( - f"[{STYLES['warning']}]Resolved tree format for 'some.nwk': newick[/]" + f"[{STYLES['debug']}]Resolved tree format for 'some.nwk': newick[/{STYLES['debug']}]" ) @@ -56,41 +58,38 @@ def test_run_subprocess_command_success(mock_subproc_run): args=["test_cmd"], returncode=0, stdout="Success output", stderr="" ) mock_console = MagicMock(spec=Console) - result = run_subprocess_command( - ["test_cmd"], - mock_console, - debug=False, - success_message="Command executed successfully", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + result = run_subprocess_command( + ["test_cmd"], + debug=False, + success_message="Command executed successfully", + ) assert result is True mock_console.print.assert_called_once_with( - f"[{STYLES['success']}]Command executed successfully[/]" + f"[{STYLES['success']}]Command executed successfully[/{STYLES['success']}]" ) @patch("subprocess.run") def test_run_subprocess_command_failure_called_process_error(mock_subproc_run): - # This test covers cases where subprocess.run(check=True) would raise CalledProcessError mock_subproc_run.side_effect = subprocess.CalledProcessError( returncode=1, cmd=["fail_cmd_cpe"], output="out", stderr="Error output cpe" ) mock_console = MagicMock(spec=Console) - with pytest.raises(click.Abort): - run_subprocess_command( - ["fail_cmd_cpe"], - mock_console, - debug=False, - error_message_prefix="Test error CPE", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + with pytest.raises(click.Abort): + run_subprocess_command( + ["fail_cmd_cpe"], + debug=False, + error_message_prefix="Test error CPE", + ) expected_calls = [ call( - f"[{STYLES['error']}]Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.[/]" + f"[{STYLES['error']}]Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.[/{STYLES['error']}]" ), - call( - f"[{STYLES['dim']}]fail_cmd_cpe stdout:\\\\nout[/]" - ), # Corrected: output is stdout - call(f"[{STYLES['dim']}]fail_cmd_cpe stderr:\\\\nError output cpe[/]"), + call(f"[{STYLES['debug']}]fail_cmd_cpe stdout:\nout[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}]fail_cmd_cpe stderr:\nError output cpe[/{STYLES['debug']}]"), ] mock_console.print.assert_has_calls(expected_calls) @@ -99,16 +98,16 @@ def test_run_subprocess_command_failure_called_process_error(mock_subproc_run): def test_run_subprocess_command_file_not_found(mock_subproc_run): mock_subproc_run.side_effect = FileNotFoundError("Command not found") mock_console = MagicMock(spec=Console) - with pytest.raises(click.Abort): - run_subprocess_command( - ["non_existent_cmd"], - mock_console, - debug=False, - error_message_prefix="FNF error", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + with pytest.raises(click.Abort): + run_subprocess_command( + ["non_existent_cmd"], + debug=False, + error_message_prefix="FNF error", + ) mock_console.print.assert_called_once_with( - f"[{STYLES['error']}]FNF error: non_existent_cmd command not found. Please ensure it is installed and in your PATH.[/]" + f"[{STYLES['error']}]FNF error: non_existent_cmd command not found. Please ensure it is installed and in your PATH.[/{STYLES['error']}]" ) @@ -121,18 +120,18 @@ def test_run_subprocess_command_success_debug(mock_subproc_run): stderr="Debug success stderr", ) mock_console = MagicMock(spec=Console) - result = run_subprocess_command( - ["debug_cmd_success", "arg1"], - mock_console, - debug=True, - success_message="Debug success", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + result = run_subprocess_command( + ["debug_cmd_success", "arg1"], + debug=True, + success_message="Debug success", + ) assert result is True expected_calls = [ - call(f"[{STYLES['debug']}]Running command: debug_cmd_success arg1[/]"), - call(f"[{STYLES['dim']}]debug_cmd_success stdout:\\\\nDebug success output[/]"), - call(f"[{STYLES['dim']}]debug_cmd_success stderr:\\\\nDebug success stderr[/]"), - call(f"[{STYLES['success']}]Debug success[/]"), + call(f"[{STYLES['debug']}]Running command: debug_cmd_success arg1[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}]debug_cmd_success stdout:\nDebug success output[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}]debug_cmd_success stderr:\nDebug success stderr[/{STYLES['debug']}]"), + call(f"[{STYLES['success']}]Debug success[/{STYLES['success']}]"), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) @@ -146,41 +145,22 @@ def test_run_subprocess_command_success_debug_empty_stderr(mock_subproc_run): stderr="", ) mock_console = MagicMock(spec=Console) - result = run_subprocess_command( - ["debug_cmd_success_no_stderr"], - mock_console, - debug=True, - success_message="Debug success no stderr", - ) + with patch.object(barcodeforge.utils, "console", mock_console): + result = run_subprocess_command( + ["debug_cmd_success_no_stderr"], + debug=True, + success_message="Debug success no stderr", + ) assert result is True expected_calls = [ - call(f"[{STYLES['debug']}]Running command: debug_cmd_success_no_stderr[/]"), - call( - f"[{STYLES['dim']}]debug_cmd_success_no_stderr stdout:\\\\nDebug success output[/]" - ), - # No call for stderr as it's empty - call(f"[{STYLES['success']}]Debug success no stderr[/]"), + call(f"[{STYLES['debug']}]Running command: debug_cmd_success_no_stderr[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}]debug_cmd_success_no_stderr stdout:\nDebug success output[/{STYLES['debug']}]"), + call(f"[{STYLES['success']}]Debug success no stderr[/{STYLES['success']}]"), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) - # Verify stderr was not printed. - # We check that if a call contains "stderr:\\n", it must be followed by another character, - # meaning it's not an empty stderr print. - for acall in mock_console.print.call_args_list: - call_str = acall[0][0] - # This assertion is a bit tricky. We want to ensure that if "stderr:\\n" is present, - # it's not *just* "stderr:\\n" (or "stderr:\\n[/]" with a style closing tag). - # It should have content after "stderr:\\n". - # A simpler way is to count calls that would match an empty stderr print. - # However, the current structure of the code in utils.py ensures that - # if stderr is empty, the print call for stderr is skipped entirely. - # So, we just need to ensure no call looks like an empty stderr print. - assert not ( - f" stderr:\\\\n Date: Wed, 18 Mar 2026 10:18:20 -0700 Subject: [PATCH 3/9] Prefix console messages with severity labels --- barcodeforge/ref_muts.py | 2 +- barcodeforge/utils.py | 10 +++++----- tests/test_cli.py | 2 +- tests/test_ref_muts.py | 2 +- tests/test_utils.py | 34 +++++++++++++++++----------------- 5 files changed, 25 insertions(+), 25 deletions(-) diff --git a/barcodeforge/ref_muts.py b/barcodeforge/ref_muts.py index ac1545b..1167fd5 100755 --- a/barcodeforge/ref_muts.py +++ b/barcodeforge/ref_muts.py @@ -197,7 +197,7 @@ def process_and_reroot_lineages( ) if not additional_muts_list: - print_warning("No additional mutations found to add to lineage paths.") + print_info("No additional mutations found to add to lineage paths.") # If no additional mutations, write the original lineage paths content or handle as needed # For now, let's just save the parsed (and potentially slightly reformatted) df lineage_paths_df["from_tree_root"] = lineage_paths_df["from_tree_root"].apply( diff --git a/barcodeforge/utils.py b/barcodeforge/utils.py index 864af5a..25cb606 100644 --- a/barcodeforge/utils.py +++ b/barcodeforge/utils.py @@ -15,24 +15,24 @@ def print_error(msg: str) -> None: - console.print(f"[{STYLES['error']}]{msg}[/{STYLES['error']}]") + console.print(f"[{STYLES['error']}][ERROR] {msg}[/{STYLES['error']}]") def print_warning(msg: str) -> None: - console.print(f"[{STYLES['warning']}]{msg}[/{STYLES['warning']}]") + console.print(f"[{STYLES['warning']}][WARNING] {msg}[/{STYLES['warning']}]") def print_success(msg: str) -> None: - console.print(f"[{STYLES['success']}]{msg}[/{STYLES['success']}]") + console.print(f"[{STYLES['success']}][SUCCESS] {msg}[/{STYLES['success']}]") def print_info(msg: str, bold: bool = False) -> None: style = f"bold {STYLES['info']}" if bold else STYLES["info"] - console.print(f"[{style}]{msg}[/{style}]") + console.print(f"[{style}][INFO] {msg}[/{style}]") def print_debug(msg: str) -> None: - console.print(f"[{STYLES['debug']}]{msg}[/{STYLES['debug']}]") + console.print(f"[{STYLES['debug']}][DEBUG] {msg}[/{STYLES['debug']}]") def resolve_tree_format( diff --git a/tests/test_cli.py b/tests/test_cli.py index 4c0248a..45166df 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -493,7 +493,7 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): final_barcode_plot_fn = "barcode_plot.pdf" mock_cli_console.print.assert_any_call( - f"[{STYLES['debug']}]Debug mode is ON[/{STYLES['debug']}]" + f"[{STYLES['debug']}][DEBUG] Debug mode is ON[/{STYLES['debug']}]" ) mock_resolve_format.assert_called_once_with( temp_files["tree"], None, True diff --git a/tests/test_ref_muts.py b/tests/test_ref_muts.py index d519163..665c9e5 100644 --- a/tests/test_ref_muts.py +++ b/tests/test_ref_muts.py @@ -280,7 +280,7 @@ def test_process_and_reroot_lineages_ref_not_in_muts_infer_root( assert output_rerooted_lineages.exists() # Verify warning for inferred root - expected_warning_call_substr = "[yellow]Reference ref_genome not present in sample mutations file. Inferring root sequence." + expected_warning_call_substr = "[yellow][WARNING] Reference ref_genome not present in sample mutations file. Inferring root sequence." assert any( expected_warning_call_substr in str(c_args) for c_args in mocked_console.print.call_args_list diff --git a/tests/test_utils.py b/tests/test_utils.py index dad69b3..b62dc70 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -48,7 +48,7 @@ def test_resolve_tree_format_debug_output(): with patch.object(barcodeforge.utils, "console", mock_console): resolve_tree_format("some.nwk", None, debug=True) mock_console.print.assert_any_call( - f"[{STYLES['debug']}]Resolved tree format for 'some.nwk': newick[/{STYLES['debug']}]" + f"[{STYLES['debug']}][DEBUG] Resolved tree format for 'some.nwk': newick[/{STYLES['debug']}]" ) @@ -66,7 +66,7 @@ def test_run_subprocess_command_success(mock_subproc_run): ) assert result is True mock_console.print.assert_called_once_with( - f"[{STYLES['success']}]Command executed successfully[/{STYLES['success']}]" + f"[{STYLES['success']}][SUCCESS] Command executed successfully[/{STYLES['success']}]" ) @@ -86,10 +86,10 @@ def test_run_subprocess_command_failure_called_process_error(mock_subproc_run): expected_calls = [ call( - f"[{STYLES['error']}]Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.[/{STYLES['error']}]" + f"[{STYLES['error']}][ERROR] Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.[/{STYLES['error']}]" ), - call(f"[{STYLES['debug']}]fail_cmd_cpe stdout:\nout[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}]fail_cmd_cpe stderr:\nError output cpe[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}][DEBUG] fail_cmd_cpe stdout:\nout[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}][DEBUG] fail_cmd_cpe stderr:\nError output cpe[/{STYLES['debug']}]"), ] mock_console.print.assert_has_calls(expected_calls) @@ -107,7 +107,7 @@ def test_run_subprocess_command_file_not_found(mock_subproc_run): ) mock_console.print.assert_called_once_with( - f"[{STYLES['error']}]FNF error: non_existent_cmd command not found. Please ensure it is installed and in your PATH.[/{STYLES['error']}]" + f"[{STYLES['error']}][ERROR] FNF error: non_existent_cmd command not found. Please ensure it is installed and in your PATH.[/{STYLES['error']}]" ) @@ -128,10 +128,10 @@ def test_run_subprocess_command_success_debug(mock_subproc_run): ) assert result is True expected_calls = [ - call(f"[{STYLES['debug']}]Running command: debug_cmd_success arg1[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}]debug_cmd_success stdout:\nDebug success output[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}]debug_cmd_success stderr:\nDebug success stderr[/{STYLES['debug']}]"), - call(f"[{STYLES['success']}]Debug success[/{STYLES['success']}]"), + call(f"[{STYLES['debug']}][DEBUG] Running command: debug_cmd_success arg1[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}][DEBUG] debug_cmd_success stdout:\nDebug success output[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}][DEBUG] debug_cmd_success stderr:\nDebug success stderr[/{STYLES['debug']}]"), + call(f"[{STYLES['success']}][SUCCESS] Debug success[/{STYLES['success']}]"), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) @@ -153,9 +153,9 @@ def test_run_subprocess_command_success_debug_empty_stderr(mock_subproc_run): ) assert result is True expected_calls = [ - call(f"[{STYLES['debug']}]Running command: debug_cmd_success_no_stderr[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}]debug_cmd_success_no_stderr stdout:\nDebug success output[/{STYLES['debug']}]"), - call(f"[{STYLES['success']}]Debug success no stderr[/{STYLES['success']}]"), + call(f"[{STYLES['debug']}][DEBUG] Running command: debug_cmd_success_no_stderr[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}][DEBUG] debug_cmd_success_no_stderr stdout:\nDebug success output[/{STYLES['debug']}]"), + call(f"[{STYLES['success']}][SUCCESS] Debug success no stderr[/{STYLES['success']}]"), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) # Verify no stderr output line was printed (empty stderr is skipped) @@ -180,11 +180,11 @@ def test_run_subprocess_command_failure_debug(mock_subproc_run): ) expected_calls_in_order = [ - call(f"[{STYLES['debug']}]Running command: {' '.join(cmd_list)}[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}][DEBUG] Running command: {' '.join(cmd_list)}[/{STYLES['debug']}]"), call( - f"[{STYLES['error']}]Debug fail error {cmd_list[0]}: Command '{cmd_list}' returned non-zero exit status 1.[/{STYLES['error']}]" + f"[{STYLES['error']}][ERROR] Debug fail error {cmd_list[0]}: Command '{cmd_list}' returned non-zero exit status 1.[/{STYLES['error']}]" ), - call(f"[{STYLES['debug']}]{cmd_list[0]} stdout:\nDebug fail stdout[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}]{cmd_list[0]} stderr:\nDebug fail stderr[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}][DEBUG] {cmd_list[0]} stdout:\nDebug fail stdout[/{STYLES['debug']}]"), + call(f"[{STYLES['debug']}][DEBUG] {cmd_list[0]} stderr:\nDebug fail stderr[/{STYLES['debug']}]"), ] mock_console.print.assert_has_calls(expected_calls_in_order, any_order=False) From e43a2aa94b26a08e07150e04039589a76cd48de2 Mon Sep 17 00:00:00 2001 From: gp201 Date: Wed, 18 Mar 2026 10:26:30 -0700 Subject: [PATCH 4/9] Use pandas grouping and click.Abort for checks --- barcodeforge/generate_barcodes.py | 55 ++++++++++++++----------------- tests/test_generate_barcodes.py | 4 +-- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/barcodeforge/generate_barcodes.py b/barcodeforge/generate_barcodes.py index c9673ce..a0d0d8a 100755 --- a/barcodeforge/generate_barcodes.py +++ b/barcodeforge/generate_barcodes.py @@ -135,36 +135,31 @@ def check_allele_consistency(df_barcodes: pd.DataFrame): Extract columns and ensure for every position ([1:-1]) there is only 1 reference allele ([0]). Additionally check if any lineage has multiple alternative alleles at the same position. """ - pos_refs = {} - pos_cols = {} - for col in df_barcodes.columns: - pos = col[1:-1] - ref = col[0] - if pos in pos_refs: - if pos_refs[pos] != ref: - print_error( - f"Error: Position {pos} has multiple reference alleles: '{pos_refs[pos]}' and '{ref}'" - ) - raise ValueError( - f"Position {pos} has multiple reference alleles: '{pos_refs[pos]}' and '{ref}'" - ) - else: - pos_refs[pos] = ref - - if pos not in pos_cols: - pos_cols[pos] = [] - pos_cols[pos].append(col) - - for pos, cols in pos_cols.items(): - if len(cols) > 1: - if (df_barcodes[cols].sum(axis=1) > 1).any(): - lineages = df_barcodes[df_barcodes[cols].sum(axis=1) > 1].index.tolist() - print_error( - f"Error: Position {pos} has multiple alternative alleles in lineages: {lineages}" - ) - raise ValueError( - f"Position {pos} has multiple alternative alleles in lineages: {lineages}" - ) + positions = df_barcodes.columns.str[1:-1] + refs = df_barcodes.columns.str[0] + + # Check for multiple reference alleles at the same position + ref_series = pd.Series(refs.values, index=positions) + bad_pos = ref_series.groupby(level=0).nunique() + bad_pos = bad_pos[bad_pos > 1] + if not bad_pos.empty: + pos = bad_pos.index[0] + conflicting = ref_series[pos].unique() + print_error( + f"Error: Position {pos} has multiple reference alleles: '{conflicting[0]}' and '{conflicting[1]}'" + ) + raise click.Abort() + + # Sum mutation counts per position per lineage, then check for any > 1 + pos_sums = df_barcodes.T.groupby(positions.values).sum().T + invalid_mask = pos_sums > 1 + if invalid_mask.any(axis=None): + offending_pos = invalid_mask.columns[invalid_mask.any(axis=0)][0] + lineages = pos_sums.index[invalid_mask[offending_pos]].tolist() + print_error( + f"Error: Position {offending_pos} has multiple alternative alleles in lineages: {lineages}" + ) + raise click.Abort() def identify_chains(df_barcodes: pd.DataFrame) -> list: diff --git a/tests/test_generate_barcodes.py b/tests/test_generate_barcodes.py index 9ff5493..6d1006f 100644 --- a/tests/test_generate_barcodes.py +++ b/tests/test_generate_barcodes.py @@ -134,9 +134,7 @@ def test_check_allele_consistency(): check_allele_consistency(df_valid) # Should not raise df_invalid = pd.DataFrame({"A123T": [1, 0], "C123G": [0, 1]}) - with pytest.raises( - ValueError, match="Position 123 has multiple reference alleles: 'A' and 'C'" - ): + with pytest.raises(click.Abort): check_allele_consistency(df_invalid) From beb11f2840bd03e425191de2a6ad27b9686e78cf Mon Sep 17 00:00:00 2001 From: gp201 Date: Wed, 18 Mar 2026 10:26:44 -0700 Subject: [PATCH 5/9] lint: lint fixes --- barcodeforge/auspice_tree_to_table.py | 4 ++- barcodeforge/cli.py | 20 +++++++++--- barcodeforge/ref_muts.py | 16 +++++++--- barcodeforge/utils.py | 4 ++- tests/test_cli.py | 40 ++++++++++++------------ tests/test_utils.py | 44 ++++++++++++++++++++------- 6 files changed, 86 insertions(+), 42 deletions(-) diff --git a/barcodeforge/auspice_tree_to_table.py b/barcodeforge/auspice_tree_to_table.py index cb62583..84b950e 100644 --- a/barcodeforge/auspice_tree_to_table.py +++ b/barcodeforge/auspice_tree_to_table.py @@ -252,4 +252,6 @@ def process_auspice_json( if not output_tree_path and not output_metadata_path: # This case should be handled by the CLI command to require at least one output. # However, a warning here is fine if called directly. - print_warning("No output requested. Use --output-tree and/or --output-metadata.") + print_warning( + "No output requested. Use --output-tree and/or --output-metadata." + ) diff --git a/barcodeforge/cli.py b/barcodeforge/cli.py index 030906a..81e60d3 100644 --- a/barcodeforge/cli.py +++ b/barcodeforge/cli.py @@ -124,7 +124,9 @@ def barcode( if resolved_tree_format == "nexus": if is_debug: - print_info(f"Converting Nexus tree ({tree}) to Newick format at {output_converted_tree_path}...") + print_info( + f"Converting Nexus tree ({tree}) to Newick format at {output_converted_tree_path}..." + ) convert_nexus_to_newick( input_file=tree, output_file=output_converted_tree_path, @@ -133,15 +135,21 @@ def barcode( print_success(f"Converted tree saved to {output_converted_tree_path}") elif resolved_tree_format == "newick": if is_debug: - print_info(f"Processing Newick tree ({tree}) to {output_converted_tree_path}...") + print_info( + f"Processing Newick tree ({tree}) to {output_converted_tree_path}..." + ) convert_nexus_to_newick( input_file=tree, output_file=output_converted_tree_path, input_format="newick", ) - print_success(f"Processed tree saved to {output_converted_tree_path} (if conversion/reformatting occurred)") + print_success( + f"Processed tree saved to {output_converted_tree_path} (if conversion/reformatting occurred)" + ) else: - print_error(f"Error: Unsupported tree format: {resolved_tree_format}. Expected 'newick' or 'nexus'.") + print_error( + f"Error: Unsupported tree format: {resolved_tree_format}. Expected 'newick' or 'nexus'." + ) raise click.Abort() # Run usher command @@ -262,7 +270,9 @@ def barcode( output_file_path=plot_output_path, ) - print_success(f"Generated barcodes are saved to '{csv_path}' and plot saved to '{plot_output_path}'") + print_success( + f"Generated barcodes are saved to '{csv_path}' and plot saved to '{plot_output_path}'" + ) @cli.command() diff --git a/barcodeforge/ref_muts.py b/barcodeforge/ref_muts.py index 1167fd5..129dfe7 100755 --- a/barcodeforge/ref_muts.py +++ b/barcodeforge/ref_muts.py @@ -152,7 +152,9 @@ def process_and_reroot_lineages( additional_muts[i]["root"] = additional_muts[i].pop("mut") if debug: - print_debug(f"Additional mutations derived from reference {ref.id}: {additional_muts}") + print_debug( + f"Additional mutations derived from reference {ref.id}: {additional_muts}" + ) # else generate the root sequence else: print_warning( @@ -173,7 +175,9 @@ def process_and_reroot_lineages( if seq is None: # In debug mode, log a per-sample warning and track missing samples for a summary warning below. if debug: - print_warning(f"Warning: Sample {sample_id} not found in FASTA file. Skipping.") + print_warning( + f"Warning: Sample {sample_id} not found in FASTA file. Skipping." + ) continue root_seqs.append(_construct_root_sequence(root_muts, seq)) @@ -204,7 +208,9 @@ def process_and_reroot_lineages( lambda x: " ".join(x) ) else: - print_info(f"Found {len(additional_muts_list)} additional mutations to incorporate into lineage paths.") + print_info( + f"Found {len(additional_muts_list)} additional mutations to incorporate into lineage paths." + ) # add the additional mutations to the lineage paths after the first item # Ensure the logic for constructing the path string is robust @@ -226,4 +232,6 @@ def update_path(path_list): ) lineage_paths_df.to_csv(output_rerooted_lineage_paths_path, sep="\t") - print_success(f"Rerooted lineage paths saved to {output_rerooted_lineage_paths_path}") + print_success( + f"Rerooted lineage paths saved to {output_rerooted_lineage_paths_path}" + ) diff --git a/barcodeforge/utils.py b/barcodeforge/utils.py index 25cb606..617ea5b 100644 --- a/barcodeforge/utils.py +++ b/barcodeforge/utils.py @@ -61,7 +61,9 @@ def resolve_tree_format( print_error( f"Error: Unknown tree format for file '{tree_path}'. Extension '{ext}' is not recognized." ) - print_error("Please specify the format using --tree-format ('newick' or 'nexus').") + print_error( + "Please specify the format using --tree-format ('newick' or 'nexus')." + ) raise click.Abort() if debug: diff --git a/tests/test_cli.py b/tests/test_cli.py index 45166df..ab6685f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -90,7 +90,9 @@ def test_barcode_command_default_options(runner, temp_files, mocker): "barcodeforge.cli.create_barcodes_from_lineage_paths" ) mock_create_plot = mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) args = [ "barcode", @@ -115,9 +117,7 @@ def test_barcode_command_default_options(runner, temp_files, mocker): final_barcodes_csv_fn = "barcode.csv" final_barcode_plot_fn = "barcode_plot.pdf" - mock_resolve_format.assert_called_once_with( - temp_files["tree"], None, False - ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], None, False) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file=converted_tree_fn, @@ -225,7 +225,9 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): "barcodeforge.cli.create_barcodes_from_lineage_paths" ) mock_create_plot = mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) prefix = "MYPREFIX" custom_usher_args = "-U -l" @@ -265,9 +267,7 @@ def test_barcode_command_custom_options(runner, temp_files, mocker): final_barcodes_csv_fn = f"{prefix}-barcode.csv" final_barcode_plot_fn = f"{prefix}-barcode_plot.pdf" - mock_resolve_format.assert_called_once_with( - temp_files["tree"], None, False - ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], None, False) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file=converted_tree_fn, @@ -376,7 +376,9 @@ def test_barcode_command_nexus_tree(runner, temp_files, mocker): mocker.patch("barcodeforge.cli.process_and_reroot_lineages") mocker.patch("barcodeforge.cli.create_barcodes_from_lineage_paths") mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) args = [ "barcode", @@ -395,9 +397,7 @@ def test_barcode_command_nexus_tree(runner, temp_files, mocker): aligned_vcf_fn = f"{intermediate_dir}/aligned.vcf" tree_pb_fn = f"{intermediate_dir}/tree.pb" - mock_resolve_format.assert_called_once_with( - temp_files["tree"], "nexus", False - ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], "nexus", False) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file=converted_tree_fn, @@ -432,7 +432,9 @@ def test_barcode_command_newick_tree_reformat(runner, temp_files, mocker): mocker.patch("barcodeforge.cli.process_and_reroot_lineages") mocker.patch("barcodeforge.cli.create_barcodes_from_lineage_paths") mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) args = [ "barcode", @@ -447,9 +449,7 @@ def test_barcode_command_newick_tree_reformat(runner, temp_files, mocker): assert result.exit_code == 0, f"CLI failed: {result.output}" converted_tree_fn = "barcodeforge_workdir/converted_tree.nwk" - mock_resolve_format.assert_called_once_with( - temp_files["tree"], "newick", False - ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], "newick", False) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file=converted_tree_fn, @@ -470,7 +470,9 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): "barcodeforge.cli.create_barcodes_from_lineage_paths" ) mock_create_plot = mocker.patch("barcodeforge.cli.create_barcode_plot") - mock_cli_console = mocker.patch.object(barcodeforge.utils, "console", MagicMock(spec=Console)) + mock_cli_console = mocker.patch.object( + barcodeforge.utils, "console", MagicMock(spec=Console) + ) args = [ "--debug", # Main CLI debug flag @@ -495,9 +497,7 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): mock_cli_console.print.assert_any_call( f"[{STYLES['debug']}][DEBUG] Debug mode is ON[/{STYLES['debug']}]" ) - mock_resolve_format.assert_called_once_with( - temp_files["tree"], None, True - ) + mock_resolve_format.assert_called_once_with(temp_files["tree"], None, True) mock_convert_tree.assert_called_once_with( input_file=temp_files["tree"], output_file="barcodeforge_workdir/converted_tree.nwk", # Unprefixed diff --git a/tests/test_utils.py b/tests/test_utils.py index b62dc70..c6ac55a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -88,8 +88,12 @@ def test_run_subprocess_command_failure_called_process_error(mock_subproc_run): call( f"[{STYLES['error']}][ERROR] Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.[/{STYLES['error']}]" ), - call(f"[{STYLES['debug']}][DEBUG] fail_cmd_cpe stdout:\nout[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}][DEBUG] fail_cmd_cpe stderr:\nError output cpe[/{STYLES['debug']}]"), + call( + f"[{STYLES['debug']}][DEBUG] fail_cmd_cpe stdout:\nout[/{STYLES['debug']}]" + ), + call( + f"[{STYLES['debug']}][DEBUG] fail_cmd_cpe stderr:\nError output cpe[/{STYLES['debug']}]" + ), ] mock_console.print.assert_has_calls(expected_calls) @@ -128,9 +132,15 @@ def test_run_subprocess_command_success_debug(mock_subproc_run): ) assert result is True expected_calls = [ - call(f"[{STYLES['debug']}][DEBUG] Running command: debug_cmd_success arg1[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}][DEBUG] debug_cmd_success stdout:\nDebug success output[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}][DEBUG] debug_cmd_success stderr:\nDebug success stderr[/{STYLES['debug']}]"), + call( + f"[{STYLES['debug']}][DEBUG] Running command: debug_cmd_success arg1[/{STYLES['debug']}]" + ), + call( + f"[{STYLES['debug']}][DEBUG] debug_cmd_success stdout:\nDebug success output[/{STYLES['debug']}]" + ), + call( + f"[{STYLES['debug']}][DEBUG] debug_cmd_success stderr:\nDebug success stderr[/{STYLES['debug']}]" + ), call(f"[{STYLES['success']}][SUCCESS] Debug success[/{STYLES['success']}]"), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) @@ -153,9 +163,15 @@ def test_run_subprocess_command_success_debug_empty_stderr(mock_subproc_run): ) assert result is True expected_calls = [ - call(f"[{STYLES['debug']}][DEBUG] Running command: debug_cmd_success_no_stderr[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}][DEBUG] debug_cmd_success_no_stderr stdout:\nDebug success output[/{STYLES['debug']}]"), - call(f"[{STYLES['success']}][SUCCESS] Debug success no stderr[/{STYLES['success']}]"), + call( + f"[{STYLES['debug']}][DEBUG] Running command: debug_cmd_success_no_stderr[/{STYLES['debug']}]" + ), + call( + f"[{STYLES['debug']}][DEBUG] debug_cmd_success_no_stderr stdout:\nDebug success output[/{STYLES['debug']}]" + ), + call( + f"[{STYLES['success']}][SUCCESS] Debug success no stderr[/{STYLES['success']}]" + ), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) # Verify no stderr output line was printed (empty stderr is skipped) @@ -180,11 +196,17 @@ def test_run_subprocess_command_failure_debug(mock_subproc_run): ) expected_calls_in_order = [ - call(f"[{STYLES['debug']}][DEBUG] Running command: {' '.join(cmd_list)}[/{STYLES['debug']}]"), + call( + f"[{STYLES['debug']}][DEBUG] Running command: {' '.join(cmd_list)}[/{STYLES['debug']}]" + ), call( f"[{STYLES['error']}][ERROR] Debug fail error {cmd_list[0]}: Command '{cmd_list}' returned non-zero exit status 1.[/{STYLES['error']}]" ), - call(f"[{STYLES['debug']}][DEBUG] {cmd_list[0]} stdout:\nDebug fail stdout[/{STYLES['debug']}]"), - call(f"[{STYLES['debug']}][DEBUG] {cmd_list[0]} stderr:\nDebug fail stderr[/{STYLES['debug']}]"), + call( + f"[{STYLES['debug']}][DEBUG] {cmd_list[0]} stdout:\nDebug fail stdout[/{STYLES['debug']}]" + ), + call( + f"[{STYLES['debug']}][DEBUG] {cmd_list[0]} stderr:\nDebug fail stderr[/{STYLES['debug']}]" + ), ] mock_console.print.assert_has_calls(expected_calls_in_order, any_order=False) From 290a63bf71642fb935d39e4a65b1ccf47b8ed03f Mon Sep 17 00:00:00 2001 From: gp201 Date: Wed, 18 Mar 2026 10:44:41 -0700 Subject: [PATCH 6/9] addressed comments --- barcodeforge/auspice_tree_to_table.py | 6 ++-- barcodeforge/generate_barcodes.py | 10 ++++--- barcodeforge/ref_muts.py | 6 ++-- barcodeforge/utils.py | 13 +++++---- tests/test_auspice_tree_to_table.py | 41 ++++++++++++++------------- tests/test_generate_barcodes.py | 11 +++++-- tests/test_utils.py | 21 +++++--------- 7 files changed, 56 insertions(+), 52 deletions(-) diff --git a/barcodeforge/auspice_tree_to_table.py b/barcodeforge/auspice_tree_to_table.py index 84b950e..5e94966 100644 --- a/barcodeforge/auspice_tree_to_table.py +++ b/barcodeforge/auspice_tree_to_table.py @@ -136,10 +136,10 @@ def process_auspice_json( with open(tree_json_path, "r", encoding="utf-8") as fh: tree_json_data = json.load(fh) except FileNotFoundError: - print_error(f"Error: Tree JSON file not found at '{tree_json_path}'") + print_error(f"Tree JSON file not found at '{tree_json_path}'") raise click.Abort() except json.JSONDecodeError: - print_error(f"Error: Could not decode JSON from '{tree_json_path}'") + print_error(f"Could not decode JSON from '{tree_json_path}'") raise click.Abort() tree = json_to_tree(tree_json_data) @@ -172,7 +172,7 @@ def process_auspice_json( if not attrs_set: print_warning( - "Warning: Could not auto-detect any attributes from the tree root. " + "Could not auto-detect any attributes from the tree root. " "The metadata output might be sparse or only contain 'name'. " "Consider using --attributes to specify columns." ) diff --git a/barcodeforge/generate_barcodes.py b/barcodeforge/generate_barcodes.py index a0d0d8a..f8977ed 100755 --- a/barcodeforge/generate_barcodes.py +++ b/barcodeforge/generate_barcodes.py @@ -146,18 +146,20 @@ def check_allele_consistency(df_barcodes: pd.DataFrame): pos = bad_pos.index[0] conflicting = ref_series[pos].unique() print_error( - f"Error: Position {pos} has multiple reference alleles: '{conflicting[0]}' and '{conflicting[1]}'" + f"Position {pos} has multiple reference alleles: {', '.join(repr(a) for a in conflicting)}" ) raise click.Abort() - # Sum mutation counts per position per lineage, then check for any > 1 - pos_sums = df_barcodes.T.groupby(positions.values).sum().T + # Convert to presence/absence before summing per position per lineage, + # so only distinct alleles are counted (not raw mutation counts). + presence = (df_barcodes > 0).astype(int) + pos_sums = presence.T.groupby(positions.values).sum().T invalid_mask = pos_sums > 1 if invalid_mask.any(axis=None): offending_pos = invalid_mask.columns[invalid_mask.any(axis=0)][0] lineages = pos_sums.index[invalid_mask[offending_pos]].tolist() print_error( - f"Error: Position {offending_pos} has multiple alternative alleles in lineages: {lineages}" + f"Position {offending_pos} has multiple alternative alleles in lineages: {lineages}" ) raise click.Abort() diff --git a/barcodeforge/ref_muts.py b/barcodeforge/ref_muts.py index 129dfe7..fae360a 100755 --- a/barcodeforge/ref_muts.py +++ b/barcodeforge/ref_muts.py @@ -135,7 +135,7 @@ def process_and_reroot_lineages( if missing_count: total_count = len(sample_ids) print_warning( - f"Warning: {missing_count} out of {total_count} samples ({missing_count / total_count:.1%}) were not found in the FASTA file." + f"{missing_count} out of {total_count} samples ({missing_count / total_count:.1%}) were not found in the FASTA file." ) # if reference in the sample mutations file, use that as the root @@ -176,7 +176,7 @@ def process_and_reroot_lineages( # In debug mode, log a per-sample warning and track missing samples for a summary warning below. if debug: print_warning( - f"Warning: Sample {sample_id} not found in FASTA file. Skipping." + f"Sample {sample_id} not found in FASTA file. Skipping." ) continue root_seqs.append(_construct_root_sequence(root_muts, seq)) @@ -201,7 +201,7 @@ def process_and_reroot_lineages( ) if not additional_muts_list: - print_info("No additional mutations found to add to lineage paths.") + print_warning("No additional mutations found to add to lineage paths.") # If no additional mutations, write the original lineage paths content or handle as needed # For now, let's just save the parsed (and potentially slightly reformatted) df lineage_paths_df["from_tree_root"] = lineage_paths_df["from_tree_root"].apply( diff --git a/barcodeforge/utils.py b/barcodeforge/utils.py index 617ea5b..7e6ce7a 100644 --- a/barcodeforge/utils.py +++ b/barcodeforge/utils.py @@ -59,7 +59,7 @@ def resolve_tree_format( resolved_format = "nexus" else: print_error( - f"Error: Unknown tree format for file '{tree_path}'. Extension '{ext}' is not recognized." + f"Unknown tree format for file '{tree_path}'. Extension '{ext}' is not recognized." ) print_error( "Please specify the format using --tree-format ('newick' or 'nexus')." @@ -85,7 +85,7 @@ def run_subprocess_command( success_message (str): Message to print on successful execution. error_message_prefix (str): Prefix for error messages. Returns: - bool: True if the command was executed successfully, False otherwise. + bool: True if the command was executed successfully. Raises: click.Abort: If the command fails or is not found. """ @@ -109,10 +109,11 @@ def run_subprocess_command( raise click.Abort() except subprocess.CalledProcessError as e: print_error(f"{error_message_prefix} {cmd[0]}: {e}") - if e.stdout: - print_debug(f"{cmd[0]} stdout:\n{e.stdout}") - if e.stderr: - print_debug(f"{cmd[0]} stderr:\n{e.stderr}") + if debug: + if e.stdout: + print_debug(f"{cmd[0]} stdout:\n{e.stdout}") + if e.stderr: + print_debug(f"{cmd[0]} stderr:\n{e.stderr}") raise click.Abort() diff --git a/tests/test_auspice_tree_to_table.py b/tests/test_auspice_tree_to_table.py index a0fba65..6e733f9 100644 --- a/tests/test_auspice_tree_to_table.py +++ b/tests/test_auspice_tree_to_table.py @@ -2,6 +2,7 @@ import pandas as pd import pytest from rich.console import Console +from unittest.mock import patch import click import barcodeforge.utils from barcodeforge.auspice_tree_to_table import json_to_tree, process_auspice_json @@ -77,29 +78,29 @@ def test_process_auspice_json_include_internal(tmp_path, sample_auspice_json): def test_process_auspice_json_missing_file(tmp_path): recording_console = Console(record=True) - barcodeforge.utils.console = recording_console - with pytest.raises(click.Abort): - process_auspice_json( - tree_json_path=str(tmp_path / "no.json"), - output_metadata_path=str(tmp_path / "meta.tsv"), - output_tree_path=None, - include_internal_nodes=False, - attributes=None, - ) - assert "Error: Tree JSON file not found" in recording_console.export_text() + with patch.object(barcodeforge.utils, "console", recording_console): + with pytest.raises(click.Abort): + process_auspice_json( + tree_json_path=str(tmp_path / "no.json"), + output_metadata_path=str(tmp_path / "meta.tsv"), + output_tree_path=None, + include_internal_nodes=False, + attributes=None, + ) + assert "Tree JSON file not found" in recording_console.export_text() def test_process_auspice_json_bad_json(tmp_path): bad_path = tmp_path / "bad.json" bad_path.write_text("not valid") recording_console = Console(record=True) - barcodeforge.utils.console = recording_console - with pytest.raises(click.Abort): - process_auspice_json( - tree_json_path=str(bad_path), - output_metadata_path=None, - output_tree_path=None, - include_internal_nodes=False, - attributes=None, - ) - assert "Error: Could not decode JSON" in recording_console.export_text() + with patch.object(barcodeforge.utils, "console", recording_console): + with pytest.raises(click.Abort): + process_auspice_json( + tree_json_path=str(bad_path), + output_metadata_path=None, + output_tree_path=None, + include_internal_nodes=False, + attributes=None, + ) + assert "Could not decode JSON" in recording_console.export_text() diff --git a/tests/test_generate_barcodes.py b/tests/test_generate_barcodes.py index 6d1006f..3941b0a 100644 --- a/tests/test_generate_barcodes.py +++ b/tests/test_generate_barcodes.py @@ -130,12 +130,19 @@ def test_replace_underscore_with_dash(): def test_check_allele_consistency(): + # Valid: same ref at each position, each lineage carries at most one allele per position df_valid = pd.DataFrame({"A123T": [1, 0], "A123C": [0, 1]}) check_allele_consistency(df_valid) # Should not raise - df_invalid = pd.DataFrame({"A123T": [1, 0], "C123G": [0, 1]}) + # Invalid: conflicting reference alleles at position 123 (A vs C) + df_conflicting_refs = pd.DataFrame({"A123T": [1, 0], "C123G": [0, 1]}) with pytest.raises(click.Abort): - check_allele_consistency(df_invalid) + check_allele_consistency(df_conflicting_refs) + + # Invalid: lineage carries two distinct alleles at the same position + df_multi_alt = pd.DataFrame({"A123T": [1], "A123C": [1]}, index=["lin1"]) + with pytest.raises(click.Abort): + check_allele_consistency(df_multi_alt) @pytest.fixture diff --git a/tests/test_utils.py b/tests/test_utils.py index c6ac55a..9f1f456 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -84,18 +84,9 @@ def test_run_subprocess_command_failure_called_process_error(mock_subproc_run): error_message_prefix="Test error CPE", ) - expected_calls = [ - call( - f"[{STYLES['error']}][ERROR] Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.[/{STYLES['error']}]" - ), - call( - f"[{STYLES['debug']}][DEBUG] fail_cmd_cpe stdout:\nout[/{STYLES['debug']}]" - ), - call( - f"[{STYLES['debug']}][DEBUG] fail_cmd_cpe stderr:\nError output cpe[/{STYLES['debug']}]" - ), - ] - mock_console.print.assert_has_calls(expected_calls) + mock_console.print.assert_called_once_with( + f"[{STYLES['error']}][ERROR] Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.[/{STYLES['error']}]" + ) @patch("subprocess.run") @@ -175,8 +166,10 @@ def test_run_subprocess_command_success_debug_empty_stderr(mock_subproc_run): ] mock_console.print.assert_has_calls(expected_calls, any_order=False) # Verify no stderr output line was printed (empty stderr is skipped) - all_calls = [str(c) for c in mock_console.print.call_args_list] - assert not any(" stderr:\n" in c for c in all_calls) + assert not any( + " stderr:\n" in c.args[0] + for c in mock_console.print.call_args_list + ) @patch("subprocess.run") From 835e91272b454c4c2c591212617ab1a851451d3c Mon Sep 17 00:00:00 2001 From: gp201 Date: Wed, 18 Mar 2026 10:48:55 -0700 Subject: [PATCH 7/9] lint fixes --- tests/test_utils.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index 9f1f456..e2a3784 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -166,10 +166,7 @@ def test_run_subprocess_command_success_debug_empty_stderr(mock_subproc_run): ] mock_console.print.assert_has_calls(expected_calls, any_order=False) # Verify no stderr output line was printed (empty stderr is skipped) - assert not any( - " stderr:\n" in c.args[0] - for c in mock_console.print.call_args_list - ) + assert not any(" stderr:\n" in c.args[0] for c in mock_console.print.call_args_list) @patch("subprocess.run") From 7b35be51f6eddd40d42bc8733de7d38c82f89de9 Mon Sep 17 00:00:00 2001 From: gp201 Date: Wed, 18 Mar 2026 13:50:40 -0700 Subject: [PATCH 8/9] Use console.print style and disable markup --- barcodeforge/utils.py | 10 ++++---- tests/test_cli.py | 4 ++- tests/test_ref_muts.py | 2 +- tests/test_utils.py | 58 +++++++++++++++++------------------------- 4 files changed, 32 insertions(+), 42 deletions(-) diff --git a/barcodeforge/utils.py b/barcodeforge/utils.py index 7e6ce7a..4395658 100644 --- a/barcodeforge/utils.py +++ b/barcodeforge/utils.py @@ -15,24 +15,24 @@ def print_error(msg: str) -> None: - console.print(f"[{STYLES['error']}][ERROR] {msg}[/{STYLES['error']}]") + console.print(f"[ERROR] {msg}", style=STYLES["error"], markup=False) def print_warning(msg: str) -> None: - console.print(f"[{STYLES['warning']}][WARNING] {msg}[/{STYLES['warning']}]") + console.print(f"[WARNING] {msg}", style=STYLES["warning"], markup=False) def print_success(msg: str) -> None: - console.print(f"[{STYLES['success']}][SUCCESS] {msg}[/{STYLES['success']}]") + console.print(f"[SUCCESS] {msg}", style=STYLES["success"], markup=False) def print_info(msg: str, bold: bool = False) -> None: style = f"bold {STYLES['info']}" if bold else STYLES["info"] - console.print(f"[{style}][INFO] {msg}[/{style}]") + console.print(f"[INFO] {msg}", style=style, markup=False) def print_debug(msg: str) -> None: - console.print(f"[{STYLES['debug']}][DEBUG] {msg}[/{STYLES['debug']}]") + console.print(f"[DEBUG] {msg}", style=STYLES["debug"], markup=False) def resolve_tree_format( diff --git a/tests/test_cli.py b/tests/test_cli.py index ab6685f..73dd79c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -495,7 +495,9 @@ def test_barcode_command_debug_flag(runner, temp_files, mocker): final_barcode_plot_fn = "barcode_plot.pdf" mock_cli_console.print.assert_any_call( - f"[{STYLES['debug']}][DEBUG] Debug mode is ON[/{STYLES['debug']}]" + "[DEBUG] Debug mode is ON", + style=STYLES["debug"], + markup=False, ) mock_resolve_format.assert_called_once_with(temp_files["tree"], None, True) mock_convert_tree.assert_called_once_with( diff --git a/tests/test_ref_muts.py b/tests/test_ref_muts.py index 665c9e5..9a7d62b 100644 --- a/tests/test_ref_muts.py +++ b/tests/test_ref_muts.py @@ -280,7 +280,7 @@ def test_process_and_reroot_lineages_ref_not_in_muts_infer_root( assert output_rerooted_lineages.exists() # Verify warning for inferred root - expected_warning_call_substr = "[yellow][WARNING] Reference ref_genome not present in sample mutations file. Inferring root sequence." + expected_warning_call_substr = "[WARNING] Reference ref_genome not present in sample mutations file. Inferring root sequence." assert any( expected_warning_call_substr in str(c_args) for c_args in mocked_console.print.call_args_list diff --git a/tests/test_utils.py b/tests/test_utils.py index e2a3784..aa62acf 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -48,7 +48,9 @@ def test_resolve_tree_format_debug_output(): with patch.object(barcodeforge.utils, "console", mock_console): resolve_tree_format("some.nwk", None, debug=True) mock_console.print.assert_any_call( - f"[{STYLES['debug']}][DEBUG] Resolved tree format for 'some.nwk': newick[/{STYLES['debug']}]" + "[DEBUG] Resolved tree format for 'some.nwk': newick", + style=STYLES["debug"], + markup=False, ) @@ -66,7 +68,9 @@ def test_run_subprocess_command_success(mock_subproc_run): ) assert result is True mock_console.print.assert_called_once_with( - f"[{STYLES['success']}][SUCCESS] Command executed successfully[/{STYLES['success']}]" + "[SUCCESS] Command executed successfully", + style=STYLES["success"], + markup=False, ) @@ -85,7 +89,9 @@ def test_run_subprocess_command_failure_called_process_error(mock_subproc_run): ) mock_console.print.assert_called_once_with( - f"[{STYLES['error']}][ERROR] Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.[/{STYLES['error']}]" + "[ERROR] Test error CPE fail_cmd_cpe: Command '['fail_cmd_cpe']' returned non-zero exit status 1.", + style=STYLES["error"], + markup=False, ) @@ -102,7 +108,9 @@ def test_run_subprocess_command_file_not_found(mock_subproc_run): ) mock_console.print.assert_called_once_with( - f"[{STYLES['error']}][ERROR] FNF error: non_existent_cmd command not found. Please ensure it is installed and in your PATH.[/{STYLES['error']}]" + "[ERROR] FNF error: non_existent_cmd command not found. Please ensure it is installed and in your PATH.", + style=STYLES["error"], + markup=False, ) @@ -123,16 +131,10 @@ def test_run_subprocess_command_success_debug(mock_subproc_run): ) assert result is True expected_calls = [ - call( - f"[{STYLES['debug']}][DEBUG] Running command: debug_cmd_success arg1[/{STYLES['debug']}]" - ), - call( - f"[{STYLES['debug']}][DEBUG] debug_cmd_success stdout:\nDebug success output[/{STYLES['debug']}]" - ), - call( - f"[{STYLES['debug']}][DEBUG] debug_cmd_success stderr:\nDebug success stderr[/{STYLES['debug']}]" - ), - call(f"[{STYLES['success']}][SUCCESS] Debug success[/{STYLES['success']}]"), + call("[DEBUG] Running command: debug_cmd_success arg1", style=STYLES["debug"], markup=False), + call("[DEBUG] debug_cmd_success stdout:\nDebug success output", style=STYLES["debug"], markup=False), + call("[DEBUG] debug_cmd_success stderr:\nDebug success stderr", style=STYLES["debug"], markup=False), + call("[SUCCESS] Debug success", style=STYLES["success"], markup=False), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) @@ -154,15 +156,9 @@ def test_run_subprocess_command_success_debug_empty_stderr(mock_subproc_run): ) assert result is True expected_calls = [ - call( - f"[{STYLES['debug']}][DEBUG] Running command: debug_cmd_success_no_stderr[/{STYLES['debug']}]" - ), - call( - f"[{STYLES['debug']}][DEBUG] debug_cmd_success_no_stderr stdout:\nDebug success output[/{STYLES['debug']}]" - ), - call( - f"[{STYLES['success']}][SUCCESS] Debug success no stderr[/{STYLES['success']}]" - ), + call("[DEBUG] Running command: debug_cmd_success_no_stderr", style=STYLES["debug"], markup=False), + call("[DEBUG] debug_cmd_success_no_stderr stdout:\nDebug success output", style=STYLES["debug"], markup=False), + call("[SUCCESS] Debug success no stderr", style=STYLES["success"], markup=False), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) # Verify no stderr output line was printed (empty stderr is skipped) @@ -186,17 +182,9 @@ def test_run_subprocess_command_failure_debug(mock_subproc_run): ) expected_calls_in_order = [ - call( - f"[{STYLES['debug']}][DEBUG] Running command: {' '.join(cmd_list)}[/{STYLES['debug']}]" - ), - call( - f"[{STYLES['error']}][ERROR] Debug fail error {cmd_list[0]}: Command '{cmd_list}' returned non-zero exit status 1.[/{STYLES['error']}]" - ), - call( - f"[{STYLES['debug']}][DEBUG] {cmd_list[0]} stdout:\nDebug fail stdout[/{STYLES['debug']}]" - ), - call( - f"[{STYLES['debug']}][DEBUG] {cmd_list[0]} stderr:\nDebug fail stderr[/{STYLES['debug']}]" - ), + call(f"[DEBUG] Running command: {' '.join(cmd_list)}", style=STYLES["debug"], markup=False), + call(f"[ERROR] Debug fail error {cmd_list[0]}: Command '{cmd_list}' returned non-zero exit status 1.", style=STYLES["error"], markup=False), + call(f"[DEBUG] {cmd_list[0]} stdout:\nDebug fail stdout", style=STYLES["debug"], markup=False), + call(f"[DEBUG] {cmd_list[0]} stderr:\nDebug fail stderr", style=STYLES["debug"], markup=False), ] mock_console.print.assert_has_calls(expected_calls_in_order, any_order=False) From 65cfa1f5a48833b7ba22d08e8712facfc1fadafc Mon Sep 17 00:00:00 2001 From: gp201 Date: Wed, 18 Mar 2026 13:58:07 -0700 Subject: [PATCH 9/9] lint: lint fixes --- tests/test_utils.py | 58 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 10 deletions(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index aa62acf..d4d8356 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -131,9 +131,21 @@ def test_run_subprocess_command_success_debug(mock_subproc_run): ) assert result is True expected_calls = [ - call("[DEBUG] Running command: debug_cmd_success arg1", style=STYLES["debug"], markup=False), - call("[DEBUG] debug_cmd_success stdout:\nDebug success output", style=STYLES["debug"], markup=False), - call("[DEBUG] debug_cmd_success stderr:\nDebug success stderr", style=STYLES["debug"], markup=False), + call( + "[DEBUG] Running command: debug_cmd_success arg1", + style=STYLES["debug"], + markup=False, + ), + call( + "[DEBUG] debug_cmd_success stdout:\nDebug success output", + style=STYLES["debug"], + markup=False, + ), + call( + "[DEBUG] debug_cmd_success stderr:\nDebug success stderr", + style=STYLES["debug"], + markup=False, + ), call("[SUCCESS] Debug success", style=STYLES["success"], markup=False), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) @@ -156,9 +168,19 @@ def test_run_subprocess_command_success_debug_empty_stderr(mock_subproc_run): ) assert result is True expected_calls = [ - call("[DEBUG] Running command: debug_cmd_success_no_stderr", style=STYLES["debug"], markup=False), - call("[DEBUG] debug_cmd_success_no_stderr stdout:\nDebug success output", style=STYLES["debug"], markup=False), - call("[SUCCESS] Debug success no stderr", style=STYLES["success"], markup=False), + call( + "[DEBUG] Running command: debug_cmd_success_no_stderr", + style=STYLES["debug"], + markup=False, + ), + call( + "[DEBUG] debug_cmd_success_no_stderr stdout:\nDebug success output", + style=STYLES["debug"], + markup=False, + ), + call( + "[SUCCESS] Debug success no stderr", style=STYLES["success"], markup=False + ), ] mock_console.print.assert_has_calls(expected_calls, any_order=False) # Verify no stderr output line was printed (empty stderr is skipped) @@ -182,9 +204,25 @@ def test_run_subprocess_command_failure_debug(mock_subproc_run): ) expected_calls_in_order = [ - call(f"[DEBUG] Running command: {' '.join(cmd_list)}", style=STYLES["debug"], markup=False), - call(f"[ERROR] Debug fail error {cmd_list[0]}: Command '{cmd_list}' returned non-zero exit status 1.", style=STYLES["error"], markup=False), - call(f"[DEBUG] {cmd_list[0]} stdout:\nDebug fail stdout", style=STYLES["debug"], markup=False), - call(f"[DEBUG] {cmd_list[0]} stderr:\nDebug fail stderr", style=STYLES["debug"], markup=False), + call( + f"[DEBUG] Running command: {' '.join(cmd_list)}", + style=STYLES["debug"], + markup=False, + ), + call( + f"[ERROR] Debug fail error {cmd_list[0]}: Command '{cmd_list}' returned non-zero exit status 1.", + style=STYLES["error"], + markup=False, + ), + call( + f"[DEBUG] {cmd_list[0]} stdout:\nDebug fail stdout", + style=STYLES["debug"], + markup=False, + ), + call( + f"[DEBUG] {cmd_list[0]} stderr:\nDebug fail stderr", + style=STYLES["debug"], + markup=False, + ), ] mock_console.print.assert_has_calls(expected_calls_in_order, any_order=False)