Skip to content

Commit bf9fabc

Browse files
committed
Refactor hub.py: extract _store_frame helper and add sleep in _reader_loop
- Extract duplicate frame storage logic into _store_frame() helper used by both read_frame() and _reader_loop(). Reduces code duplication and centralizes lock handling. - Add 1ms sleep in _reader_loop when no data available, matching behavior in read_frame() to prevent busy spin if serial_timeout=0.
1 parent 04f934e commit bf9fabc

1 file changed

Lines changed: 27 additions & 20 deletions

File tree

sensx/hub.py

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,25 @@ def _is_valid_header(self, buf: bytearray, idx: int) -> bool:
263263
return False
264264
return buf[idx:idx+2] in (HEADER_A, HEADER_B)
265265

266+
def _store_frame(
267+
self, header: bytes, frame: np.ndarray, ts: float
268+
) -> Optional[Callable[[np.ndarray, float], None]]:
269+
"""Store frame to internal buffer and return the appropriate callback.
270+
271+
Thread-safe storage for latest frame and timestamp.
272+
Returns the callback (on_frame_a or on_frame_b) to invoke, or None.
273+
"""
274+
if header == HEADER_A:
275+
with self._lock_a:
276+
self._frame_a[:] = frame
277+
self._ts_a = ts
278+
return self.on_frame_a
279+
else:
280+
with self._lock_b:
281+
self._frame_b[:] = frame
282+
self._ts_b = ts
283+
return self.on_frame_b
284+
266285
# ------------------------------------------------------------------
267286
# Synchronous (blocking) reads
268287
# ------------------------------------------------------------------
@@ -295,14 +314,7 @@ def read_frame(self) -> Tuple[bytes, np.ndarray]:
295314
del buf[: idx + fsize]
296315
frame = self._parse_frame(header, raw)
297316
ts = time.perf_counter()
298-
if header == HEADER_A:
299-
with self._lock_a:
300-
self._frame_a[:] = frame
301-
self._ts_a = ts
302-
else:
303-
with self._lock_b:
304-
self._frame_b[:] = frame
305-
self._ts_b = ts
317+
self._store_frame(header, frame, ts)
306318
return header, frame
307319
else:
308320
# False header found inside payload - skip it
@@ -373,6 +385,8 @@ def _reader_loop(self) -> None:
373385
except serial.SerialException:
374386
break
375387
if not chunk:
388+
# No data available -- brief yield to prevent busy spin
389+
time.sleep(0.001)
376390
continue
377391

378392
buf += chunk
@@ -406,23 +420,16 @@ def _reader_loop(self) -> None:
406420

407421
frame = self._parse_frame(header, raw)
408422
ts = time.perf_counter()
409-
410-
if header == HEADER_A:
411-
with self._lock_a:
412-
self._frame_a[:] = frame
413-
self._ts_a = ts
414-
cb = self.on_frame_a
415-
else:
416-
with self._lock_b:
417-
self._frame_b[:] = frame
418-
self._ts_b = ts
419-
cb = self.on_frame_b
423+
cb = self._store_frame(header, frame, ts)
420424

421425
if cb is not None:
422426
try:
423427
cb(frame, ts)
424428
except Exception:
425-
logger.exception("Error in %s callback", "on_frame_a" if header == HEADER_A else "on_frame_b")
429+
logger.exception(
430+
"Error in %s callback",
431+
"on_frame_a" if header == HEADER_A else "on_frame_b"
432+
)
426433

427434
# Prevent unbounded growth
428435
if len(buf) > max_frame * 8:

0 commit comments

Comments
 (0)