diff --git a/final_assignment_1/main_city_navigator.py b/final_assignment_1/main_city_navigator.py new file mode 100644 index 0000000..9a2bc0b --- /dev/null +++ b/final_assignment_1/main_city_navigator.py @@ -0,0 +1,101 @@ +from state import Behavior, Ctx, Position +from state_map import StateMap +from system import System +from wheel_driver import WheelDriver + +if __name__ == "__main__": + # Navigates through a city of streets based on provided instruction for each intersection (or corner) + system = System() + wheels = WheelDriver( + system=system, + left_pwm_min=60, left_pwm_multiplier=0.09, left_pwm_shift=-2.5, + right_pwm_min=60, right_pwm_multiplier=0.09, right_pwm_shift=-2.2 + ) + + # see Behavior class for the robot behavior definition and parameter characteristics + behavior = Behavior( + fwd_speed=0.2, + side_speed_dec=0.1, side_speed_min=0.1, + side_arc_min=0.2, side_arc_inc=0.3, side_arc_max=2, + # 25ms per cycle x 75 = 1.875s outside the valid line action rules (i.e., searching for line) + line_cycle_tolerance=50, + turn_speed=0.5, turn_arc_speed=10, + # 25ms per cycle x 100 = 2.5s outside the valid turn action rules (i.e., searching for 90° turned line) + turn_cycle_tolerance=80, + fast_sensor_change_dropped_below_cycle_count=3, + cycle_duration_us=25_000, + # the city navigator behavior has additional parameters + # Position indicates our last intersection (we are past it and heading to the next one) + # where EAST signifies X axis increase and NORTH signifies Y axis increase + city_position=Position(x=1, y=0, orientation=Position.NORTH), # next intersection will be at (1, 1) + navigation_directions=[ + # the city is a simple grid of streets with intersections at every corner + # we navigate with : '^' (forward), '>' (right), '<' (left), S (stop) + # We assume 5x5 grid (can be larger, but corners are mentioned for 5x5 size) + '^', '<', '>', '^', '>', # we should be turning at the Northwest corner (0, 4) + # >.... + # ^.... + # ^<... + # .^... + # ..... + '>', '<', '^', '^', '>', # we should be turning one line below the Northeast corner (4, 3) + # .v... + # .>>>v + # ..... + # ..... + # ..... + '^', '^', '>', # we should be turning at the SouthEast corner (4, 0) + # ..... + # ..... + # ....v + # ....v + # ....< + '^', '>', '<', '^', # we should be heading to the initial South-east square, heading towards (0, 1) + # ..... + # ..... + # ..... + # .<<.. + # ..^<. + '<', '<', '<', 'S' # we should be circling the initial South-east square, stopping where we started (1, 0) + # ..... + # ..... + # ..... + # v.... + # >^... + ], + line_state_implicit_transition_protection_after_switch_cycle_count=200, + turn_state_implicit_transition_protection_after_switch_cycle_count=500 + ) + state_map = StateMap(behavior=behavior, stop_on_non_line_sensors=False, turns=True, intersections=True) + ctx = Ctx(system=system, wheels=wheels, behavior=behavior, + states=state_map.states, transitions=state_map.transitions, state_key='START') + + try: + state_cycle_start = system.ticks_us() + while not system.is_button_a_pressed(): + wheels.update() + # reads the sensors and builds their history + ctx.update_sensors() + + time_now = system.ticks_us() + if system.ticks_diff(time_now, state_cycle_start) > ctx.behavior.cycle_duration_us: + state_cycle_start = time_now + + # finalize context before evaluation + # (costly operations are done here like sensor history extension with current sensor) + ctx.before_evaluation() + + # if our context situation matches one of the states, we switch to that state + # (this is typically based on sensor history match, but other matchers are possible) + ctx.switch_to_state_matching_ctx_situation() + + # while being in the state we update it + ctx.state.on_update(ctx) + + finally: + try: + ctx.state.on_exit(ctx) + finally: + wheels.stop() + system.display_off() + print("Finished") diff --git a/final_assignment_1/state.py b/final_assignment_1/state.py new file mode 100644 index 0000000..37e9fc3 --- /dev/null +++ b/final_assignment_1/state.py @@ -0,0 +1,475 @@ +from system import System +from wheel_driver import WheelDriver +from math import sin, cos, pi + + +class Position: + """A position in the city.""" + EAST = 0 + NORTH = pi / 2 + WEST = pi + SOUTH = 3 * pi / 2 + + def __init__(self, x: float, y: float, orientation: float): + self.x = x + self.y = y + self.orientation = orientation + + def turn_left(self): + self.orientation = (self.orientation + pi / 2) % (2 * pi) + print("TURNED_TO: %s" % self) + + def turn_right(self): + self.orientation = (self.orientation - pi / 2) % (2 * pi) + + def move_forward(self): + x_old = self.x + y_old = self.y + self.x += round(cos(self.orientation)) + self.y += round(sin(self.orientation)) + print("MOVING_FORWARD: (%s, %s) -> (%s, %s)" % (x_old, y_old, self.x, self.y)) + + def move_backward(self): + self.x -= round(cos(self.orientation)) + self.y -= round(sin(self.orientation)) + + def move_by_choice(self, choice: str, forward_accounted_for=False): + if choice == '^': + if not forward_accounted_for: + self.move_forward() + elif choice == '<': + self.turn_left() + elif choice == '>': + self.turn_right() + + def copy(self): + return Position(x=self.x, y=self.y, orientation=self.orientation) + + def str_orientation(self): + if self.orientation == Position.EAST: + return "EAST" + elif self.orientation == Position.NORTH: + return "NORTH" + elif self.orientation == Position.WEST: + return "WEST" + elif self.orientation == Position.SOUTH: + return "SOUTH" + else: + return str(self.orientation) + + def __str__(self): + return f"({self.x}, {self.y}, {self.str_orientation()})" + + def __repr__(self): + return f"Position(x={self.x}, y={self.y}, orientation={self.str_orientation()})" + + +class Behavior: + """The behavior of the robot.""" + + def __init__(self, + # base forward speed (rad) + fwd_speed: float, + # how much to decrement each cycle when on side sensor (too low = lazy reaction) + side_speed_dec: float, + # the minimum rotation speed to maintain to not go too low + # and unnecessarily slow down turning (it turns no matter what as long as rotation speeds are correct) + side_speed_min: float, + # starting arc speed (rotation) when side sensor picks up the line instead of center one + side_arc_min: float, + # how fast we'll be increasing arc speed each cycle we are out of center + # this continues even if we are out of side sensor due to tolerance cycles (see below) + side_arc_inc: float, + # maximum arc speed we can perform (to not get too crazy and take our time when turning) + side_arc_max: float, + # tolerance before declaring we're out of line (time-dependent) + # (if turning too slow, we might not catch the line again if too low) + line_cycle_tolerance: int, + # base turn speed (rad) + turn_speed: float, + # base turn arc speed (rad) + turn_arc_speed: float, + # tolerance before declaring we're out of turn (time-dependent) + # (if turning too slow, we might not catch the line again if too low) + turn_cycle_tolerance: int, + # we disregard sensor transitions which last very short time + fast_sensor_change_dropped_below_cycle_count: int, + # how long the cycle is (in microseconds) + cycle_duration_us: int, + # Starting position in the city (after we reach the first intersection (i.e., we can be a bit off). + # If specified, we run in the city mode and the robot starts drawing the position on the screen. + # If not specified, we don't run in the city mode. + city_position: Position = None, + # navigation directions to take on the intersections + # (this includes also the turns which otherwise have no other option, just to fine-map the orientation) + # if not specified, the robot will not run in the interactive mode + navigation_directions: list[str] = None, + # Pause time between each intersection (in microseconds) + navigation_intersection_pause: int = 1_000_000, + # On a city grid, we compensate for flukes by protecting the state from implicit transitions + # in case the state is too short (we assume the grid has a minimum street distance we can utilize to avoid flukes) + line_state_implicit_transition_protection_after_switch_cycle_count: int = 0, + turn_state_implicit_transition_protection_after_switch_cycle_count: int = 0 + ): + self.fwd_speed = fwd_speed + self.side_speed_dec = side_speed_dec + self.side_speed_min = side_speed_min + self.side_arc_min = side_arc_min + self.side_arc_inc = side_arc_inc + self.side_arc_max = side_arc_max + self.line_cycle_tolerance = line_cycle_tolerance + self.turn_speed = turn_speed + self.turn_arc_speed = turn_arc_speed + self.turn_cycle_tolerance = turn_cycle_tolerance + self.fast_sensor_change_dropped_below_cycle_count = fast_sensor_change_dropped_below_cycle_count + self.cycle_duration_us = cycle_duration_us + self.city_position = city_position + self.navigation_directions = navigation_directions + self.navigation_intersection_pause = navigation_intersection_pause + self.line_state_implicit_transition_protection_cycle_count = line_state_implicit_transition_protection_after_switch_cycle_count + self.turn_state_implicit_transition_protection_cycle_count = turn_state_implicit_transition_protection_after_switch_cycle_count + + +class SensorMatcher: + """A sensor matcher.""" + + def __init__(self, optional: bool = False): + self.optional = optional + + def matches(self, sensor, count) -> bool: + pass + + +class SensorHasCount(SensorMatcher): + """A sensor and how many times it needs to match.""" + + def __init__(self, sensor: int, min_count: int, max_count: int = None, optional: bool = False): + super().__init__(optional=optional) + self.sensor = sensor + self.min_count = min_count + self.max_count = max_count + + def matches(self, sensor, count): + if sensor != self.sensor: + # print("Sensor mismatch: %s != %s" % (sensor, self.sensor)) + return False + if self.max_count is not None: + return self.min_count <= count <= self.max_count + result = count >= self.min_count + # print(f"Sensor {sensor:05b} ({count}x) match to {self.sensor:05b} ({count}x) -> result: %s" % result) + return count >= self.min_count + + # let's print binary representation of the sensor and how many times it needs to match + def __str__(self): + return f"{self.sensor:0{5}b} ({self.min_count}x)" + + +class EitherSensorHasCount(SensorMatcher): + """Multiple variants of a sensor, either of them must have count.""" + + def __init__(self, sensors: list[SensorHasCount], optional: bool = False): + super().__init__(optional=optional) + self.sensors = sensors + + def matches(self, sensor, count): + for sensor_has_count in self.sensors: + # print(f"Matching {sensor_has_count} to {sensor:05b} ({count}x): {sensor_has_count.matches(sensor, count)}") + if sensor_has_count.matches(sensor, count): + # print("Matched") + return True + # print("Unmatched") + return False + + def __str__(self): + return f"Either{[str(sensor) for sensor in self.sensors]}" + + +class Ctx: + """Carries the current operating context of the robot.""" + + def __init__(self, system: System, wheels: WheelDriver, behavior: Behavior, + states: dict[str, 'State'], transitions: dict[str, list[str]], state_key: str): + self.system = system + self.wheels = wheels + self.behavior = behavior + self.states = states + self.transitions = transitions + self.state_key = None + self.state = None + self.sensor = None + self.sensor_count = 0 + self.sensor_last = None + self.state_action_cycle = 0 + self.state_out_of_bounds_cycle = 0 + self.state_cycle_since_transition = 0 + # history of sensor changes: [(lcr, count), ...] + # we keep track of the last several sensor changes and use that to determine situation underneath us + # this feeds to a sudden main state change if we detect different behavior than expected + # each state has the ability to override other states if it thinks it should rule the car + self.sensor_history = [] + # the history with extra element pertaining the current sensor and cycle count (updated just before state eval) + self.sensor_history_with_current = [] + # Our history needs to be able to house at least 4 transitions + # (going to intersection might be preceded by a single sensor if the car is going sideways) + self.sensor_history_length = 10 + # carries max speed for each wheel (for display purposes) + # will be updated on forward to correct values + self.fwd_speed_pwm_left_max = 255 + self.fwd_speed_pwm_right_max = 255 + # runtime navigation directions (will be consumed as we go, we have backup in behavior if needed) + self.navigation_directions = None + # We will be using the city mode if the position is specified (and we will update position based on intersections) + self.city_position = None + # we need a flag to account for multi-state navigation at the same intersection (i.e., intersection enter + turn) + self.city_position_forward_accounted_for = False + self.reset_navigation_and_position() + # initialize the state + self.transition_to_state(state_key=state_key) + self.print_add_to_history_count = 0 + + def reset_navigation_and_position(self): + """Resets navigation and position to the initial state.""" + self.navigation_directions = None + if self.behavior.navigation_directions is not None: + self.navigation_directions = self.behavior.navigation_directions.copy() + print("Using navigation directions: %s" % self.navigation_directions) + self.city_position = None + if self.behavior.city_position is not None: + self.city_position = self.behavior.city_position.copy() + print("Starting at the initial city position: %s" % self.city_position) + self.system.display_position(self.city_position.x, self.city_position.y) + self.city_position_forward_accounted_for = False + + def update_sensors(self): + """Updates sensor readings and counts.""" + self.state_cycle_since_transition += 1 + self.sensor_last = self.sensor + li, ri, ll, lc, lr = self.system.get_sensors() + self.sensor = li << 4 | ri << 3 | ll << 2 | lc << 1 | lr + if self.sensor != self.sensor_last: + if self.sensor_count > 0: + # we eliminate fluke transitions (short ones) from the history + if self.sensor_count >= self.behavior.fast_sensor_change_dropped_below_cycle_count: + # print(f"Last sensor into history: {self.sensor_last:05b} ({self.sensor_count}x) -> change to {self.sensor:05b}") + self.sensor_history.append((self.sensor_last, self.sensor_count)) + if self.sensor_last == 0b111 or self.print_add_to_history_count > 0: + if self.sensor_last == 0b111: + self.print_add_to_history_count = 3 + else: + self.print_add_to_history_count -= 1 + if len(self.sensor_history) >= self.sensor_history_length: + self.sensor_history.pop(0) + self.switch_to_state_matching_ctx_situation() + else: + print(f"Fluke transition eliminated: {self.sensor_last:05b} ({self.sensor_count}x)") + self.system.display_sensors(li, ri, ll, lc, lr) + self.sensor_count = 1 + else: + self.sensor_count += 1 + + def transition_to_state(self, state_key): + """Transitions to state, a one-liner helper for main code.""" + if state_key != self.state_key: + action_now = self.state.action if self.state is not None else None + state_new = self.states[state_key] + state_new.set_default_action(ctx=self) + action_new = state_new.action + if self.state is not None: + self.state.on_exit(ctx=self) + print("Transitioning: state %s (action %s) -> state %s (action %s)" + % (self.state, action_now, state_new, action_new)) + else: + print("Transitioning to state %s (action %s)" % (state_new, action_new)) + self.system.display_drive_mode(action_new.symbol) + self.state = state_new + self.state_key = state_key + self.state_cycle_since_transition = 0 + # reset the sensor history to start fresh within the state and not bring historical baggage + print("Resetting sensor history") + self.sensor_history = [] + self.state.on_enter(ctx=self) + + def switch_to_state_matching_ctx_situation(self): + """Switches the state matching the sensor history + current sensor state (car behavior in the recent past).""" + state_transitions = self.transitions.get(self.state_key) + if state_transitions is None: + print("ERROR: No state transitions for state %s" % self.state_key) + return + for state_key in state_transitions: + state = self.states[state_key] + if state == self.state or state.matchers is None: + continue + for matcher in state.matchers: + if matcher.matches(self): + if 'TURN' in self.state_key and self.state_cycle_since_transition < self.behavior.turn_state_implicit_transition_protection_cycle_count: + print("Protecting from implicit state transition to TURN state, removing sensor history just to be sure") + self.sensor_history_with_current = [(self.sensor, self.sensor_count)] + self.sensor_history = [] + return + if 'LINE' in self.state_key and self.state_cycle_since_transition < self.behavior.line_state_implicit_transition_protection_cycle_count: + print("Protecting from implicit state transition to LINE state, removing sensor history just to be sure") + self.sensor_history_with_current = [(self.sensor, self.sensor_count)] + self.sensor_history = [] + return + print("Switching to state %s (%s) due to match of state matcher, sensor history %s" + % (state_key, state, Ctx.str_sensor_history(self.sensor_history_with_current))) + self.transition_to_state(state_key) + return + # else: + # if self.sensor == 0b111 or self.is_intersection_in_history(): + # print( + # f"Sensor {self.sensor:03b}, count {self.sensor_count}, history {self.str_sensor_history(self.sensor_history)} - no match by {matcher}") + + def add_sensor_to_history(self, sensor): + if len(self.sensor_history) >= self.sensor_history_length: + self.sensor_history.pop(0) + self.sensor_history.append(sensor) + + def is_intersection_in_history(self): + for sensor, count in self.sensor_history: + if sensor == 0b111: + return True + return False + + def before_evaluation(self): + """Called when the context changes are finished, before all evaluations.""" + # we need to update the sensor history with the current sensor and count + self.sensor_history_with_current = self.sensor_history.copy() + self.sensor_history_with_current.append((self.sensor, self.sensor_count)) + + @staticmethod + def str_sensor_history(sensor_history): + """Converts the sensor history to a string.""" + return ", ".join([str(SensorHasCount(s, c)) for s, c in sensor_history]) + + +class Action: + def __init__(self, symbol: str): + self.name = type(self).__name__.replace('Action', '') + self.symbol = symbol + + def __str__(self): + return self.name + + def matches(self, ctx: Ctx) -> bool: + """Returns True if the action matches the context. Used for intra-state transitions.""" + return False + + def on_enter(self, ctx: Ctx): + """Called when the action is entered. Can return new action or indicate state switch.""" + ctx.system.display_drive_mode(self.symbol) + + def on_update(self, ctx: Ctx): + """Called when the action is updated. Can return new action or indicate state switch.""" + pass + + def on_exit(self, ctx: Ctx): + """Called when the action is exited. Can return new action or indicate state switch.""" + pass + + +class SensorMatchingAction(Action): + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol) + self.matching_sensor = matching_sensor + + def matches(self, ctx: Ctx) -> bool: + return ctx.sensor == self.matching_sensor + + +class StateMatcher: + def __init__(self): + self.name = type(self).__name__.replace('StateMatcher', '') + + def __str__(self): + return self.name + + def matches(self, ctx: Ctx) -> bool: + """Returns True if the state matches the current context situation.""" + pass + + +class SensorHistoryStateMatcher(StateMatcher): + def __init__(self, steps: list[SensorMatcher]): + super().__init__() + self.steps = steps + + def __str__(self): + return f"{self.name}(steps=[{', '.join([str(step) for step in self.steps])}])" + + def matches(self, ctx: Ctx) -> bool: + # we go through sensor history from the end to the beginning + # (we are interested in the last steps, not the first ones) + # we match the sensor history to all steps (in reverse), disregarding optional steps if missing + for sensor_history_view in (ctx.sensor_history_with_current, ctx.sensor_history): + if self.matches_sensor_history(sensor_history_view): + return True + return False + + def matches_sensor_history(self, sensor_history_view): + for start_index in range(len(sensor_history_view) - 1, -1, -1): + if self.match_from_index(sensor_history_view, start_index): + return True + return False + + def match_from_index(self, sensor_history_view, start_index): + step_index = 0 + history_index = start_index + + while step_index < len(self.steps) and history_index >= 0: + step: SensorMatcher = self.steps[step_index] + sensor, count = sensor_history_view[history_index] + if step.matches(sensor, count): + step_index += 1 + history_index -= 1 + elif step.optional: + step_index += 1 + else: + break + + # Ensure all non-optional steps are matched + while step_index < len(self.steps): + if not self.steps[step_index].optional: + return False + step_index += 1 + + return step_index == len(self.steps) + + +class State: + def __init__(self, symbol: str, actions: list[Action], matchers: list[StateMatcher] = None): + # Our name is the class name without the State suffix + self.name = type(self).__name__.replace('State', '') + self.symbol = symbol + self.matchers = matchers + self.actions = actions + self.action = None + + def __str__(self): + return self.name + + def str_full(self): + matchers = ', '.join([str(matcher) for matcher in self.matchers]) if self.matchers else 'None' + actions = ', '.join([str(action) for action in self.actions]) + return f"{self.name}(matchers=[{matchers}], actions=[{actions}])" + + def on_enter(self, ctx: Ctx): + """Called when the state is entered.""" + print("Entering state %s" % self) + ctx.system.display_drive_mode(self.symbol) + if self.action is not None: + self.action.on_enter(ctx) + + def on_update(self, ctx: Ctx): + """Called when the state is updated.""" + if self.action is not None: + self.action.on_update(ctx) + + def on_exit(self, ctx: Ctx): + """Called when the state is exited.""" + if self.action is not None: + self.action.on_exit(ctx) + + def set_default_action(self, ctx: Ctx): + self.action = self.actions[0] + ctx.system.display_drive_mode(self.action.symbol) diff --git a/final_assignment_1/state_generic.py b/final_assignment_1/state_generic.py new file mode 100644 index 0000000..92991fa --- /dev/null +++ b/final_assignment_1/state_generic.py @@ -0,0 +1,57 @@ +from state import Ctx, Action, State + + +class GenericAction(Action): + def __init__(self, symbol): + super().__init__(symbol) + + +class StartAction(GenericAction): + """Starts the robot. An action used by start state to wait for a button to be pressed to start moving. + The action also stops the robot to allow comfortable pressing of the button.""" + + def __init__(self): + super().__init__(symbol='s') + + def on_enter(self, ctx: Ctx): + ctx.system.display_on() + ctx.wheels.stop() + ctx.system.display_speed(0, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(0, ctx.fwd_speed_pwm_right_max, left=False) + + def on_update(self, ctx: Ctx): + if ctx.system.is_button_b_pressed(): + print("B pressed, starting") + ctx.transition_to_state("LINE") + + +class StopAction(GenericAction): + """Stops the robot. No further action will be taken.""" + + def __init__(self): + super().__init__(symbol='.') + + def on_enter(self, ctx: Ctx): + ctx.wheels.stop() + + def on_update(self, ctx: Ctx): + if ctx.system.is_button_b_pressed(): + ctx.transition_to_state("START") + + def on_exit(self, ctx: Ctx): + ctx.reset_navigation_and_position() + + +class StartState(State): + def __init__(self, symbol: str, matchers=None): + super().__init__(symbol=symbol, actions=[StartAction()], matchers=matchers) + + +class StopState(State): + def __init__(self, symbol: str, matchers=None): + super().__init__(symbol=symbol, actions=[StopAction()], matchers=matchers) + + +class ErrorState(State): + def __init__(self, symbol: str, matchers=None): + super().__init__(symbol=symbol, actions=[StopAction()], matchers=matchers) diff --git a/final_assignment_1/state_intersection.py b/final_assignment_1/state_intersection.py new file mode 100644 index 0000000..39b989e --- /dev/null +++ b/final_assignment_1/state_intersection.py @@ -0,0 +1,132 @@ +from state import Ctx, Action, State, SensorHistoryStateMatcher + + +class Choice: + def __init__(self, symbol: str, target_state: str): + self.symbol = symbol + self.target_state = target_state + + +class InteractiveIntersectAction(Action): + def __init__(self, symbol: str, choices: list[Choice]): + super().__init__(symbol) + self.choices = choices + self.choice_idx = -1 + self.navigation_pause_count = 0 + self.button_press_count = 0 + + def get_choice_by_symbol(self, symbol: str): + # Returns the choice object by looking at the symbol of all choices we have + for choice in self.choices: + if choice.symbol == symbol: + return choice + return None + + def on_enter(self, ctx: Ctx): + print("Intersection.on_enter()") + ctx.wheels.stop() + ctx.system.display_speed(0, ctx.behavior.fwd_speed, left=True) + ctx.system.display_speed(0, ctx.behavior.fwd_speed, left=False) + ctx.system.display_drive_mode(self.symbol) + self.choice_idx = -1 + self.navigation_pause_count = 0 + self.button_press_count = 0 + if ctx.city_position is not None and ctx.city_position_forward_accounted_for is False: + # we first move to this intersection and then we will decide where to go (turn) + ctx.city_position.move_forward() + ctx.system.display_position(ctx.city_position.x, ctx.city_position.y) + ctx.city_position_forward_accounted_for = True + + def on_update(self, ctx: Ctx): + """Chooses the next state based on the user input or the supplied navigation directions. + When the navigation is active, next command will be taken from the navigation directions. + When the interactive mode is active, user can cycle through the choices and confirm the choice with long press.""" + if ctx.navigation_directions is not None: + cycle_count_for_navigation_pause = ctx.behavior.navigation_intersection_pause // ctx.behavior.cycle_duration_us + if self.navigation_pause_count < cycle_count_for_navigation_pause: + self.navigation_pause_count += 1 + return + else: + self.navigation_pause_count = 0 + choice_symbol = ctx.navigation_directions[0] + print("Navigating to %s, valid choices are %s" % (choice_symbol, [c.symbol for c in self.choices])) + choice = self.get_choice_by_symbol(symbol=choice_symbol) + if choice is not None: + # if we are not just going forward, we need to keep direction in the navigation for the next state + if choice.target_state == 'LINE': + ctx.navigation_directions.pop(0) + ctx.transition_to_state(choice.target_state) + else: + # we are in the navigation mode, but the symbol is not recognized for the current situation + ctx.transition_to_state('ERROR') + return + + # we will be switching to the next choice after button press ends + if ctx.system.is_button_b_pressed(): + self.button_press_count += 1 + elif self.button_press_count > 0: + # we want at least half a second for long press + min_cycle_count_for_long_press = 500_000 // ctx.behavior.cycle_duration_us + if self.button_press_count >= min_cycle_count_for_long_press: + ctx.system.display_choice(' ') + choice = self.choices[self.choice_idx] + ctx.transition_to_state(choice.target_state) + else: + self.choice_idx += 1 + if self.choice_idx >= len(self.choices): + self.choice_idx = 0 + ctx.system.display_choice(self.choices[self.choice_idx].symbol) + self.button_press_count = 0 + + +class InteractiveIntersectState(State): + """User-interactive intersection state. + The state stops the car, displays the intersection type and asks the user for the choice. + User can cycle through the options (an arrow will be displayed in that direction), + then confirms the choice with long-pressing 'B' button.""" + + +class IntersectXState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='<', target_state='TURN_L_OFF_LINE'), + Choice(symbol='^', target_state='LINE'), + Choice(symbol='>', target_state='TURN_R_OFF_LINE'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class IntersectYState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='<', target_state='TURN_L_TO_LINE'), + Choice(symbol='>', target_state='TURN_R_TO_LINE'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class IntersectLState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='<', target_state='TURN_L_OFF_LINE'), + Choice(symbol='^', target_state='LINE'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class IntersectRState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='^', target_state='LINE'), + Choice(symbol='>', target_state='TURN_R_OFF_LINE'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class IntersectTState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='<', target_state='TURN_L'), + Choice(symbol='>', target_state='TURN_R'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) diff --git a/final_assignment_1/state_line.py b/final_assignment_1/state_line.py new file mode 100644 index 0000000..e9b534d --- /dev/null +++ b/final_assignment_1/state_line.py @@ -0,0 +1,113 @@ +from state import Ctx, SensorMatchingAction, State + + +class LineAction(SensorMatchingAction): + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor) + + +class FwdLineAction(LineAction): + """Moves the robot forward.""" + + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor) + + def on_enter(self, ctx: Ctx): + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.fwd_speed, rotation_rad=0) + ctx.fwd_speed_pwm_left = ctx.wheels.left.speed_pwm + ctx.fwd_speed_pwm_right = ctx.wheels.right.speed_pwm + ctx.system.display_speed(ctx.behavior.fwd_speed, ctx.behavior.fwd_speed, left=True) + ctx.system.display_speed(ctx.behavior.fwd_speed, ctx.behavior.fwd_speed, left=False) + ctx.state_action_cycle = 0 + ctx.city_position_forward_accounted_for = False + + def on_update(self, ctx: Ctx): + """When going just forward, we don't do any recalculations.""" + ctx.state_action_cycle += 1 + + +class SideFwdLineAction(LineAction): + """Moves the robot forward with slight turn to the side on which we captured the non-center sensor.""" + + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor) + self.direction = direction + + def on_enter(self, ctx: Ctx): + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.fwd_speed, rotation_rad=-ctx.behavior.side_arc_max) + ctx.fwd_speed_pwm_left = ctx.wheels.left.speed_pwm + ctx.fwd_speed_pwm_right = ctx.wheels.right.speed_pwm + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.behavior.fwd_speed, left=True) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.behavior.fwd_speed, left=False) + ctx.state_action_cycle = 0 + + def on_update(self, ctx: Ctx): + """When side-stepping line while going forward, we are slowing down progressively while increasing arc speed.""" + ctx.state_action_cycle += 1 + rotation_rad = ctx.behavior.side_arc_min + ctx.behavior.side_arc_inc * ctx.state_action_cycle + rotation_rad = min(rotation_rad, ctx.behavior.side_arc_max) + align_speed = ctx.behavior.fwd_speed - ctx.state_action_cycle * ctx.behavior.side_speed_dec + align_speed = max(align_speed, ctx.behavior.side_speed_min) + ctx.wheels.move(speed_rad=align_speed, rotation_rad=rotation_rad * self.direction) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(ctx.wheels.right.speed_pwm, ctx.fwd_speed_pwm_right_max, left=False) + print( + "%s, rotation_rad %d, init %s + inc_per_cycle %s * cycle %s" % + (self, rotation_rad, ctx.behavior.side_arc_min, ctx.behavior.side_arc_inc, ctx.state_action_cycle)) + + +class LeftFwdLineAction(SideFwdLineAction): + """Moves the robot forward with slight turn to the left when left sensor is captured.""" + + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=1) + + +class RightFwdLineAction(SideFwdLineAction): + """Moves the robot forward with slight turn to the right when right sensor is captured.""" + + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=-1) + + +class LineState(State): + def __init__(self, symbol: str, matchers=None): + actions: list[LineAction] = [ + FwdLineAction(symbol='|', matching_sensor=0b010), + LeftFwdLineAction(symbol='\\', matching_sensor=0b100), + RightFwdLineAction(symbol='/', matching_sensor=0b001) + ] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + self.actions = actions # type-cast for IDE support + + def transition_action(self, ctx: Ctx): + """Transitions from action to action within the state based on the line sensor readings.""" + # print("Trans state %s action %s, %s" % (state, action_now, bin(lcr))) + for action in self.actions: + if action.matching_sensor == ctx.sensor: + print("Transitioning intra-state %s action %s to %s" % (self, self.action, action)) + self.action.on_exit(ctx) + self.action = action + action.on_enter(ctx) + return True + return False + + def on_update(self, ctx: Ctx): + """Updates the current state.""" + if ctx.sensor != self.action.matching_sensor: + # print(f"Transitioning Line: sensor={ctx.sensor:05b} no longer matches while_sensor={self.action.matching_sensor:05b} (current state {self} action {self.action})") + ctx.state_out_of_bounds_cycle += 1 + if self.transition_action(ctx): + ctx.state_out_of_bounds_cycle = 0 + else: + if ctx.state_out_of_bounds_cycle > ctx.behavior.line_cycle_tolerance: + print("Line cycle tolerance exceeded, switching to error state") + ctx.transition_to_state("STOP") + return + else: + if ctx.state_out_of_bounds_cycle > 0: + ctx.state_out_of_bounds_cycle = 0 + print("Back in state") + self.action.on_update(ctx) diff --git a/final_assignment_1/state_map.py b/final_assignment_1/state_map.py new file mode 100644 index 0000000..173c127 --- /dev/null +++ b/final_assignment_1/state_map.py @@ -0,0 +1,207 @@ +from state import Behavior, SensorHistoryStateMatcher, SensorHasCount, EitherSensorHasCount +from state_generic import StartState, StopState, ErrorState +from state_intersection import IntersectXState, IntersectYState, IntersectTState, IntersectLState, IntersectRState +from state_line import LineState +from state_turn import LeftTurnState, RightTurnState, OffLineLeftTurnState, OffLineRightTurnState +from state_turn import ToLineLeftTurnState, ToLineRightTurnState + + +class StateMap: + """States of the robot. + Each state is defined declaratively, indicating: + - sensor history matchers for entering the state + - supported transitions to other states + """ + + def __init__(self, behavior: Behavior, stop_on_non_line_sensors=False, turns=False, intersections=False): + # if we want basic scenario, we will transition to stop on any non-line sensor change + stop_matchers = None if not stop_on_non_line_sensors else [ + SensorHistoryStateMatcher(steps=[SensorHasCount(sensor=0b111, min_count=5)]), + SensorHistoryStateMatcher(steps=[SensorHasCount(sensor=0b011, min_count=5)]), + SensorHistoryStateMatcher(steps=[SensorHasCount(sensor=0b110, min_count=5)]), + ] + self.states = { + # Generic states + 'START': StartState(symbol='s', matchers=[ + # move to start when line is detected after border states (i.e., after stop) + SensorHistoryStateMatcher(steps=[SensorHasCount(sensor=0b010, min_count=10)]), + ]), + 'STOP': StopState(symbol='.', matchers=stop_matchers), + 'ERROR': ErrorState(symbol='x'), + + # Follows the line (no declarative matchers enabled, we transition in and out using advanced conditions) + 'LINE': LineState(symbol='|'), + } + line_transitions = ['STOP'] + self.transitions = { + 'START': ['LINE', 'STOP'], + 'LINE': line_transitions, + 'STOP': ['START'], + 'ERROR': ['STOP'], + } + + vertical_min_count = 5 + horizontal_min_count = behavior.fast_sensor_change_dropped_below_cycle_count + 1 + if turns: + self.states.update({ + # detects a turn to the left + 'TURN_L': LeftTurnState( + symbol='TL', matchers=[ + # we are detecting disappearing line while last match shows it turning to the left + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b000, min_count=vertical_min_count), + SensorHasCount(sensor=0b100, min_count=vertical_min_count, optional=True), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count), + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b001, min_count=vertical_min_count), + SensorHasCount(sensor=0b010, min_count=vertical_min_count) + ]) + ]) + ] + ), + # detects a turn to the right + 'TURN_R': RightTurnState( + symbol='TR', matchers=[ + # we are detecting disappearing line while last match shows it turning to the right + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b000, min_count=vertical_min_count), + SensorHasCount(sensor=0b001, min_count=vertical_min_count, optional=True), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count), + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b001, min_count=vertical_min_count), + SensorHasCount(sensor=0b010, min_count=vertical_min_count) + ]) + ]) + ] + ), + }) + line_transitions.extend(['TURN_L', 'TURN_R']) + self.transitions.update({ + 'TURN_L': ['STOP'], + 'TURN_R': ['STOP'], + }) + + if intersections: + self.states.update({ + # detects a full intersection (+) + 'INTERSECT_X': IntersectXState( + symbol='I+', matchers=[ + # we will be detecting normal line, then a full intersection, then normal line again + # we also need to account for the fact that we might be slightly off the line + SensorHistoryStateMatcher(steps=[ + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + SensorHasCount(sensor=0b100, min_count=vertical_min_count), + SensorHasCount(sensor=0b001, min_count=vertical_min_count), + ]), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b111, min_count=horizontal_min_count), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count, optional=True), + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + # SensorHasCount(sensor=0b100, min_count=vertical_min_count), + # SensorHasCount(sensor=0b001, min_count=vertical_min_count), + ]), + ]) + ] + ), + # detects an intersection slight to the left and right, not forward (i.e., 'Y') + 'INTERSECT_Y': IntersectYState( + symbol='IY', matchers=[ + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b101, min_count=vertical_min_count), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b010, min_count=horizontal_min_count), + ]) + ] + ), + # detects an intersection to the left and right, not forward (i.e., 'T') + 'INTERSECT_T': IntersectTState( + symbol='IT', matchers=[ + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b000, min_count=vertical_min_count), + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + SensorHasCount(sensor=0b100, min_count=vertical_min_count), + SensorHasCount(sensor=0b001, min_count=vertical_min_count), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count), + ], optional=True), + SensorHasCount(sensor=0b111, min_count=horizontal_min_count), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count, optional=True), + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + SensorHasCount(sensor=0b100, min_count=vertical_min_count), + SensorHasCount(sensor=0b001, min_count=vertical_min_count), + ]), + ]) + ] + ), + # detects an intersection to the left + 'INTERSECT_L': IntersectLState( + symbol='IL', matchers=[ + # we are detecting a blip on the right sensor, it has to last for some time (speed-dependent) + SensorHistoryStateMatcher(steps=[ + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + SensorHasCount(sensor=0b100, min_count=vertical_min_count), + SensorHasCount(sensor=0b001, min_count=vertical_min_count), + ]), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count), + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b100, min_count=vertical_min_count), + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + ]), + ]) + ] + ), + # detects an intersection to the right + 'INTERSECT_R': IntersectRState( + symbol='IR', matchers=[ + # we are detecting a blip on the right sensor, it has to last for some time (speed-dependent) + SensorHistoryStateMatcher(steps=[ + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + SensorHasCount(sensor=0b100, min_count=vertical_min_count), + SensorHasCount(sensor=0b001, min_count=vertical_min_count), + ]), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count), + EitherSensorHasCount(sensors=[ + SensorHasCount(sensor=0b001, min_count=vertical_min_count), + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + ]), + ]) + ] + ), + # extra turn operations needed to go off the intersection in the right sensor order + 'TURN_L_OFF_LINE': OffLineLeftTurnState(symbol='TL'), + 'TURN_R_OFF_LINE': OffLineRightTurnState(symbol='TR'), + 'TURN_L_TO_LINE': ToLineLeftTurnState(symbol='TL'), + 'TURN_R_TO_LINE': ToLineRightTurnState(symbol='TR'), + }) + line_transitions.extend(['INTERSECT_X', 'INTERSECT_Y', 'INTERSECT_T', 'INTERSECT_L', 'INTERSECT_R']) + self.transitions.update({ + 'INTERSECT_X': ['STOP'], + 'INTERSECT_Y': ['STOP'], + 'INTERSECT_R': ['STOP'], + 'INTERSECT_L': ['STOP'], + 'INTERSECT_T': ['STOP'], + 'TURN_L_OFF_LINE': ['STOP'], + 'TURN_R_OFF_LINE': ['STOP'], + 'TURN_L_TO_LINE': ['STOP'], + 'TURN_R_TO_LINE': ['STOP'], + }) + + print("Working with states:") + for state in self.states.values(): + print("* " + state.str_full()) + print("Enabled implicit state-to-state transitions:") + for state, transitions in self.transitions.items(): + print(f"* {state} -> {transitions}") + + def __str__(self): + return "StateMap(%s)" % self.states diff --git a/final_assignment_1/state_turn.py b/final_assignment_1/state_turn.py new file mode 100644 index 0000000..b906df5 --- /dev/null +++ b/final_assignment_1/state_turn.py @@ -0,0 +1,229 @@ +from state import Action, Ctx, SensorMatchingAction, State, SensorHistoryStateMatcher + + +class CityNavigationTurnAction(Action): + # the symbol indicates the direction we are going to take (navigation must match it or we enter ERROR state) + def __init__(self, symbol: str): + super().__init__(symbol) + + def on_enter(self, ctx: Ctx): + ctx.wheels.stop() + ctx.system.display_speed(0, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(0, ctx.fwd_speed_pwm_right_max, left=False) + ctx.state_action_cycle = 0 + if ctx.city_position is not None and ctx.city_position_forward_accounted_for is False: + # we first move to this intersection and then we will decide where to go (turn) + ctx.city_position.move_forward() + ctx.system.display_position(ctx.city_position.x, ctx.city_position.y) + ctx.city_position_forward_accounted_for = True + + def on_update(self, ctx: Ctx): + ctx.state_action_cycle += 1 + if ctx.navigation_directions is not None: + cycle_count_for_navigation_pause = ctx.behavior.navigation_intersection_pause // ctx.behavior.cycle_duration_us + if ctx.state_action_cycle < cycle_count_for_navigation_pause: + ctx.state_action_cycle += 1 + return False + else: + ctx.state_action_cycle = 0 + navigation_direction = ctx.navigation_directions.pop(0) + turn_type = self.navigation_direction_to_turn_type(navigation_direction) + print("Navigating to %s (turn type %s), valid choice is %s" % (navigation_direction, turn_type, self.symbol)) + if turn_type == self.symbol: + if ctx.city_position is not None: + ctx.city_position.move_by_choice(navigation_direction) + ctx.system.display_position(ctx.city_position.x, ctx.city_position.y) + return True + else: + # we are in the navigation mode, but the symbol is not recognized for the current situation + ctx.transition_to_state('ERROR') + return False + else: + # in non-interactive mode we just turn based on our only choice + if ctx.city_position is not None: + ctx.city_position.move_by_choice(self.get_position_symbol()) + ctx.system.display_position(ctx.city_position.x, ctx.city_position.y) + return True + + def navigation_direction_to_turn_type(self, direction: str): + if direction == '^': + return '^' + elif direction == '<': + return 'TL' + elif direction == '>': + return 'TR' + + def get_position_symbol(self): + if self.symbol == '^': + return '^' + elif self.symbol == 'TL': + return '<' + elif self.symbol == 'TR': + return '>' + + +class TurnAction(SensorMatchingAction): + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor) + self.direction = direction + + def on_enter(self, ctx: Ctx): + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.turn_speed, rotation_rad=ctx.behavior.turn_arc_speed * self.direction) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(ctx.wheels.right.speed_pwm, ctx.fwd_speed_pwm_right_max, left=False) + ctx.state_action_cycle = 0 + + def on_update(self, ctx: Ctx): + ctx.state_action_cycle += 1 + + +class SideSeekTurnAction(TurnAction): + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=direction) + + def on_enter(self, ctx: Ctx): + print("Turning to catch side line..") + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.turn_speed, rotation_rad=ctx.behavior.turn_arc_speed * self.direction) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(ctx.wheels.right.speed_pwm, ctx.fwd_speed_pwm_right_max, left=False) + ctx.state_action_cycle = 0 + + +class CenterSeekTurnAction(TurnAction): + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=direction) + + def on_enter(self, ctx: Ctx): + print("Turning to catch center line..") + super().on_enter(ctx) + ctx.transition_to_state('LINE') + + +class NoCenterSeekTurnAction(TurnAction): + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=direction) + + def on_enter(self, ctx: Ctx): + print("Turning off center line..") + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.turn_speed, rotation_rad=ctx.behavior.turn_arc_speed * self.direction) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(ctx.wheels.right.speed_pwm, ctx.fwd_speed_pwm_right_max, left=False) + ctx.state_action_cycle = 0 + + +class TurnState(State): + def __init__(self, symbol: str, actions: list[TurnAction], matchers: list[SensorHistoryStateMatcher]): + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + self.actions = actions # type-cast for IDE support + self.action_idx = 0 # current action index + + def set_default_action(self, ctx: Ctx): + if ctx.behavior.navigation_directions is not None: + # we are in the navigation mode, we will turn based on the navigation directions + # but our only choice is driven by our first normal turn action's direction + self.action = CityNavigationTurnAction(symbol=self.symbol) + self.action_idx = -1 # we are not part of standard actions and need to make sure we move the index + else: + self.action_idx = 0 + self.action = self.actions[self.action_idx] + + def on_update(self, ctx: Ctx): + """Updates the current state: we just go through both actions we have and wait until we match the sensors.""" + if self.action_idx == -1: + # we are in the navigation mode, we will turn based on the navigation directions + if self.action.on_update(ctx): + self.action_idx += 1 + self.action = self.actions[self.action_idx] + self.action.on_enter(ctx) + else: + return + + # standard turn actions + if ctx.sensor != self.action.matching_sensor: + print( + f"Turning and seeking sensor {self.action.matching_sensor:05b}, current sensor={ctx.sensor:05b} (current state {self} action {self.action})") + ctx.state_out_of_bounds_cycle += 1 + if ctx.state_out_of_bounds_cycle > ctx.behavior.turn_cycle_tolerance: + print("Turn cycle tolerance exceeded, switching to error state") + ctx.transition_to_state('STOP') + return + else: + ctx.state_out_of_bounds_cycle = 0 + if self.action == self.actions[0]: + print("Transitioning state %s action %s to %s" % (self, self.action, self.actions[1])) + self.action.on_exit(ctx) + self.action = self.actions[1] + self.action.on_enter(ctx) + else: + print("Finished turning") + ctx.transition_to_state('LINE') + self.action.on_update(ctx) + + +class LeftTurnState(TurnState): + """Turns the car to the left from an empty space, first catching the left side sensor, then the center one.""" + + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [ + SideSeekTurnAction(symbol=symbol, matching_sensor=0b100, direction=1), + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class RightTurnState(TurnState): + """Turns the car to the right from an empty space, first catching the right side sensor, then the center one.""" + + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [ + SideSeekTurnAction(symbol=symbol, matching_sensor=0b001, direction=-1), + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=-1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class OffLineLeftTurnState(TurnState): + """Turns the car to the left from a center line, first waiting for an empty space on all sensors, then left side sensor, then the center one.""" + + def __init__(self, symbol: str): + actions = [ + NoCenterSeekTurnAction(symbol=symbol, matching_sensor=0b000, direction=1), + SideSeekTurnAction(symbol=symbol, matching_sensor=0b100, direction=1), + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=[]) + + +class OffLineRightTurnState(TurnState): + """Turns the car to the right from a center line, first waiting for an empty space on all sensors, then right side sensor, then the center one.""" + + def __init__(self, symbol: str): + actions = [ + NoCenterSeekTurnAction(symbol=symbol, matching_sensor=0b000, direction=-1), + SideSeekTurnAction(symbol=symbol, matching_sensor=0b001, direction=-1), + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=-1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=[]) + + +class ToLineLeftTurnState(TurnState): + """Turns the car to the left just waiting for the center sensor to catch the line (disregarding side sensor).""" + + def __init__(self, symbol: str): + actions = [ + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=[]) + + +class ToLineRightTurnState(TurnState): + """Turns the car to the right just waiting for the center sensor to catch the line (disregarding side sensor).""" + + def __init__(self, symbol: str): + actions = [ + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=-1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=[]) diff --git a/final_assignment_1/system.py b/final_assignment_1/system.py new file mode 100644 index 0000000..1ca2412 --- /dev/null +++ b/final_assignment_1/system.py @@ -0,0 +1,181 @@ +class System: + """System class for the robot core system interface, + with platform-specific implementations separate classes.""" + SYS_MBIT = "Micro:Bit" + SYS_PICO = "Pico:Ed" + + I2C_FREQ = 100_000 + I2C_SENSOR_DEVICE = 0x38 + I2C_MOTOR_DEVICE = 0x70 + + MASK_LINE_LEFT = 0x04 + MASK_LINE_CENTER = 0x08 + MASK_LINE_RIGHT = 0x10 + MASK_IR_LEFT = 0x20 + MASK_IR_RIGHT = 0x40 + + SOUND_SPEED = 343 # m/s + + def __init__(self): + """Initializes the system.""" + pass + + def get_system_type(self): + """Returns the system type.""" + pass + + def ticks_us(self): + """Returns the current time in microseconds.""" + pass + + def ticks_diff(self, ticks1, ticks2): + """Returns the difference between two time values in microseconds.""" + pass + + def sleep_us(self, us): + """Sleeps for the given time in microseconds.""" + pass + + def i2c_scan(self) -> list[int]: + """Scans the I2C bus for devices and returns a list of addresses.""" + pass + + def i2c_read(self, addr: int, n: int) -> bytes: + """Reads 'n' amount of data from the I2C device into a buffer.""" + pass + + def i2c_write(self, addr: int, buf: bytes): + """Writes data to the I2C device from a buffer.""" + pass + + def i2c_write_motor(self, buf: bytes): + """Writes data to the I2C motor device from a buffer.""" + self.i2c_write(self.I2C_MOTOR_DEVICE, buf) + + def i2c_init_motor(self): + """Initializes the I2C motor device.""" + self.i2c_write_motor(b"\x00\x01") + self.i2c_write_motor(b"\xE8\xAA") + + def i2c_read_sensors(self): + """Returns the current sensor data byte.""" + return self.i2c_read(self.I2C_SENSOR_DEVICE, 1)[0] + + def get_sensors(self): + """Checks if line sensors (..., left, center, right) detected a line (true if line is present) + or obstacle sensors (left, right, ...) detect an obstacle (true if [white] reflection is present).""" + data = self.i2c_read_sensors() + li = bool(data & self.MASK_IR_LEFT) + ri = bool(data & self.MASK_IR_RIGHT) + ll = bool(data & self.MASK_LINE_LEFT) + lc = bool(data & self.MASK_LINE_CENTER) + lr = bool(data & self.MASK_LINE_RIGHT) + return not li, not ri, ll, lc, lr + + def pin_read_digital(self, pin): + """Reads the digital value of a pin.""" + pass + + def pin_write_digital(self, pin, value: int): + """Writes a digital value to the pin.""" + pass + + def set_sonar_angle_pwm(self, angle_pwm: int): + """Sets front sonar horizontal angle PWM value.""" + pass + + def get_sonar_echo_delay_us(self, timeout_us) -> int: + """Measures the delay it takes for the sonar echo to return. + Returns the delay in microseconds or a negative value if the timeout was reached.""" + pass + + def get_sonar_distance(self, max_distance=1.0) -> float: + """Returns the distance in meters measured by the sonar, + with the maximum time spent on detecting the echo based on the max distance we want to detect. + This is by default set to 1m as the reasonable maximum distance for the sonar balanced to max time spent.""" + timeout_us = int((2 * max_distance / self.SOUND_SPEED) * 1_000_000) + measured_time_us = self.get_sonar_echo_delay_us(timeout_us=timeout_us) + if measured_time_us < 0: + return measured_time_us + measured_time_sec = measured_time_us / 1_000_000 + return measured_time_sec * self.SOUND_SPEED / 2 + + def get_encoder_pin_left(self): + """Returns the pin object for the left encoder.""" + pass + + def get_encoder_pin_right(self): + """Returns the pin object for the right encoder.""" + pass + + def get_adc_value(self) -> int: + """Returns the current ADC value of the robot (0 - 1023).""" + pass + + def get_supply_voltage(self): + """Returns the current supply voltage of the robot.""" + adc = self.get_adc_value() # ADC value 0 - 1023 + # Convert ADC value to volts: 3.3 V / 1024 (max. voltage at ADC pin / ADC resolution) + voltage = 0.00322265625 * adc + # Multiply measured voltage by voltage divider ratio to calculate actual voltage + # (10 kOhm + 5,6 kOhm) / 5,6 kOhm [(R1 + R2) / R2, Voltage divider ratio] + return voltage * 2.7857142 + + def is_button_a_pressed(self): + """Returns whether button A is pressed.""" + pass + + def is_button_b_pressed(self): + """Returns whether button B is pressed.""" + pass + + def display_text(self, label): + """Sets a label on the robot display (prints in log, displays the first letter on the screen).""" + pass + + def display_sensors(self, il, ir, ll, lc, lr, y=4, lb=9, ib=5): + """Displays the sensors in top line of the display as pixels for each sensor. + Line sensors (left, center, right) are far left, center, far right, lb is line brightness 0-9, default 9. + IR sensors (left, right) are interlaced among them, ib is IR brightness 0-9, default 5.""" + pass + + def get_drive_mode_symbol_keys(self): + """Returns the keys of the drive mode symbols.""" + pass + + def display_drive_mode(self, mode: str): + """Displays the drive mode depicting the current situation we are in now. + The form of the displaying of the mode is platform-dependent. + Variable mode refers to the pictogram displayed, see each implementation (they should be in sync).""" + pass + + def display_choice(self, choice: str): + """Displays the choice depicting the current situation we are in now. + The form of the displaying of the choice is platform-dependent. + Variable mode refers to the pictogram displayed, see each implementation (they should be in sync).""" + pass + + def display_position(self, x: float, y: float): + """Displays the robot position on the display. The actual implementation depends on the platform.""" + pass + + def display_speed(self, speed_now, speed_max, left: bool): + """Displays one of the two current wheel speeds on the display. Position and form is platform-dependent.""" + pass + + def display_bitmap(self, x_pos: int, y_pos: int, width: int, lines: list[int]): + """Displays bitmap on display (0x0 = top left, max is platform-dependent: 5x5 on Micro:Bit, 27x7 Pico:Ed). + Bitwise, each line int is right-aligned.""" + pass + + def display_clear(self): + """Clears the display.""" + pass + + def display_on(self): + """Enables the display.""" + pass + + def display_off(self): + """Disables the display.""" + pass diff --git a/final_assignment_1/system_mbit.py b/final_assignment_1/system_mbit.py new file mode 100644 index 0000000..605cc73 --- /dev/null +++ b/final_assignment_1/system_mbit.py @@ -0,0 +1,155 @@ +from microbit import button_a, button_b, display, i2c, pin1, pin2, pin8, pin12, pin14, pin15 +from machine import time_pulse_us +from utime import ticks_us, ticks_diff, sleep + +from system import System as SystemBase + + +class System(SystemBase): + DRIVE_MODE_PICTOGRAMS = { + ' ': [0b000, 0b000, 0b000], + 'TL': [0b000, 0b110, 0b010], # sharp turn to left + 'TR': [0b000, 0b011, 0b010], # sharp turn to right + 'IT': [0b000, 0b111, 0b010], # intersection left-right (T) + 'IL': [0b010, 0b110, 0b010], # intersection left-straight (T to left) + 'IR': [0b010, 0b011, 0b010], # intersection right-straight (T to right) + 'IY': [0b101, 0b010, 0b010], # split in the road (Y) + 'I+': [0b010, 0b111, 0b010], # intersection all directions (+) + '<': [0b100, 0b111, 0b100], # interactive choice left + '^': [0b111, 0b010, 0b010], # interactive choice forward + '>': [0b001, 0b111, 0b001], # interactive choice right + '-': [0b000, 0b111, 0b000], + '_': [0b000, 0b000, 0b111], + '.': [0b000, 0b000, 0b010], + '|': [0b010, 0b010, 0b010], + '/': [0b001, 0b010, 0b100], + '\\': [0b100, 0b010, 0b001], + 's': [0b011, 0b010, 0b110], + 'x': [0b101, 0b010, 0b101], + } + + def __init__(self, i2c_freq=SystemBase.I2C_FREQ): + super().__init__() + i2c.init(freq=i2c_freq) + print("System %s initialized, voltage %sV" % (self.get_system_type(), self.get_supply_voltage())) + + def get_system_type(self): + return self.SYS_MBIT + + def ticks_us(self): + return ticks_us() + + def ticks_diff(self, ticks1, ticks2): + return ticks_diff(ticks1, ticks2) + + def sleep_us(self, us): + sleep(us / 1_000_000) + + def i2c_read(self, addr: int, n: int) -> bytes: + return i2c.read(addr, n) + + def i2c_write(self, addr: int, buf: bytes): + i2c.write(addr, buf) + + def i2c_scan(self) -> list[int]: + return i2c.scan() + + def pin_read_digital(self, pin): + return pin.read_digital() + + def pin_write_digital(self, pin, value: int): + pin.write_digital(value) + + def set_sonar_angle_pwm(self, angle_pwm: int): + pin1.write_analog(angle_pwm) + + def get_sonar_echo_delay_us(self, timeout_us) -> int: + self.pin_write_digital(pin8, 1) + self.pin_write_digital(pin8, 0) + return time_pulse_us(pin12, 1, timeout_us) + + def get_encoder_pin_left(self): + return pin14 + + def get_encoder_pin_right(self): + return pin15 + + def get_adc_value(self) -> int: + return pin2.read_analog() + + def is_button_a_pressed(self): + return button_a.is_pressed() + + def is_button_b_pressed(self): + return button_b.is_pressed() + + def display_text(self, label): + """Sets a label on the robot display (prints in log, displays the first letter on the screen).""" + print("Label: %s" % label) + display.show(label[0]) + + def display_sensors(self, il, ir, ll, lc, lr, y=4, lb=9, ib=5): + """Displays the sensors in top line of the display as pixels for each sensor. + Line sensors (left, center, right) are far left, center, far right, lb is line brightness 0-9, default 9. + IR sensors (left, right) are interlaced among them, ib is IR brightness 0-9, default 5.""" + display.set_pixel(4, y, lb if ll else 0) + display.set_pixel(2, y, lb if lc else 0) + display.set_pixel(0, y, lb if lr else 0) + display.set_pixel(3, y, ib if il else 0) + display.set_pixel(1, y, ib if ir else 0) + + def get_drive_mode_symbol_keys(self): + return list(self.DRIVE_MODE_PICTOGRAMS.keys()) + + def display_drive_mode(self, mode: str): + """Displays the drive mode in the center (3x3 pixels), overriding choice, supporting + all pictograms defined in DRIVE_MODE_PICTOGRAMS (other characters clear the area).""" + lines = self.DRIVE_MODE_PICTOGRAMS[mode if mode in self.DRIVE_MODE_PICTOGRAMS else ' '] + self.display_bitmap(1, 2, 3, lines) + + def display_choice(self, choice: str): + """Displays the choice in the center (3x3 pixels), overriding drive mode, supporting + all pictograms defined in DRIVE_MODE_PICTOGRAMS (other characters clear the area).""" + self.display_drive_mode(choice) + + def display_position(self, x: float, y: float): + """Displays the X and Y position of the robot on the left and right side of the display + as a bar of maximum 4 pixels. The position on the display is shared with speed indicator.""" + print("Position %s, %s" % (x, y)) + intensity = 5 + height_max = 4 + x_bar_x_pos = 4 + y_bar_x_pos = 0 + height_x = min(height_max, int(x)) + height_y = min(height_max, int(y)) + for y in range(4): + display.set_pixel(x_bar_x_pos, y, intensity if y <= height_x else 0) + display.set_pixel(y_bar_x_pos, y, intensity if y <= height_y else 0) + + def display_speed(self, speed_now, speed_max, left: bool): + """Displays the current speed on the display (represented as a 3-pixel bar) on the right side of display.""" + intensity = 3 + height_max = 4 + height = int(height_max * speed_now / speed_max) + x_pos = 4 if left else 0 + for y in range(height_max): + display.set_pixel(x_pos, y, intensity if y < height else 0) + + def display_bitmap(self, x_pos: int, y_pos: int, width: int, lines: list[int]): + """Displays the bitmap on the display (0x0 = top left, max 5x5). Bitwise, each line int is right-aligned.""" + for y in range(len(lines)): + for x in range(width): + display.set_pixel(x_pos + x, 4 - (y_pos + y), 9 if lines[y] & (1 << x) else 0) + + def display_clear(self): + """Clears the display.""" + display.clear() + + def display_on(self): + """Enables the display.""" + display.on() + display.clear() + + def display_off(self): + """Disables the display.""" + display.off() diff --git a/final_assignment_1/system_ped.py b/final_assignment_1/system_ped.py new file mode 100644 index 0000000..8c1a6c8 --- /dev/null +++ b/final_assignment_1/system_ped.py @@ -0,0 +1,224 @@ +from time import monotonic_ns, sleep + +from analogio import AnalogIn +from board import P1, P2, P8, P12, P14, P15 +from digitalio import DigitalInOut, Direction +from picoed import i2c, display, button_a, button_b +from pulseio import PulseIn +from pwmio import PWMOut + +from system import System as SystemBase + + +class System(SystemBase): + """Pico:Ed implementation of the System class. + The Pico:Ed board (I2C, pins, buttons, display) use library at https://github.com/elecfreaks/circuitpython_picoed. + Display uses IS31FL3731 library at https://github.com/adafruit/Adafruit_CircuitPython_IS31FL3731. + Light uses NeoPixel library at https://github.com/adafruit/Adafruit_CircuitPython_NeoPixel. + Sonar code is inspired by HC-SR04 library at https://github.com/adafruit/Adafruit_CircuitPython_HCSR04.""" + + DRIVE_MODE_PICTOGRAMS = { + ' ': [0b00000, 0b00000, 0b00000, 0b00000, 0b00000], + 'TL': [0b00000, 0b00000, 0b11100, 0b00100, 0b00100], # sharp turn to left + 'TR': [0b00000, 0b00000, 0b00111, 0b00100, 0b00100], # sharp turn to right + 'IT': [0b00000, 0b00000, 0b11111, 0b00100, 0b00100], # intersection left-right (T) + 'IL': [0b00100, 0b00100, 0b11100, 0b00100, 0b00100], # intersection left-straight (T to left) + 'IR': [0b00100, 0b00100, 0b00111, 0b00100, 0b00100], # intersection right-straight (T to right) + 'IY': [0b10001, 0b01010, 0b00100, 0b00100, 0b00100], # split in the road (Y) + 'I+': [0b00100, 0b00100, 0b11111, 0b00100, 0b00100], # intersection all directions (+) + '<': [0b00010, 0b00100, 0b01111, 0b00100, 0b00010], # interactive choice left + '^': [0b00000, 0b00100, 0b01110, 0b10101, 0b00100], # interactive choice forward + '>': [0b00100, 0b00010, 0b01111, 0b00010, 0b00100], # interactive choice right + '-': [0b00000, 0b00000, 0b11111, 0b00000, 0b00000], + '_': [0b00000, 0b00000, 0b00000, 0b00000, 0b11111], + '.': [0b00000, 0b00000, 0b00000, 0b00000, 0b00100], + '|': [0b00100, 0b00100, 0b00100, 0b00100, 0b00100], + '/': [0b00001, 0b00010, 0b00100, 0b01000, 0b10000], + '\\': [0b10000, 0b01000, 0b00100, 0b00010, 0b00001], + 's': [0b00111, 0b01000, 0b01110, 0b00010, 0b11100], + 'x': [0b10001, 0b01010, 0b00100, 0b01010, 0b10001], + '0': [0b01110, 0b10011, 0b10101, 0b11001, 0b01110], + '1': [0b00010, 0b00110, 0b01010, 0b00010, 0b00010], + '2': [0b01110, 0b00001, 0b01110, 0b10000, 0b01110], + '3': [0b11110, 0b00001, 0b01111, 0b00001, 0b11110], + '4': [0b10000, 0b10100, 0b01110, 0b00100, 0b00100], + '5': [0b11110, 0b10000, 0b11110, 0b00001, 0b01110], + '6': [0b01110, 0b10000, 0b11110, 0b10001, 0b01110], + '7': [0b11111, 0b00010, 0b00100, 0b01000, 0b01000], + '8': [0b01110, 0b10001, 0b01110, 0b10001, 0b01110], + '9': [0b01110, 0b10001, 0b01111, 0b00001, 0b01110], + } + + # position pictograms are smaller to not touch the edges and the middle character + # we still print the full 5x5 pictogram, but the actual character is 3x5 + POS_PICTOGRAMS = { + '0': [0b01110, 0b01010, 0b01010, 0b01010, 0b01110], + '1': [0b00100, 0b01100, 0b00100, 0b00100, 0b00100], + '2': [0b01110, 0b00010, 0b01110, 0b01000, 0b01110], + '3': [0b01110, 0b00010, 0b00110, 0b00010, 0b01110], + '4': [0b01010, 0b01010, 0b00110, 0b00010, 0b00010], + '5': [0b01110, 0b01000, 0b01110, 0b00010, 0b01110], + '6': [0b01110, 0b01000, 0b01110, 0b01010, 0b01110], + '7': [0b01110, 0b00010, 0b00010, 0b00010, 0b00010], + '8': [0b01110, 0b01010, 0b00100, 0b01010, 0b01110], + '9': [0b01110, 0b01010, 0b01110, 0b00010, 0b01110], + } + + def __init__(self): + super().__init__() + # Sonar servo + self.pin1 = PWMOut(P1, frequency=100) + # ADC + self.pin2 = AnalogIn(P2) + # Sonar trigger + self.pin8 = DigitalInOut(P8) + self.pin8.direction = Direction.OUTPUT + # Sonar echo + self.pin12 = PulseIn(P12) + # Encoder left + self.pin14 = DigitalInOut(P14) + self.pin14.direction = Direction.INPUT + # Encoder right + self.pin15 = DigitalInOut(P15) + self.pin15.direction = Direction.INPUT + print("System %s initialized, voltage %sV" % (self.get_system_type(), self.get_supply_voltage())) + + def get_system_type(self): + return self.SYS_PICO + + def ticks_us(self): + return monotonic_ns() // 1000 + + def ticks_diff(self, ticks1, ticks2): + return abs(ticks1 - ticks2) + + def sleep_us(self, us): + sleep(us / 1_000_000) + + def i2c_scan(self) -> list[int]: + while not i2c.try_lock(): + pass + ret = i2c.scan() + i2c.unlock() + return ret + + def i2c_read(self, addr: int, n: int) -> bytes: + while not i2c.try_lock(): + pass + buffer = bytearray(n) + i2c.readfrom_into(addr, buffer, start=0, end=n) + i2c.unlock() + return buffer + + def i2c_write(self, addr: int, buf: bytes): + while not i2c.try_lock(): + pass + i2c.writeto(addr, buf) + i2c.unlock() + + def pin_read_digital(self, pin): + return 1 if pin.value else 0 + + def pin_write_digital(self, pin, value: int): + pin.value = value != 1 + + def set_sonar_angle_pwm(self, angle_pwm: int): + scaled_value = int((angle_pwm / 128) * 16384) + self.pin1.duty_cycle = scaled_value + + def get_sonar_echo_delay_us(self, timeout_us) -> int: + # Trigger the sonar w/ 10ms pulse + self.pin8.value = True + self.sleep_us(10) + self.pin8.value = False + + self.pin12.clear() + self.pin12.resume() + start_time = monotonic_ns() + while not self.pin12: + if (monotonic_ns() - start_time) > timeout_us * 1000: + self.pin12.pause() + return -1 + self.pin12.pause() + return self.pin12[0] if len(self.pin12) > 0 else -1 + + def get_encoder_pin_left(self): + return self.pin14 + + def get_encoder_pin_right(self): + return self.pin15 + + def get_adc_value(self) -> int: + # scale ADC value to 10-bit value as expected by the caller + return self.pin2.value * 1024 // 16384 + + def is_button_a_pressed(self): + return button_a.is_pressed() + + def is_button_b_pressed(self): + return button_b.is_pressed() + + def display_text(self, label): + print("Label: %s" % label) + display.scroll(label[0:3]) + + def display_sensors(self, il, ir, ll, lc, lr, y=6, lb=32, ib=3): + """Displays the sensors in top line of the display as pixels for each sensor. + Line sensors (left, center, right) are far left, center, far right, lb is line brightness 0-9, default 9. + IR sensors (left, right) are interlaced among them, ib is IR brightness 0-9, default 5.""" + x_pos = 4 + stretch = 2 + display.pixel(x_pos + 4 * stretch, y, lb if ll else 0) + display.pixel(x_pos + 2 * stretch, y, lb if lc else 0) + display.pixel(x_pos + 0 * stretch, y, lb if lr else 0) + display.pixel(x_pos + 3 * stretch, y, ib if il else 0) + display.pixel(x_pos + 1 * stretch, y, ib if ir else 0) + + def get_drive_mode_symbol_keys(self): + return list(self.DRIVE_MODE_PICTOGRAMS.keys()) + + def display_drive_mode(self, mode: str): + """Displays the drive mode in the display center (5x5 pixels), supporting + all pictograms defined in DRIVE_MODE_PICTOGRAMS (other characters clear the area).""" + lines = self.DRIVE_MODE_PICTOGRAMS[mode if mode in self.DRIVE_MODE_PICTOGRAMS else ' '] + self.display_bitmap(6, 0, 5, lines) + + def display_choice(self, choice: str): + """Displays the choice next to the drive mode (5x5 pixels), supporting + all pictograms defined in DRIVE_MODE_PICTOGRAMS (other characters clear the area).""" + lines = self.DRIVE_MODE_PICTOGRAMS[choice if choice in self.DRIVE_MODE_PICTOGRAMS else ' '] + self.display_bitmap(1, 0, 5, lines) + + def display_position(self, x: float, y: float): + """Displays the X and Y position of the robot on the left and right side of the display, + each as a single digit using standard pictogram (5x5 pixels).""" + print("Position %s, %s" % (x, y)) + char = str(min(9, int(x))) + lines = self.POS_PICTOGRAMS[char] + self.display_bitmap(11, 0, 5, lines) + char = str(min(9, int(y))) + lines = self.POS_PICTOGRAMS[char] + self.display_bitmap(1, 0, 5, lines) + + def display_speed(self, speed_now, speed_max, left: bool): + """Displays the current speed as a horizontal bar on the left or right of the display.""" + intensity = 2 + height_max = 7 + height = int(height_max * speed_now / speed_max) + x_pos = 16 if left else 0 + for y in range(height_max): + display.pixel(x_pos, y, intensity if y < height else 0) + + def display_bitmap(self, x_pos: int, y_pos: int, width: int, lines: list[int]): + for y in range(len(lines)): + for x in range(width): + display.pixel(x_pos + width - x - 1, y_pos + width - y - 1, 9 if lines[y] & (1 << (width - x - 1)) else 0) + + def display_clear(self): + display.fill(0) + + def display_on(self): + pass + + def display_off(self): + self.display_clear() diff --git a/final_assignment_1/wheel.py b/final_assignment_1/wheel.py new file mode 100644 index 0000000..b2f7a09 --- /dev/null +++ b/final_assignment_1/wheel.py @@ -0,0 +1,143 @@ +from wheel_encoder import WheelEncoder +from system import System + + +class Wheel: + """Handles single wheel capable of moving forward or backward + with given (variable) speed and stop immediately or conditionally + based on distance and time.""" + + def __init__(self, system: System, name, motor_fwd_cmd, motor_rwd_cmd, sensor_pin, + pwm_min=60, pwm_max=255, pwm_multiplier=0, pwm_shift=0): + """Initializes the wheel.""" + self.system = system + self.name = name + self.motor_fwd_cmd = motor_fwd_cmd + self.motor_rwd_cmd = motor_rwd_cmd + self.distance_remain_ticks = -1 + self.distance_req_time_us = -1 + self.distance_start_time_us = 0 + self.speed_pwm = 0 + self.enc = WheelEncoder(system=system, sensor_pin=sensor_pin) + self.pwm_min = pwm_min + self.pwm_max = pwm_max + self.pwm_multiplier = pwm_multiplier + self.pwm_shift = pwm_shift + + def move_pwm(self, speed_pwm): + """Moves the wheel using given PWM speed (indefinite ticks, time). + The wheel will continue to move until stop() is called. + The PWM speed is a value between -255 and 255, where 0 means stop.""" + self.set_speed_pwm(speed_pwm) + self.distance_remain_ticks = -1 + self.distance_req_time_us = -1 + + def move_rad(self, speed_rad): + """Moves the wheel using given rad/sec speed (indefinite ticks, time). + The wheel will continue to move until stop() is called.""" + speed_pwm = self.radsec2pwm(speed_rad) + self.move_pwm(speed_pwm + self.pwm_min if speed_pwm > 0 else speed_pwm - self.pwm_min) + + def move_pwm_for_ticks(self, speed_pwm, distance_ticks): + """Moves the wheel forward using given PWM speed for the given distance + in sensor ticks. If the motor is already moving, the asked distance is added + to the remaining distance and the motor continues until no distance remains.""" + self.set_speed_pwm(speed_pwm) + print("Moving %s wheel with speed %d pwm for distance %f ticks" % (self.name, speed_pwm, distance_ticks)) + self.distance_remain_ticks += distance_ticks + + def move_pwm_for_time(self, speed_pwm, distance_time_us): + """Moves the wheel forward using given PWM speed for the given time. + If the motor is already moving, the distance in time is added to the current + distance and the motor continues to move until the total time is reached.""" + self.set_speed_pwm(speed_pwm) + self.distance_req_time_us += distance_time_us + if self.distance_start_time_us == 0: + self.distance_start_time_us = self.system.ticks_us() + + def move_pwm_for_distance(self, speed_pwm, distance): + """Moves the wheel forward using given PWM speed for given distance in meters.""" + distance_ticks = int(distance * self.enc.TICKS_PER_M) + self.move_pwm_for_ticks(speed_pwm, distance_ticks) + + def move_radsec_for_distance(self, radsec, distance): + """Moves the wheel using given rad/s speed for given distance in meters.""" + print("Moving %s wheel with speed %f rad/s for distance %f m" % (self.name, radsec, distance)) + speed_pwm = self.radsec2pwm(radsec) + distance_ticks = int(distance * self.enc.TICKS_PER_M) * 2 + self.move_pwm_for_ticks(speed_pwm, distance_ticks) + + def set_speed_pwm(self, speed_pwm): + """Sets the wheel PWM speed (and direction). Does not affect the remaining + distance or time previously set to perform. If the wheel was going + in the other direction, resets the H-bridge other direction first.""" + if speed_pwm == 0: + if self.speed_pwm != 0: + # print("Stopping %s wheel" % self.name) + self.system.i2c_write_motor(bytes([self.motor_fwd_cmd, 0])) + self.system.i2c_write_motor(bytes([self.motor_rwd_cmd, 0])) + self.speed_pwm = 0 + return + speed_pwm = int(max(-255, min(255, speed_pwm))) + if self.speed_pwm == speed_pwm: + return + self.enc.reset() + if (self.speed_pwm < 0 < speed_pwm) or (self.speed_pwm > 0 > speed_pwm): + # if we are changing the direction, we need to reset the motor first + motor_reset_cmd = (self.motor_rwd_cmd + if speed_pwm >= 0 else self.motor_fwd_cmd) + # print("Changing %s wheel direction" % self.name) + self.system.i2c_write_motor(bytes([motor_reset_cmd, 0])) + motor_set_cmd = self.motor_fwd_cmd if speed_pwm > 0 else self.motor_rwd_cmd + print("Setting %s wheel speed_pwm %d" % (self.name, speed_pwm)) + self.system.i2c_write_motor(bytes([motor_set_cmd, abs(speed_pwm)])) + self.speed_pwm = speed_pwm + + def radsec2pwm(self, radsec): + """Returns the PWM speed for the given rad/s speed. + We use the multiplier and shift values to calculate the PWM speed using formula: + rad_per_sec = pwm * multiplier + shift, for us: pwm = (rad_per_sec - shift) / multiplier.""" + if self.pwm_multiplier == 0: + print("error: wheel %s pwm_multiplier is 0" % self.name) + return 0 + direction = 1 if radsec >= 0 else -1 + return direction * int((abs(radsec) - self.pwm_shift) / self.pwm_multiplier) + + def msec2pwm(self, msec): + """Returns the PWM speed for the given m/s speed.""" + rad_per_sec = self.enc.m2rad(msec) + return self.radsec2pwm(rad_per_sec) + + def stop(self): + """Stops the wheel immediately.""" + self.set_speed_pwm(0) + self.distance_remain_ticks = -1 + self.distance_req_time_us = -1 + self.enc.reset() + + def stop_on_no_work(self): + """Stops the wheel if the remaining distance in ticks or time is reached.""" + stop_due_to_ticks = True + if self.distance_remain_ticks != 0: + stop_due_to_ticks = False + stop_due_to_time = True + if self.distance_req_time_us >= 0: + time_delta = self.system.ticks_diff(self.system.ticks_us(), self.distance_start_time_us) + if time_delta < self.distance_req_time_us: + stop_due_to_time = False + # we stop only if both conditions are met + # otherwise we keep the other condition finish as well + if stop_due_to_ticks and stop_due_to_time: + self.stop() + + def on_tick(self): + """Updates the wheel state based on a new tick, + checks the remaining distance in ticks.""" + if self.distance_remain_ticks > 0: + self.distance_remain_ticks -= 1 + + def update(self): + """Updates the encoder and general wheel state.""" + if self.enc.update() is True: + self.on_tick() + self.stop_on_no_work() diff --git a/final_assignment_1/wheel_driver.py b/final_assignment_1/wheel_driver.py new file mode 100644 index 0000000..70534db --- /dev/null +++ b/final_assignment_1/wheel_driver.py @@ -0,0 +1,66 @@ +from system import System +from wheel import Wheel + + +class WheelDriver: + """Handles the movement of the whole robot + (forward, backward, turning). Activities are either + indefinite or conditional based on ticks, time + or real speed measured by the encoder on wheel level.""" + + def __init__(self, system: System, + left_pwm_min=50, left_pwm_max=255, left_pwm_multiplier=0, left_pwm_shift=0, + right_pwm_min=50, right_pwm_max=255, right_pwm_multiplier=0, right_pwm_shift=0): + """Initializes the wheel driver.""" + self.system = system + self.system.i2c_init_motor() + self.left = Wheel(system=system, name="left", + motor_fwd_cmd=5, motor_rwd_cmd=4, sensor_pin=system.get_encoder_pin_left(), + pwm_min=left_pwm_min, pwm_max=left_pwm_max, + pwm_multiplier=left_pwm_multiplier, pwm_shift=left_pwm_shift) + self.right = Wheel(system=system, name="right", + motor_fwd_cmd=3, motor_rwd_cmd=2, sensor_pin=system.get_encoder_pin_right(), + pwm_min=right_pwm_min, pwm_max=right_pwm_max, + pwm_multiplier=right_pwm_multiplier, pwm_shift=right_pwm_shift) + self.speed_rad = -99 + self.rotation_rad = -99 + self.stop() + + # Please note: normally, we would have aggregate move...() methods here for both wheels, but + # these got removed in favor of smaller code memory footprint + we control each wheel separately anyway. + + def move(self, speed_rad, rotation_rad): + """Moves the robot with specific PWM on each wheel while applying rotational speed in radians. + Positive rotation is clockwise, negative rotation is counterclockwise. + We first need to calculate the speed of each wheel based on the desired speed and rotation.""" + if speed_rad == 0 and rotation_rad == 0: + self.stop() + return + reverse = speed_rad < 0 + if reverse: + rotation_rad = -rotation_rad + speed_rad = abs(speed_rad) + left_speed = speed_rad - self.left.enc.WHEEL_CIRCUMFERENCE_M * rotation_rad + right_speed = speed_rad + self.right.enc.WHEEL_CIRCUMFERENCE_M * rotation_rad + if reverse: + left_speed = -left_speed + right_speed = -right_speed + print("Moving with speed_rad %d, rotation_rad %f, left_pwm %s, right_pwm %s" % (speed_rad, rotation_rad, left_speed, right_speed)) + self.left.move_rad(left_speed) + self.right.move_rad(right_speed) + self.speed_rad = speed_rad + self.rotation_rad = rotation_rad + + def stop(self): + """Stops the robot.""" + if self.speed_rad == 0 and self.rotation_rad == 0: + return + self.left.stop() + self.right.stop() + self.speed_rad = 0 + self.rotation_rad = 0 + + def update(self): + """Updates the wheel driver, propagating the changes to the hardware.""" + self.left.update() + self.right.update() diff --git a/final_assignment_1/wheel_encoder.py b/final_assignment_1/wheel_encoder.py new file mode 100644 index 0000000..1450abc --- /dev/null +++ b/final_assignment_1/wheel_encoder.py @@ -0,0 +1,104 @@ +from system import System + +class WheelEncoder: + """Encoder is able to monitor the wheel movement precisely + and provide the actual wheel rotation speed over the ticks + measured for X consecutive constant time intervals.""" + USE_BOTH_EDGES = False + TICKS_PER_WHEEL = 40 if USE_BOTH_EDGES else 20 + RAD_PER_WHEEL = 2 * 3.14159265359 + RAD_PER_TICK = RAD_PER_WHEEL / TICKS_PER_WHEEL + WHEEL_CIRCUMFERENCE_M = 0.21 + WHEEL_RADIUS_M = WHEEL_CIRCUMFERENCE_M / RAD_PER_WHEEL + WHEEL_CENTER_DISTANCE = 0.075 + M_PER_WHEEL = RAD_PER_WHEEL * WHEEL_RADIUS_M + TICKS_PER_M = TICKS_PER_WHEEL / M_PER_WHEEL + MIN_TICK_TIME_US = 5_000 # minimum possible tick time (switch instability detection under this value) + MAX_TICK_TIME_US = 200_000 # maximum possible tick time (after which we consider speed to be zero) + AVG_TICK_COUNT = 3 + + def __init__(self, system: System, sensor_pin): + """Initializes the wheel encoder.""" + self.system = system + self.sensor_pin = sensor_pin + self.sensor_value = -1 + self.tick_last_time = -1 + self.tick_last_time_avg = -1 + self.update_count = 0 + self.tick_count = 0 + self.speed_radsec = 0 + self.speed_radsec_avg = 0 + self.calc_value = -1 + self.calc_tick = 0 + self.calc_update_count = -1 + + def reset(self): + """Resets the encoder state.""" + self.__init__(self.system, self.sensor_pin) + + def update(self): + """Updates the encoder state based on the ongoing command.""" + """Retrieves the sensor value, checks for change and updates the wheel state + based on the ongoing command.""" + self.update_count += 1 + time_now = self.system.ticks_us() + last_time_diff = self.system.ticks_diff(time_now, self.tick_last_time) + if self.tick_last_time != -1 and last_time_diff < self.MIN_TICK_TIME_US: + return False + sensor_value_now = self.system.pin_read_digital(self.sensor_pin) + if sensor_value_now == self.sensor_value: + if last_time_diff >= self.MAX_TICK_TIME_US: + self.speed_radsec = 0 + return False + self.sensor_value = sensor_value_now + if self.tick_last_time == -1: + self.tick_last_time = time_now + self.tick_last_time_avg = time_now + return False + if sensor_value_now == 1: + if self.USE_BOTH_EDGES: + # compensate for much shorter time when the sensor is down + last_time_diff *= 1.8 + else: + # we count just 1->0 change in this mode (to achieve uniformity between 0 and 1) + return False + self.tick_last_time = time_now + self.speed_radsec = self.RAD_PER_TICK / (last_time_diff / 1_000_000) + + # calculate average speed (simplistic, just accumulate last several ticks once in a while) + self.tick_count += 1 + if self.tick_count < self.AVG_TICK_COUNT: + if self.speed_radsec_avg == 0: + self.speed_radsec_avg = self.speed_radsec + else: + last_time_avg_diff = self.system.ticks_diff(time_now, self.tick_last_time_avg) + self.speed_radsec_avg = self.RAD_PER_TICK * self.AVG_TICK_COUNT / (last_time_avg_diff / 1_000_000) + self.tick_last_time_avg = time_now + self.tick_count = 0 + + self.calc_value = sensor_value_now + self.calc_tick = self.tick_count + self.calc_update_count = self.update_count + if self.speed_radsec > 0 and self.update_count < (4 if self.USE_BOTH_EDGES else 2): + print("Warning: wheel encoder updating slow, %s counts per change" % self.update_count) + self.update_count = 0 + return True + + def speed_msec(self): + """Returns the current wheel speed in m/s.""" + return self.M_PER_WHEEL * self.speed_radsec / self.RAD_PER_WHEEL + + def speed_msec_avg(self): + """Returns the current wheel speed in m/s.""" + return self.M_PER_WHEEL * self.speed_radsec_avg / self.RAD_PER_WHEEL + + def m2rad(self, m): + """Converts meters to radians.""" + return self.RAD_PER_WHEEL * m / self.M_PER_WHEEL + + def rad2m(self, rad): + """Converts radians to meters.""" + return self.M_PER_WHEEL * rad / self.RAD_PER_WHEEL + + def __str__(self): + return "speed: %f rad/s, %f m/s" % (self.speed_radsec, self.speed_msec())