-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathcoiled_utils.py
More file actions
34 lines (25 loc) · 936 Bytes
/
coiled_utils.py
File metadata and controls
34 lines (25 loc) · 936 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import Any
from attrs import define
if TYPE_CHECKING:
from collections.abc import Callable
try:
from coiled.function import Function
except ImportError:
@define
class Function:
cluster_kwargs: dict[str, Any]
environ: dict[str, Any]
function: Callable[..., Any] | None
keepalive: str | None
__all__ = ["Function"]
def extract_coiled_function_kwargs(func: Function) -> dict[str, Any]:
"""Extract the kwargs for a coiled function."""
return {
"cluster_kwargs": func._cluster_kwargs, # ty: ignore[possibly-missing-attribute]
"keepalive": func.keepalive,
"environ": func._environ, # ty: ignore[possibly-missing-attribute]
"local": func._local, # ty: ignore[possibly-missing-attribute]
"name": func._name, # ty: ignore[possibly-missing-attribute]
}