Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 70 additions & 29 deletions can/interfaces/gs_usb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Any

import usb
from gs_usb.constants import CAN_EFF_FLAG, CAN_ERR_FLAG, CAN_MAX_DLC, CAN_RTR_FLAG
Expand All @@ -12,17 +13,46 @@
logger = logging.getLogger(__name__)


def _find_gs_usb_devices(
bus: int | None = None, address: int | None = None
) -> list[usb.core.Device]:
"""Find raw USB devices for gs_usb using auto-detected backend.

Unlike :meth:`GsUsb.scan`, this does not force the ``libusb1`` backend,
allowing ``pyusb`` to auto-detect the best available backend. This enables
support for WinUSB on Windows in addition to libusbK.

:param bus: number of the bus that the device is connected to
:param address: address of the device on the bus it is connected to
:return: a list of found raw USB devices
"""
kwargs = {}
if bus is not None:
kwargs["bus"] = bus
if address is not None:
kwargs["address"] = address

return list(
usb.core.find(
find_all=True,
custom_match=GsUsb.is_gs_usb_device,
**kwargs,
)
or []
)


class GsUsbBus(can.BusABC):
def __init__(
self,
channel,
channel: can.typechecking.Channel,
bitrate: int = 500_000,
index=None,
bus=None,
address=None,
can_filters=None,
**kwargs,
):
index: int | None = None,
bus: int | None = None,
address: int | None = None,
can_filters: can.typechecking.CanFilters | None = None,
**kwargs: Any,
) -> None:
"""
:param channel: usb device name
:param index: device number if using automatic scan, starting from 0.
Expand All @@ -32,31 +62,41 @@ def __init__(
:param can_filters: not supported
:param bitrate: CAN network bandwidth (bits/s)
"""
self._is_shutdown = False
if (index is not None) and ((bus or address) is not None):
raise CanInitializationError(
"index and bus/address cannot be used simultaneously"
)

if index is None and address is None and bus is None:
index = channel
_index: Any = channel
else:
_index = index

self._index = None
if index is not None:
devs = GsUsb.scan()
if len(devs) <= index:
self._index: int | None = None
if _index is not None:
if not isinstance(_index, int):
try:
_index = int(_index)
except (ValueError, TypeError):
raise CanInitializationError(
f"index must be an integer, but got {type(_index).__name__} ({_index})"
) from None

devs = _find_gs_usb_devices()
if len(devs) <= _index:
raise CanInitializationError(
f"Cannot find device {index}. Devices found: {len(devs)}"
f"Cannot find device {_index}. Devices found: {len(devs)}"
)
gs_usb = devs[index]
self._index = index
gs_usb_dev = devs[_index]
self._index = _index
else:
gs_usb = GsUsb.find(bus=bus, address=address)
if not gs_usb:
devs = _find_gs_usb_devices(bus=bus, address=address)
if not devs:
raise CanInitializationError(f"Cannot find device {channel}")
gs_usb_dev = devs[0]

self.gs_usb = gs_usb
self.channel_info = channel
self.gs_usb = GsUsb(gs_usb_dev)
self.channel_info = str(channel)
self._can_protocol = can.CanProtocol.CAN_20

