diff --git a/rig/machine_control/machine_controller.py b/rig/machine_control/machine_controller.py index 4b582c7..a04ec48 100644 --- a/rig/machine_control/machine_controller.py +++ b/rig/machine_control/machine_controller.py @@ -1,5 +1,6 @@ """A high level interface for controlling a SpiNNaker system.""" +import sys import collections import functools import os @@ -10,6 +11,9 @@ import time import pkg_resources import warnings +import traceback + +from threading import Thread, Lock, Event from rig.machine_control.consts import \ SCPCommands, NNCommands, NNConstants, AppFlags, LEDAction @@ -134,6 +138,15 @@ def __init__(self, initial_host, scp_port=consts.SCP_PORT, self._width = None self._height = None + # Postponed operation queues for reads/writes issued with the postponed + # argument set. These are listed per-connection and are flushed by the + # flush_postponed_io method. Each queue contains a list of + # zero-argument function which carries out the required I/O operation. + # + # {connection: deque([f, ...]), ...} + self._postponed = collections.defaultdict(collections.deque) + self._postponed_lock = Lock() + def __call__(self, **context_args): """For use with `with`: set default argument values. @@ -461,7 +474,7 @@ def get_ip_address(self, x, y): return None @ContextMixin.use_contextual_arguments() - def write(self, address, data, x, y, p=0): + def write(self, address, data, x, y, p=0, postpone=False): """Write a bytestring to an address in memory. It is strongly encouraged to only read and write to blocks of memory @@ -476,17 +489,40 @@ def write(self, address, data, x, y, p=0): The address at which to start writing the data. Addresses are given within the address space of a SpiNNaker core. See the SpiNNaker datasheet for more information. - data : :py:class:`bytes` + data : :py:class:`bytes` or callable Data to write into memory. Writes are automatically broken into a sequence of SCP write commands. + + If a callable is given, it will be called when the write is about + to be carried out and must return a bytearray (or similar) to be + written. + postpone : bool + If False (the default), writes are performed straight away and this + function returns once the write completes. + + If True, the write will be queued and carried out in parallel when + :py:meth:`.flush_postponed_io` is called. The data object must + remain valid until this function returns. If a callable is passed + as the data argument, the callable may be called from another + thread and with in specific order with respect to other calls to + this function. """ # Call the SCPConnection to perform the write on our behalf connection = self._get_connection(x, y) - return connection.write(self.scp_data_length, self.scp_window_size, - x, y, p, address, data) + + def f(): + connection.write(self.scp_data_length, self.scp_window_size, + x, y, p, address, + data() if callable(data) else data) + + if postpone: + self._postponed[connection].append(f) + else: + f() @ContextMixin.use_contextual_arguments() - def read(self, address, length_bytes, x, y, p=0): + def read(self, address, length_bytes, x, y, p=0, + on_read=None, postpone=False): """Read a bytestring from an address in memory. Parameters @@ -496,16 +532,54 @@ def read(self, address, length_bytes, x, y, p=0): length_bytes : int The number of bytes to read from memory. Large reads are transparently broken into multiple SCP read commands. + on_read : callable + If supplied, this is called with the read data as an argument when + the read completes. Otherwise, the read data is returned directly. + postpone : bool + If False (the default), the read will occur immediately and the + read value returned or used as an argument to on_read. + + If True, the read will be performed when + :py:meth:`.flush_postponed_io` is called and an uninitialised + bytearray will be returned. When the read actually occurs, this + bytearray will be populated and the on_read method called with + a reference to the bytearray. Note that on_read may be called from + another thread and the order of calls to on_read is not guaranteed. Returns ------- - :py:class:`bytes` - The data is read back from memory as a bytestring. + :py:class:`bytearray` or None + If on_read is not given, the data read back from memory is returned + if postpone is False. If postpone is True, an uninitialised + bytearray will be returned. The bytearray will be populated when + :py:meth:`.flush_postponed_io` is called. + + If on_read is given, this method returns None. """ # Call the SCPConnection to perform the read on our behalf connection = self._get_connection(x, y) - return connection.read(self.scp_data_length, self.scp_window_size, - x, y, p, address, length_bytes) + if on_read is not None: + data = None + f = (lambda: on_read(connection.read(self.scp_data_length, + self.scp_window_size, + x, y, p, + address, length_bytes))) + else: + data = bytearray(length_bytes) + + def f(): + connection.readinto(data, self.scp_data_length, + self.scp_window_size, + x, y, p, + address, length_bytes) + + if postpone: + with self._postponed_lock: + self._postponed[connection].append(f) + else: + f() + + return data @ContextMixin.use_contextual_arguments() def write_across_link(self, address, data, x, y, link): @@ -628,6 +702,55 @@ def read_across_link(self, address, length_bytes, x, y, link): return bytes(data) + @ContextMixin.use_contextual_arguments() + def readinto(self, address, buffer, length_bytes, x, y, p=0, + on_read=None, postpone=False): + """Read a from an address in memory into the supplied buffer. + + Parameters + ---------- + address : int + The address at which to start reading the data. + buffer : bytearray or callable + A bufferable object (e.g. bytearray) of at least length_bytes in + size into which the data will be read. + + If a callable is supplied, this will be called with no arguments + just before the read is carried out and the function must return a + bufferable object into which the result will be written. + length_bytes : int + The number of bytes to read from memory. Large reads are + transparently broken into multiple SCP read commands. + on_read : callable + If supplied, this is called with supplied buffer as an argument + when the read completes. + postpone : bool + If False (the default), the read will occur immediately and the + read value will be placed into the supplied buffer. + + If True, the read will be performed when + :py:meth:`.flush_postponed_io` is called. Note that on_read and + buffer may be called from another thread and the order of other + calls to buffer and on_read is not guaranteed. + """ + # Call the SCPConnection to perform the read on our behalf + connection = self._get_connection(x, y) + + def f(): + buf = buffer() if callable(buffer) else buffer + connection.readinto(buf, + self.scp_data_length, self.scp_window_size, + x, y, p, + address, length_bytes) + if on_read is not None: + on_read(buf) + + if postpone: + with self._postponed_lock: + self._postponed[connection].append(f) + else: + f() + def _get_struct_field_and_address(self, struct_name, field_name): field = self.structs[six.b(struct_name)][six.b(field_name)] address = self.structs[six.b(struct_name)].base + field.offset @@ -669,7 +792,8 @@ def read_struct_field(self, struct_name, field_name, x, y, p=0): length = struct.calcsize(pack_chars) # Perform the read - data = self.read(address, length, x, y, p) + data = self.read(address, length, x, y, p, + on_read=None, postpone=False) # Unpack the data unpacked = struct.unpack(pack_chars, data) @@ -680,7 +804,8 @@ def read_struct_field(self, struct_name, field_name, x, y, p=0): return unpacked @ContextMixin.use_contextual_arguments() - def write_struct_field(self, struct_name, field_name, values, x, y, p=0): + def write_struct_field(self, struct_name, field_name, values, x, y, p=0, + postpone=False): """Write a value into a struct. This method is particularly useful for writing values into the ``sv`` @@ -695,6 +820,12 @@ def write_struct_field(self, struct_name, field_name, values, x, y, p=0): Name of the field to write, e.g., `"random"` values : Value(s) to be written into the field. + postpone : bool + If False (the default), writes are performed straight away and this + function returns once the write completes. + + If True, the write will be queued and carried out in parallel when + :py:meth:`.flush_postponed_io` is called. .. warning:: Fields which are arrays must currently be written in their @@ -711,7 +842,7 @@ def write_struct_field(self, struct_name, field_name, values, x, y, p=0): data = struct.pack(pack_chars, values) # Perform the write - self.write(address, data, x, y, p) + self.write(address, data, x, y, p, postpone=postpone) def _get_vcpu_field_and_address(self, field_name, x, y, p): """Get the field and address for a VCPU struct field.""" @@ -747,7 +878,7 @@ def read_vcpu_struct_field(self, field_name, x, y, p): # Perform the read length = struct.calcsize(pack_chars) - data = self.read(address, length, x, y) + data = self.read(address, length, x, y, on_read=None, postpone=False) # Unpack and return unpacked = struct.unpack(pack_chars, data) @@ -764,7 +895,8 @@ def read_vcpu_struct_field(self, field_name, x, y, p): return unpacked # pragma: no cover @ContextMixin.use_contextual_arguments() - def write_vcpu_struct_field(self, field_name, value, x, y, p): + def write_vcpu_struct_field(self, field_name, value, x, y, p, + postpone=True): """Write a value to the VCPU struct for a specific core. Parameters @@ -773,6 +905,12 @@ def write_vcpu_struct_field(self, field_name, value, x, y, p): Name of the field to write (e.g. `"user0"`) value : Value to write to this field. + postpone : bool + If False (the default), writes are performed straight away and this + function returns once the write completes. + + If True, the write will be queued and carried out in parallel when + :py:meth:`.flush_postponed_io` is called. """ field, address, pack_chars = \ self._get_vcpu_field_and_address(field_name, x, y, p) @@ -788,7 +926,7 @@ def write_vcpu_struct_field(self, field_name, value, x, y, p): data = struct.pack(pack_chars, *value) # pragma: no cover # Perform the write - self.write(address, data, x, y) + self.write(address, data, x, y, postpone=postpone) @ContextMixin.use_contextual_arguments() def get_processor_status(self, p, x, y): @@ -804,7 +942,8 @@ def get_processor_status(self, p, x, y): self.structs[b"vcpu"].size * p) # Get the VCPU data - data = self.read(address, self.structs[b"vcpu"].size, x, y) + data = self.read(address, self.structs[b"vcpu"].size, x, y, + on_read=None, postpone=False) # Build the kwargs that describe the current state state = { @@ -843,7 +982,8 @@ def get_iobuf(self, p, x, y): while address: # The IOBUF data is proceeded by a header which gives the next # address and also the length of the string in the current buffer. - iobuf_data = self.read(address, iobuf_size + 16, x, y) + iobuf_data = self.read(address, iobuf_size + 16, x, y, + on_read=False, postpone=False) address, time, ms, length = struct.unpack("<4I", iobuf_data[:16]) iobuf += iobuf_data[16:16 + length].decode("utf-8") @@ -859,7 +999,8 @@ def get_router_diagnostics(self, x, y): Description of the state of the counters. """ # Read the block of memory - data = self.read(0xe1000300, 64, x=x, y=y) + data = self.read(0xe1000300, 64, x=x, y=y, + on_read=None, postpone=False) # Convert to 16 ints, then process that as the appropriate tuple type return RouterDiagnostics(*struct.unpack("<16I", data)) @@ -1049,7 +1190,8 @@ def sdram_alloc(self, size, tag=0, x=Required, y=Required, @ContextMixin.use_contextual_arguments() def sdram_alloc_as_filelike(self, size, tag=0, x=Required, y=Required, - app_id=Required, buffer_size=0, clear=False): + app_id=Required, buffer_size=0, clear=False, + postpone=False): """Like :py:meth:`.sdram_alloc` but returns a :py:class:`file-like object <.MemoryIO>` which allows safe reading and writing to the block that is allocated. @@ -1061,6 +1203,12 @@ def sdram_alloc_as_filelike(self, size, tag=0, x=Required, y=Required, If this is set to anything but `0` (the default) then :py:meth:`~.MemoryIO.flush` should be called to ensure that all writes are completed. + postpone : bool + If False (the default), reads and (flushed) writes to the returned + object are carried out immediately. + + If True, any reads/(flushed) writes will be queued and carried out + in parallel when :py:meth:`.flush_postponed_io` is called. Returns ------- @@ -1101,7 +1249,7 @@ def sdram_alloc_as_filelike(self, size, tag=0, x=Required, y=Required, start_address = self.sdram_alloc(size, tag, x, y, app_id, clear) return MemoryIO(self, x, y, start_address, start_address + size, - buffer_size=buffer_size) + buffer_size=buffer_size, postpone=postpone) def _get_next_nn_id(self): """Get the next nearest neighbour ID.""" @@ -1629,7 +1777,7 @@ def load_routing_table_entries(self, entries, x, y, app_id): struct.pack_into(consts.RTE_PACK_STRING, data, i*16, i, 0, route, entry.key, entry.mask) - self.write(buf, data, x, y) + self.write(buf, data, x, y, postpone=False) # Perform the load of the data into the router self._send_scp( @@ -1652,7 +1800,8 @@ def get_routing_table_entries(self, x, y): # Determine where to read from, perform the read rtr_addr = self.read_struct_field("sv", "rtr_copy", x, y) read_size = struct.calcsize(consts.RTE_PACK_STRING) - rtr_data = self.read(rtr_addr, consts.RTR_ENTRIES * read_size, x, y) + rtr_data = self.read(rtr_addr, consts.RTR_ENTRIES * read_size, x, y, + on_read=None, postpone=False) # Read each routing table entry in turn table = list() @@ -1699,7 +1848,8 @@ def get_p2p_routing_table(self, x, y): raw_table_col = self.read( consts.SPINNAKER_RTR_P2P + (((256 * col) // 8) * 4), col_words, - x, y + x, y, + on_read=None, postpone=False, ) row = 0 @@ -1893,6 +2043,83 @@ def get_machine(self, x=0, y=0, default_num_cores=18): system_info = self.get_system_info(x, y) return build_machine(system_info) + @ContextMixin.use_contextual_arguments() + def flush_postponed_io(self, max_num_connections=24): + """Carry out all postponed I/O operations in parallel. + + Parameters + ---------- + max_num_connections : int + Gives the maximum number of simultaneous connections to use. + Setting this too high may result in this process becoming CPU bound + and thus not achieving high throughput. + """ + with self._postponed_lock: + num_threads = min(max_num_connections, len(self._postponed)) + + # A flag which is set if one of the threads encounter an error. + terminate_now = Event() + + # This list is populated with all exception objects raised in any of + # the worker threads + exceptions = [] + exceptions_lock = Lock() + + def queue_processor(): + """Attempts to process all postponed events for a particular + connection queue, deleting the queue when it empties.""" + while not terminate_now.is_set(): + # Get the next queue to be processed + try: + with self._postponed_lock: + connection, queue = self._postponed.popitem() + except KeyError: + # There are no more queues which need processing, terminate + # this thread. + return + + # Process that queue + while not terminate_now.is_set(): + try: + with self._postponed_lock: + f = queue.popleft() + except IndexError: + # The queue is empty, move on to the next one + break + + # Run the current queue entry handling failures sensibly + try: + f() + except Exception as e: + sys.stderr.write( + "Exception while processing a queued I/O " + "operation:\n") + traceback.print_exc() + terminate_now.set() + + with exceptions_lock: + exceptions.append(e) + + threads = [] + for _ in range(num_threads): + threads.append(Thread(target=queue_processor)) + + try: + for thread in threads: + thread.start() + + # Wait for the threads to complete + for thread in threads: + thread.join() + finally: + # If something goes wrong in the above, trigger the termination of + # all threads and attempt to wait for this + terminate_now.set() + for thread in threads: + thread.join() + + return exceptions + class CoreInfo(collections.namedtuple( 'CoreInfo', "position physical_cpu virt_cpu version buffer_size " @@ -2379,7 +2606,7 @@ class MemoryIO(object): """ def __init__(self, machine_controller, x, y, start_address, end_address, - buffer_size=0, _write_buffer=None): + buffer_size=0, postpone=False, _write_buffer=None): """Create a file-like view onto a subset of the memory-space of a chip. Parameters @@ -2397,6 +2624,13 @@ def __init__(self, machine_controller, x, y, start_address, end_address, End address in memory. buffer_size : int Number of bytes to store in the write buffer. + postpone : bool + If False (the default), reads and (flushed) writes are performed + immediately and their result returned to the caller. + + If True, reads and (flushed) writes will be placed in a queue and + executed in parallel when + :py:meth:`.MachineController.flush_postponed_io` is called. _write_buffer : :py:class:`._WriteBufferChild` Internal use only, the write buffer to use to combine writes. @@ -2408,11 +2642,12 @@ def __init__(self, machine_controller, x, y, start_address, end_address, self._x = x self._y = y self._machine_controller = machine_controller + self._postpone = postpone # Get, or create, a write buffer if _write_buffer is None: _write_buffer = _WriteBuffer(x, y, 0, machine_controller, - buffer_size) + buffer_size, self._postpone) self._write_buffer = _write_buffer # Store and clip the addresses @@ -2477,6 +2712,7 @@ def __getitem__(self, sl): return type(self)( self._machine_controller, self._x, self._y, start_address, end_address, + self._postpone, _write_buffer=self._write_buffer ) else: @@ -2494,8 +2730,22 @@ def __exit__(self, exception_type, exception_value, traceback): """Exit a block and call :py:meth:`~.close`.""" self.close() + def _read_n_bytes(self, n_bytes): + """Return the number of bytes to actually read accounting for the + cursor position. + """ + # If n_bytes is negative then calculate it as the number of bytes left + if n_bytes < 0: + n_bytes = self._end_address - self.address + + # Determine how far to read, then read nothing beyond that point. + if self.address + n_bytes > self._end_address: + n_bytes = min(n_bytes, self._end_address - self.address) + + return n_bytes + @_if_not_closed - def read(self, n_bytes=-1): + def read(self, n_bytes=-1, on_read=None): """Read a number of bytes from the memory. .. note:: @@ -2506,34 +2756,93 @@ def read(self, n_bytes=-1): n_bytes : int A number of bytes to read. If the number of bytes is negative or omitted then read all data until the end of memory region. + on_read : callable + If supplied, this is called with the read data as an argument when + the read completes. Otherwise, the read data is returned directly. Returns ------- - :py:class:`bytes` - Data read from SpiNNaker as a bytestring. + :py:class:`bytes` or None + If on_read is not given and postpone is set to False for this + MemoryIO, the data read from SpiNNaker is returned. + + If on_read is not given and postpone is set to True for this + MemoryIO, an uninitialised bytearray is returned which will be + populated with the read data when + :py:meth:`.MachineController.flush_postponed_io` is called. + + If on_read is supplied, this function will return None and when the + read completes, the read data will be given as an argument to + on_read. Note that on_read may be called from another thread and + the order of calls to on_read is not guaranteed. """ # Flush this write buffer self.flush() - # If n_bytes is negative then calculate it as the number of bytes left - if n_bytes < 0: - n_bytes = self._end_address - self.address - - # Determine how far to read, then read nothing beyond that point. - if self.address + n_bytes > self._end_address: - n_bytes = min(n_bytes, self._end_address - self.address) - + n_bytes = self._read_n_bytes(n_bytes) if n_bytes <= 0: + if callable(on_read): + on_read(b'') return b'' + else: + # Note: the offset is updated before the read (and the read address + # compensated) such that if the on_read callback interacts with + # this MemoryIO, the addresses are kept consistent. + self._offset += n_bytes + return self._machine_controller.read( + self.address - n_bytes, n_bytes, self._x, self._y, 0, + on_read=on_read, postpone=self._postpone) - # Perform the read and increment the offset - data = self._machine_controller.read( - self.address, n_bytes, self._x, self._y, 0) - self._offset += n_bytes - return data + @_if_not_closed + def readinto(self, buffer, n_bytes=-1, on_read=None): + """Read a number of bytes from the memory into a supplied buffer. + + .. note:: + Reads beyond the specified memory range will be truncated. + + Parameters + ---------- + buffer : bytearray or callable + A bufferable object (e.g. bytearray) of at least n_bytes in size + into which the data will be read. + + If a callable is supplied, this will be called with no arguments + just before the read is carried out and the function must return a + bufferable object into which the result will be written. + + If postpone is set to False for this MemoryIO, the read will be + completed before this method returns. If set to True, the read will + actually occur when + :py:meth:`.MachineController.flush_postponed_io` is called. + n_bytes : int + A number of bytes to read. If the number of bytes is negative or + omitted then read all data until the end of memory region. + on_read : callable + If supplied, this is called with supplied buffer as an argument + when the read completes. + """ + # Flush this write buffer + self.flush() + + n_bytes = self._read_n_bytes(n_bytes) + + if n_bytes <= 0: + if callable(on_read): + on_read(buffer() if callable(buffer) else buffer) + elif callable(buffer): + buffer() + return + else: + # Note: the offset is updated before the read (and the read address + # compensated) such that if the on_read callback interacts with + # this MemoryIO, the addresses are kept consistent. + self._offset += n_bytes + self._machine_controller.readinto( + self.address - n_bytes, buffer, n_bytes, self._x, self._y, 0, + on_read=on_read, postpone=self._postpone) @_if_not_closed - def write(self, bytes): + def write(self, bytes, n_bytes=None): """Write data to the memory. .. warning:: @@ -2547,26 +2856,53 @@ def write(self, bytes): Parameters ---------- - bytes : :py:class:`bytes` - Data to write to the memory as a bytestring. + bytes : :py:class:`bytes` or callable + Data to write into memory. + + If a callable is given, it will be called when the write is about + to be carried out and must return a bytearray (or similar) to be + written. Note that at present write data supplied as a callable is + never buffered and writing a callable causes the write buffer to be + flushed. + + For non-callables, if the buffer size is non-zero, the data will be + buffered immediately. If the buffer size is zero, or the data + written is too large to fit in the buffer, the write will be passed + directly to :py:meth:`.MachineController.write`. This means that if + postpone is set to False for this MemoryIO, the write will be + complete before this method returns but if it is set to True, the + write will actually occur when + :py:meth:`.MachineController.flush_postponed_io` is called. + n_bytes : int + The number of bytes to write. This field is optional when bytes + supports the `len` operator but is mandatory when it does not. Returns ------- - int - Number of bytes written. + int or None + Number of bytes written or buffered. """ - if self.address + len(bytes) > self._end_address: - n_bytes = min(len(bytes), self._end_address - self.address) + n_bytes = n_bytes if n_bytes is not None else len(bytes) + + if self.address + n_bytes > self._end_address: + n_bytes = min(n_bytes, self._end_address - self.address) if n_bytes <= 0: + if callable(bytes): + bytes() return 0 - bytes = bytes[:n_bytes] + if callable(bytes): + bytes_ = (lambda: bytes()[:n_bytes]) + else: + bytes_ = bytes[:n_bytes] + else: + bytes_ = bytes # Perform the write and increment the offset - self._write_buffer.add_new_write(self.address, bytes) - self._offset += len(bytes) - return len(bytes) + self._offset += n_bytes + self._write_buffer.add_new_write(self.address - n_bytes, bytes_) + return n_bytes @_if_not_closed def flush(self): @@ -2574,6 +2910,11 @@ def flush(self): This must be called to ensure that all writes to SpiNNaker made using this file-like object (and its siblings, if any) are completed. + + If postpone is set to False for this MemoryIO, the writes will be + completed before this method returns. If set to True, the writes will + actually occur when :py:meth:`.MachineController.flush_postponed_io` is + called. """ self._write_buffer.flush() @@ -2633,11 +2974,12 @@ class _WriteBuffer(object): together. """ - def __init__(self, x, y, p, controller, buffer_size=0): + def __init__(self, x, y, p, controller, buffer_size=0, postpone=False): self.x = x self.y = y self.p = p self.controller = controller + self.postpone = postpone # A buffer of writes self.buffer = bytearray(buffer_size) @@ -2648,11 +2990,14 @@ def __init__(self, x, y, p, controller, buffer_size=0): def add_new_write(self, start_address, data): """Add a new write to the buffer.""" - if len(data) > self.buffer_size: - # Perform the write if we couldn't buffer it at all + if callable(data) or len(data) > self.buffer_size: + # Perform the write if we couldn't buffer it at all. Unbufferable + # writes are those too large to fit in the buffer and those + # provided via callback. self.flush() # Flush to ensure ordering is preserved self.controller.write(start_address, data, - self.x, self.y, self.p) + self.x, self.y, self.p, + postpone=self.postpone) return if self.start_address is None: @@ -2701,13 +3046,18 @@ def flush(self): # Write out all the values from the buffer self.controller.write( self.start_address, self.buffer[:self.current_end], - self.x, self.y, self.p + self.x, self.y, self.p, postpone=self.postpone ) # Reset the buffer self.start_address = None self.current_end = 0 + # If postponed writes are in use, create a new buffer since the old + # one will be used at an undetermined point in the future. + if self.postpone: + self.buffer = bytearray(self.buffer_size) + def unpack_routing_table_entry(packed): """Unpack a routing table entry read from a SpiNNaker machine. diff --git a/rig/machine_control/scp_connection.py b/rig/machine_control/scp_connection.py index 4679f4f..5588edc 100644 --- a/rig/machine_control/scp_connection.py +++ b/rig/machine_control/scp_connection.py @@ -327,6 +327,39 @@ def read(self, buffer_size, window_size, x, y, p, address, length_bytes): """ # Prepare the buffer to receive the incoming data data = bytearray(length_bytes) + self.readinto(data, buffer_size, window_size, x, y, p, address, + length_bytes) + return bytes(data) + + def readinto(self, data, buffer_size, window_size, x, y, p, address, + length_bytes): + """Read a bytestring from an address in memory into a supplied buffer. + + ..note:: + This method is included here to maintain API compatibility with an + `alternative implementation of SCP + `_. + + Parameters + ---------- + data : bytearray + An object into which supports the buffer protocol (e.g. bytearray) + into which the data will be read. + buffer_size : int + Number of bytes held in an SCP buffer by SARK, determines how many + bytes will be expected in a socket and how many bytes of data will + be read back in each packet. + window_size : int + x : int + y : int + p : int + address : int + The address at which to start reading the data. + length_bytes : int + The number of bytes to read from memory. Large reads are + transparently broken into multiple SCP read commands. + """ + # Prepare the buffer to receive the incoming data mem = memoryview(data) # Create a callback which will write the data from a packet into a @@ -360,7 +393,6 @@ def packets(length_bytes, data): # Run the event loop and then return the retrieved data self.send_scp_burst(buffer_size, window_size, packets(length_bytes, data)) - return bytes(data) def write(self, buffer_size, window_size, x, y, p, address, data): """Write a bytestring to an address in memory. diff --git a/tests/machine_control/test_machine_controller.py b/tests/machine_control/test_machine_controller.py index b74496b..c67fac4 100644 --- a/tests/machine_control/test_machine_controller.py +++ b/tests/machine_control/test_machine_controller.py @@ -976,6 +976,34 @@ def test_write_across_link_unaligned(self, start_address, data): cn.write_across_link(start_address, data, x=0, y=0, link=Links.north) + @pytest.mark.parametrize( + "buffer_size, window_size, x, y, p, start_address, length, data", + [(128, 1, 0, 1, 2, 0x67800000, 100, b"\x00" * 100), + (256, 5, 1, 4, 5, 0x67801000, 2, b"\x10\x23"), + ] + ) + def test_readinto(self, buffer_size, window_size, x, y, p, + start_address, length, data): + # Create the mock controller + cn = MachineController("localhost") + cn._scp_data_length = buffer_size + cn._window_size = window_size + cn.connections[None] = mock.Mock(spec_set=SCPConnection) + + def mock_readinto(buffer, *args, **kwargs): + buffer[:] = data + cn.connections[None].readinto.side_effect = mock_readinto + + # Perform the read and ensure that values are passed on as appropriate + with cn(x=x, y=y, p=p): + read_data = bytearray(length) + cn.readinto(start_address, read_data, length) + assert data == read_data + + assert len(cn.connections[None].readinto.mock_calls) == 1 + assert cn.connections[None].readinto.mock_calls[0][1][1:] == \ + (buffer_size, window_size, x, y, p, start_address, length) + @pytest.mark.parametrize( "iptag, addr, port", [(1, "localhost", 54321), @@ -2562,15 +2590,17 @@ def test_read(self, mock_controller, x, y, start_address, lengths): calls = [] offset = 0 for n_bytes in lengths: - sdram_file.read(n_bytes) + buf = bytearray(n_bytes) + sdram_file.readinto(buf, n_bytes) assert sdram_file.tell() == offset + n_bytes assert sdram_file.address == start_address + offset + n_bytes - calls.append(mock.call(start_address + offset, n_bytes, x, y, 0)) + calls.append(mock.call(start_address + offset, + buf, n_bytes, x, y, 0)) offset = offset + n_bytes # Check the reads caused the appropriate calls to the machine # controller. - mock_controller.read.assert_has_calls(calls) + mock_controller.readinto.assert_has_calls(calls) @pytest.mark.parametrize("x, y", [(1, 3), (3, 0)]) @pytest.mark.parametrize("start_address, length, offset", @@ -2583,17 +2613,21 @@ def test_read_no_parameter(self, mock_controller, x, y, start_address, # Assert that reading with no parameter reads the full number of bytes sdram_file.seek(offset) sdram_file.read() - mock_controller.read.assert_called_once_with( - start_address + offset, length - offset, x, y, 0) + assert mock_controller.readinto.call_count == 1 + args = mock_controller.readinto.call_args[0] + assert args[0] == start_address + offset + assert args[2] == length - offset + assert args[3] == x + assert args[4] == y + assert args[5] == 0 def test_read_beyond(self, mock_controller): sdram_file = MemoryIO(mock_controller, 0, 0, start_address=0, end_address=10) - sdram_file.read(100) - mock_controller.read.assert_called_with(0, 10, 0, 0, 0) + assert len(sdram_file.read(100)) == 10 assert sdram_file.read(1) == b'' - assert mock_controller.read.call_count == 1 + assert mock_controller.readinto.call_count == 1 @pytest.mark.parametrize("x, y", [(4, 2), (255, 1)]) @pytest.mark.parametrize("start_address", [0x60000004, 0x61000003]) @@ -2811,6 +2845,7 @@ def test_zero_length_filelike(self, mock_controller): "flush_event", [lambda filelike: filelike.flush(), lambda filelike: filelike.read(1), + lambda filelike: filelike.readinto(bytearray(1), 1), lambda filelike: filelike.close()] ) def test_coalescing_writes(self, get_node, flush_event): diff --git a/tests/machine_control/test_scp_connection.py b/tests/machine_control/test_scp_connection.py index 9b0ea79..a09e137 100644 --- a/tests/machine_control/test_scp_connection.py +++ b/tests/machine_control/test_scp_connection.py @@ -446,6 +446,7 @@ def recv(self, *args, **kwargs): mock_conn.send_scp_burst(512, 8, packets) +@pytest.mark.parametrize("readinto", [True, False]) @pytest.mark.parametrize( "buffer_size, window_size, x, y, p", [(128, 1, 0, 0, 1), (256, 5, 1, 2, 3)] ) @@ -469,7 +470,7 @@ def recv(self, *args, **kwargs): (256, DataType.word, 0x60000004) ]) def test_read(buffer_size, window_size, x, y, p, n_bytes, - data_type, start_address): + data_type, start_address, readinto): mock_conn = SCPConnection("localhost") # Construct the expected calls, and hence the expected return packets @@ -510,8 +511,13 @@ def __call__(self, buffer_size, window_size, args): send_scp_burst.side_effect = ccs # Read an amount of memory specified by the size. - data = mock_conn.read(buffer_size, window_size, x, y, p, - start_address, n_bytes) + if readinto: + data = bytearray(n_bytes) + mock_conn.readinto(data, buffer_size, window_size, x, y, p, + start_address, n_bytes) + else: + data = mock_conn.read(buffer_size, window_size, x, y, p, + start_address, n_bytes) assert data == ccs.read_data # send_burst_scp should have been called once, each element in the iterator