-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdata_models.py
More file actions
2342 lines (2038 loc) · 88.9 KB
/
data_models.py
File metadata and controls
2342 lines (2038 loc) · 88.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Module containing pydantic data models used throughout the map_processing package.
Notes:
For more info on pydantic, visit: https://pydantic-docs.helpmanual.io/
Interpreting class name prefixes which describe how the models are used:
- "GT" --> ground truth data
- "UG" --> un-processed graph data
- "PG" --> processed graph data
- "O" --> optimization-related data (either results or configuration)
Explanation of '# noinspection PyMethodParameters': For some reason, decorating validators with a classmethod
decorator prevents successful use of the validator.
Note that there are some pydantic models that contain a configuration that allows them to have numpy arrays even
though they do not have any numpy arrays as attributes. This is because these classes are composed of other pydantic
models that do contain numpy arrays, and the configuration that allows this must be replicated in any pydantic model
in which these numpy array-containing models are used.
"""
import itertools
from enum import Enum
from typing import Union, Optional, Dict, List, Tuple, Any
import numpy as np
from g2o import SE3Quat
from matplotlib import pyplot as plt
from pydantic import BaseModel, conlist, Field, confloat, conint, validator
from map_processing import VertexType, ASSUMED_TAG_SIZE
from map_processing.transform_utils import (
NEGATE_Y_AND_Z_AXES,
transform_matrix_to_vector,
LEN_3_UNIT_VEC,
)
from mpl_toolkits.axes_grid1 import make_axes_locatable
ARRAY_SUMMARIZATION_THRESHOLD = int(
1e9
) # Arbitrarily high integer # TODO: Check what this does
def _is_arr_of_right_shape(
v: Optional[np.ndarray], shape: Tuple[int, ...], is_optional: bool = False
):
"""
Check if the array entered is of the shape specified
Args:
v: An optional numpy ndarray repesenting the array to check shape for
shape: A tuple representing the shape of each dimension of the array to be expected
is_optional: A boolean representing whether the check is optional or not
Returns:
v_sqz: A numpy ndarray representing the result of np.squeeze on the array provided.
Raises:
ValueError: When any NaN values are contained in the array
When a not optional array is None
When array is not the same size as expected after np.squeeze
When array is not the same shape as expected
"""
expected_num_dims = len(shape)
# Check array is None
if v is None:
if is_optional:
return v
else:
raise ValueError(
"Value provided that was marked as non-optionally None is None"
)
# Check value in array is NaN
if np.any(v == np.nan):
raise ValueError("Numpy array cannot contain any NaN values")
# Check if squeeze gives wrong dimensions
v_sqz: np.ndarray = np.squeeze(v)
if v_sqz.ndim != expected_num_dims:
raise ValueError(
f"Field that should have been an array was found to not have the right dimensions (number of dims found to "
f"be {v_sqz.ndim} after squeezing the array)"
)
# Check if shape is wrong
for dim_idx, dim in enumerate(shape):
if 0 <= shape[dim_idx] != v_sqz.shape[dim_idx]:
raise ValueError(
f"Field that should have had an array of shape {shape} had a shape of {v_sqz.shape} (note that "
f"negative expected dimensions, if there are any, mean that the matrix can be of any size along that "
f"axis)"
)
return v_sqz
def _is_vector_of_right_length(v: np.ndarray, length: int) -> np.ndarray:
"""
Check if vector entered is of the correct length.
Args:
v: A numpy ndarray representing the vector to be checked for right length
length: An int representing the correct length of the ndarray
Returns:
v_sqz: A numpy ndarray representing the result of np.squeeze on the vector inputted
Raises:
ValueError: When NaN values in vector
When vector has >1 dimension
When length is not the same as expected
"""
v_sqz: np.ndarray = np.squeeze(v)
# Check if value in vector is NaN
if np.any(v == np.nan):
raise ValueError("Numpy array cannot contain any NaN values")
# Check if vector has >1 dimensions
if v_sqz.ndim != 1:
raise ValueError(
f"field that should have been a vector was found to not have the right dimensions (number of dims found to "
f"be {v_sqz.ndim} after squeezing the array)"
)
# Check if size of vector is right
if v_sqz.size != length:
raise ValueError(
f"Expected vector to be of length {length} but instead found the length to be {v_sqz.size}"
)
return v_sqz
def _validator_for_numpy_array_deserialization(v: Union[str, np.ndarray]) -> np.ndarray:
"""
Convert any string based array
Args:
v: A str or numpy array representing the array to be deserialized
Returns:
A numpy ndarray representing the deserialized version of the array entered
"""
if isinstance(v, np.ndarray):
return v
elif isinstance(v, str):
return np.fromstring(v.strip("[").strip("]"), sep=" ")
else:
raise ValueError(
f"Attempted to parse value for an array-type field that is not handled: {type(v)}"
)
class Weights(BaseModel):
"""
A representation of weights for all parameters from data
Attributes:
orig_gravity: A numpy ndarray representing initial gravity weights
orig_odometry: A numpy ndarray representing initial odometry weights
orig_tag: A numpy ndarray representing inital tag weights
orig_tag_sba: A numpy ndarray representing inital tag weights after SBA
odom_tag_ratio: A float representing the odom tag ratio (in sweep results json)
normalize: A boolean representing whether to normalize or not
_check_gravity_is_correct_length_vector: A Validator for 'orig_gravity' correct length
_check_odometry_is_correct_length_vector: A Validator for 'orig_odometry' correct length
_check_tag_is_correct_length_vector: A Validator for 'orig_tag' correct length
_check_tag_sba_is_correct_length_vector: A Validator for 'orig_tag_sba' correct length
_deserialize_gravity_vector_if_needed: A Validator for 'orig_gravity' deserialization
_deserialize_odometry_vector_if_needed: A Validator for 'orig_odometry' deserialization
_deserialize_tag_vector_if_needed: A Validator for 'orig_tag' deserialization
_deserialize_tag_sba_vector_if_needed: A Validator for 'orig_tag_sba' deserialization
"""
orig_gravity: np.ndarray = Field(default_factory=lambda: np.ones(3))
orig_odometry: np.ndarray = Field(default_factory=lambda: np.ones(6))
orig_tag: np.ndarray = Field(default_factory=lambda: np.ones(6))
orig_tag_sba: np.ndarray = Field(default_factory=lambda: np.ones(2))
odom_tag_ratio: confloat(ge=0.00001) = 1.0
normalize: bool = False
class Config:
"""
A configuration for weights
Attributes:
arbitrary_types_allowed: A boolean representing whether types such as np arrays can
be used as fields.
json_encoders: Encode array to string (used to write to json)
"""
arbitrary_types_allowed = True
json_encoders = {
np.ndarray: lambda arr: np.array2string(
arr, threshold=ARRAY_SUMMARIZATION_THRESHOLD
)
}
# Vector validators
_check_gravity_is_correct_length_vector = validator(
"orig_gravity", allow_reuse=True
)(lambda v: _is_vector_of_right_length(v, 3))
_check_odometry_is_correct_length_vector = validator(
"orig_odometry", allow_reuse=True
)(lambda v: _is_vector_of_right_length(v, 6))
_check_tag_is_correct_length_vector = validator("orig_tag", allow_reuse=True)(
lambda v: _is_vector_of_right_length(v, 6)
)
_check_tag_sba_is_correct_length_vector = validator(
"orig_tag_sba", allow_reuse=True
)(lambda v: _is_vector_of_right_length(v, 2))
_deserialize_gravity_vector_if_needed = validator(
"orig_gravity", allow_reuse=True, pre=True
)(_validator_for_numpy_array_deserialization)
_deserialize_odometry_vector_if_needed = validator(
"orig_odometry", allow_reuse=True, pre=True
)(_validator_for_numpy_array_deserialization)
_deserialize_tag_vector_if_needed = validator(
"orig_tag", allow_reuse=True, pre=True
)(_validator_for_numpy_array_deserialization)
_deserialize_tag_sba_vector_if_needed = validator(
"orig_tag_sba", allow_reuse=True, pre=True
)(_validator_for_numpy_array_deserialization)
@property
def odometry(self) -> np.ndarray:
"""
If normalization True, normalized odometry provided, else orig_odometry
Returns:
A numpy ndarray representing the calculated odometry
"""
odom_mag = 1
if self.normalize:
odom_mag = np.linalg.norm(self.orig_odometry)
if odom_mag == 0: # Avoid divide by zero error
odom_mag = 1
return self.orig_odometry * self.odom_tag_ratio / odom_mag
@property
def gravity(self) -> np.ndarray:
"""
If normalization True, normalized gravity provided, else orig_gravity
Returns:
A numpy ndarray representing the calculated gravity
"""
grav_mag = 1
if self.normalize:
grav_mag = np.linalg.norm(self.orig_gravity)
if grav_mag == 0:
grav_mag = 1
return self.orig_gravity / grav_mag
@property
def tag(self) -> np.ndarray:
"""
If normalization True, normalized tag positions provided, else orig_tag
Returns:
A numpy ndarray repesenting the calculated tag positions
"""
tag_mag = 1
if self.normalize:
tag_mag = np.linalg.norm(self.orig_tag)
if tag_mag == 0:
tag_mag = 1
return self.orig_tag / tag_mag
@property
def tag_sba(self) -> np.ndarray:
"""
If normalization True, normalized SBA tag positions provided, else orig_tag_sba
Returns:
A numpy ndarray representing the calculated SBA tag positions
"""
tag_sba_mag = 1
if self.normalize:
tag_sba_mag = np.linalg.norm(self.orig_tag_sba)
if tag_sba_mag == 0:
tag_sba_mag = 1
return self.orig_tag_sba / tag_sba_mag
@property
def tag_odom_ratio(self):
"""
Convert odom:tag ratio to tag:odom ratio by inversing the number
Returns:
A float representing the tag to odometry ratio.
"""
return 1 / self.odom_tag_ratio
@classmethod
def legacy_from_array(cls, array: Union[np.ndarray, List[float]]) -> "Weights":
"""
Converts array to Weights type (legacy)
Returns:
A Weights type representing the inputted array
"""
return Weights(**cls.legacy_weight_dict_from_array(array))
@staticmethod
def legacy_weight_dict_from_array(
array: Union[np.ndarray, List[float]]
) -> Dict[str, Union[float, np.ndarray]]:
"""
Construct a normalized weight dictionary from a given array of values using the legacy approach.
Args:
array: A np.ndarray or List of floats representing the array to create a normalized
weight dictionary from
Returns:
A dictionary mapping strings to either floats or np.ndarrays representing the normalized
weights dictionary
Raises:
Exception: If length of array is not supported (0 or >6)
TODO: refactor places where this is function is used to not use this approach of constructing weights from a
single numpy array
"""
weights = Weights().dict()
length = array.size if isinstance(array, np.ndarray) else len(array)
half_len = length // 2
has_ratio = length % 2 == 1
if length == 1: # ratio
weights["orig_odom_tag_ratio"] = array[0]
elif length == 2: # tag/odom pose:rot/tag-sba x:y, ratio
weights["orig_odometry"] = np.array([array[0]] * 3 + [1] * 3)
weights["orig_tag"] = np.array([array[0]] * 3 + [1] * 3)
weights["orig_tag_sba"] = np.array([array[0], 1])
weights["odom_tag_ratio"] = array[1]
elif length == 3: # odom pose:rot, tag pose:rot/tag-sba x:y, ratio
weights["orig_odometry"] = np.array([array[0]] * 3 + [1] * 3)
weights["orig_tag"] = np.array([array[1]] * 3 + [1] * 3)
weights["orig_tag_sba"] = np.array([array[1], 1])
weights["odom_tag_ratio"] = array[2]
elif (
half_len == 2
): # odom pose, odom rot, tag pose/tag-sba x, tag rot/tag-sba y, (ratio)
weights["orig_odometry"] = np.array([array[0]] * 3 + [array[1]] * 3)
weights["orig_tag"] = np.array([array[2]] * 3 + [array[3]] * 3)
weights["orig_tag_sba"] = np.array(array[2:])
weights["odom_tag_ratio"] = array[-1] if has_ratio else 1
elif half_len == 3: # odom x y z qx qy, tag-sba x, (ratio)
weights["orig_odometry"] = np.array(array[:5])
weights["orig_tag_sba"] = np.array([array[5]])
weights["odom_tag_ratio"] = array[-1] if has_ratio else 1
elif length == 4: # odom, tag-sba, (ratio)
weights["orig_odometry"] = np.array(array[:6])
weights["orig_tag_sba"] = np.array(array[6:])
weights["odom_tag_ratio"] = array[-1] if has_ratio else 1
elif length == 5: # odom x y z qx qy, tag x y z qx qy, (ratio)
weights["orig_odometry"] = np.array(array[:5])
weights["orig_tag"] = np.array(array[5:])
weights["odom_tag_ratio"] = array[-1] if has_ratio else 1
elif length == 6: # odom, tag, (ratio)
weights["orig_odometry"] = np.array(array[:6])
weights["orig_tag"] = np.array(array[6:])
weights["odom_tag_ratio"] = array[-1] if has_ratio else 1
else:
raise Exception(f"Weight length of {length} is not supported")
weights["normalize"] = True
w = Weights(**weights)
return w.dict()
def get_weights_from_end_vertex_mode(self, end_vertex_mode: Optional[VertexType]):
"""
Args:
end_vertex_mode: Mode of the end vertex of the edge
Returns:
A copy of the edge weight vector selected according to the mode of an edge's end vertex. An end vertex mode
of type waypoint returns a vector of 1s.
Raises:
ValueError: If the end_vertex_mode is not recognized
"""
if end_vertex_mode == VertexType.ODOMETRY:
return np.array(self.odometry)
elif end_vertex_mode == VertexType.TAG:
return np.array(self.tag)
elif end_vertex_mode == VertexType.TAGPOINT:
return np.array(self.tag_sba)
elif end_vertex_mode is None:
return np.array(self.gravity)
elif end_vertex_mode == VertexType.WAYPOINT:
return np.ones(6) # TODO: set to something other than identity?
else:
raise Exception(f"Edge of end type {end_vertex_mode} not recognized")
class UGPoseDatum(BaseModel):
"""
Represents a single pose datum.
Attributes:
pose: Pose as a tuple of floats where reshaping into a 4x4 array using Fortran-like index
order results in the transform matrix. For more information on Fortran-like indexing
from the numpy documentation: "...means to read / write the elements using Fortran-like
index order, with the first index changing fastest, and the last index changing slowest.
timestamp: A float representing the time of the pose datum being recorded
planes: A list representing the plane of the datum (*CURRENTLY SKIPPED IN GRAPH GENERATION*)
id: An int representing the id of the pose datum
"""
pose: conlist(Union[float, int], min_items=16, max_items=16)
timestamp: float
planes: List = []
id: int
@property
def pose_as_matrix(self) -> np.ndarray:
"""
Converts pose to array
Returns:
np.ndarray representing the current pose as a 4x4 array
"""
return np.reshape(np.array(self.pose), (4, 4), order="F")
@property
def position(self) -> np.ndarray:
"""
Gets position from pose
Returns:
np.ndarray representing just the position of the datum as a 3x3 array
"""
return self.pose_as_matrix[:3, 3]
def __repr__(self):
return f"<{UGPoseDatum.__name__} id={self.id}> position(x,y,z)={tuple(self.position)}"
class UGTagDatum(BaseModel):
"""
Represents a single tag observation datum.
Attributes:
tag_corners_pixel_coordinates: values alternate between x and y coordinates in the camera
frame. Tag corner order convention: Bottom right, bottom left, top left, top right.
tag_id: An int representing the id of the tag detected
pose_id: An int representing the id of the pose the tag was detected in
camera_intrinsics: Camera intrinsics in the order of: fx, fy, cx, cy
timestamp: A float representing the time the tag was detected
tag_pose: A list of floats and ints representing the pose of the tag detected
tag_position_variance: A list of floats and ints representing the variance in position of
tag
tag_orientation_variance: A list of floats and ints representing the variance in orientation
of tag
joint_covar: A list of floats and ints representing 'joint_covar' from json
"""
tag_corners_pixel_coordinates: conlist(Union[float, int], min_items=8, max_items=8)
tag_id: int
pose_id: int
camera_intrinsics: conlist(Union[float, int], min_items=4, max_items=4)
timestamp: float
tag_pose: conlist(Union[float, int], min_items=16, max_items=16)
tag_position_variance: conlist(Union[float, int], min_items=3, max_items=3) = [
0,
] * 3
tag_orientation_variance: conlist(Union[float, int], min_items=4, max_items=4) = [
0,
] * 4
joint_covar: conlist(Union[float, int], min_items=49, max_items=49) = list(
np.eye(7).flatten()
)
@property
def tag_pose_as_matrix(self) -> np.ndarray:
"""
Converts tag pose to matrix form
Returns:
A np.ndarray representing the pose of the tag as a 4x4 matrix
"""
return np.reshape(np.array(self.tag_pose), (4, 4), order="F")
@property
def obs_dist(self) -> float:
"""
Finds distance from origin of tag (Euclidean)
Returns:
A float representing the Euclidean distance of the position of the tag
"""
return np.linalg.norm(self.tag_pose_as_matrix[:3, 3])
def __repr__(self):
return f"<{UGTagDatum.__name__} tag_id={self.tag_id} pose_id={self.pose_id} obs_dist={self.obs_dist}>"
class UGLocationDatum(BaseModel):
"""
Represents a location datum, mapping a name to a pose id and transform
Attributes:
transform: Pose as a tuple of floats where reshaping into a 4x4 array using C-like index
order results in the transform matrix. For more information on Fortran-like indexing
from the numpy documentation: "means to read / write the elements using C-like index
order, with the last axis index changing fastest, back to the first axis index changing
slowest
name: A string representing the name of the location
timestamp: A float representing the timestamp of the observation
pose_id: An int representing the id of the pose associated with the observation.
"""
transform: conlist(Union[float, int], min_items=16, max_items=16)
# TODO: validate assumption that this transform actually uses C-like indexing
name: str
timestamp: float
pose_id: int
class GenerateParams(BaseModel):
# noinspection PyUnresolvedReferences
"""
Configures data set generation.
Attributes:
dataset_name: A string provided as the data set name to the cache manager when the generated
data set is cached.
map_id: A string provided as the map_id field in the UGDataSet object when exported.
parameterized_path_args: A dictionary to pass as the second positional argument to the
`path_from` argument if it is a callable (if the `path_from` argument is not a callable,
then this argument is ignored).
t_max: For a parameterized path, this is the max parameter value to use when evaluating the
path.
n_poses: Number of poses to sample a parameterized path at; if a recorded path is provided,
then this argument is ignored.
dist_threshold: Maximum distance from which a tag can be considered observable.
aoa_threshold: Maximum angle of attack (in radians) from which a tag can be considered
observable. The angle of attack is calculated as the angle between the z-axis of the
tag pose and the vector from the tag to the phone.
tag_size: Height/width dimension of the (square) tags in meters.
obs_noise_var: Variance parameter for the observation model. Specifies the variance for the
distribution from which pixel noise is sampled and added to the simulated tag corner
pixel observations. Note that the simulated tag observation poses are re-derived from
these noise pixel observations.
odometry_noise_var: Dictionary mapping a dimension to which noise is applied to the variance
of the Gaussian noise in that direction.
Properties:
delta_t: For a parameterized path, this gives the time delta used between each of the
points. If the path is a recorded path, then this value is set to 0 arbitrarily.
"""
class OdomNoiseDims(str, Enum):
"""
Ordering for odom noise dimensions
"""
X = "x"
Y = "y"
Z = "z"
RVERT = "rvert"
@staticmethod
def ordering() -> List:
return [
GenerateParams.OdomNoiseDims.X,
GenerateParams.OdomNoiseDims.Y,
GenerateParams.OdomNoiseDims.Z,
GenerateParams.OdomNoiseDims.RVERT,
]
class GenerateParamsEnum(str, Enum):
"""
Enumeration for GenerateParams
"""
ODOMETRY_NOISE_VAR_X = "odometry_noise_var_x"
ODOMETRY_NOISE_VAR_Y = "odometry_noise_var_y"
ODOMETRY_NOISE_VAR_Z = "odometry_noise_var_z"
ODOMETRY_NOISE_VAR_RVERT = "odometry_noise_var_rvert"
OBS_NOISE_VAR = "obs_noise_var"
class AltGenerateParamsEnum(str, Enum):
"""
OBS_NOISE_VAR: Sets the observation noise variance of the generated data set
LIN_TO_ANG_VEL_VAR: Defines the ratio between the magnitude of the linear velocity variance
vector and the angular velocity variance.
"""
OBS_NOISE_VAR = "obs_noise_var"
LIN_TO_ANG_VEL_VAR = "lin_to_ang_vel_var"
dataset_name: str
map_id: Optional[str] = None
dist_threshold: confloat(ge=0) = 3.7
aoa_threshold: confloat(ge=0, le=np.pi) = np.pi / 4
tag_size: confloat(gt=0) = ASSUMED_TAG_SIZE
odometry_noise_var: Dict[OdomNoiseDims, float] = Field(
default_factory=lambda: {
GenerateParams.OdomNoiseDims.X: 0,
GenerateParams.OdomNoiseDims.Y: 0,
GenerateParams.OdomNoiseDims.Z: 0,
GenerateParams.OdomNoiseDims.RVERT: 0,
}
)
obs_noise_var: confloat(ge=0) = 0.0
t_max: Optional[confloat(gt=0)] = None
n_poses: Optional[conint(ge=2)] = None
parameterized_path_args: Optional[
Dict[str, Union[float, Tuple[float, float]]]
] = None
# noinspection PyMethodParameters
@validator("parameterized_path_args")
def validate_interdependent_null_values(cls, v, values):
"""
Validates interdependent null values by checking if v, t_max, and n_poses are correct or not
Raises:
ValueError: If all three values are not equal: (all three None or all three not None)
"""
v_is_none = v is None
t_max_is_none = values["t_max"] is None
n_poses_is_none = values["n_poses"] is None
if not (
(v_is_none and t_max_is_none and n_poses_is_none)
or (not v_is_none and not t_max_is_none and not n_poses_is_none)
):
raise ValueError(
"tag_poses_for_parameterized, t_max, n_poses, and \
parameterized_path_args members must both be None or not None."
)
return v
@property
def delta_t(self):
"""
If t_max is not None, then a delta-time value is computed from t_max and the number of
specified poses
"""
if self.t_max is not None:
return self.t_max / (self.n_poses - 1)
else:
return 0
@property
def lin_to_ang_var(self) -> float:
"""
Finds norm of X, Y, Z and divide by rvert
"""
return (
np.linalg.norm(
np.array(
[
self.odometry_noise_var[GenerateParams.OdomNoiseDims.X],
self.odometry_noise_var[GenerateParams.OdomNoiseDims.Y],
self.odometry_noise_var[GenerateParams.OdomNoiseDims.Z],
]
)
)
/ self.odometry_noise_var[GenerateParams.OdomNoiseDims.RVERT]
)
# noinspection DuplicatedCode
@classmethod
def generate_params_generator(
cls,
param_multiplicands: Dict[GenerateParamsEnum, np.ndarray],
param_order: List[GenerateParamsEnum],
base_generate_params: "GenerateParams",
) -> Tuple[List[Tuple[Any, ...]], List["GenerateParams"]]:
"""
Generator yielding instances of this class according to the cartesian product of the
provided parameters.
Args:
param_multiplicands: Dictionary mapping parameters to arrays of values whose cartesian
product is taken.
param_order: Ordering of the keys in param_multiplicands.
base_generate_params: Supplies every parameter not prescribed by param_multiplicands.
Returns:
A list of each tuple of parameters computed from the cartesian product (the length of
which is equivalent to the length of param_order) and a list of the generated objects.
Raises:
ValueError: If the keys of param_multiplicands elements of param_order are not the same.
"""
included_params = set(param_order)
if set(param_multiplicands.keys()) != included_params:
raise ValueError(
"The sets of param_multiplicands keys and param_order items must be equal"
)
product_args = []
sweep_param_to_product_idx: Dict[GenerateParams.GenerateParamsEnum, int] = {}
for i, key in enumerate(param_order):
product_args.append(param_multiplicands[key])
sweep_param_to_product_idx[key] = i
products: List[Tuple[Any, ...]] = []
generate_params: List[GenerateParams] = []
for this_product in itertools.product(*product_args, repeat=1):
products.append(this_product)
"""For each of the x, y, z, and rvert elements of the odometry noise, apply the value
stored in the included_params dictionary if it is a key; if not, then default to the
value stored in base_generate_params."""
odometry_noise_var = {}
if (
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_X
in included_params
):
odometry_noise_var[GenerateParams.OdomNoiseDims.X] = this_product[
sweep_param_to_product_idx[
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_X
]
]
else:
odometry_noise_var[
GenerateParams.OdomNoiseDims.X
] = base_generate_params.odometry_noise_var[
GenerateParams.OdomNoiseDims.X
]
if (
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_Y
in included_params
):
odometry_noise_var[GenerateParams.OdomNoiseDims.Y] = this_product[
sweep_param_to_product_idx[
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_Y
]
]
else:
odometry_noise_var[
GenerateParams.OdomNoiseDims.Y
] = base_generate_params.odometry_noise_var[
GenerateParams.OdomNoiseDims.Y
]
if (
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_Z
in included_params
):
odometry_noise_var[GenerateParams.OdomNoiseDims.Z] = this_product[
sweep_param_to_product_idx[
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_Z
]
]
else:
odometry_noise_var[
GenerateParams.OdomNoiseDims.Z
] = base_generate_params.odometry_noise_var[
GenerateParams.OdomNoiseDims.Z
]
if (
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_RVERT
in included_params
):
odometry_noise_var[GenerateParams.OdomNoiseDims.RVERT] = this_product[
sweep_param_to_product_idx[
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_RVERT
]
]
else:
odometry_noise_var[
GenerateParams.OdomNoiseDims.RVERT
] = base_generate_params.odometry_noise_var[
GenerateParams.OdomNoiseDims.RVERT
]
generate_params.append(
GenerateParams(
dataset_name=base_generate_params.dataset_name,
dist_threshold=base_generate_params.dist_threshold,
aoa_threshold=base_generate_params.aoa_threshold,
tag_size=base_generate_params.tag_size,
odometry_noise_var=odometry_noise_var,
obs_noise_var=this_product[
sweep_param_to_product_idx[
GenerateParams.GenerateParamsEnum.OBS_NOISE_VAR
]
]
if GenerateParams.GenerateParamsEnum.OBS_NOISE_VAR
in included_params
else base_generate_params.obs_noise_var,
t_max=base_generate_params.t_max,
n_poses=base_generate_params.n_poses,
parameterized_path_args=base_generate_params.parameterized_path_args,
)
)
return products, generate_params
# noinspection DuplicatedCode
@classmethod
def alt_generate_params_generator(
cls,
alt_param_multiplicands: Dict[AltGenerateParamsEnum, np.ndarray],
base_generate_params: "GenerateParams",
hold_rvert_at: float,
ratio_xz_to_y_lin_vel_var: float = 1,
) -> Tuple[List[Tuple[Any, ...]], List["GenerateParams"]]:
"""
Acts as a wrapper around the generate_params_generator class method that utilizes a
parameter sweeping space defined by the parameters in the AltGenerateParamsEnum enumeration.
Generates GenerateParams objects according to the cartesian product of the contents of
alt_param_multiplicands.
Args:
alt_param_multiplicands: Dictionary mapping parameters to arrays of values whose
cartesian product is taken.
base_generate_params: Supplies every parameter not prescribed by param_multiplicands.
hold_rvert_at: Because AltGenerateParamsEnum.LIN_TO_ANG_VEL_VAR is a ratio, the
rotational part of the odometry noise is held constant with this value.
ratio_xz_to_y_lin_vel_var: Before the X, Y, and Z elements of the unit-magnitude linear
velocity variance vector are scaled, this sets the X:Y and Z:Y ratios of the
vector's elements.
Returns:
A list of the outputs from the cartesian product and the corresponding GenerateParams
objects.
Raises:
NotImplementedError: If there is an unhandled value in the AltGenerateParamsEnum
enumeration.
"""
# Expand the alt_param_multiplicands argument into a form that can be used in the
# GenerateParams.generate_params_generator method.
param_multiplicands: Dict[GenerateParams.GenerateParamsEnum, np.ndarray] = {}
for key, values in alt_param_multiplicands.items():
if key == cls.AltGenerateParamsEnum.OBS_NOISE_VAR:
param_multiplicands[
GenerateParams.GenerateParamsEnum.OBS_NOISE_VAR
] = values
elif key == cls.AltGenerateParamsEnum.LIN_TO_ANG_VEL_VAR:
# Ignore the values provided in alt_param_multiplicands because we are not
# interested in the cartesian product between each of the X, Y, Z, and rvert
# elements of the linear and angular velocity variance. Instead, the values provided
# in alt_param_multiplicands are applied to the result of the cartesian product.
lin_vel_var_unit = np.ones(3) * np.array(
[ratio_xz_to_y_lin_vel_var, 1, ratio_xz_to_y_lin_vel_var]
)
lin_vel_var_unit /= np.linalg.norm(lin_vel_var_unit)
param_multiplicands[
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_X
] = np.array(
[
hold_rvert_at * lin_vel_var_unit[0],
]
)
param_multiplicands[
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_Y
] = np.array(
[
hold_rvert_at * lin_vel_var_unit[1],
]
)
param_multiplicands[
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_Z
] = np.array(
[
hold_rvert_at * lin_vel_var_unit[2],
]
)
param_multiplicands[
GenerateParams.GenerateParamsEnum.ODOMETRY_NOISE_VAR_RVERT
] = np.array(
[
hold_rvert_at,
]
)
else:
raise NotImplementedError(
"Encountered unhandled parameter: " + str(key)
)
param_order = sorted(list(param_multiplicands.keys()))
param_to_param_order_idx: Dict[GenerateParams.GenerateParamsEnum, int] = {}
for i, param in enumerate(param_order):
param_to_param_order_idx[param] = i
(
products_intermediate,
generate_params_intermediate,
) = GenerateParams.generate_params_generator(
param_multiplicands=param_multiplicands,
param_order=param_order,
base_generate_params=base_generate_params,
)
# Apply the linear and angular velocity variance values provided in alt_param_multiplicands.
products_orig_space: List[Tuple[Any, ...]] = []
generate_params_objects: List[GenerateParams] = []
for product_pre, generate_param_pre in zip(
products_intermediate, generate_params_intermediate
):
for value in alt_param_multiplicands[
cls.AltGenerateParamsEnum.LIN_TO_ANG_VEL_VAR
]:
new_generate_param: GenerateParams = cls.copy(generate_param_pre)
new_generate_param.odometry_noise_var = {
cls.OdomNoiseDims.X: generate_param_pre.odometry_noise_var[
cls.OdomNoiseDims.X
]
* value,
cls.OdomNoiseDims.Y: generate_param_pre.odometry_noise_var[
cls.OdomNoiseDims.Y
]
* value,
cls.OdomNoiseDims.Z: generate_param_pre.odometry_noise_var[
cls.OdomNoiseDims.Z
]
* value,
cls.OdomNoiseDims.RVERT: generate_param_pre.odometry_noise_var[
cls.OdomNoiseDims.RVERT
],
}
generate_params_objects.append(new_generate_param)
new_product = list(product_pre)
new_product[
param_to_param_order_idx[
cls.GenerateParamsEnum.ODOMETRY_NOISE_VAR_X
]
] = new_generate_param.odometry_noise_var[cls.OdomNoiseDims.X]
new_product[
param_to_param_order_idx[
cls.GenerateParamsEnum.ODOMETRY_NOISE_VAR_X
]
] = new_generate_param.odometry_noise_var[cls.OdomNoiseDims.X]
new_product[
param_to_param_order_idx[
cls.GenerateParamsEnum.ODOMETRY_NOISE_VAR_X
]
] = new_generate_param.odometry_noise_var[cls.OdomNoiseDims.X]
new_product[
param_to_param_order_idx[
cls.GenerateParamsEnum.ODOMETRY_NOISE_VAR_X
]
] = new_generate_param.odometry_noise_var[cls.OdomNoiseDims.X]
products_orig_space.append(tuple(new_product))
return products_orig_space, generate_params_objects
def __hash__(self):
# TODO: there are more efficient ways to do this, but this works for now
return self.json().__hash__()
class UGDataSet(BaseModel):
"""
Represents an unprocessed graph dataset.
Attributes:
location_data: A list of UGLocationDatums representing the location data of the dataset
map_id: A string representing the id of the map being generated
plane_data: A list of plane_data *NOT CURRENTLY BEING USED*
pose_data: A list of UGPoseDatum representing the poses recorded for the map
tag_data: A list of UGTagDatum representing the tags recorded for the map
generated_from: GenerateParams
Notes:
All attributes except `generated_from` are necessary for deserializing data from the
datasets generated by the client app. The `generated_from` attribute is only used when the
data set generated is synthetic.
"""
location_data: List[UGLocationDatum] = []
map_id: str
plane_data: List = []
pose_data: List[UGPoseDatum]
tag_data: List[List[UGTagDatum]] = []
generated_from: Optional[GenerateParams] = None
# TODO: Add documentation for the following properties
@property
def num_tags(self) -> int:
return len(np.unique(self.tag_ids))
@property
def num_observations(self) -> int:
num_observations = 0
for tag_obs_list in self.tag_data:
num_observations += len(tag_obs_list)
return num_observations
@property