-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpy_runner.py
More file actions
123 lines (101 loc) · 3.93 KB
/
py_runner.py
File metadata and controls
123 lines (101 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, List
ROOT = Path(__file__).resolve().parent
REPO_ROOT = ROOT.parent
PY_CLIENT_PATH = REPO_ROOT / "dclimate-client-py"
# Use local clone of the Python client
sys.path.insert(0, str(PY_CLIENT_PATH))
from dclimate_client_py import dClimateClient # type: ignore # pylint: disable=wrong-import-position
def parse_case() -> dict:
raw = sys.stdin.read()
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise SystemExit(f"Invalid case payload: {exc}") from exc
def parse_iso8601(value: str) -> datetime:
if value.endswith("Z"):
value = value.replace("Z", "+00:00")
dt = datetime.fromisoformat(value)
# Convert to UTC and drop tzinfo to match tz-naive dataset coords
if dt.tzinfo is not None:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt
def flatten(values: Any) -> List[Any]:
flattened: List[Any] = []
stack = [values]
while stack:
current = stack.pop()
if isinstance(current, list):
stack.extend(reversed(current))
elif current is None:
flattened.append(None)
elif isinstance(current, (int, float)):
flattened.append(float(current))
return flattened
def sanitize_metadata(metadata: dict) -> dict:
safe = {k: v for k, v in metadata.items() if k not in {"slug"}}
if "timestamp" in safe and safe["timestamp"] is not None:
safe["timestamp"] = int(safe["timestamp"])
return safe
async def run_case(test_case: dict) -> dict:
python_opts = test_case.get("python", {})
selection = test_case.get("selection", {}) or {}
request = test_case.get("request", {}) or {}
variable = test_case.get("variable") or request.get("dataset")
async with dClimateClient() as client:
dataset, metadata = await client.load_dataset(
dataset=request.get("dataset"),
collection=request.get("collection"),
variant=request.get("variant"),
cid=python_opts.get("cid"),
return_xarray=False,
)
selected_variable = None
# Ensure dataset is scoped to a data variable if needed
if hasattr(dataset, "data"):
data_vars = list(dataset.data.data_vars)
if variable in data_vars:
dataset = dataset.use(variable)
selected_variable = variable
elif len(data_vars) == 1:
dataset = dataset.use(data_vars[0])
selected_variable = data_vars[0]
elif len(data_vars) > 1:
dataset = dataset.use(data_vars[0])
selected_variable = data_vars[0]
if selection.get("point"):
point = selection["point"]
dataset = dataset.point(
latitude=point["latitude"],
longitude=point["longitude"],
snap_to_grid=point.get("snap_to_grid", True),
)
if selection.get("timeRange"):
time_range = selection["timeRange"]
dataset = dataset.time_range(
parse_iso8601(time_range["start"]),
parse_iso8601(time_range["end"]),
)
payload = dataset.as_dict()
values = flatten(payload.get("data", []))
return {
"origin": "python",
"metadata": sanitize_metadata(metadata),
"selectedVariable": selected_variable,
"variables": list(dataset.data.data_vars),
"coords": {
"dimensions": {k: int(v) for k, v in dataset.data.sizes.items()},
"coordKeys": list(dataset.data.coords.keys()),
},
"values": values,
}
async def main():
test_case = parse_case()
result = await run_case(test_case)
sys.stdout.write(json.dumps(result, default=str))
if __name__ == "__main__":
asyncio.run(main())