From f12c3aa395086b69c99406682d012f034270d623 Mon Sep 17 00:00:00 2001 From: tdy Date: Mon, 8 Aug 2016 18:48:00 -0500 Subject: [PATCH] Port the g.Tec driver to PyUSB 1.0 --- libmushu/driver/gtec.py | 125 +++++++++++++++++++--------------------- requirements.txt | 2 +- 2 files changed, 60 insertions(+), 67 deletions(-) diff --git a/libmushu/driver/gtec.py b/libmushu/driver/gtec.py index f3d60ac..647ad6b 100644 --- a/libmushu/driver/gtec.py +++ b/libmushu/driver/gtec.py @@ -18,14 +18,13 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -# TODO: update to new version of pyusb - import struct import time from exceptions import Exception import logging -import usb +import usb.core +import usb.util from scipy.signal import iirfilter import numpy as np @@ -36,7 +35,7 @@ logger.info('Logger started') ID_VENDOR_GTEC = 0x153c -# I saw an am with this vendorid too +# I saw an amp with this vendorid too ID_VENDOR_GTEC2 = 0x15c3 ID_PRODUCT_GUSB_AMP = 0x0001 @@ -47,49 +46,50 @@ class GUSBamp(Amplifier): def __init__(self): logger.info('Initializing GUSBamp instance') - # list of available amps - self.amps = [] - for bus in usb.busses(): - for device in bus.devices: - if (device.idVendor in [ID_VENDOR_GTEC, ID_VENDOR_GTEC2] and - device.idProduct == ID_PRODUCT_GUSB_AMP): - self.amps.append(device) - self.devh = None + # find the amplifier + self.dev = usb.core.find(custom_match= + lambda d: + d.idVendor in (ID_VENDOR_GTEC, ID_VENDOR_GTEC2) and + d.idProduct == ID_PRODUCT_GUSB_AMP) + assert(self.dev is not None) self.mode = None - # Initialize the amplifier and make it ready. - device = self.amps[0] - self.devh = device.open() - # detach kernel driver if nessecairy - config = device.configurations[0] - self.devh.setConfiguration(config) - assert(len(config.interfaces) > 0) - # sometimes it is the other one - first_interface = config.interfaces[0][0] - if first_interface is None: - first_interface = config.interfaces[0][1] - first_setting = first_interface.alternateSetting - self.devh.claimInterface(first_interface) - self.devh.setAltInterface(first_interface) - # initialization straight from the usb-dump + # activate its first configuration + self.dev.set_configuration() + cfg = self.dev[0] + assert(cfg.bNumInterfaces > 0) + # access its interface + intf = cfg[(0,0)] + if intf is None: + intf = cfg[(0,1)] + # detach if necessary + intf_num = intf.bInterfaceNumber + if self.dev.is_kernel_driver_active(intf_num): + self.dev.detach_kernel_driver(intf_num) + usb.util.claim_interface(self.dev, intf_num) + try: + intf.set_altsetting() + except USBError: + pass + # initialize straight from the usb-dump self.set_mode('data') - self.devh.controlMsg(CX_OUT, 0xb6, value=0x80, buffer=0) - self.devh.controlMsg(CX_OUT, 0xb5, value=0x80, buffer=0) - self.devh.controlMsg(CX_OUT, 0xb9, value=0x00, buffer="\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10") + self.dev.ctrl_transfer(CX_OUT, 0xb6, wValue=0x80, data_or_wLength=0) + self.dev.ctrl_transfer(CX_OUT, 0xb5, wValue=0x80, data_or_wLength=0) + self.dev.ctrl_transfer(CX_OUT, 0xb9, wValue=0x00, data_or_wLength="\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10") self.set_slave_mode(False) - self.devh.controlMsg(CX_OUT, 0xd3, value=0x01, buffer=0) - self.devh.controlMsg(CX_OUT, 0xca, value=0x01, buffer=0) - self.devh.controlMsg(CX_OUT, 0xc8, value=0x01, buffer="\x00"*16) + self.dev.ctrl_transfer(CX_OUT, 0xd3, wValue=0x01, data_or_wLength=0) + self.dev.ctrl_transfer(CX_OUT, 0xca, wValue=0x01, data_or_wLength=0) + self.dev.ctrl_transfer(CX_OUT, 0xc8, wValue=0x01, data_or_wLength="\x00"*16) self.set_common_reference() self.set_common_ground() self.set_calibration_mode('sine') - self.set_sampling_ferquency(128, [False for i in range(16)], None, None) + self.set_sampling_frequency(128, [False for i in range(16)], None, None) def start(self): - self.devh.controlMsg(CX_OUT, 0xb5, value=0x08, buffer=0) - self.devh.controlMsg(CX_OUT, 0xf7, value=0x00, buffer=0) + self.dev.ctrl_transfer(CX_OUT, 0xb5, wValue=0x08, data_or_wLength=0) + self.dev.ctrl_transfer(CX_OUT, 0xf7, wValue=0x00, data_or_wLength=0) def stop(self): - self.devh.controlMsg(CX_OUT, 0xb8, []) + self.dev.ctrl_transfer(CX_OUT, 0xb8, data_or_wLength=[]) def get_data(self): """Get data.""" @@ -101,7 +101,7 @@ def get_data(self): size = 2028 #512 try: # TODO what is the optimal timeout here? - data = self.devh.bulkRead(endpoint, size, 100) + data = self.dev.read(endpoint, size, timeout=100) except usb.USBError: data = [] data = ''.join(map(chr, data)) @@ -125,7 +125,7 @@ def get_channels(self): def is_available(): for bus in usb.busses(): for device in bus.devices: - if (device.idVendor in [ID_VENDOR_GTEC, ID_VENDOR_GTEC2] and + if (device.idVendor in (ID_VENDOR_GTEC, ID_VENDOR_GTEC2) and device.idProduct == ID_PRODUCT_GUSB_AMP): return True return False @@ -135,24 +135,23 @@ def is_available(): ########################################################################### def set_mode(self, mode): - """Set mode, 'impedance', 'data'.""" + """Set mode, 'impedance', 'calibrate', 'data'.""" if mode == 'impedance': - self.devh.controlMsg(CX_OUT, 0xc9, value=0x00, buffer=0) - self.devh.controlMsg(CX_OUT, 0xc2, value=0x03, buffer=0) + self.dev.ctrl_transfer(CX_OUT, 0xc9, wValue=0x00, data_or_wLength=0) + self.dev.ctrl_transfer(CX_OUT, 0xc2, wValue=0x03, data_or_wLength=0) self.mode = 'impedance' elif mode == 'calibrate': - self.devh.controlMsg(CX_OUT, 0xc1, value=0x00, buffer=0) - self.devh.controlMsg(CX_OUT, 0xc2, value=0x02, buffer=0) + self.dev.ctrl_transfer(CX_OUT, 0xc1, wValue=0x00, data_or_wLength=0) + self.dev.ctrl_transfer(CX_OUT, 0xc2, wValue=0x02, data_or_wLength=0) self.mode = 'calibration' elif mode == 'data': - self.devh.controlMsg(CX_OUT, 0xc0, value=0x00, buffer=0) - self.devh.controlMsg(CX_OUT, 0xc2, value=0x01, buffer=0) + self.dev.ctrl_transfer(CX_OUT, 0xc0, wValue=0x00, data_or_wLength=0) + self.dev.ctrl_transfer(CX_OUT, 0xc2, wValue=0x01, data_or_wLength=0) self.mode = 'data' else: raise AmpError('Unknown mode: %s' % mode) - - def set_sampling_ferquency(self, fs, channels, bpfilter, notchfilter): + def set_sampling_frequency(self, fs, channels, bpfilter, notchfilter): """ Set the sampling frequency and filters for individual channels. Parameters: @@ -198,19 +197,18 @@ def set_sampling_ferquency(self, fs, channels, bpfilter, notchfilter): # set the filters for all channels if bpfilter == notchfilter == None: - self.devh.controlMsg(CX_OUT, 0xc6, value=0x01, buffer=bp_filter) - self.devh.controlMsg(CX_OUT, 0xc7, value=0x01, buffer=bs_filter) + self.dev.ctrl_transfer(CX_OUT, 0xc6, wValue=0x01, data_or_wLength=bp_filter) + self.dev.ctrl_transfer(CX_OUT, 0xc7, wValue=0x01, data_or_wLength=bs_filter) else: idx = 1 for i in channels: if i: - self.devh.controlMsg(CX_OUT, 0xc6, value=idx, buffer=bp_filter) - self.devh.controlMsg(CX_OUT, 0xc7, value=idx, buffer=bs_filter) + self.dev.ctrl_transfer(CX_OUT, 0xc6, wValue=idx, data_or_wLength=bp_filter) + self.dev.ctrl_transfer(CX_OUT, 0xc7, wValue=idx, data_or_wLength=bs_filter) idx += 1 # set the sampling frequency - self.devh.controlMsg(CX_OUT, 0xb6, value=fs, buffer=0) - + self.dev.ctrl_transfer(CX_OUT, 0xb6, wValue=fs, data_or_wLength=0) def set_calibration_mode(self, mode): # buffer: [0x03, 0xd0, 0x07, 0x02, 0x00, 0xff, 0x07] @@ -218,20 +216,19 @@ def set_calibration_mode(self, mode): # (1) mode: # (2) amplitude: little endian (0x07d0 = 2000) if mode == 'sine': - self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x03\xd0\x07\x02\x00\xff\x07") + self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x03\xd0\x07\x02\x00\xff\x07") elif mode == 'sawtooth': - self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x02\xd0\x07\x02\x00\xff\x07") + self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x02\xd0\x07\x02\x00\xff\x07") elif mode == 'whitenoise': - self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x05\xd0\x07\x02\x00\xff\x07") + self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x05\xd0\x07\x02\x00\xff\x07") elif mode == 'square': - self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x01\xd0\x07\x02\x00\xff\x07") + self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x01\xd0\x07\x02\x00\xff\x07") else: raise AmpError('Unknown mode: %s' % mode) def calculate_impedance(self, u_measured, u_applied=1e4): return (u_measured * 1e6) / (u_applied - u_measured) - 1e4 - def set_common_ground(self, a=False, b=False, c=False, d=False): """Set common ground for the electrodes. @@ -241,8 +238,7 @@ def set_common_ground(self, a=False, b=False, c=False, d=False): """ v = (d << 3) + (c << 2) + (b << 1) + a - self.devh.controlMsg(CX_OUT, 0xbe, value=v, buffer=0) - + self.dev.ctrl_transfer(CX_OUT, 0xbe, wValue=v, data_or_wLength=0) def set_common_reference(self, a=False, b=False, c=False, d=False): """Set common reference for the electrodes. @@ -253,8 +249,7 @@ def set_common_reference(self, a=False, b=False, c=False, d=False): """ v = (d << 3) + (c << 2) + (b << 1) + a - self.devh.controlMsg(CX_OUT, 0xbf, value=v, buffer=0) - + self.dev.ctrl_transfer(CX_OUT, 0xbf, wValue=v, data_or_wLength=0) def set_slave_mode(self, slave): """Set amp into slave or master mode. @@ -264,15 +259,13 @@ def set_slave_mode(self, slave): """ v = 1 if slave else 0 - self.devh.controlMsg(CX_OUT, 0xcd, value=v, buffer=0) + self.dev.ctrl_transfer(CX_OUT, 0xcd, wValue=v, data_or_wLength=0) class AmpError(Exception): pass - - def main(): amp = GUSBamp() amp.start() diff --git a/requirements.txt b/requirements.txt index 44046de..7f573e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,5 @@ scipy>=0.10.1 sphinx # amplifier crypto>=1.0.0 -pyusb>=1.0.0a3 +pyusb>=1.0.0 pylsl>=1.10.4