-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutil.py
More file actions
257 lines (205 loc) · 6.7 KB
/
util.py
File metadata and controls
257 lines (205 loc) · 6.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import inspect
from time import time
from functools import wraps, reduce
from operator import itemgetter
from pathlib import Path
from collections.abc import Mapping
from typing import Collection
#Create Number type
Number = float|int
def unprinted(iterable):
return set(filter(lambda s:not s.printed, iterable))
class GCodeException(Exception):
"""Utility class so we can get an object for debugging easily. Use in this
code like:
raise GCodeException(segs, 'here are segments')
then (e.g. in Jupyter notebook):
try:
steps.plot()
except GCodeException as e:
print(len(e.obj), 'segments to print')
"""
def __init__(self, obj, message):
self.obj = obj
self.message = message
#https://stackoverflow.com/a/31174427/49663
def rgetattr(obj, attr, *args):
def _getattr(obj, attr):
return getattr(obj, attr, *args)
return reduce(_getattr, [obj] + attr.split('.'))
def rsetattr(obj, attr, val):
pre, _, post = attr.rpartition('.')
return setattr(rgetattr(obj, pre) if pre else obj, post, val)
#Modified from https://stackoverflow.com/a/2123602/49663
def attrhelper(attr, after=None):
"""Generate functions which get and set the given attr. If the parent object
has a '.attr_changed' function or if after is defined, and the attr changes,
call that function with arguments (attr, old_value, new_value):
class Foo:
a = property(**attrsetter('_a'))
b = property(**attrsetter('_b'))
def attr_changed(self, attr, old_value, new_value):
print(f'{attr} changed from {old_value} to {new_value}')
Uses rgetattr and rsetattr to work with nested values like '_a.x'.
"""
def set_any(self, value):
old_value = rgetattr(self, attr)
rsetattr(self, attr, value)
if value != old_value:
f = getattr(self, 'attr_changed', after) or (lambda a,b,c: 0)
f(attr, old_value, value)
def get_any(self):
return rgetattr(self, attr)
return {'fget': get_any, 'fset': set_any}
def sign(n):
return -1 if n < 0 else (1 if n > 0 else 0)
def linf(frame=0):
"""Return a string of information about the calling line in the file, or
frame number of frames previous."""
_, fn, ln, func, ctx, _ = inspect.stack()[frame+1]
return '[' + Path(fn).stem + ('' if func == '<module>' else f'.{func}') + f':{ln}]'
def construct_lines_rel2abs(gc_lines, start=0):
"""Construct a list of GCLine objects, replacing the existing E values with
absolute extrusion amounts based on each GCLine's relative_extrude value.
Use start as the starting extrusion value. Return the constructed list and
the ending extrusion value."""
r = []
ext_val = start
for line in gc_lines:
if 'E' in line.args and line.code in ['G0', 'G1']:
try:
ext_val += line.relative_extrude
except AttributeError:
r.append(line.construct())
else:
r.append(line.construct(E=f'{ext_val:.5f}'))
else:
r.append(line.construct())
return r, ext_val
def find_lineno(lineno, steps=None, gcsegs=None, gc_lines=None):
if gc_lines:
return any([l for l in gc_lines if l.lineno == lineno])
if gcsegs:
return {f'Seg {i}': seg for i,seg in enumerate(gcsegs) if
find_lineno(lineno, gc_lines=seg.gc_lines)}
if steps:
return dict(filter(itemgetter(1),
[(f'Step {i}', find_lineno(lineno, gcsegs=step.gcsegs))
for i,step in enumerate(steps)]))
def listsplit(l, sepfunc, maxsplit=-1, keepsep='>', minsize=0):
"""Return list l divided into chunks, separated whenever function sepfunc(line)
is True. Discard the separator if keepsep is False. If keepsep is '<' save the
separator at the end of the chunk; if it's '>' save it in the start of the
next chunk. If a chunk is less than minsize in length, combine it with
the next chunk."""
r = []
a = []
ll = iter(l)
for e in ll:
if sepfunc(e) and a:
if keepsep == '<':
a.append(e)
if len(a) >= minsize:
r.append(a)
a = []
if keepsep == '>':
a.append(e)
if maxsplit > -1 and len(r) >= maxsplit:
r.append(a + list(ll))
a = []
break
else:
a.append(e)
if a:
r.append(a)
return r
def find(lst, func=None, rev=False):
"""Return the first item in the list that is true. Pass an optional
evaluation function. Set rev to True to search backwards."""
lst = reversed(lst) if rev else lst
return next(filter(func, lst), None)
def listsplit2(l, sepfunc, maxsplit=-1, keepsep='>'):
r = []
start = 0
end = None
for i,e in enumerate(l):
if sepfunc(e):
end = i
if keepsep == '<':
end += 1
r.append(l[start:end])
if maxsplit > -1 and len(r) >= maxsplit:
r.append(l[end:])
end = None
break
start = end + 1
end = None
else:
end = i
if end:
r.append(l[start:end+1])
return r
def timing(f):
@wraps(f)
def wrap(*args, **kwargs):
start = time()
result = f(*args, **kwargs)
end = time()
print(f'{f.__name__} took {end-start:2.4f}s')
return result
return wrap
#Source https://github.com/pydantic/pydantic/blob/317bef33b06e05f65f57b1ba009dfd949b462689/pydantic/utils.py#L215
def deep_update(mapping, *updating_mappings):
updated_mapping = mapping.copy()
for updating_mapping in updating_mappings:
for k, v in updating_mapping.items():
if k in updated_mapping and isinstance(updated_mapping[k], dict) and isinstance(v, dict):
updated_mapping[k] = deep_update(updated_mapping[k], v)
else:
updated_mapping[k] = v
return updated_mapping
class Saver:
"""Save values for variables in save_vars that have changed."""
def __init__(self, obj:object, save_vars:Collection[str]):
self.saved = {v: rgetattr(obj, v) for v in save_vars}
self.obj = obj
self.changed = {}
def __enter__(self):
return self
def __exit__(self, exc_type, value, tb):
if exc_type is not None:
return False
for var,oldval in self.saved.items():
newval = rgetattr(self.obj, var)
if newval != oldval:
self.changed[var] = newval
def __repr__(self):
return('Saver:\n\tChanged:' + '\n'.join([
f'\t\t{var}: {self.saved[var]} -> {val}'
for var,val in self.changed.items()]) + '\n\tUnchanged:\n' +
'\n'.join([f'\t\t{var}: {self.saved[var]}' for var in self.saved if var
not in self.changed])
)
def __bool__(self):
"""Return True if any of the saved variables have changed."""
return bool(self.originals)
@property
def originals(self):
"""Return the original values for every saved variable that changed."""
return {var: self.saved[var] for var in self.changed}
class ReadOnlyDict(Mapping):
"""A read-only dict."""
def __init__(self, *args, **kwargs):
self._dict = dict(*args, **kwargs)
def __getitem__(self, key):
return self._dict[key]
def __iter__(self):
return iter(self._dict)
def __len__(self):
return len(self._dict)
def __repr__(self):
return repr(self._dict)
def __hash__(self):
return hash(self._dict)
def __eq__(self, other):
return hash(self) == hash(other)