|
21 | 21 | from _pytask.node_protocols import PTaskWithPath |
22 | 22 |
|
23 | 23 |
|
| 24 | +def _make_local_upath_uri(path: Path, protocol: str) -> str: |
| 25 | + return f"{protocol}:///{path.as_posix().lstrip('/')}" |
| 26 | + |
| 27 | + |
24 | 28 | def test_collect_task(runner, tmp_path): |
25 | 29 | source = """ |
26 | 30 | from pathlib import Path |
@@ -396,6 +400,59 @@ def test_task_name_is_shortened(runner, tmp_path): |
396 | 400 | assert "a/b/task_example.py::task_example" not in result.output |
397 | 401 |
|
398 | 402 |
|
| 403 | +def test_collect_task_with_remote_upath_node(runner, tmp_path): |
| 404 | + pytest.importorskip("upath") |
| 405 | + |
| 406 | + source = """ |
| 407 | + from pathlib import Path |
| 408 | + from typing import Annotated |
| 409 | +
|
| 410 | + from upath import UPath |
| 411 | +
|
| 412 | + from pytask import PickleNode |
| 413 | + from pytask import Product |
| 414 | +
|
| 415 | + def task_example( |
| 416 | + data=PickleNode(path=UPath("s3://bucket/in.pkl")), |
| 417 | + path: Annotated[Path, Product] = Path("out.txt"), |
| 418 | + ): ... |
| 419 | + """ |
| 420 | + tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source)) |
| 421 | + |
| 422 | + result = runner.invoke(cli, ["collect", "--nodes", tmp_path.as_posix()]) |
| 423 | + |
| 424 | + assert result.exit_code == ExitCode.OK |
| 425 | + assert "s3://bucket/in.pkl" in result.output |
| 426 | + |
| 427 | + |
| 428 | +@pytest.mark.parametrize("protocol", ["file", "local"]) |
| 429 | +def test_collect_task_with_local_upath_protocol_node(runner, tmp_path, protocol): |
| 430 | + pytest.importorskip("upath") |
| 431 | + |
| 432 | + uri = _make_local_upath_uri(tmp_path / "in.pkl", protocol) |
| 433 | + |
| 434 | + source = f""" |
| 435 | + from pathlib import Path |
| 436 | + from typing import Annotated |
| 437 | +
|
| 438 | + from upath import UPath |
| 439 | +
|
| 440 | + from pytask import PickleNode |
| 441 | + from pytask import Product |
| 442 | +
|
| 443 | + def task_example( |
| 444 | + data=PickleNode(path=UPath("{uri}")), |
| 445 | + path: Annotated[Path, Product] = Path("out.txt"), |
| 446 | + ): ... |
| 447 | + """ |
| 448 | + tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source)) |
| 449 | + |
| 450 | + result = runner.invoke(cli, ["collect", "--nodes", tmp_path.as_posix()]) |
| 451 | + |
| 452 | + assert result.exit_code == ExitCode.OK |
| 453 | + assert f"{tmp_path.name}/in.pkl" in result.output |
| 454 | + |
| 455 | + |
399 | 456 | def test_python_node_is_collected(runner, tmp_path): |
400 | 457 | source = """ |
401 | 458 | from pytask import Product |
|
0 commit comments