Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d743ef3
Add custom analysis_path support with configurable analysis directory…
djarecka Apr 7, 2026
93007b9
Fix typos in installation instructions
tien-tong Oct 23, 2025
061ff16
Update eg_fmriprep-24-1-1_ingressed-fs.yaml
tien-tong Oct 29, 2025
76ecf79
Update eg_fmriprep-24-1-1_ingressed-fs.yaml
tien-tong Oct 29, 2025
6c54ceb
Update eg_fmriprep-24-1-1_ingressed-fs.yaml
tien-tong Oct 29, 2025
dadcc71
Update eg_fmriprep-24-1-1_ingressed-fs.yaml
tien-tong Oct 30, 2025
e51615a
Update eg_freesurferpost-unstable_ingressed-fs.yaml
tien-tong Oct 30, 2025
52b9c97
change tests/pytest_in_docker.sh
tien-tong Nov 18, 2025
fe64c8c
Update babs-status.rst
tien-tong Oct 30, 2025
3ea5824
Update babs-status.rst
tien-tong Oct 30, 2025
9a860ca
Update babs-status.rst
tien-tong Oct 31, 2025
f127ce9
Update whats_new.md for version 0.5.3
tien-tong Nov 18, 2025
e0532f3
Add BABS chaining to run 2 BIDS apps in a pipeline (#316)
tien-tong Nov 24, 2025
6ff2a1f
Fix `babs submit --select` argument parsing (#312)
singlesp Nov 25, 2025
ca98da8
Update installation.rst (#309)
yibeichan Nov 25, 2025
1663a8c
Update eg_aslprep-0-7-5.yaml (#304)
B-Sevchik Nov 25, 2025
789a5e2
Fix coverage (#318)
tien-tong Dec 7, 2025
1a5662e
edit nordic-fmriprep pipeline
tien-tong Dec 8, 2025
631dc27
add 0.5.4 release notes
tien-tong Dec 8, 2025
ef42366
Add `--throttle` option to `babs init` for SLURM array job throttling…
tien-tong Dec 18, 2025
16d5611
Fix `babs merge` error when deleting merge_ds (#322)
tien-tong Jan 13, 2026
c83f676
Allow submission when all running jobs are in CG state (#332)
tien-tong Feb 4, 2026
d1eb269
Allow resubmission of failed, non-running jobs when other jobs are ru…
tien-tong Feb 9, 2026
c66df2e
Change `participant_job` to use `git sparse-checkout` (#337)
tien-tong Mar 2, 2026
8bd0bbb
Use conda to install niworkflows and scipy instead of pip (#340)
tsalo Mar 3, 2026
bb7b7b3
Remove mentions of `babs submit --all` and `babs submit --job`(#341)
tsalo Mar 4, 2026
192c672
Fix `babs init --list_sub_file` (#345)
tien-tong Mar 6, 2026
283e03b
Fix writable check for NFS/ACL filesystems (#348)
asmacdo Mar 18, 2026
5fbfd58
Fix `babs merge` (#351)
tien-tong Mar 19, 2026
bc16f47
add instruction to get containers
tien-tong Mar 19, 2026
07d03b4
Do specify which branch to push (#352)
yarikoptic Mar 19, 2026
03b35f5
Fix test scripts to work on any machine (#346)
asmacdo Mar 25, 2026
9921d6d
Custom analysis_path support: adding .babs directory for config/RIA s…
djarecka Apr 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,23 @@ jobs:
name: pytest of BABS
no_output_timeout: 1h
command: |
docker build \
-t pennlinc/slurm-docker-ci:unstable \
-f Dockerfile_testing .

# Make a directory that will hold the test artifacts
mkdir -p ${HOME}/e2e-testing
docker run -it \
-v ${PWD}:/tests \
-v ${PWD}:/babs \
-v ${HOME}/e2e-testing:/test-temp:rw \
-w /babs \
-h slurmctl --cap-add sys_admin \
--privileged \
pennlinc/slurm-docker-ci:unstable \
pennlinc/slurm-docker-ci:0.14 \
bash -c "pip install -e .[tests] && \
pytest -n 4 -sv \
--durations=0 \
--timeout=300 \
--junitxml=/test-temp/junit.xml \
--cov-report term-missing \
--cov-report xml:/test-temp/coverage.xml \
--cov=babs \
/babs
/babs/tests/"

- store_test_results:
path: /home/circleci/e2e-testing/junit.xml
Expand All @@ -54,22 +51,10 @@ jobs:
- checkout:
path: /home/circleci/src/babs
- run:
name: pytest of BABS
name: e2e SLURM tests
no_output_timeout: 1h
command: |
docker build \
-t pennlinc/slurm-docker-ci:unstable \
-f Dockerfile_testing .

# Make a directory that will hold the test artifacts
mkdir -p ${HOME}/e2e-testing
docker run -it \
-v ${PWD}:/tests \
-v ${HOME}/e2e-testing:/test-temp:rw \
-h slurmctl --cap-add sys_admin \
--privileged \
pennlinc/slurm-docker-ci:unstable \
/tests/tests/e2e-slurm/container/walkthrough-tests.sh
E2E_DIR=${HOME}/e2e-testing bash tests/e2e_in_docker.sh

- run:
name: clean up test artifacts
Expand Down
6 changes: 0 additions & 6 deletions Dockerfile_testing

This file was deleted.

146 changes: 133 additions & 13 deletions babs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import datalad.api as dlapi
import pandas as pd
import yaml

from babs.input_datasets import InputDatasets, OutputDatasets
from babs.scheduler import (
Expand All @@ -22,6 +23,7 @@
read_yaml,
results_branch_dataframe,
results_status_columns,
scheduler_status_columns,
status_dtypes,
update_job_batch_status,
update_results_status,
Expand All @@ -38,7 +40,7 @@
class BABS:
"""The BABS base class holds common attributes and methods for all BABS classes."""

def __init__(self, project_root):
def __init__(self, project_root, container_config=None):
"""The BABS class is for babs projects of BIDS Apps.

The constructor only initializes the attributes.
Expand Down Expand Up @@ -101,13 +103,28 @@ def __init__(self, project_root):
# attributes:
self.project_root = str(project_root)

self.analysis_path = op.join(self.project_root, 'analysis')
if container_config is not None:
with open(container_config) as f:
cfg = yaml.safe_load(f)
else:
root_config_path = op.join(self.project_root, '.babs', 'babs_init_config.yaml')
cfg = {}
if op.exists(root_config_path):
with open(root_config_path) as f:
cfg = yaml.safe_load(f) or {}

analysis_dir = cfg.get('analysis_path', 'analysis')
self.analysis_path = op.normpath(op.join(self.project_root, analysis_dir))
self._analysis_datalad_handle = None

self.config_path = op.join(self.analysis_path, 'code/babs_proj_config.yaml')

self.input_ria_path = op.join(self.project_root, 'input_ria')
self.output_ria_path = op.join(self.project_root, 'output_ria')
self.input_ria_path = op.normpath(
op.join(self.project_root, cfg.get('input_ria_path', 'input_ria'))
)
self.output_ria_path = op.normpath(
op.join(self.project_root, cfg.get('output_ria_path', 'output_ria'))
)

self.input_ria_url = 'ria+file://' + self.input_ria_path
self.output_ria_url = 'ria+file://' + self.output_ria_path
Expand All @@ -121,6 +138,7 @@ def __init__(self, project_root):
self.job_status_path_rel = 'code/job_status.csv'
self.job_status_path_abs = op.join(self.analysis_path, self.job_status_path_rel)
self.job_submit_path_abs = op.join(self.analysis_path, 'code/job_submit.csv')
self.analysis_root = op.dirname(self.analysis_path)
self._apply_config()

def _apply_config(self) -> None:
Expand Down Expand Up @@ -161,11 +179,70 @@ def _apply_config(self) -> None:
self.queue = validate_queue(config_yaml['queue'])
self.container = config_yaml['container']

# Check for pipeline configuration (optional)
self.pipeline = config_yaml.get('pipeline', None)
if self.pipeline is not None:
self._validate_pipeline_config()

# Check the output RIA:
self.wtf_key_info(flag_output_ria_only=True)

self.input_datasets = InputDatasets(self.processing_level, config_yaml['input_datasets'])
self.input_datasets.update_abs_paths(Path(self.project_root) / 'analysis')
self.input_datasets.update_abs_paths(Path(self.analysis_path))

def _validate_pipeline_config(self) -> None:
"""Validate the pipeline configuration if present.

Raises
------
ValueError
If the pipeline configuration is invalid.
"""
if not isinstance(self.pipeline, list):
raise ValueError('Pipeline configuration must be a list of steps')

if len(self.pipeline) == 0:
raise ValueError('Pipeline configuration cannot be empty')

print(f'\nValidating pipeline configuration with {len(self.pipeline)} steps...')

for i, step in enumerate(self.pipeline):
if not isinstance(step, dict):
raise ValueError(f'Pipeline step {i} must be a dictionary')

required_fields = ['container_name']
for field in required_fields:
if field not in step:
raise ValueError(f'Pipeline step {i} missing required field: {field}')

step_name = step['container_name']
print(f' Step {i + 1}: {step_name}')

# Validate step configuration
step_config = step.get('config', {})
if step_config:
print(f' Config: {len(step_config)} configuration items')

# Check for step-specific cluster resources
cluster_resources = step_config.get('cluster_resources', {})
if cluster_resources:
print(f' Cluster resources: {list(cluster_resources.keys())}')

# Check for step-specific bids_app_args
bids_app_args = step_config.get('bids_app_args', {})
if bids_app_args:
print(f' BIDS app args: {len(bids_app_args)} arguments')

# Check for step-specific singularity_args
singularity_args = step_config.get('singularity_args', [])
if singularity_args:
print(f' Singularity args: {len(singularity_args)} arguments')

# Check for inter-step commands
if 'inter_step_cmds' in step:
print(' Inter-step commands: present')

print('Pipeline configuration validation complete!')

def _update_inclusion_dataframe(
self, initial_inclusion_df: pd.DataFrame | None = None
Expand Down Expand Up @@ -237,6 +314,13 @@ def wtf_key_info(self, flag_output_ria_only=False) -> None:
proc_output_ria_data_dir.stdout.decode('utf-8')
).path.strip()

# If the URL points to the RIA store root (no .git there), resolve to the
# actual dataset git dir via the alias symlink (e.g. output_ria/alias/data -> XX/xxx-uuid).
if not op.exists(op.join(self.output_ria_data_dir, '.git')):
alias_link = op.join(self.output_ria_path, 'alias', 'data')
if op.exists(alias_link) and os.path.islink(alias_link):
self.output_ria_data_dir = op.realpath(alias_link)

if not flag_output_ria_only: # also want other information:
# Get the dataset ID of `analysis`, i.e., `analysis_dataset_id`:
# way #2: command line of datalad:
Expand Down Expand Up @@ -385,15 +469,51 @@ def get_currently_running_jobs_df(self):
Index: []

"""

def _empty_running():
cols = scheduler_status_columns + ['sub_id']
if self.processing_level == 'session':
cols = cols + ['ses_id']
return pd.DataFrame(columns=cols)

job_status_df = self.get_job_status_df()
last_submitted_jobs_df = self.get_latest_submitted_jobs_df()
if last_submitted_jobs_df.empty:
return EMPTY_JOB_SUBMIT_DF
job_ids = last_submitted_jobs_df['job_id'].unique()
if not len(job_ids) == 1:
raise Exception(f'Expected 1 job id, got {len(job_ids)}')
job_id = job_ids[0]
currently_running_df = request_all_job_status(self.queue, job_id)
return identify_running_jobs(last_submitted_jobs_df, currently_running_df)

# Rows that are submitted but don't have results yet (candidates for "running")
if not job_status_df.empty:
sub = job_status_df['submitted'].fillna(False)
no_res = ~job_status_df['has_results'].fillna(False)
job_status_df = job_status_df.loc[sub & no_res].copy()

# Use status rows (submitted, no results) or last submit file for job_id -> sub/ses
mapping_df = job_status_df if not job_status_df.empty else last_submitted_jobs_df.copy()
if mapping_df.empty:
return _empty_running()

# Keep only columns needed to join scheduler output with subject/session
mapping_cols = ['job_id', 'task_id', 'sub_id']
if 'ses_id' in mapping_df:
mapping_cols.append('ses_id')
mapping_df = mapping_df[mapping_cols].copy()
# Drop rows with missing or invalid job/task ids so we only query real jobs
mapping_df = mapping_df[
mapping_df['job_id'].notna()
& mapping_df['task_id'].notna()
& (mapping_df['job_id'] > 0)
& (mapping_df['task_id'] > 0)
]
if mapping_df.empty:
return _empty_running()

# Ask scheduler for each distinct job_id, keep only non-empty responses
job_ids = sorted({int(j) for j in mapping_df['job_id'].unique()})
running_dfs = [request_all_job_status(self.queue, j) for j in job_ids]
running_dfs = [d for d in running_dfs if not d.empty]
if not running_dfs:
return _empty_running()

# Attach sub_id (and ses_id) to scheduler rows and return
return identify_running_jobs(mapping_df, pd.concat(running_dfs, ignore_index=True))

def get_job_status_df(self):
"""
Expand Down
Loading