diff --git a/CHANGES.md b/CHANGES.md index db47966..43456d3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,26 +2,63 @@ Unreleased -- x +## Version 6.3.0 + +Released 2026-04-19 + +- fix missing `raise` on `scope_import is None` check in `process_folder_file_scope` +- use context manager for `multiprocessing.Pool` in `authenticate_password` to stop worker leak +- fix `disable_default_fail` semantics on `checkpoint_callable` — default now aborts with `fail_status` + when no fail handler is set (previously it silently called the protected view) +- use `secrets.compare_digest` for bearer token comparison in `BearerCheckpoint` +- mix `secrets.token_hex` entropy into `generate_csrf_token` output +- `ImpBlueprint.import_resources` now routes through `cast_to_import_str`, fixing import paths for + nested resource folders +- preserve order when deduping scoped import results (`dict.fromkeys` instead of `set`) +- URL-encode username/password when building database URIs to avoid breakage on special characters +- chain `ImportError` re-raises with `from e` to preserve the original traceback +- add testing for checkpoints + +## Version 6.2.0 + +Released 2026-04-14 + +- auth.generate_private_key +- auth.authenticate_password +- auth.encrypt_password + +The function argument `encryption_level` has been changed to `algorithm` and has a new type of **Literal** + +This was done to allow for more algorithms to be added in the future. ## Version 6.1.5 +Released 2026-02-22 + - bugfix -## Version 6.1.4 +## Version 6.1.4 - YANKED + +Released 2026-02-21 - add disable_default_fail option to checkpoints ## Version 6.1.3 +Released 2026-02-21 + - adjust fail_response to accept a callable to avoid out of context error ## Version 6.1.2 +Released 2026-02-21 + - switch checkpoint type checking to protocol matching ## Version 6.1.1 +Released 2026-02-21 + - checkpoint bug fix, docs fix, bump version ## Version 6.1.0 @@ -51,16 +88,12 @@ Released 2025-10-21 ## Version 6.0.0 - - Released 2025-10-16 - beta-3 + beta-2 + beta-1 ## Version 6.0.0-beta.3 - - Released 2025-10-16 - Replaced `import_app_resources` with @@ -80,8 +113,6 @@ Released 2025-10-16 ## Version 6.0.0-beta.2 - - Released 2025-05-27 - bug fixes @@ -89,16 +120,12 @@ Released 2025-05-27 ## Version 6.0.0-beta.1 - - Released 2025-05-27 - Simplify `flask_imp.security.checkpoint` decorator by adding checkpoint types. ## Version 5.7.0 - - Released 2025-02-10 - add new method: `FlaskConfig.as_object` @@ -106,8 +133,6 @@ Released 2025-02-10 ## Version 5.6.0 - - Released 2025-02-04 - New method added to register ImpBlueprints @@ -118,8 +143,6 @@ Released 2025-02-04 ## Version 5.5.1 - - Released 2024-12-04 - switched logo for emoji @@ -128,8 +151,6 @@ Released 2024-12-04 ## Version 5.5.0 - - Released 2024-11-21 - updated project structure. diff --git a/src/flask_imp/_imp.py b/src/flask_imp/_imp.py index b84b0f2..718ebd2 100644 --- a/src/flask_imp/_imp.py +++ b/src/flask_imp/_imp.py @@ -178,7 +178,7 @@ def new(): # add the module to the set of imported modules imported_modules.add(module) except ImportError as e: - raise ImportError(f"Error when importing {cast_import}: {e}") + raise ImportError(f"Error when importing {cast_import}: {e}") from e # check if each module has any valid factories, if so, pass the blueprint for instance_factory in factories: @@ -392,7 +392,7 @@ def _process_model(self, path: Path) -> None: self.model_registry.add(name, value) except ImportError as e: - raise ImportError(f"Error when importing {import_string}: {e}") + raise ImportError(f"Error when importing {import_string}: {e}") from e def _init_session(self) -> None: if isinstance(self.config.IMP_INIT_SESSION, dict): diff --git a/src/flask_imp/_imp_blueprint.py b/src/flask_imp/_imp_blueprint.py index b1afd01..77ac587 100644 --- a/src/flask_imp/_imp_blueprint.py +++ b/src/flask_imp/_imp_blueprint.py @@ -220,17 +220,15 @@ def new(): ): # skip hidden files / folders continue + cast_import = cast_to_import_str(self.package, module_path) + try: # attempt to import the module - module = import_module( - f"{self.package}.{module_path.parent.name}.{module_path.stem}" - ) + module = import_module(cast_import) # add the module to the set of imported modules imported_modules.add(module) except ImportError as e: - raise ImportError( - f"Error when importing {self.package}.{module_path.parent.name}.{module_path.stem}: {e}" - ) + raise ImportError(f"Error when importing {cast_import}: {e}") from e # check if each module has any valid factories, if so, pass the blueprint for instance_factory in factories: diff --git a/src/flask_imp/_utilities.py b/src/flask_imp/_utilities.py index 7d64994..aedb626 100644 --- a/src/flask_imp/_utilities.py +++ b/src/flask_imp/_utilities.py @@ -372,7 +372,7 @@ def process_folder_file_scope( """ if scope_import is None: - Exception("scope_import cannot be None") + raise ValueError("scope_import cannot be None") result: list[Path] = [] @@ -394,5 +394,5 @@ def process_folder_file_scope( if named_scopes := process_scope(resource, scope_import[resource.name]): result.extend(named_scopes) - # clear duplicates - return list(set(result)) + # clear duplicates while preserving order + return list(dict.fromkeys(result)) diff --git a/src/flask_imp/auth/_authenticate_password.py b/src/flask_imp/auth/_authenticate_password.py index 11bcfb2..9ac9973 100644 --- a/src/flask_imp/auth/_authenticate_password.py +++ b/src/flask_imp/auth/_authenticate_password.py @@ -59,26 +59,26 @@ def authenticate_password( return False - thread_pool = multiprocessing.Pool(processes=pepper_length) - threads = [] - - for batch in batched(_guesses, 1000): - threads.append( - thread_pool.apply_async( - _guess_block, - args=( - batch, - input_password, - database_password, - database_salt, - algorithm, - pepper_position, - ), + with multiprocessing.Pool(processes=pepper_length) as thread_pool: + threads = [] + + for batch in batched(_guesses, 1000): + threads.append( + thread_pool.apply_async( + _guess_block, + args=( + batch, + input_password, + database_password, + database_salt, + algorithm, + pepper_position, + ), + ) ) - ) - for thread in threads: - if thread.get(): - return True + for thread in threads: + if thread.get(): + return True return False diff --git a/src/flask_imp/auth/_generate_csrf_token.py b/src/flask_imp/auth/_generate_csrf_token.py index 175b154..3a0648e 100644 --- a/src/flask_imp/auth/_generate_csrf_token.py +++ b/src/flask_imp/auth/_generate_csrf_token.py @@ -1,15 +1,17 @@ +import secrets from datetime import datetime from hashlib import sha1 def generate_csrf_token() -> str: """ - Generates a SHA1 using the current date and time. + Generates a SHA1 using the current date and time combined with + cryptographically secure random bytes. For use in Cross-Site Request Forgery. - :return: sha1 hash of the current date and time + :return: sha1 hash of the current date and time plus random entropy """ sha = sha1() - sha.update(str(datetime.now()).encode("utf-8")) + sha.update(f"{datetime.now()}{secrets.token_hex(16)}".encode("utf-8")) return sha.hexdigest() diff --git a/src/flask_imp/config/_database_config.py b/src/flask_imp/config/_database_config.py index e8c067a..11b2f20 100644 --- a/src/flask_imp/config/_database_config.py +++ b/src/flask_imp/config/_database_config.py @@ -1,5 +1,6 @@ import typing as t from pathlib import Path +from urllib.parse import quote class DatabaseConfig: @@ -114,7 +115,7 @@ def uri(self, app_instance_path: Path) -> str: return f"{self.dialect}:///{filepath}" return ( - f"{self.dialect}://{self.username}:" - f"{self.password}@{self.location}:" + f"{self.dialect}://{quote(self.username, safe='')}:" + f"{quote(self.password, safe='')}@{self.location}:" f"{self.port}/{self.database_name}" ) diff --git a/src/flask_imp/config/_sql_database_config.py b/src/flask_imp/config/_sql_database_config.py index b66b46c..f649b4b 100644 --- a/src/flask_imp/config/_sql_database_config.py +++ b/src/flask_imp/config/_sql_database_config.py @@ -1,4 +1,5 @@ import typing as t +from urllib.parse import quote class SQLDatabaseConfig: @@ -66,7 +67,7 @@ def as_dict(self) -> t.Dict[str, t.Any]: def uri(self) -> str: return ( - f"{self.dialect}://{self.username}:" - f"{self.password}@{self.location}:" + f"{self.dialect}://{quote(self.username, safe='')}:" + f"{quote(self.password, safe='')}@{self.location}:" f"{self.port}/{self.database_name}" ) diff --git a/src/flask_imp/security/_checkpoint_callable.py b/src/flask_imp/security/_checkpoint_callable.py index 9eefcdf..5c39730 100644 --- a/src/flask_imp/security/_checkpoint_callable.py +++ b/src/flask_imp/security/_checkpoint_callable.py @@ -163,7 +163,7 @@ def inner(*args: t.Any, **kwargs: t.Any) -> t.Any: raise TypeError("Pass URL must either be a string or a partial") - if not disable_default_fail: + if disable_default_fail: return func(*args, **kwargs) return abort(fail_status) diff --git a/src/flask_imp/security/_checkpoints.py b/src/flask_imp/security/_checkpoints.py index 6efdac5..e81b8eb 100644 --- a/src/flask_imp/security/_checkpoints.py +++ b/src/flask_imp/security/_checkpoints.py @@ -18,6 +18,7 @@ """ +import secrets import typing as t from flask import request @@ -152,7 +153,11 @@ def __init__( def pass_(self) -> bool: if auth := request.authorization: - if auth.type == "bearer" and auth.token == self.token: + if ( + auth.type == "bearer" + and auth.token is not None + and secrets.compare_digest(auth.token, self.token) + ): return True return False diff --git a/tests/test_security.py b/tests/test_security.py new file mode 100644 index 0000000..4725f11 --- /dev/null +++ b/tests/test_security.py @@ -0,0 +1,499 @@ +""" +Tests for src/flask_imp/security. + +Builds a minimal Flask app with routes that exercise each checkpoint surface +independently from the main `test_app` fixture used by test_group.py. +""" + +import base64 + +import pytest +from flask import Flask, jsonify, make_response, session + +from flask_imp.security import ( + APIKeyCheckpoint, + BaseCheckpoint, + BearerCheckpoint, + SessionCheckpoint, + checkpoint, + checkpoint_callable, + include_csrf, +) +from flask_imp.utilities import lazy_session_get, lazy_url_for + + +API_KEY = "secret-header-key" +BEARER_TOKEN = "secret-bearer-token" + + +def _build_app() -> Flask: + app = Flask(__name__) + app.secret_key = "test-secret" + app.config.update(TESTING=True) + + # ------------------------------------------------------------------ + # Bare landing pages used as redirect targets. + # ------------------------------------------------------------------ + @app.route("/fail-page") + def fail_page(): + return "failed", 200 + + @app.route("/pass-page") + def pass_page(): + return "passed", 200 + + @app.route("/index") + def index(): + return "index", 200 + + # ------------------------------------------------------------------ + # SessionCheckpoint — covers single value, list values, and each + # .action() branch: default abort, fail_url, fail_json, fail_response, + # pass_url, disable_default_fail. + # ------------------------------------------------------------------ + @app.route("/session/login") + def session_login(): + session["logged_in"] = True + return "logged in", 200 + + @app.route("/session/logout") + def session_logout(): + session.clear() + return "logged out", 200 + + @app.route("/session/set-role/") + def session_set_role(role: str): + session["role"] = role + return f"role set to {role}", 200 + + LOGGED_IN = SessionCheckpoint("logged_in", True).action() + + @app.route("/session/protected-default") + @checkpoint(LOGGED_IN) + def session_protected_default(): + return "ok", 200 + + LOGGED_IN_FAIL_URL = SessionCheckpoint("logged_in", True).action( + fail_url="/fail-page", message="need login" + ) + + @app.route("/session/protected-fail-url") + @checkpoint(LOGGED_IN_FAIL_URL) + def session_protected_fail_url(): + return "ok", 200 + + LOGGED_IN_FAIL_JSON = SessionCheckpoint("logged_in", True).action( + fail_json={"error": "login required"}, fail_status=401 + ) + + @app.route("/session/protected-fail-json") + @checkpoint(LOGGED_IN_FAIL_JSON) + def session_protected_fail_json(): + return jsonify(ok=True) + + LOGGED_IN_FAIL_RESPONSE = SessionCheckpoint("logged_in", True).action( + fail_response=lambda: make_response("custom fail", 418) + ) + + @app.route("/session/protected-fail-response") + @checkpoint(LOGGED_IN_FAIL_RESPONSE) + def session_protected_fail_response(): + return "ok", 200 + + ALREADY_IN = SessionCheckpoint("logged_in", True).action(pass_url="/pass-page") + + @app.route("/session/login-page") + @checkpoint(ALREADY_IN) + def session_login_page(): + return "login form", 200 + + LOGGED_IN_DISABLE_DEFAULT = SessionCheckpoint("logged_in", True).action( + disable_default_fail=True + ) + + @app.route("/session/protected-disable-default") + @checkpoint(LOGGED_IN_DISABLE_DEFAULT) + def session_protected_disable_default(): + return "fell through", 200 + + ROLE_IN_LIST = SessionCheckpoint("role", ["admin", "editor"]).action( + fail_status=403 + ) + + @app.route("/session/role-list") + @checkpoint(ROLE_IN_LIST) + def session_role_list(): + return "ok", 200 + + LAZY_FAIL = SessionCheckpoint("logged_in", True).action( + fail_url=lazy_url_for("index") + ) + + @app.route("/session/lazy-fail") + @checkpoint(LAZY_FAIL) + def session_lazy_fail(): + return "ok", 200 + + # ------------------------------------------------------------------ + # APIKeyCheckpoint — header + query_param. + # ------------------------------------------------------------------ + HEADER_KEY = APIKeyCheckpoint(API_KEY, type_="header").action( + fail_json={"error": "bad key"}, fail_status=401 + ) + + @app.route("/api/header") + @checkpoint(HEADER_KEY) + def api_header(): + return jsonify(ok=True) + + QUERY_KEY = APIKeyCheckpoint(API_KEY, type_="query_param", header_or_param="api_key").action( + fail_json={"error": "bad key"}, fail_status=401 + ) + + @app.route("/api/query") + @checkpoint(QUERY_KEY) + def api_query(): + return jsonify(ok=True) + + # ------------------------------------------------------------------ + # BearerCheckpoint — exercises secrets.compare_digest path. + # ------------------------------------------------------------------ + BEARER = BearerCheckpoint(BEARER_TOKEN).action( + fail_json={"error": "bad token"}, fail_status=401 + ) + + @app.route("/api/bearer") + @checkpoint(BEARER) + def api_bearer(): + return jsonify(ok=True) + + # ------------------------------------------------------------------ + # checkpoint_callable — pass/fail plus each action. + # ------------------------------------------------------------------ + def always_true(**_kwargs): + return True + + def always_false(**_kwargs): + return False + + def session_has_flag(**kwargs): + return kwargs.get("value") is True + + def url_arg_is_admin(**kwargs): + url_vars = kwargs.get("__url_vars__") or {} + return url_vars.get("name") == "admin" + + @app.route("/callable/pass") + @checkpoint_callable(always_true) + def callable_pass(): + return "ok", 200 + + @app.route("/callable/fail-url") + @checkpoint_callable(always_false, fail_url="/fail-page") + def callable_fail_url(): + return "ok", 200 + + @app.route("/callable/fail-json") + @checkpoint_callable(always_false, fail_json={"error": "no"}, fail_status=401) + def callable_fail_json(): + return "ok", 200 + + @app.route("/callable/fail-response") + @checkpoint_callable( + always_false, fail_response=lambda: make_response("teapot", 418) + ) + def callable_fail_response(): + return "ok", 200 + + @app.route("/callable/pass-url") + @checkpoint_callable(always_true, pass_url="/pass-page") + def callable_pass_url(): + return "ok", 200 + + @app.route("/callable/disable-default-fail") + @checkpoint_callable(always_false, disable_default_fail=True) + def callable_disable_default_fail(): + return "fell through", 200 + + @app.route("/callable/default-fail") + @checkpoint_callable(always_false, fail_status=403) + def callable_default_fail(): + return "ok", 200 + + @app.route("/callable/lazy-session") + @checkpoint_callable( + session_has_flag, + predefined_args={"value": lazy_session_get("flag", False)}, + fail_status=403, + ) + def callable_lazy_session(): + return "ok", 200 + + @app.route("/callable/url-args/") + @checkpoint_callable(url_arg_is_admin, include_url_args=True, fail_status=403) + def callable_url_args(name: str): + return f"hello {name}", 200 + + # ------------------------------------------------------------------ + # include_csrf + # ------------------------------------------------------------------ + @app.route("/csrf-form", methods=["GET", "POST"]) + @include_csrf(session_key="csrf", form_key="csrf") + def csrf_form(): + if "csrf" in session: + return session["csrf"], 200 + return "no token", 200 + + # ------------------------------------------------------------------ + # `checkpoint` rejects anything that isn't a BaseCheckpoint. + # ------------------------------------------------------------------ + @app.route("/bad-checkpoint") + @checkpoint("not a checkpoint") # type: ignore[arg-type] + def bad_checkpoint(): + return "ok", 200 + + return app + + +@pytest.fixture(scope="module") +def sec_app(): + return _build_app() + + +@pytest.fixture() +def sec_client(sec_app): + return sec_app.test_client() + + +# --------------------------------------------------------------------------- +# SessionCheckpoint +# --------------------------------------------------------------------------- +def test_session_checkpoint_pass(sec_client): + sec_client.get("/session/login") + assert sec_client.get("/session/protected-default").status_code == 200 + + +def test_session_checkpoint_default_aborts_when_not_logged_in(sec_client): + sec_client.get("/session/logout") + assert sec_client.get("/session/protected-default").status_code == 403 + + +def test_session_checkpoint_fail_url_redirects(sec_client): + sec_client.get("/session/logout") + response = sec_client.get("/session/protected-fail-url") + assert response.status_code == 302 + assert response.headers["Location"].endswith("/fail-page") + + +def test_session_checkpoint_fail_json_returned(sec_client): + sec_client.get("/session/logout") + response = sec_client.get("/session/protected-fail-json") + assert response.status_code == 401 + assert response.get_json() == {"error": "login required"} + + +def test_session_checkpoint_fail_response_returned(sec_client): + sec_client.get("/session/logout") + response = sec_client.get("/session/protected-fail-response") + assert response.status_code == 418 + assert response.data == b"custom fail" + + +def test_session_checkpoint_pass_url_redirects(sec_client): + sec_client.get("/session/login") + response = sec_client.get("/session/login-page") + assert response.status_code == 302 + assert response.headers["Location"].endswith("/pass-page") + + +def test_session_checkpoint_disable_default_fail_runs_view(sec_client): + sec_client.get("/session/logout") + response = sec_client.get("/session/protected-disable-default") + assert response.status_code == 200 + assert response.data == b"fell through" + + +def test_session_checkpoint_list_values_match(sec_client): + sec_client.get("/session/set-role/admin") + assert sec_client.get("/session/role-list").status_code == 200 + + sec_client.get("/session/set-role/guest") + assert sec_client.get("/session/role-list").status_code == 403 + + +def test_session_checkpoint_lazy_url_for(sec_client): + sec_client.get("/session/logout") + response = sec_client.get("/session/lazy-fail") + assert response.status_code == 302 + assert response.headers["Location"].endswith("/index") + + +# --------------------------------------------------------------------------- +# APIKeyCheckpoint +# --------------------------------------------------------------------------- +def test_api_key_header_pass(sec_client): + response = sec_client.get("/api/header", headers={"x-api-key": API_KEY}) + assert response.status_code == 200 + + +def test_api_key_header_fail_missing(sec_client): + response = sec_client.get("/api/header") + assert response.status_code == 401 + assert response.get_json() == {"error": "bad key"} + + +def test_api_key_header_fail_wrong(sec_client): + response = sec_client.get("/api/header", headers={"x-api-key": "wrong"}) + assert response.status_code == 401 + + +def test_api_key_query_pass(sec_client): + response = sec_client.get(f"/api/query?api_key={API_KEY}") + assert response.status_code == 200 + + +def test_api_key_query_fail(sec_client): + response = sec_client.get("/api/query?api_key=wrong") + assert response.status_code == 401 + + +# --------------------------------------------------------------------------- +# BearerCheckpoint +# --------------------------------------------------------------------------- +def test_bearer_pass(sec_client): + response = sec_client.get( + "/api/bearer", headers={"Authorization": f"Bearer {BEARER_TOKEN}"} + ) + assert response.status_code == 200 + + +def test_bearer_fail_wrong_token(sec_client): + response = sec_client.get( + "/api/bearer", headers={"Authorization": "Bearer nope"} + ) + assert response.status_code == 401 + + +def test_bearer_fail_missing_auth(sec_client): + response = sec_client.get("/api/bearer") + assert response.status_code == 401 + + +def test_bearer_fail_wrong_auth_type(sec_client): + basic = base64.b64encode(b"user:pass").decode() + response = sec_client.get( + "/api/bearer", headers={"Authorization": f"Basic {basic}"} + ) + assert response.status_code == 401 + + +# --------------------------------------------------------------------------- +# checkpoint_callable +# --------------------------------------------------------------------------- +def test_callable_pass(sec_client): + assert sec_client.get("/callable/pass").status_code == 200 + + +def test_callable_fail_url(sec_client): + response = sec_client.get("/callable/fail-url") + assert response.status_code == 302 + assert response.headers["Location"].endswith("/fail-page") + + +def test_callable_fail_json(sec_client): + response = sec_client.get("/callable/fail-json") + assert response.status_code == 401 + assert response.get_json() == {"error": "no"} + + +def test_callable_fail_response(sec_client): + response = sec_client.get("/callable/fail-response") + assert response.status_code == 418 + assert response.data == b"teapot" + + +def test_callable_pass_url(sec_client): + response = sec_client.get("/callable/pass-url") + assert response.status_code == 302 + assert response.headers["Location"].endswith("/pass-page") + + +def test_callable_disable_default_fail_runs_view(sec_client): + response = sec_client.get("/callable/disable-default-fail") + assert response.status_code == 200 + assert response.data == b"fell through" + + +def test_callable_default_fail_aborts(sec_client): + response = sec_client.get("/callable/default-fail") + assert response.status_code == 403 + + +def test_callable_lazy_session_pass(sec_client): + with sec_client.session_transaction() as s: + s["flag"] = True + assert sec_client.get("/callable/lazy-session").status_code == 200 + + +def test_callable_lazy_session_fail(sec_client): + with sec_client.session_transaction() as s: + s.pop("flag", None) + assert sec_client.get("/callable/lazy-session").status_code == 403 + + +def test_callable_include_url_args_pass(sec_client): + assert sec_client.get("/callable/url-args/admin").status_code == 200 + + +def test_callable_include_url_args_fail(sec_client): + assert sec_client.get("/callable/url-args/guest").status_code == 403 + + +# --------------------------------------------------------------------------- +# include_csrf +# --------------------------------------------------------------------------- +def test_csrf_get_sets_token(sec_client): + response = sec_client.get("/csrf-form") + assert response.status_code == 200 + assert response.data # token echoed back + + +def test_csrf_post_valid_token_passes(sec_client): + token = sec_client.get("/csrf-form").data.decode() + response = sec_client.post("/csrf-form", data={"csrf": token}) + assert response.status_code == 200 + + +def test_csrf_post_mismatched_token_aborts(sec_client): + sec_client.get("/csrf-form") + response = sec_client.post("/csrf-form", data={"csrf": "wrong"}) + assert response.status_code == 401 + + +def test_csrf_post_missing_form_key_aborts(sec_client): + sec_client.get("/csrf-form") + response = sec_client.post("/csrf-form", data={}) + assert response.status_code == 401 + + +def test_csrf_post_missing_session_key_aborts(sec_client): + # Fresh client with no prior GET — no session token established. + fresh = sec_client.application.test_client() + response = fresh.post("/csrf-form", data={"csrf": "anything"}) + assert response.status_code == 401 + + +# --------------------------------------------------------------------------- +# Misuse / type guard on the `checkpoint` decorator. +# --------------------------------------------------------------------------- +def test_checkpoint_rejects_non_checkpoint(sec_client): + with pytest.raises(TypeError): + sec_client.get("/bad-checkpoint") + + +# --------------------------------------------------------------------------- +# BaseCheckpoint itself is not meant to be used directly. +# --------------------------------------------------------------------------- +def test_base_checkpoint_pass_is_abstract(): + with pytest.raises(NotImplementedError): + BaseCheckpoint().pass_() \ No newline at end of file