bit_timing = can.BitTiming.from_sample_point(
Expand All @@ -81,7 +121,7 @@ def __init__(
**kwargs,
)

def send(self, msg: can.Message, timeout: float | None = None):
def send(self, msg: can.Message, timeout: float | None = None) -> None:
"""Transmit a message to the CAN bus.

:param Message msg: A message object.
Expand Down Expand Up @@ -139,7 +179,7 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo
frame = GsUsbFrame()

# Do not set timeout as None or zero here to avoid blocking
timeout_ms = round(timeout * 1000) if timeout else 1
timeout_ms = round(timeout * 1000) if timeout else 0
if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms):
return None, False

Expand All @@ -158,21 +198,22 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo
return msg, False

def shutdown(self):
if self._is_shutdown:
already_shutdown = self._is_shutdown
super().shutdown()
if already_shutdown:
return

super().shutdown()
self.gs_usb.stop()
if self._index is not None:
# Avoid errors on subsequent __init() by repeating the .scan() and .start() that would otherwise fail
# the next time the device is opened in __init__()
devs = GsUsb.scan()
# Avoid errors on subsequent __init() by repeating the .scan() and
# .start() that would otherwise fail the next time the device is
# opened in __init__()
devs = _find_gs_usb_devices()
if self._index < len(devs):
gs_usb = devs[self._index]
gs_usb = GsUsb(devs[self._index])
try:
gs_usb.set_bitrate(self._bitrate)
gs_usb.start()
gs_usb.stop()
except usb.core.USBError:
pass
self._is_shutdown = True
1 change: 1 addition & 0 deletions doc/changelog.d/2031.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
make gs_usb use pyusb (allows WinUSB instead of requiring libusbK on windows) also timeout=None means foreever
6 changes: 4 additions & 2 deletions doc/interfaces/gs_usb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ Windows, Linux and Mac.
The backend driver depends on `pyusb <https://pyusb.github.io/pyusb/>`_ so a ``pyusb`` backend driver library such as
``libusb`` must be installed.

On Windows a tool such as `Zadig <https://zadig.akeo.ie/>`_ can be used to set the USB device driver to
``libusbK``.
On Windows, WinUSB and libusbK are both supported. Devices with WCID (Windows Compatible ID) descriptors,
such as candleLight firmware, will automatically use WinUSB without any additional driver installation.
Alternatively, a tool such as `Zadig <https://zadig.akeo.ie/>`_ can be used to set the USB device driver to
either ``WinUSB`` or ``libusbK``.


Supplementary Info
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ neovi = ["filelock", "python-ics>=2.12"]
canalystii = ["canalystii>=0.1.0"]
cantact = ["cantact>=0.0.7"]
cvector = ["python-can-cvector"]
gs-usb = ["gs-usb>=0.2.1"]
gs-usb = ["gs-usb>=0.2.1", "pyusb>=1.0.2"]
nixnet = ["nixnet>=0.3.2"]
pcan = ["uptime~=3.0.1"]
remote = ["python-can-remote"]
Expand Down
67 changes: 67 additions & 0 deletions test/test_interface_gs_usb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Tests for the gs_usb interface."""

from unittest.mock import MagicMock, patch

import pytest

from can.interfaces.gs_usb import (
GsUsbBus,
_find_gs_usb_devices,
)


@patch("can.interfaces.gs_usb.usb.core.find")
def test_find_devices_does_not_force_backend(mock_find):
"""Verify that _find_gs_usb_devices does not pass a backend argument,
allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.)."""
mock_find.return_value = []

_find_gs_usb_devices()

mock_find.assert_called_once()
call_kwargs = mock_find.call_args[1]
assert (
"backend" not in call_kwargs
), "backend should not be specified so pyusb can auto-detect"
assert call_kwargs["find_all"] is True


@patch("can.interfaces.gs_usb.usb.core.find")
def test_find_devices_with_args_does_not_force_backend(mock_find):
"""Verify that _find_gs_usb_devices with bus/address does not pass a backend argument."""
mock_find.return_value = []

_find_gs_usb_devices(bus=1, address=2)

mock_find.assert_called_once()
call_kwargs = mock_find.call_args[1]
assert (
"backend" not in call_kwargs
), "backend should not be specified so pyusb can auto-detect"
assert call_kwargs["bus"] == 1
assert call_kwargs["address"] == 2
assert call_kwargs["find_all"] is True


@patch("can.interfaces.gs_usb.usb.core.find")
def test_find_devices_returns_raw_usb_devices(mock_find):
"""Verify that _find_gs_usb_devices returns the raw USB devices."""
mock_dev1 = MagicMock()
mock_dev2 = MagicMock()
mock_find.return_value = [mock_dev1, mock_dev2]

devices = _find_gs_usb_devices()

assert len(devices) == 2
assert devices[0] is mock_dev1
assert devices[1] is mock_dev2


@patch("can.interfaces.gs_usb.usb.core.find")
def test_find_devices_returns_empty_list_when_no_devices(mock_find):
"""Verify that _find_gs_usb_devices returns an empty list when no devices are found."""
mock_find.return_value = []

devices = _find_gs_usb_devices()

assert devices == []
4 changes: 4 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependency_groups =
test
extras =
canalystii
gs-usb
mf4
multicast
pywin32
Expand All @@ -27,13 +28,16 @@ extras =
canalystii
mf4
multicast
gs-usb
serial
pywin32
serial
# still no windows-curses for py314

[testenv:{py313t,py314t,pypy310,pypy311}]
extras =
canalystii
gs-usb
serial

[testenv:docs]
Expand Down