|
14 | 14 | from collections.abc import Iterator |
15 | 15 | from datetime import datetime, timezone |
16 | 16 | from io import BytesIO |
| 17 | +from typing import Any |
17 | 18 |
|
| 19 | +import jmespath |
18 | 20 | import numpy |
19 | 21 | import pyarrow as pa |
20 | 22 | import pyarrow.parquet as pq |
@@ -162,7 +164,122 @@ def get_sample_table( |
162 | 164 |
|
163 | 165 |
|
164 | 166 | def get_sample_table_and_bytes(table_format: BaseTableFormat, n_rows: int) -> tuple[pa.Table, bytes]: |
165 | | - memory = BytesIO() |
166 | 167 | table = get_sample_table(table_format, n_rows) |
| 168 | + return table, write_table_to_bytes(table) |
| 169 | + |
| 170 | + |
| 171 | +def write_table_to_bytes(table: pa.Table) -> bytes: |
| 172 | + memory = BytesIO() |
167 | 173 | pq.write_table(table, where=memory, version="2.4", compression="gzip") |
168 | | - return table, memory.getvalue() |
| 174 | + return memory.getvalue() |
| 175 | + |
| 176 | + |
| 177 | +# Support for assignment operations using JMESPath expressions. |
| 178 | +# Could be moved to evo.jmespath in the future, if we want to expose this functionality outside of tests. |
| 179 | +class _AssignmentTargetDictEntry: |
| 180 | + """Represents a dictionary entry that potentially can be assigned to.""" |
| 181 | + |
| 182 | + def __init__(self, key: str, obj: dict): |
| 183 | + self.key = key |
| 184 | + self.obj = obj |
| 185 | + |
| 186 | + @property |
| 187 | + def value(self) -> Any: |
| 188 | + """Get the value at this dictionary entry, creating an empty dict if it doesn't exist.""" |
| 189 | + return self.obj.setdefault(self.key, {}) |
| 190 | + |
| 191 | + |
| 192 | +class _AssignmentTargetListEntry: |
| 193 | + """Represents a list entry that potentially can be assigned to.""" |
| 194 | + |
| 195 | + def __init__(self, index: int, obj: list): |
| 196 | + self.index = index |
| 197 | + self.obj = obj |
| 198 | + |
| 199 | + @property |
| 200 | + def value(self) -> Any: |
| 201 | + """Get the value at this list entry, or None if the index is out of range.""" |
| 202 | + try: |
| 203 | + return self.obj[self.index] |
| 204 | + except IndexError: |
| 205 | + return None |
| 206 | + |
| 207 | + |
| 208 | +class _AssignInterpreter(jmespath.visitor.Visitor): |
| 209 | + """A JMESPath visitor used for processing assignment operations. |
| 210 | +
|
| 211 | + This only supports a subset of JMESPath expressions that can be used for assignment. |
| 212 | +
|
| 213 | + This works by lazily evaluating field and index accesses, so that the last operation can be turned into an |
| 214 | + assignment. If another operation is encountered after a field or index access, the value is evaluated at that |
| 215 | + point. |
| 216 | + """ |
| 217 | + |
| 218 | + def default_visit(self, node, *args, **kwargs): |
| 219 | + raise NotImplementedError(node["type"]) |
| 220 | + |
| 221 | + @staticmethod |
| 222 | + def _evaluate_value(value): |
| 223 | + """Lazily evaluate the value if it's an assignment target.""" |
| 224 | + if isinstance(value, (_AssignmentTargetDictEntry, _AssignmentTargetListEntry)): |
| 225 | + return value.value |
| 226 | + else: |
| 227 | + return value |
| 228 | + |
| 229 | + def visit_field(self, node, value): |
| 230 | + """Visit a field access node, i.e. foo.bar.""" |
| 231 | + evaluated_value = self._evaluate_value(value) |
| 232 | + if not isinstance(evaluated_value, dict): |
| 233 | + return None |
| 234 | + return _AssignmentTargetDictEntry(node["value"], evaluated_value) |
| 235 | + |
| 236 | + def visit_index(self, node, value): |
| 237 | + """Visit an index access node, i.e. foo[0].""" |
| 238 | + evaluated_value = self._evaluate_value(value) |
| 239 | + if not isinstance(evaluated_value, list): |
| 240 | + return None |
| 241 | + return _AssignmentTargetListEntry(node["value"], evaluated_value) |
| 242 | + |
| 243 | + def _visit_sub_or_index_expression(self, node, value): |
| 244 | + """Visit a subexpression or index expression node, i.e. foo.bar.baz or a[0][1].""" |
| 245 | + result = value |
| 246 | + for node in node["children"]: |
| 247 | + result = self.visit(node, result) |
| 248 | + return result |
| 249 | + |
| 250 | + visit_subexpression = _visit_sub_or_index_expression |
| 251 | + visit_index_expression = _visit_sub_or_index_expression |
| 252 | + |
| 253 | + |
| 254 | +def assign_property(obj: dict, expression: str, value: Any) -> None: |
| 255 | + """Assign a value to a property in a dictionary using a JMESPath expression. |
| 256 | +
|
| 257 | + This only supports a subset of JMESPath expressions that can be used for assignment. In particular, only the following |
| 258 | + expression types are supported: |
| 259 | + - Field accesses (e.g. foo.bar) |
| 260 | + - Index accesses (e.g. foo[0]) |
| 261 | + - Subexpressions combining the above (e.g. foo.bar[0].baz) |
| 262 | + If the expression is not in that form, a JMESPathError will be raised. |
| 263 | +
|
| 264 | + Also, if the expression attempts to perform an invalid operation like: |
| 265 | + - Accessing a field on a non-object |
| 266 | + - Accessing an index on a non-array |
| 267 | + - Accessing an out-of-bounds index on an array |
| 268 | + then a JMESPathError will be raised. |
| 269 | +
|
| 270 | + Accessing a non-existent field on an object will create an empty object at that field to allow for nested assignments. |
| 271 | +
|
| 272 | + :param obj: The dictionary to assign the property to. |
| 273 | + :param expression: The JMESPath expression representing the property to assign to. |
| 274 | + :param value: The value to assign to the property. |
| 275 | + """ |
| 276 | + parsed_expression = jmespath.compile(expression) |
| 277 | + interpreter = _AssignInterpreter() |
| 278 | + target = interpreter.visit(parsed_expression.parsed, obj) |
| 279 | + |
| 280 | + if isinstance(target, _AssignmentTargetDictEntry): |
| 281 | + target.obj[target.key] = value |
| 282 | + elif isinstance(target, _AssignmentTargetListEntry): |
| 283 | + target.obj[target.index] = value |
| 284 | + else: |
| 285 | + raise TypeError(f"Cannot assign to expression '{expression}'") |
0 commit comments