diff --git a/lesson_14/build.gradle b/lesson_14/build.gradle new file mode 100644 index 0000000..995fd50 --- /dev/null +++ b/lesson_14/build.gradle @@ -0,0 +1,366 @@ +/** + * Builds minified merged Python main.py and uploads it to a Micro:Bit or CircuitPython device (Pico:Ed, ...). + * All major OSes are supported, the following python packages need to be installed: + * - for minification - python-minifier (command 'pyminify' on path) + * - for Micro:bit - microfs (command 'ufs' on path) + * - for CircuitPython - circup (command 'circup' on path) - for auto-loading library support (todo) + * + * Tasks: + * clean .. cleans build/ directory + * mini .. minifies all sources into build/mini/ + * buildMini .. builds the merged minified source into build/mini/main.py + * build .. builds the merged (non-minified) source into build/main.py + calls buildMini + * upload .. uploads merged main.py file of choice to the detected platform (see upload options below) + * + * Generic options: + * -Pmain={file.py} .. specifies main.py file (last during merging), optional (autodetected), + * needed just when __main__ block is in multiple source files + * -Phw={file.py} .. specifies hardware-specific classes, default: hw.py | system.py + * -Phwcp={file.py} .. specifies CircuitPython-specific class, default: {hw}_cp.py or {hw}_ped.py + * -Phwmb={file.py} .. specifies Micro:Bit-specific class, default: {hw}_mb.py or {hw}_mbit.py + * -Pinclude={file1,file2,..} .. adds extra files outside of project dir (all *.py files in project are included automatically) + * -Pexclude={file1,file2,..} .. removes files from the source file list + * + * Multiple hardware implementations: + * When working with multiple platforms, -Phw, -Phwcp and -Phwmb properties will enable automatic class replacement + * based on the inserted device. The code is expected to just depend on base class defined in -Phw. During merging + * and minification, the base hardware class is kept with suffix *Base. It is expected the hardware-specific + * classes import and reference it as such in the original code while other classes just reference the original base + * class. The effect is that the specific hardware classes substitute it in the resulting code and none is the wiser. + * This way, the advanced editors will not have issues during programming, while we can have different implementation + * classes on resulting device, a bit like dependency injection. + * + * Upload options: + * -Pmini .. uploads minified version + * -Psudo .. uses supervisor permissions to operate (Unix/MacOS only, sometimes needed to gain port access on Linux for device uploads) + * -Pcp .. CircuitPython (Pico:Ed, ..) - autodetected on Ubuntu, detection left to 'circup' otherwise + * -Pmb .. Micro:Bit - fallback (default) option, uses 'ufs' and its port detection mechanism + * + * CircuitPython device environment variables (when defined, used for calling 'circup'): + * CP_PATH .. path to a mounted device (not needed for Ubuntu, but 'pmount' apt OS package needed if not mounted) + * CP_HOST .. CircuitPython host to contact in the form of (host}:{port}#{password} (*) - (todo) + * (*) Each CP device supports access over Wifi, see https://docs.circuitpython.org/en/latest/docs/workflows.html + * + * Example build commands: + * + * Basic non-minified build and upload if sudo is not needed, all sources are in local dir + there's just one __main__ block: + * ./gradlew clean build upload + * + * Minified build with sudo upload and added source file from other directory: + * ./gradlew -Pmini -Psudo -Pinclude=../cely_projekt/cely_projekt.py clean build upload + */ +import java.nio.file.Files +import java.util.stream.Collectors +import java.util.stream.Stream + +ext { + onWindows = System.getProperty("os.name").toLowerCase().contains("win") + useSudo = project.hasProperty('sudo') + useMini = project.hasProperty('mini') + cpPath = System.getenv('CP_PATH') != null ? System.getenv('CP_PATH') : + (!project.hasProperty('mb') ? detectCircuitPythonPath() : null) + cpHost = System.getenv('CP_HOST') + useCP = !project.hasProperty('mb') && (project.hasProperty('cp') || cpPath != null || cpHost != null) + useMB = project.hasProperty('mb') || !useCP + hwPrefixes = List.of('system', 'hardware', 'hw') + hwMBSuffixes = List.of('_mb', '_mbit') + hwCPSuffixes = List.of('_cp', '_ped') + hwType = useCP ? 'cp' : 'mb' + hwBase = project.hasProperty('hw') ? project.getProperty('hw') : ( + hwPrefixes.stream() + .map { prefix -> new File(projectDir, prefix + '.py') } + .filter { f -> f.exists() } + .findFirst().orElse(null) + ) + hwClasses = hwBase == null ? List.of() : + Stream.concat(hwMBSuffixes.stream(), hwCPSuffixes.stream()) + .map(suffix -> new File(projectDir, hwBase.getName().replaceAll("\\.py\$", suffix + '.py'))) + .filter { f -> f.exists() } + .collect(Collectors.toList()) + hwClass = project.hasProperty('hw' + hwType) ? project.getProperty('hw' + hwType) : ( + hwBase == null ? null : + (useMB ? hwMBSuffixes : hwCPSuffixes).stream() + .map(suffix -> new File(projectDir, hwBase.getName().replaceAll("\\.py\$", suffix + '.py'))) + .filter { f -> f.exists() } + .findFirst().orElse(null) + ) + miniDir = "${buildDir}/mini" + includedFiles = propertyCsvToFileList('include') + excludedFiles = propertyCsvToFileList('exclude') + mainFile = getMainFileName(includedFiles, excludedFiles) + // we need all local *.py sources, but we will need to exclude the indicated ones + // if hardware-specific override is configured, we need to exclude hw classes not matching the correct one + // we also need to exclude main.py (or whatever was specified as main) and put it at the very end explicitly + sourceFiles = Stream.concat( + Stream.concat( + includedFiles.stream(), + Arrays.stream(projectDir.listFiles()) + .filter { f -> f.name.endsWith('.py') } + .sorted { f1, f2 -> f1.name.compareTo(f2.name) } + ) + .filter { f -> !f.equals(mainFile) } + .filter { f -> !excludedFiles.contains(f) } + .filter { f -> hwClass == null || !hwClasses.contains(f) || f.equals(hwClass) }, + Stream.of(mainFile) + ).toList() +} + +if (hwBase != null && hwClass != null) { + logger.lifecycle("Enabling hardware-specific class replacement: suffix *Base added to ${hwBase.getName()}, real hardware in ${hwClass.getName()}") +} + +task clean { + doLast { + logger.lifecycle("Cleaning ..") + delete "${buildDir}" + } +} + +task minify { + doLast { + mkdir miniDir + sourceFiles.each { file -> + // our main.py is a merged file, we have to avoid name clash + def fileOut = file.name.replaceAll('^main\\.py$', 'main_.py') + exec { + workingDir projectDir + commandLine = [ + 'pyminify', file.getPath(), + '--remove-literal-statements', + '--remove-class-attribute-annotations', '--no-rename-locals', '--no-hoist-literals', + '--no-remove-explicit-return-none', '--no-remove-return-annotations', + '--output', "${miniDir}/${fileOut}" + ] + } + } + } +} + +def mergeSource(BufferedWriter writer, File source, Set imports, Map> importsFrom) { + logger.lifecycle("Merging ${source}") + writer.newLine() + writer.newLine() + try (BufferedReader reader = new BufferedReader(new FileReader(source))) { + boolean header = true; + while (reader.ready()) { + String line = reader.readLine() + if (header) { + if (line.startsWith("class") || line.startsWith("def") || line.startsWith("if")) { + header = false + // if we are dealing with a hardware class base, we will be suffixing with Base + // this will avoid conflict with actual inheriting implementation class + if (source.getName().equals(hwBase.getName()) && line.startsWith("class")) { + line = line.replaceAll(":\$", "Base:") + } + } else if (line.startsWith("import")) { + line = line.replaceFirst("import\\s+", "") + imports.addAll(Arrays.asList(line.split(",\\s*"))) + } else if (line.startsWith("from")) { + String importSource = line.replaceAll("from (\\S+)\\s?.*\$", "\$1") + String objects = line.replaceFirst("from\\s+(\\S+)\\s+import\\s+", "") + importsFrom.computeIfAbsent(importSource, value -> new LinkedHashSet<>()) + .addAll(Arrays.asList(objects.split(",\\s*"))) + } + } + if (!header) { + writer.write(line) + writer.newLine() + } + } + } +} + +def mergeAll(File target, List sourceFiles) { + def imports = new LinkedHashSet() + def importsFrom = new LinkedHashMap>() + String body + try (StringWriter bodyWriter = new StringWriter()) { + try (BufferedWriter writer = new BufferedWriter((bodyWriter))) { + sourceFiles.each { source -> mergeSource(writer, source, imports, importsFrom) } + } + body = bodyWriter.toString() + } + + List classSources = sourceFiles.stream() + .map { it.name.replaceAll("\\.py", "").replaceAll(".*/", "") } + .collect(Collectors.toList()) + try (BufferedWriter writer = new BufferedWriter((new FileWriter(target, false)))) { + logger.lifecycle("Merging all into ${target}") + String importsCsv = imports.stream() + .filter { !classSources.contains(it) } + .collect(Collectors.joining(", ")) + if (!importsCsv.trim().isBlank()) { + writer.write("import ${importsCsv}") + writer.newLine() + } + importsFrom.entrySet().forEach { entry -> + if (!classSources.contains(entry.getKey())) { + def objects = String.join(", ", entry.getValue()) + writer.write("from ${entry.getKey()} import ${objects}") + writer.newLine() + } + } + writer.write(body) + } +} + +task buildMini { + dependsOn minify + doLast { + List files = sourceFiles.stream() + .map { file -> + // when we were minifying, we had to minify into an alternate name + // to avoid target file name clash, here we need to use it as a new source + new File(miniDir, file.name.replaceAll('^main\\.py$', 'main_.py')) + } + .toList() + mergeAll(new File(miniDir, "main.py"), files) + } +} + +task build { + dependsOn buildMini + doLast { + mergeAll(new File(buildDir, "main.py"), sourceFiles) + } +} + +task upload { + shouldRunAfter build + doLast { + def uploadFile = new File(useMini ? miniDir : buildDir, "main.py") + if (cpHost != null) { + throw new Exception("CP_HOST environment variable not supported yet") + } + if (!useMB) { + if (cpPath == null) { + cpPath = detectCircuitPythonPath() + } + useCP |= cpPath != null + } + if (useCP) { + if (cpPath == null) { + throw new Exception("CIRCUITPY device not found, please mount it or set CP_PATH environment variable") + } + def cmd = useSudo ? + ['sudo', 'cp', "${uploadFile}", "${cpPath}/code.py"] : + ['cp', "${uploadFile}", "${cpPath}/code.py"] + logger.lifecycle("Copying ${useSudo ? "(using sudo) " : ""}merged ${useMini ? "minified " : ""}${uploadFile} to CircuitPython device path ${cpPath}/code.py") + exec { + workingDir "${projectDir}" + commandLine = cmd + } + if (!onWindows) { + logger.lifecycle("Syncing CircuitPython device ..") + exec { + workingDir "${projectDir}" + commandLine = ['sync'] + } + } + } else if (useMB) { + def cmd = useSudo ? + ['sudo', 'ufs', 'put', "${uploadFile}"] : + ['ufs', 'put', "${uploadFile}"] + logger.lifecycle("Uploading ${useSudo ? "(using sudo) " : ""}merged ${useMini ? "minified " : ""}${uploadFile} to Micro:Bit device") + exec { + workingDir "${projectDir}" + commandLine = cmd + } + } else { + throw new Exception("Unknown platform, please specify -Pcp or -Pmb") + } + } +} + +List propertyCsvToFileList(String property) { + List files = new ArrayList<>() + if (project.hasProperty(property)) { + List patterns = Arrays.asList(project.getProperty(property).split(",")) + files.addAll( + Arrays. stream(projectDir.listFiles()) + .filter { f -> f.name.endsWith('.py') } + .filter { f -> patterns.stream().anyMatch { p -> f.name.matches(p) } } + .toList() + ) + } + return files +} + +/** + * Returns main file name from property (if defined), use main.py (if exists) or scan for content to find __main__. + */ +File getMainFileName(List includedFiles, List excludedFiles) { + if (project.hasProperty('main')) { + def mainFile = new File(projectDir, project.getProperty('main')) + if (!mainFile.exists()) { + throw new Exception("Indicated main file ${mainFile.getPath()} does not exist") + } + // any other main files in the directory are excluded automatically + excludedFiles.addAll( + Arrays. stream(projectDir.listFiles()) + .filter { f -> f.name.endsWith('.py') } + .filter { f -> f.name.startsWith('main_') } + .filter { f -> !f.equals(mainFile) } + .toList() + ) + return mainFile + } else if (new File(projectDir, 'main.py').exists()) { + return new File(projectDir, 'main.py') + } else { + def mainFiles = + Stream.concat( + includedFiles.stream(), + Arrays.stream(projectDir.listFiles()) + ) + .filter { f -> f.name.endsWith('.py') } + .filter { f -> !excludedFiles.contains(f) } + .filter { f -> Files.readAllLines(f.toPath()).contains('if __name__ == "__main__":') } + .toList() + if (mainFiles.isEmpty()) { + throw new Exception("No *.py files found in the project directory looking like a main code block") + } else if (mainFiles.size() > 1) { + excludedFiles.forEach { logger.lifecycle("Excluded: ${it.getAbsolutePath()}") } + logger.lifecycle("Excluded files: ${excludedFiles}") + throw new Exception("Multiple *.py files found in the project directory looking like a main code block: $mainFiles, please specify -Pmain={file}") + } + logger.lifecycle("Detected main code block in file ${mainFiles.getFirst()}") + return mainFiles.getFirst() + } +} + +def getDiskMountPoint(String diskLabel) { + def out = new ByteArrayOutputStream() + exec { + workingDir "${projectDir}" + commandLine = ['df'] + standardOutput = out + } + def line = out.toString().split("\n").stream().filter { it.contains(diskLabel) }.findFirst() + if (line.isEmpty()) { + return null + } + def mountPoint = line.get().replaceAll(".*\\s+", "") + logger.lifecycle("Detected disk ${diskLabel} mounted at ${mountPoint}") + return mountPoint +} + +// for Unix-like systems we can use 'df' to check the possible location of CIRCUITPY +def detectCircuitPythonPath() { + def cpPath = null + if (!onWindows) { + cpPath = getDiskMountPoint('CIRCUITPY') + if (cpPath == null) { + // if we still have nothing we can try to permissive-mount the disk by label (supported by Linux) + File cpDevFile = new File("/dev/disk/by-label/CIRCUITPY") + if (cpDevFile.exists()) { + logger.lifecycle("Detected CircuitPython device ${cpDevFile.getCanonicalPath()}, permissive-mounting ..") + exec { + workingDir "${projectDir}" + commandLine = ['pmount', cpDevFile.getCanonicalPath(), 'CIRCUITPY'] + } + cpPath = getDiskMountPoint('CIRCUITPY') + } + } + } + return cpPath +} diff --git a/lesson_14/build_line_navigator b/lesson_14/build_line_navigator new file mode 100755 index 0000000..e5ccf4f --- /dev/null +++ b/lesson_14/build_line_navigator @@ -0,0 +1,2 @@ +#!/bin/bash +./gradlew -Pmain=main_line_navigator.py -Pexclude=light.*,sonar.*,wheel_calibrator.* clean build upload diff --git a/lesson_14/gradle/wrapper/gradle-wrapper.jar b/lesson_14/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..a4b76b9 Binary files /dev/null and b/lesson_14/gradle/wrapper/gradle-wrapper.jar differ diff --git a/lesson_14/gradle/wrapper/gradle-wrapper.properties b/lesson_14/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..1ed247e --- /dev/null +++ b/lesson_14/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.1-all.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/lesson_14/gradlew b/lesson_14/gradlew new file mode 100755 index 0000000..f5feea6 --- /dev/null +++ b/lesson_14/gradlew @@ -0,0 +1,252 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s +' "$PWD" ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/lesson_14/gradlew.bat b/lesson_14/gradlew.bat new file mode 100644 index 0000000..9b42019 --- /dev/null +++ b/lesson_14/gradlew.bat @@ -0,0 +1,94 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/lesson_14/main_line_navigator.py b/lesson_14/main_line_navigator.py new file mode 100644 index 0000000..f5f67e4 --- /dev/null +++ b/lesson_14/main_line_navigator.py @@ -0,0 +1,60 @@ +from state import Behavior, Ctx +from state_map import StateMap +from system import System +from wheel_driver import WheelDriver + +if __name__ == "__main__": + # Navigates across lines, turns at sharp turns and stops at intersections and asks for user input where to go + system = System() + wheels = WheelDriver( + system=system, + left_pwm_min=60, left_pwm_multiplier=0.09, left_pwm_shift=-2.5, + right_pwm_min=60, right_pwm_multiplier=0.09, right_pwm_shift=-2.5 + ) + + # see Behavior class for the robot behavior definition and parameter characteristics + behavior = Behavior( + fwd_speed=2, + side_speed_dec=2, side_speed_min=1, + side_arc_min=1, side_arc_inc=3, side_arc_max=6, + # 25ms per cycle x 75 = 1.875s outside the valid line action rules (i.e., searching for line) + line_cycle_tolerance=75, + turn_speed=0.2, turn_arc_speed=8, + # 25ms per cycle x 100 = 2.5s outside the valid turn action rules (i.e., searching for 90° turned line) + turn_cycle_tolerance=100, + fast_sensor_change_dropped_below_cycle_count=4, + cycle_duration_us=25_000 + ) + state_map = StateMap(behavior=behavior, stop_on_non_line_sensors=False, turns=True, intersections=True) + ctx = Ctx(system=system, wheels=wheels, behavior=behavior, + states=state_map.states, transitions=state_map.transitions, state_key='START') + + try: + state_cycle_start = system.ticks_us() + while not system.is_button_a_pressed(): + wheels.update() + # reads the sensors and builds their history + ctx.update_sensors() + + time_now = system.ticks_us() + if system.ticks_diff(time_now, state_cycle_start) > ctx.behavior.cycle_duration_us: + state_cycle_start = time_now + + # finalize context before evaluation + # (costly operations are done here like sensor history extension with current sensor) + ctx.before_evaluation() + + # if our context situation matches one of the states, we switch to that state + # (this is typically based on sensor history match, but other matchers are possible) + ctx.switch_to_state_matching_ctx_situation() + + # while being in the state we update it + ctx.state.on_update(ctx) + + finally: + try: + ctx.state.on_exit(ctx) + finally: + wheels.stop() + system.display_off() + print("Finished") diff --git a/lesson_14/state.py b/lesson_14/state.py new file mode 100644 index 0000000..95a1c54 --- /dev/null +++ b/lesson_14/state.py @@ -0,0 +1,307 @@ +from system import System +from wheel_driver import WheelDriver + + +class Behavior: + """The behavior of the robot.""" + + def __init__(self, + # base forward speed (rad) + fwd_speed: float, + # how much to decrement each cycle when on side sensor (too low = lazy reaction) + side_speed_dec: float, + # the minimum rotation speed to maintain to not go too low + # and unnecessarily slow down turning (it turns no matter what as long as rotation speeds are correct) + side_speed_min: float, + # starting arc speed (rotation) when side sensor picks up the line instead of center one + side_arc_min: float, + # how fast we'll be increasing arc speed each cycle we are out of center + # this continues even if we are out of side sensor due to tolerance cycles (see below) + side_arc_inc: float, + # maximum arc speed we can perform (to not get too crazy and take our time when turning) + side_arc_max: float, + # tolerance before declaring we're out of line (time-dependent) + # (if turning too slow, we might not catch the line again if too low) + line_cycle_tolerance: int, + # base turn speed (rad) + turn_speed: float, + # base turn arc speed (rad) + turn_arc_speed: float, + # tolerance before declaring we're out of turn (time-dependent) + # (if turning too slow, we might not catch the line again if too low) + turn_cycle_tolerance: int, + # we disregard sensor transitions which last very short time + fast_sensor_change_dropped_below_cycle_count: int, + # how long the cycle is (in microseconds) + cycle_duration_us: int + ): + self.fwd_speed = fwd_speed + self.side_speed_dec = side_speed_dec + self.side_speed_min = side_speed_min + self.side_arc_min = side_arc_min + self.side_arc_inc = side_arc_inc + self.side_arc_max = side_arc_max + self.line_cycle_tolerance = line_cycle_tolerance + self.turn_speed = turn_speed + self.turn_arc_speed = turn_arc_speed + self.turn_cycle_tolerance = turn_cycle_tolerance + self.fast_sensor_change_dropped_below_cycle_count = fast_sensor_change_dropped_below_cycle_count + self.cycle_duration_us = cycle_duration_us + + +class SensorHasCount: + """A sensor and how many times it needs to match.""" + + def __init__(self, sensor, min_count, max_count=None, optional=False): + self.sensor = sensor + self.min_count = min_count + self.max_count = max_count + self.optional = optional + + def matches(self, sensor, count): + if sensor != self.sensor: + return False + if self.max_count is not None: + return self.min_count <= count <= self.max_count + return count >= self.min_count + + # let's print binary representation of the sensor and how many times it needs to match + def __str__(self): + return f"{self.sensor:0{5}b} ({self.min_count}x)" + + +class Ctx: + """Carries the current operating context of the robot.""" + + def __init__(self, system: System, wheels: WheelDriver, behavior: Behavior, + states: dict[str, 'State'], transitions: dict[str, list[str]], state_key: str): + self.system = system + self.wheels = wheels + self.behavior = behavior + self.states = states + self.transitions = transitions + self.state_key = state_key + self.state = states[state_key] + self.sensor = None + self.sensor_count = 0 + self.sensor_last = None + self.state_action_cycle = 0 + self.state_out_of_bounds_cycle = 0 + # history of sensor changes: [(lcr, count), ...] + # we keep track of the last several sensor changes and use that to determine situation underneath us + # this feeds to a sudden main state change if we detect different behavior than expected + # each state has the ability to override other states if it thinks it should rule the car + self.sensor_history = [] + # the history with extra element pertaining the current sensor and cycle count (updated just before state eval) + self.sensor_history_with_current = [] + # Our history needs to be able to house at least 4 transitions + # (going to intersection might be preceded by a single sensor if the car is going sideways) + self.sensor_history_length = 5 + # carries max speed for each wheel (for display purposes) + # will be updated on forward to correct values + self.fwd_speed_pwm_left_max = 255 + self.fwd_speed_pwm_right_max = 255 + # initialize the state + self.state.on_enter(self) + self.state.set_default_action(self) + + def update_sensors(self): + """Updates sensor readings and counts.""" + self.sensor_last = self.sensor + li, ri, ll, lc, lr = self.system.get_sensors() + self.sensor = li << 4 | ri << 3 | ll << 2 | lc << 1 | lr + if self.sensor != self.sensor_last: + if self.sensor_count > 0: + # we eliminate fluke transitions (short ones) from the history + if self.sensor_count >= self.behavior.fast_sensor_change_dropped_below_cycle_count: + # print(f"Last sensor into history: {self.sensor_last:05b} ({self.sensor_count}x) -> change to {self.sensor:05b}") + self.sensor_history.append((self.sensor_last, self.sensor_count)) + if len(self.sensor_history) >= self.sensor_history_length: + self.sensor_history.pop(0) + else: + print(f"Fluke transition eliminated: {self.sensor_last:05b} ({self.sensor_count}x)") + self.system.display_sensors(li, ri, ll, lc, lr) + self.sensor_count = 1 + else: + self.sensor_count += 1 + + def transition_to_state(self, state_key): + """Transitions to state, a one-liner helper for main code.""" + if state_key != self.state_key: + action_now = self.state.action + state_new = self.states[state_key] + state_new.set_default_action(ctx=self) + action_new = state_new.action + self.state.on_exit(ctx=self) + print("Transitioning: state %s (action %s) -> state %s (action %s)" + % (self.state, action_now, state_new, action_new)) + self.system.display_drive_mode(action_new.symbol) + self.state = state_new + self.state_key = state_key + # reset the sensor history to start fresh within the state and not bring historical baggage + self.sensor_history = [] + self.state.on_enter(ctx=self) + + def switch_to_state_matching_ctx_situation(self): + """Switches the state matching the sensor history + current sensor state (car behavior in the recent past).""" + state_transitions = self.transitions.get(self.state_key) + if state_transitions is None: + print("ERROR: No state transitions for state %s" % self.state_key) + return + for state_key in state_transitions: + state = self.states[state_key] + if state == self.state or state.matchers is None: + continue + for matcher in state.matchers: + if matcher.matches(self): + print("Switching to state %s (%s) due to match of state matcher, sensor history %s" + % (state_key, state, Ctx._str_history(self.sensor_history_with_current))) + self.transition_to_state(state_key) + return + + def add_sensor_to_history(self, sensor): + if len(self.sensor_history) >= self.sensor_history_length: + self.sensor_history.pop(0) + self.sensor_history.append(sensor) + + def before_evaluation(self): + """Called when the context changes are finished, before all evaluations.""" + # we need to update the sensor history with the current sensor and count + self.sensor_history_with_current = self.sensor_history.copy() + self.sensor_history_with_current.append((self.sensor, self.sensor_count)) + + @staticmethod + def _str_history(sensor_history): + """Converts the sensor history to a string.""" + return ", ".join([str(SensorHasCount(s, c)) for s, c in sensor_history]) + + +class Action: + def __init__(self, symbol: str): + self.name = type(self).__name__.replace('Action', '') + self.symbol = symbol + + def __str__(self): + return self.name + + def matches(self, ctx: Ctx) -> bool: + """Returns True if the action matches the context. Used for intra-state transitions.""" + return False + + def on_enter(self, ctx: Ctx): + """Called when the action is entered. Can return new action or indicate state switch.""" + ctx.system.display_drive_mode(self.symbol) + + def on_update(self, ctx: Ctx): + """Called when the action is updated. Can return new action or indicate state switch.""" + pass + + def on_exit(self, ctx: Ctx): + """Called when the action is exited. Can return new action or indicate state switch.""" + pass + + +class SensorMatchingAction(Action): + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol) + self.matching_sensor = matching_sensor + + def matches(self, ctx: Ctx) -> bool: + return ctx.sensor == self.matching_sensor + + +class StateMatcher: + def __init__(self): + self.name = type(self).__name__.replace('StateMatcher', '') + + def __str__(self): + return self.name + + def matches(self, ctx: Ctx) -> bool: + """Returns True if the state matches the current context situation.""" + pass + + +class SensorHistoryStateMatcher(StateMatcher): + def __init__(self, steps: list[SensorHasCount]): + super().__init__() + self.steps = steps + + def __str__(self): + return f"{self.name}(steps=[{', '.join([str(step) for step in self.steps])}])" + + def matches(self, ctx: Ctx): + # we go through sensor history from the end to the beginning + # (we are interested in the last steps, not the first ones) + # we match the sensor history to all steps (in reverse), disregarding optional steps if missing + sensor_history_view = ctx.sensor_history_with_current + step_index = 0 + history_index = len(sensor_history_view) - 1 + + while step_index < len(self.steps) and history_index >= 0: + step = self.steps[step_index] + sensor, count = sensor_history_view[history_index] + if step.matches(sensor, count): + step_index += 1 + history_index -= 1 + elif step.optional: + step_index += 1 + else: + return False + + # Ensure all non-optional steps are matched + while step_index < len(self.steps): + if not self.steps[step_index].optional: + return False + step_index += 1 + + return True + +class State: + def __init__(self, symbol: str, actions: list[Action], matchers: list[StateMatcher] = None): + # Our name is the class name without the State suffix + self.name = type(self).__name__.replace('State', '') + self.symbol = symbol + self.matchers = matchers + self.actions = actions + self.action = None + + def __str__(self): + return self.name + + def str_full(self): + matchers = ', '.join([str(matcher) for matcher in self.matchers]) if self.matchers else 'None' + actions = ', '.join([str(action) for action in self.actions]) + return f"{self.name}(matchers=[{matchers}], actions=[{actions}])" + + def on_enter(self, ctx: Ctx): + """Called when the state is entered.""" + print("Entering state %s" % self) + ctx.system.display_drive_mode(self.symbol) + if self.action is not None: + self.action.on_enter(ctx) + + def on_update(self, ctx: Ctx): + """Called when the state is updated.""" + if self.action is not None: + self.action.on_update(ctx) + + def on_exit(self, ctx: Ctx): + """Called when the state is exited.""" + if self.action is not None: + self.action.on_exit(ctx) + + def set_default_action(self, ctx: Ctx): + self.action = self.actions[0] + ctx.system.display_drive_mode(self.action.symbol) + self.action.on_enter(ctx) + + +class NoOpState(State): + def __init__(self, symbol: str, matchers: list[StateMatcher] = None): + super().__init__(symbol=symbol, actions=[], matchers=matchers) + + def on_enter(self, ctx: Ctx): + print("NoOpState entered - stopping the robot") + ctx.wheels.stop() + super().on_enter(ctx=ctx) diff --git a/lesson_14/state_generic.py b/lesson_14/state_generic.py new file mode 100644 index 0000000..5c85698 --- /dev/null +++ b/lesson_14/state_generic.py @@ -0,0 +1,54 @@ +from state import Ctx, Action, State + + +class GenericAction(Action): + def __init__(self, symbol): + super().__init__(symbol) + + +class StartAction(GenericAction): + """Starts the robot. An action used by start state to wait for a button to be pressed to start moving. + The action also stops the robot to allow comfortable pressing of the button.""" + + def __init__(self): + super().__init__(symbol='s') + + def on_enter(self, ctx: Ctx): + ctx.system.display_on() + ctx.wheels.stop() + ctx.system.display_speed(0, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(0, ctx.fwd_speed_pwm_right_max, left=False) + + def on_update(self, ctx: Ctx): + if ctx.system.is_button_b_pressed(): + print("B pressed, starting") + ctx.transition_to_state("LINE") + + +class StopAction(GenericAction): + """Stops the robot. No further action will be taken.""" + + def __init__(self): + super().__init__(symbol='.') + + def on_enter(self, ctx: Ctx): + ctx.wheels.stop() + + def on_update(self, ctx: Ctx): + if ctx.system.is_button_b_pressed(): + ctx.transition_to_state("START") + + +class StartState(State): + def __init__(self, symbol: str, matchers=None): + super().__init__(symbol=symbol, actions=[StartAction()], matchers=matchers) + + +class StopState(State): + def __init__(self, symbol: str, matchers=None): + super().__init__(symbol=symbol, actions=[StopAction()], matchers=matchers) + + +class ErrorState(State): + def __init__(self, symbol: str, matchers=None): + super().__init__(symbol=symbol, actions=[StopAction()], matchers=matchers) diff --git a/lesson_14/state_intersection.py b/lesson_14/state_intersection.py new file mode 100644 index 0000000..b4c7d2e --- /dev/null +++ b/lesson_14/state_intersection.py @@ -0,0 +1,93 @@ +from state import Ctx, Action, State, SensorHistoryStateMatcher + + +class Choice: + def __init__(self, symbol: str, target_state: str): + self.symbol = symbol + self.target_state = target_state + + +class InteractiveIntersectAction(Action): + def __init__(self, symbol: str, choices: list[Choice]): + super().__init__(symbol) + self.choices = choices + self.choice_idx = -1 + self.button_press_count = 0 + + def on_enter(self, ctx: Ctx): + ctx.wheels.stop() + ctx.system.display_speed(0, ctx.behavior.fwd_speed, left=True) + ctx.system.display_speed(0, ctx.behavior.fwd_speed, left=False) + ctx.system.display_drive_mode(self.symbol) + self.choice_idx = -1 + self.button_press_count = 0 + + def on_update(self, ctx: Ctx): + # we will be switching to the next choice after button press ends + if ctx.system.is_button_b_pressed(): + self.button_press_count += 1 + elif self.button_press_count > 0: + # we want at least half a second for long press + min_cycle_count_for_long_press = 500_000 // ctx.behavior.cycle_duration_us + if self.button_press_count >= min_cycle_count_for_long_press: + ctx.system.display_choice(' ') + ctx.transition_to_state(self.choices[self.choice_idx].target_state) + else: + self.choice_idx += 1 + if self.choice_idx >= len(self.choices): + self.choice_idx = 0 + ctx.system.display_choice(self.choices[self.choice_idx].symbol) + self.button_press_count = 0 + + +class InteractiveIntersectState(State): + """User-interactive intersection state. + The state stops the car, displays the intersection type and asks the user for the choice. + User can cycle through the options (an arrow will be displayed in that direction), + then confirms the choice with long-pressing 'B' button.""" + + +class IntersectXState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='<', target_state='TURN_L_OFF_LINE'), + Choice(symbol='^', target_state='LINE'), + Choice(symbol='>', target_state='TURN_R_OFF_LINE'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class IntersectYState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='<', target_state='TURN_L_TO_LINE'), + Choice(symbol='>', target_state='TURN_R_TO_LINE'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class IntersectLState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='<', target_state='TURN_L_OFF_LINE'), + Choice(symbol='^', target_state='LINE'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class IntersectRState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='^', target_state='LINE'), + Choice(symbol='>', target_state='TURN_R_OFF_LINE'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class IntersectTState(State): + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [InteractiveIntersectAction(symbol=symbol, choices=[ + Choice(symbol='<', target_state='TURN_L'), + Choice(symbol='>', target_state='TURN_R'), + ])] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) diff --git a/lesson_14/state_line.py b/lesson_14/state_line.py new file mode 100644 index 0000000..01e7305 --- /dev/null +++ b/lesson_14/state_line.py @@ -0,0 +1,112 @@ +from state import Ctx, SensorMatchingAction, State + + +class LineAction(SensorMatchingAction): + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor) + + +class FwdLineAction(LineAction): + """Moves the robot forward.""" + + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor) + + def on_enter(self, ctx: Ctx): + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.fwd_speed, rotation_rad=0) + ctx.fwd_speed_pwm_left = ctx.wheels.left.speed_pwm + ctx.fwd_speed_pwm_right = ctx.wheels.right.speed_pwm + ctx.system.display_speed(ctx.behavior.fwd_speed, ctx.behavior.fwd_speed, left=True) + ctx.system.display_speed(ctx.behavior.fwd_speed, ctx.behavior.fwd_speed, left=False) + ctx.state_action_cycle = 0 + + def on_update(self, ctx: Ctx): + """When going just forward, we don't do any recalculations.""" + ctx.state_action_cycle += 1 + + +class SideFwdLineAction(LineAction): + """Moves the robot forward with slight turn to the side on which we captured the non-center sensor.""" + + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor) + self.direction = direction + + def on_enter(self, ctx: Ctx): + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.fwd_speed, rotation_rad=-ctx.behavior.side_arc_max) + ctx.fwd_speed_pwm_left = ctx.wheels.left.speed_pwm + ctx.fwd_speed_pwm_right = ctx.wheels.right.speed_pwm + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.behavior.fwd_speed, left=True) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.behavior.fwd_speed, left=False) + ctx.state_action_cycle = 0 + + def on_update(self, ctx: Ctx): + """When side-stepping line while going forward, we are slowing down progressively while increasing arc speed.""" + ctx.state_action_cycle += 1 + rotation_rad = ctx.behavior.side_arc_min + ctx.behavior.side_arc_inc * ctx.state_action_cycle + rotation_rad = min(rotation_rad, ctx.behavior.side_arc_max) + align_speed = ctx.behavior.fwd_speed - ctx.state_action_cycle * ctx.behavior.side_speed_dec + align_speed = max(align_speed, ctx.behavior.side_speed_min) + ctx.wheels.move(speed_rad=align_speed, rotation_rad=rotation_rad * self.direction) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(ctx.wheels.right.speed_pwm, ctx.fwd_speed_pwm_right_max, left=False) + print( + "%s, rotation_rad %d, init %s + inc_per_cycle %s * cycle %s" % + (self, rotation_rad, ctx.behavior.side_arc_min, ctx.behavior.side_arc_inc, ctx.state_action_cycle)) + + +class LeftFwdLineAction(SideFwdLineAction): + """Moves the robot forward with slight turn to the left when left sensor is captured.""" + + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=1) + + +class RightFwdLineAction(SideFwdLineAction): + """Moves the robot forward with slight turn to the right when right sensor is captured.""" + + def __init__(self, symbol: str, matching_sensor: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=-1) + + +class LineState(State): + def __init__(self, symbol: str, matchers=None): + actions: list[LineAction] = [ + FwdLineAction(symbol='|', matching_sensor=0b010), + LeftFwdLineAction(symbol='\\', matching_sensor=0b100), + RightFwdLineAction(symbol='/', matching_sensor=0b001) + ] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + self.actions = actions # type-cast for IDE support + + def transition_action(self, ctx: Ctx): + """Transitions from action to action within the state based on the line sensor readings.""" + # print("Trans state %s action %s, %s" % (state, action_now, bin(lcr))) + for action in self.actions: + if action.matching_sensor == ctx.sensor: + print("Transitioning state %s action %s to %s" % (self, self.action, action)) + self.action.on_exit(ctx) + self.action = action + action.on_enter(ctx) + return True + return False + + def on_update(self, ctx: Ctx): + """Updates the current state.""" + if ctx.sensor != self.action.matching_sensor: + # print(f"Transitioning Line: sensor={ctx.sensor:05b} no longer matches while_sensor={self.action.matching_sensor:05b} (current state {self} action {self.action})") + ctx.state_out_of_bounds_cycle += 1 + if self.transition_action(ctx): + ctx.state_out_of_bounds_cycle = 0 + else: + if ctx.state_out_of_bounds_cycle > ctx.behavior.line_cycle_tolerance: + print("Line cycle tolerance exceeded, switching to error state") + ctx.transition_to_state("STOP") + return + else: + if ctx.state_out_of_bounds_cycle > 0: + ctx.state_out_of_bounds_cycle = 0 + print("Back in state") + self.action.on_update(ctx) diff --git a/lesson_14/state_map.py b/lesson_14/state_map.py new file mode 100644 index 0000000..a16ac61 --- /dev/null +++ b/lesson_14/state_map.py @@ -0,0 +1,165 @@ +from state import Behavior, SensorHistoryStateMatcher, SensorHasCount +from state_generic import StartState, StopState, ErrorState +from state_intersection import IntersectXState, IntersectYState, IntersectTState, IntersectLState, IntersectRState +from state_line import LineState +from state_turn import LeftTurnState, RightTurnState, OffLineLeftTurnState, OffLineRightTurnState +from state_turn import ToLineLeftTurnState, ToLineRightTurnState + + +class StateMap: + """States of the robot. + Each state is defined declaratively, indicating: + - sensor history matchers for entering the state + - supported transitions to other states + """ + + def __init__(self, behavior: Behavior, stop_on_non_line_sensors=False, turns=False, intersections=False): + # if we want basic scenario, we will transition to stop on any non-line sensor change + stop_matchers = None if not stop_on_non_line_sensors else [ + SensorHistoryStateMatcher(steps=[SensorHasCount(sensor=0b111, min_count=5)]), + SensorHistoryStateMatcher(steps=[SensorHasCount(sensor=0b011, min_count=5)]), + SensorHistoryStateMatcher(steps=[SensorHasCount(sensor=0b110, min_count=5)]), + ] + self.states = { + # Generic states + 'START': StartState(symbol='s', matchers=[ + # move to start when line is detected after border states (i.e., after stop) + SensorHistoryStateMatcher(steps=[SensorHasCount(sensor=0b010, min_count=10)]), + ]), + 'STOP': StopState(symbol='.', matchers=stop_matchers), + 'ERROR': ErrorState(symbol='x'), + + # Follows the line (no declarative matchers enabled, we transition in and out using advanced conditions) + 'LINE': LineState(symbol='|'), + } + line_transitions = ['STOP'] + self.transitions = { + 'START': ['LINE', 'STOP'], + 'LINE': line_transitions, + 'STOP': ['START'], + } + + if turns: + self.states.update({ + # detects a turn to the left + 'TURN_L': LeftTurnState( + symbol='TL', matchers=[ + # we are detecting disappearing line while last match shows it turning to the left + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b000, min_count=20), + SensorHasCount(sensor=0b110, min_count=4), + SensorHasCount(sensor=0b010, min_count=10) + ]) + ] + ), + # detects a turn to the right + 'TURN_R': RightTurnState( + symbol='TR', matchers=[ + # we are detecting disappearing line while last match shows it turning to the right + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b000, min_count=20), + SensorHasCount(sensor=0b011, min_count=4), + SensorHasCount(sensor=0b010, min_count=10) + ]) + ] + ), + }) + line_transitions.extend(['TURN_L', 'TURN_R']) + self.transitions.update({ + 'TURN_L': ['STOP'], + 'TURN_R': ['STOP'], + }) + + if intersections: + vertical_min_count = 10 + horizontal_min_count = behavior.fast_sensor_change_dropped_below_cycle_count + self.states.update({ + # detects a full intersection (+) + 'INTERSECT_X': IntersectXState( + symbol='I+', matchers=[ + # we will be detecting normal line, then a full intersection, then normal line again + # we also need to account for the fact that we might be slightly off the line + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b111, min_count=horizontal_min_count), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b010, min_count=vertical_min_count) + ]) + ] + ), + # detects an intersection slight to the left and right, not forward (i.e., 'Y') + 'INTERSECT_Y': IntersectYState( + symbol='IY', matchers=[ + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b101, min_count=vertical_min_count), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b010, min_count=horizontal_min_count), + ]) + ] + ), + # detects an intersection to the left and right, not forward (i.e., 'T') + 'INTERSECT_T': IntersectTState( + symbol='IT', matchers=[ + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b000, min_count=vertical_min_count), + SensorHasCount(sensor=0b111, min_count=horizontal_min_count), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count, optional=True), + SensorHasCount(sensor=0b010, min_count=vertical_min_count) + ]) + ] + ), + # detects an intersection to the left + 'INTERSECT_L': IntersectLState( + symbol='IL', matchers=[ + # we are detecting a blip on the right sensor, it has to last for some time (speed-dependent) + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + SensorHasCount(sensor=0b110, min_count=horizontal_min_count), + SensorHasCount(sensor=0b010, min_count=vertical_min_count) + ]) + ] + ), + # detects an intersection to the right + 'INTERSECT_R': IntersectRState( + symbol='IR', matchers=[ + # we are detecting a blip on the right sensor, it has to last for some time (speed-dependent) + SensorHistoryStateMatcher(steps=[ + SensorHasCount(sensor=0b010, min_count=vertical_min_count), + SensorHasCount(sensor=0b011, min_count=horizontal_min_count), + SensorHasCount(sensor=0b010, min_count=vertical_min_count) + ]) + ] + ), + # extra turn operations needed to go off the intersection in the right sensor order + 'TURN_L_OFF_LINE': OffLineLeftTurnState(symbol='TL'), + 'TURN_R_OFF_LINE': OffLineRightTurnState(symbol='TR'), + 'TURN_L_TO_LINE': ToLineLeftTurnState(symbol='TL'), + 'TURN_R_TO_LINE': ToLineRightTurnState(symbol='TR'), + }) + line_transitions.extend(['INTERSECT_X', 'INTERSECT_Y', 'INTERSECT_T', 'INTERSECT_L', 'INTERSECT_R']) + self.transitions.update({ + 'INTERSECT_X': ['STOP'], + 'INTERSECT_Y': ['STOP'], + 'INTERSECT_R': ['STOP'], + 'INTERSECT_L': ['STOP'], + 'INTERSECT_T': ['STOP'], + 'TURN_L_OFF_LINE': ['STOP'], + 'TURN_R_OFF_LINE': ['STOP'], + 'TURN_L_TO_LINE': ['STOP'], + 'TURN_R_TO_LINE': ['STOP'], + }) + + print("Working with states:") + for state in self.states.values(): + print("* " + state.str_full()) + print("Enabled implicit state-to-state transitions:") + for state, transitions in self.transitions.items(): + print(f"* {state} -> {transitions}") + + def __str__(self): + return "StateMap(%s)" % self.states diff --git a/lesson_14/state_turn.py b/lesson_14/state_turn.py new file mode 100644 index 0000000..586b8ce --- /dev/null +++ b/lesson_14/state_turn.py @@ -0,0 +1,144 @@ +from state import Ctx, SensorMatchingAction, State, SensorHistoryStateMatcher + + +class TurnAction(SensorMatchingAction): + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor) + self.direction = direction + + def on_enter(self, ctx: Ctx): + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.turn_speed, rotation_rad=ctx.behavior.turn_arc_speed * self.direction) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(ctx.wheels.right.speed_pwm, ctx.fwd_speed_pwm_right_max, left=False) + ctx.state_action_cycle = 0 + + def on_update(self, ctx: Ctx): + ctx.state_action_cycle += 1 + + +class SideSeekTurnAction(TurnAction): + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=direction) + + def on_enter(self, ctx: Ctx): + print("Turning to catch side line..") + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.turn_speed, rotation_rad=ctx.behavior.turn_arc_speed * self.direction) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(ctx.wheels.right.speed_pwm, ctx.fwd_speed_pwm_right_max, left=False) + ctx.state_action_cycle = 0 + + +class CenterSeekTurnAction(TurnAction): + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=direction) + + def on_enter(self, ctx: Ctx): + print("Turning to catch center line..") + super().on_enter(ctx) + ctx.transition_to_state('LINE') + + +class NoCenterSeekTurnAction(TurnAction): + def __init__(self, symbol: str, matching_sensor: int, direction: int): + super().__init__(symbol=symbol, matching_sensor=matching_sensor, direction=direction) + + def on_enter(self, ctx: Ctx): + print("Turning off center line..") + super().on_enter(ctx) + ctx.wheels.move(speed_rad=ctx.behavior.turn_speed, rotation_rad=ctx.behavior.turn_arc_speed * self.direction) + ctx.system.display_speed(ctx.wheels.left.speed_pwm, ctx.fwd_speed_pwm_left_max, left=True) + ctx.system.display_speed(ctx.wheels.right.speed_pwm, ctx.fwd_speed_pwm_right_max, left=False) + ctx.state_action_cycle = 0 + + +class TurnState(State): + def __init__(self, symbol: str, actions: list[TurnAction], matchers: list[SensorHistoryStateMatcher]): + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + self.actions = actions # type-cast for IDE support + + def set_default_action(self, ctx: Ctx): + self.action = self.actions[0] + self.action.on_enter(ctx) + + def on_update(self, ctx: Ctx): + """Updates the current state: we just go through both actions we have and wait until we match the sensors.""" + if ctx.sensor != self.action.matching_sensor: + print( + f"Turning and seeking sensor {self.action.matching_sensor:05b}, current sensor={ctx.sensor:05b} (current state {self} action {self.action})") + ctx.state_out_of_bounds_cycle += 1 + if ctx.state_out_of_bounds_cycle > ctx.behavior.turn_cycle_tolerance: + print("Turn cycle tolerance exceeded, switching to error state") + ctx.transition_to_state('STOP') + return + else: + ctx.state_out_of_bounds_cycle = 0 + if self.action == self.actions[0]: + print("Transitioning state %s action %s to %s" % (self, self.action, self.actions[1])) + self.action.on_exit(ctx) + self.action = self.actions[1] + self.action.on_enter(ctx) + else: + print("Finished turning") + ctx.transition_to_state('LINE') + self.action.on_update(ctx) + + +class LeftTurnState(TurnState): + """Turns the car to the left from an empty space, first catching the left side sensor, then the center one.""" + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [ + SideSeekTurnAction(symbol=symbol, matching_sensor=0b100, direction=1), + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class RightTurnState(TurnState): + """Turns the car to the right from an empty space, first catching the right side sensor, then the center one.""" + def __init__(self, symbol: str, matchers: list[SensorHistoryStateMatcher]): + actions = [ + SideSeekTurnAction(symbol=symbol, matching_sensor=0b001, direction=-1), + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=-1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=matchers) + + +class OffLineLeftTurnState(TurnState): + """Turns the car to the left from a center line, first waiting for an empty space on all sensors, then left side sensor, then the center one.""" + def __init__(self, symbol: str): + actions = [ + NoCenterSeekTurnAction(symbol=symbol, matching_sensor=0b000, direction=1), + SideSeekTurnAction(symbol=symbol, matching_sensor=0b100, direction=1), + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=[]) + + +class OffLineRightTurnState(TurnState): + """Turns the car to the right from a center line, first waiting for an empty space on all sensors, then right side sensor, then the center one.""" + def __init__(self, symbol: str): + actions = [ + NoCenterSeekTurnAction(symbol=symbol, matching_sensor=0b000, direction=-1), + SideSeekTurnAction(symbol=symbol, matching_sensor=0b001, direction=-1), + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=-1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=[]) + +class ToLineLeftTurnState(TurnState): + """Turns the car to the left just waiting for the center sensor to catch the line (disregarding side sensor).""" + def __init__(self, symbol: str): + actions = [ + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=[]) + + +class ToLineRightTurnState(TurnState): + """Turns the car to the right just waiting for the center sensor to catch the line (disregarding side sensor).""" + def __init__(self, symbol: str): + actions = [ + CenterSeekTurnAction(symbol=symbol, matching_sensor=0b010, direction=-1) + ] + super().__init__(symbol=symbol, actions=actions, matchers=[]) diff --git a/lesson_14/system.py b/lesson_14/system.py new file mode 100644 index 0000000..b308ae0 --- /dev/null +++ b/lesson_14/system.py @@ -0,0 +1,177 @@ +class System: + """System class for the robot core system interface, + with platform-specific implementations separate classes.""" + SYS_MBIT = "Micro:Bit" + SYS_PICO = "Pico:Ed" + + I2C_FREQ = 100_000 + I2C_SENSOR_DEVICE = 0x38 + I2C_MOTOR_DEVICE = 0x70 + + MASK_LINE_LEFT = 0x04 + MASK_LINE_CENTER = 0x08 + MASK_LINE_RIGHT = 0x10 + MASK_IR_LEFT = 0x20 + MASK_IR_RIGHT = 0x40 + + SOUND_SPEED = 343 # m/s + + def __init__(self): + """Initializes the system.""" + pass + + def get_system_type(self): + """Returns the system type.""" + pass + + def ticks_us(self): + """Returns the current time in microseconds.""" + pass + + def ticks_diff(self, ticks1, ticks2): + """Returns the difference between two time values in microseconds.""" + pass + + def sleep_us(self, us): + """Sleeps for the given time in microseconds.""" + pass + + def i2c_scan(self) -> list[int]: + """Scans the I2C bus for devices and returns a list of addresses.""" + pass + + def i2c_read(self, addr: int, n: int) -> bytes: + """Reads 'n' amount of data from the I2C device into a buffer.""" + pass + + def i2c_write(self, addr: int, buf: bytes): + """Writes data to the I2C device from a buffer.""" + pass + + def i2c_write_motor(self, buf: bytes): + """Writes data to the I2C motor device from a buffer.""" + self.i2c_write(self.I2C_MOTOR_DEVICE, buf) + + def i2c_init_motor(self): + """Initializes the I2C motor device.""" + self.i2c_write_motor(b"\x00\x01") + self.i2c_write_motor(b"\xE8\xAA") + + def i2c_read_sensors(self): + """Returns the current sensor data byte.""" + return self.i2c_read(self.I2C_SENSOR_DEVICE, 1)[0] + + def get_sensors(self): + """Checks if line sensors (..., left, center, right) detected a line (true if line is present) + or obstacle sensors (left, right, ...) detect an obstacle (true if [white] reflection is present).""" + data = self.i2c_read_sensors() + li = bool(data & self.MASK_IR_LEFT) + ri = bool(data & self.MASK_IR_RIGHT) + ll = bool(data & self.MASK_LINE_LEFT) + lc = bool(data & self.MASK_LINE_CENTER) + lr = bool(data & self.MASK_LINE_RIGHT) + return not li, not ri, ll, lc, lr + + def pin_read_digital(self, pin): + """Reads the digital value of a pin.""" + pass + + def pin_write_digital(self, pin, value: int): + """Writes a digital value to the pin.""" + pass + + def set_sonar_angle_pwm(self, angle_pwm: int): + """Sets front sonar horizontal angle PWM value.""" + pass + + def get_sonar_echo_delay_us(self, timeout_us) -> int: + """Measures the delay it takes for the sonar echo to return. + Returns the delay in microseconds or a negative value if the timeout was reached.""" + pass + + def get_sonar_distance(self, max_distance=1.0) -> float: + """Returns the distance in meters measured by the sonar, + with the maximum time spent on detecting the echo based on the max distance we want to detect. + This is by default set to 1m as the reasonable maximum distance for the sonar balanced to max time spent.""" + timeout_us = int((2 * max_distance / self.SOUND_SPEED) * 1_000_000) + measured_time_us = self.get_sonar_echo_delay_us(timeout_us=timeout_us) + if measured_time_us < 0: + return measured_time_us + measured_time_sec = measured_time_us / 1_000_000 + return measured_time_sec * self.SOUND_SPEED / 2 + + def get_encoder_pin_left(self): + """Returns the pin object for the left encoder.""" + pass + + def get_encoder_pin_right(self): + """Returns the pin object for the right encoder.""" + pass + + def get_adc_value(self) -> int: + """Returns the current ADC value of the robot (0 - 1023).""" + pass + + def get_supply_voltage(self): + """Returns the current supply voltage of the robot.""" + adc = self.get_adc_value() # ADC value 0 - 1023 + # Convert ADC value to volts: 3.3 V / 1024 (max. voltage at ADC pin / ADC resolution) + voltage = 0.00322265625 * adc + # Multiply measured voltage by voltage divider ratio to calculate actual voltage + # (10 kOhm + 5,6 kOhm) / 5,6 kOhm [(R1 + R2) / R2, Voltage divider ratio] + return voltage * 2.7857142 + + def is_button_a_pressed(self): + """Returns whether button A is pressed.""" + pass + + def is_button_b_pressed(self): + """Returns whether button B is pressed.""" + pass + + def display_text(self, label): + """Sets a label on the robot display (prints in log, displays the first letter on the screen).""" + pass + + def display_sensors(self, il, ir, ll, lc, lr, y=4, lb=9, ib=5): + """Displays the sensors in top line of the display as pixels for each sensor. + Line sensors (left, center, right) are far left, center, far right, lb is line brightness 0-9, default 9. + IR sensors (left, right) are interlaced among them, ib is IR brightness 0-9, default 5.""" + pass + + def display_drive_mode(self, mode: str): + """Displays the drive mode depicting the current situation we are in now. + The form of the displaying of the mode is platform-dependent. + Variable mode refers to the pictogram displayed, see each implementation (they should be in sync).""" + pass + + def display_choice(self, choice: str): + """Displays the choice depicting the current situation we are in now. + The form of the displaying of the choice is platform-dependent. + Variable mode refers to the pictogram displayed, see each implementation (they should be in sync).""" + pass + + def get_drive_mode_symbol_keys(self): + """Returns the keys of the drive mode symbols.""" + pass + + def display_speed(self, speed_now, speed_max, left: bool): + """Displays one of the two current wheel speeds on the display. Position and form is platform-dependent.""" + pass + + def display_bitmap(self, x_pos: int, y_pos: int, width: int, lines: list[int]): + """Displays bitmap on display (0x0 = top left, max is platform-dependent: 5x5 on Micro:Bit, 27x7 Pico:Ed). + Bitwise, each line int is right-aligned.""" + pass + + def display_clear(self): + """Clears the display.""" + pass + + def display_on(self): + """Enables the display.""" + pass + + def display_off(self): + """Disables the display.""" + pass diff --git a/lesson_14/system_mbit.py b/lesson_14/system_mbit.py new file mode 100644 index 0000000..dfb1f62 --- /dev/null +++ b/lesson_14/system_mbit.py @@ -0,0 +1,138 @@ +from microbit import button_a, button_b, display, i2c, pin1, pin2, pin8, pin12, pin14, pin15 +from machine import time_pulse_us +from utime import ticks_us, ticks_diff, sleep + +from system import System as SystemBase + + +class System(SystemBase): + DRIVE_MODE_PICTOGRAMS = { + ' ': [0b000, 0b000, 0b000], + 'TL': [0b000, 0b110, 0b010], # sharp turn to left + 'TR': [0b000, 0b011, 0b010], # sharp turn to right + 'IT': [0b000, 0b111, 0b010], # intersection left-right (T) + 'IL': [0b010, 0b110, 0b010], # intersection left-straight (T to left) + 'IR': [0b010, 0b011, 0b010], # intersection right-straight (T to right) + 'IY': [0b101, 0b010, 0b010], # split in the road (Y) + 'I+': [0b010, 0b111, 0b010], # intersection all directions (+) + '-': [0b000, 0b111, 0b000], + '_': [0b000, 0b000, 0b111], + '.': [0b000, 0b000, 0b010], + '|': [0b010, 0b010, 0b010], + '/': [0b001, 0b010, 0b100], + '\\': [0b100, 0b010, 0b001], + 's': [0b011, 0b010, 0b110], + 'x': [0b101, 0b010, 0b101], + } + + def __init__(self, i2c_freq=SystemBase.I2C_FREQ): + super().__init__() + i2c.init(freq=i2c_freq) + print("System %s initialized, voltage %sV" % (self.get_system_type(), self.get_supply_voltage())) + + def get_system_type(self): + return self.SYS_MBIT + + def ticks_us(self): + return ticks_us() + + def ticks_diff(self, ticks1, ticks2): + return ticks_diff(ticks1, ticks2) + + def sleep_us(self, us): + sleep(us / 1_000_000) + + def i2c_read(self, addr: int, n: int) -> bytes: + return i2c.read(addr, n) + + def i2c_write(self, addr: int, buf: bytes): + i2c.write(addr, buf) + + def i2c_scan(self) -> list[int]: + return i2c.scan() + + def pin_read_digital(self, pin): + return pin.read_digital() + + def pin_write_digital(self, pin, value: int): + pin.write_digital(value) + + def set_sonar_angle_pwm(self, angle_pwm: int): + pin1.write_analog(angle_pwm) + + def get_sonar_echo_delay_us(self, timeout_us) -> int: + self.pin_write_digital(pin8, 1) + self.pin_write_digital(pin8, 0) + return time_pulse_us(pin12, 1, timeout_us) + + def get_encoder_pin_left(self): + return pin14 + + def get_encoder_pin_right(self): + return pin15 + + def get_adc_value(self) -> int: + return pin2.read_analog() + + def is_button_a_pressed(self): + return button_a.is_pressed() + + def is_button_b_pressed(self): + return button_b.is_pressed() + + def display_text(self, label): + """Sets a label on the robot display (prints in log, displays the first letter on the screen).""" + print("Label: %s" % label) + display.show(label[0]) + + def display_sensors(self, il, ir, ll, lc, lr, y=4, lb=9, ib=5): + """Displays the sensors in top line of the display as pixels for each sensor. + Line sensors (left, center, right) are far left, center, far right, lb is line brightness 0-9, default 9. + IR sensors (left, right) are interlaced among them, ib is IR brightness 0-9, default 5.""" + display.set_pixel(4, y, lb if ll else 0) + display.set_pixel(2, y, lb if lc else 0) + display.set_pixel(0, y, lb if lr else 0) + display.set_pixel(3, y, ib if il else 0) + display.set_pixel(1, y, ib if ir else 0) + + def display_drive_mode(self, mode: str): + """Displays the drive mode in the center (3x3 pixels), overriding choice, supporting + all pictograms defined in DRIVE_MODE_PICTOGRAMS (other characters clear the area).""" + lines = self.DRIVE_MODE_PICTOGRAMS[mode if mode in self.DRIVE_MODE_PICTOGRAMS else ' '] + self.display_bitmap(1, 2, 3, lines) + + def display_choice(self, choice: str): + """Displays the choice in the center (3x3 pixels), overriding drive mode, supporting + all pictograms defined in DRIVE_MODE_PICTOGRAMS (other characters clear the area).""" + self.display_drive_mode(choice) + + def get_drive_mode_symbol_keys(self): + return list(self.DRIVE_MODE_PICTOGRAMS.keys()) + + def display_speed(self, speed_now, speed_max, left: bool): + """Displays the current speed on the display (represented as a 3-pixel bar) on the right side of display.""" + intensity = 3 + height_max = 4 + height = int(height_max * speed_now / speed_max) + x_pos = 4 if left else 0 + for y in range(height_max): + display.set_pixel(x_pos, y, intensity if y < height else 0) + + def display_bitmap(self, x_pos: int, y_pos: int, width: int, lines: list[int]): + """Displays the bitmap on the display (0x0 = top left, max 5x5). Bitwise, each line int is right-aligned.""" + for y in range(len(lines)): + for x in range(width): + display.set_pixel(x_pos + x, 4 - (y_pos + y), 9 if lines[y] & (1 << x) else 0) + + def display_clear(self): + """Clears the display.""" + display.clear() + + def display_on(self): + """Enables the display.""" + display.on() + display.clear() + + def display_off(self): + """Disables the display.""" + display.off() diff --git a/lesson_14/system_ped.py b/lesson_14/system_ped.py new file mode 100644 index 0000000..fdb8e0d --- /dev/null +++ b/lesson_14/system_ped.py @@ -0,0 +1,188 @@ +from time import monotonic_ns, sleep + +from analogio import AnalogIn +from board import P1, P2, P8, P12, P14, P15 +from digitalio import DigitalInOut, Direction +from picoed import i2c, display, button_a, button_b +from pulseio import PulseIn +from pwmio import PWMOut + +from system import System as SystemBase + + +class System(SystemBase): + """Pico:Ed implementation of the System class. + The Pico:Ed board (I2C, pins, buttons, display) use library at https://github.com/elecfreaks/circuitpython_picoed. + Display uses IS31FL3731 library at https://github.com/adafruit/Adafruit_CircuitPython_IS31FL3731. + Light uses NeoPixel library at https://github.com/adafruit/Adafruit_CircuitPython_NeoPixel. + Sonar code is inspired by HC-SR04 library at https://github.com/adafruit/Adafruit_CircuitPython_HCSR04.""" + + DRIVE_MODE_PICTOGRAMS = { + ' ': [0b00000, 0b00000, 0b00000, 0b00000, 0b00000], + 'TL': [0b00000, 0b00000, 0b11100, 0b00100, 0b00100], # sharp turn to left + 'TR': [0b00000, 0b00000, 0b00111, 0b00100, 0b00100], # sharp turn to right + 'IT': [0b00000, 0b00000, 0b11111, 0b00100, 0b00100], # intersection left-right (T) + 'IL': [0b00100, 0b00100, 0b11100, 0b00100, 0b00100], # intersection left-straight (T to left) + 'IR': [0b00100, 0b00100, 0b00111, 0b00100, 0b00100], # intersection right-straight (T to right) + 'IY': [0b10001, 0b01010, 0b00100, 0b00100, 0b00100], # split in the road (Y) + 'I+': [0b00100, 0b00100, 0b11111, 0b00100, 0b00100], # intersection all directions (+) + '<': [0b00010, 0b00100, 0b01111, 0b00100, 0b00010], # interactive choice left + '^': [0b00000, 0b00100, 0b01110, 0b10101, 0b00100], # interactive choice forward + '>': [0b00100, 0b00010, 0b01111, 0b00010, 0b00100], # interactive choice right + '-': [0b00000, 0b00000, 0b11111, 0b00000, 0b00000], + '_': [0b00000, 0b00000, 0b00000, 0b00000, 0b11111], + '.': [0b00000, 0b00000, 0b00000, 0b00000, 0b00100], + '|': [0b00100, 0b00100, 0b00100, 0b00100, 0b00100], + '/': [0b00001, 0b00010, 0b00100, 0b01000, 0b10000], + '\\': [0b10000, 0b01000, 0b00100, 0b00010, 0b00001], + 's': [0b00111, 0b01000, 0b01110, 0b00010, 0b11100], + 'x': [0b10001, 0b01010, 0b00100, 0b01010, 0b10001], + } + + def __init__(self): + super().__init__() + # Sonar servo + self.pin1 = PWMOut(P1, frequency=100) + # ADC + self.pin2 = AnalogIn(P2) + # Sonar trigger + self.pin8 = DigitalInOut(P8) + self.pin8.direction = Direction.OUTPUT + # Sonar echo + self.pin12 = PulseIn(P12) + # Encoder left + self.pin14 = DigitalInOut(P14) + self.pin14.direction = Direction.INPUT + # Encoder right + self.pin15 = DigitalInOut(P15) + self.pin15.direction = Direction.INPUT + print("System %s initialized, voltage %sV" % (self.get_system_type(), self.get_supply_voltage())) + + def get_system_type(self): + return self.SYS_PICO + + def ticks_us(self): + return monotonic_ns() // 1000 + + def ticks_diff(self, ticks1, ticks2): + return abs(ticks1 - ticks2) + + def sleep_us(self, us): + sleep(us / 1_000_000) + + def i2c_scan(self) -> list[int]: + while not i2c.try_lock(): + pass + ret = i2c.scan() + i2c.unlock() + return ret + + def i2c_read(self, addr: int, n: int) -> bytes: + while not i2c.try_lock(): + pass + buffer = bytearray(n) + i2c.readfrom_into(addr, buffer, start=0, end=n) + i2c.unlock() + return buffer + + def i2c_write(self, addr: int, buf: bytes): + while not i2c.try_lock(): + pass + i2c.writeto(addr, buf) + i2c.unlock() + + def pin_read_digital(self, pin): + return 1 if pin.value else 0 + + def pin_write_digital(self, pin, value: int): + pin.value = value != 1 + + def set_sonar_angle_pwm(self, angle_pwm: int): + scaled_value = int((angle_pwm / 128) * 16384) + self.pin1.duty_cycle = scaled_value + + def get_sonar_echo_delay_us(self, timeout_us) -> int: + # Trigger the sonar w/ 10ms pulse + self.pin8.value = True + self.sleep_us(10) + self.pin8.value = False + + self.pin12.clear() + self.pin12.resume() + start_time = monotonic_ns() + while not self.pin12: + if (monotonic_ns() - start_time) > timeout_us * 1000: + self.pin12.pause() + return -1 + self.pin12.pause() + return self.pin12[0] if len(self.pin12) > 0 else -1 + + def get_encoder_pin_left(self): + return self.pin14 + + def get_encoder_pin_right(self): + return self.pin15 + + def get_adc_value(self) -> int: + # scale ADC value to 10-bit value as expected by the caller + return self.pin2.value * 1024 // 16384 + + def is_button_a_pressed(self): + return button_a.is_pressed() + + def is_button_b_pressed(self): + return button_b.is_pressed() + + def display_text(self, label): + print("Label: %s" % label) + display.scroll(label[0:3]) + + def display_sensors(self, il, ir, ll, lc, lr, y=6, lb=32, ib=3): + """Displays the sensors in top line of the display as pixels for each sensor. + Line sensors (left, center, right) are far left, center, far right, lb is line brightness 0-9, default 9. + IR sensors (left, right) are interlaced among them, ib is IR brightness 0-9, default 5.""" + x_pos = 4 + stretch = 2 + display.pixel(x_pos + 4 * stretch, y, lb if ll else 0) + display.pixel(x_pos + 2 * stretch, y, lb if lc else 0) + display.pixel(x_pos + 0 * stretch, y, lb if lr else 0) + display.pixel(x_pos + 3 * stretch, y, ib if il else 0) + display.pixel(x_pos + 1 * stretch, y, ib if ir else 0) + + def display_drive_mode(self, mode: str): + """Displays the drive mode in the display center (5x5 pixels), supporting + all pictograms defined in DRIVE_MODE_PICTOGRAMS (other characters clear the area).""" + lines = self.DRIVE_MODE_PICTOGRAMS[mode if mode in self.DRIVE_MODE_PICTOGRAMS else ' '] + self.display_bitmap(6, 0, 5, lines) + + def display_choice(self, choice: str): + """Displays the choice next to the drive mode (5x5 pixels), supporting + all pictograms defined in DRIVE_MODE_PICTOGRAMS (other characters clear the area).""" + lines = self.DRIVE_MODE_PICTOGRAMS[choice if choice in self.DRIVE_MODE_PICTOGRAMS else ' '] + self.display_bitmap(1, 0, 5, lines) + + def get_drive_mode_symbol_keys(self): + return list(self.DRIVE_MODE_PICTOGRAMS.keys()) + + def display_speed(self, speed_now, speed_max, left: bool): + """Displays the current speed as a horizontal bar on the left or right of the display.""" + intensity = 2 + height_max = 7 + height = int(height_max * speed_now / speed_max) + x_pos = 16 if left else 0 + for y in range(height_max): + display.pixel(x_pos, y, intensity if y < height else 0) + + def display_bitmap(self, x_pos: int, y_pos: int, width: int, lines: list[int]): + for y in range(len(lines)): + for x in range(width): + display.pixel(x_pos + width - x - 1, y_pos + width - y - 1, 9 if lines[y] & (1 << (width - x - 1)) else 0) + + def display_clear(self): + display.fill(0) + + def display_on(self): + pass + + def display_off(self): + self.display_clear() diff --git a/lesson_14/wheel.py b/lesson_14/wheel.py new file mode 100644 index 0000000..b2f7a09 --- /dev/null +++ b/lesson_14/wheel.py @@ -0,0 +1,143 @@ +from wheel_encoder import WheelEncoder +from system import System + + +class Wheel: + """Handles single wheel capable of moving forward or backward + with given (variable) speed and stop immediately or conditionally + based on distance and time.""" + + def __init__(self, system: System, name, motor_fwd_cmd, motor_rwd_cmd, sensor_pin, + pwm_min=60, pwm_max=255, pwm_multiplier=0, pwm_shift=0): + """Initializes the wheel.""" + self.system = system + self.name = name + self.motor_fwd_cmd = motor_fwd_cmd + self.motor_rwd_cmd = motor_rwd_cmd + self.distance_remain_ticks = -1 + self.distance_req_time_us = -1 + self.distance_start_time_us = 0 + self.speed_pwm = 0 + self.enc = WheelEncoder(system=system, sensor_pin=sensor_pin) + self.pwm_min = pwm_min + self.pwm_max = pwm_max + self.pwm_multiplier = pwm_multiplier + self.pwm_shift = pwm_shift + + def move_pwm(self, speed_pwm): + """Moves the wheel using given PWM speed (indefinite ticks, time). + The wheel will continue to move until stop() is called. + The PWM speed is a value between -255 and 255, where 0 means stop.""" + self.set_speed_pwm(speed_pwm) + self.distance_remain_ticks = -1 + self.distance_req_time_us = -1 + + def move_rad(self, speed_rad): + """Moves the wheel using given rad/sec speed (indefinite ticks, time). + The wheel will continue to move until stop() is called.""" + speed_pwm = self.radsec2pwm(speed_rad) + self.move_pwm(speed_pwm + self.pwm_min if speed_pwm > 0 else speed_pwm - self.pwm_min) + + def move_pwm_for_ticks(self, speed_pwm, distance_ticks): + """Moves the wheel forward using given PWM speed for the given distance + in sensor ticks. If the motor is already moving, the asked distance is added + to the remaining distance and the motor continues until no distance remains.""" + self.set_speed_pwm(speed_pwm) + print("Moving %s wheel with speed %d pwm for distance %f ticks" % (self.name, speed_pwm, distance_ticks)) + self.distance_remain_ticks += distance_ticks + + def move_pwm_for_time(self, speed_pwm, distance_time_us): + """Moves the wheel forward using given PWM speed for the given time. + If the motor is already moving, the distance in time is added to the current + distance and the motor continues to move until the total time is reached.""" + self.set_speed_pwm(speed_pwm) + self.distance_req_time_us += distance_time_us + if self.distance_start_time_us == 0: + self.distance_start_time_us = self.system.ticks_us() + + def move_pwm_for_distance(self, speed_pwm, distance): + """Moves the wheel forward using given PWM speed for given distance in meters.""" + distance_ticks = int(distance * self.enc.TICKS_PER_M) + self.move_pwm_for_ticks(speed_pwm, distance_ticks) + + def move_radsec_for_distance(self, radsec, distance): + """Moves the wheel using given rad/s speed for given distance in meters.""" + print("Moving %s wheel with speed %f rad/s for distance %f m" % (self.name, radsec, distance)) + speed_pwm = self.radsec2pwm(radsec) + distance_ticks = int(distance * self.enc.TICKS_PER_M) * 2 + self.move_pwm_for_ticks(speed_pwm, distance_ticks) + + def set_speed_pwm(self, speed_pwm): + """Sets the wheel PWM speed (and direction). Does not affect the remaining + distance or time previously set to perform. If the wheel was going + in the other direction, resets the H-bridge other direction first.""" + if speed_pwm == 0: + if self.speed_pwm != 0: + # print("Stopping %s wheel" % self.name) + self.system.i2c_write_motor(bytes([self.motor_fwd_cmd, 0])) + self.system.i2c_write_motor(bytes([self.motor_rwd_cmd, 0])) + self.speed_pwm = 0 + return + speed_pwm = int(max(-255, min(255, speed_pwm))) + if self.speed_pwm == speed_pwm: + return + self.enc.reset() + if (self.speed_pwm < 0 < speed_pwm) or (self.speed_pwm > 0 > speed_pwm): + # if we are changing the direction, we need to reset the motor first + motor_reset_cmd = (self.motor_rwd_cmd + if speed_pwm >= 0 else self.motor_fwd_cmd) + # print("Changing %s wheel direction" % self.name) + self.system.i2c_write_motor(bytes([motor_reset_cmd, 0])) + motor_set_cmd = self.motor_fwd_cmd if speed_pwm > 0 else self.motor_rwd_cmd + print("Setting %s wheel speed_pwm %d" % (self.name, speed_pwm)) + self.system.i2c_write_motor(bytes([motor_set_cmd, abs(speed_pwm)])) + self.speed_pwm = speed_pwm + + def radsec2pwm(self, radsec): + """Returns the PWM speed for the given rad/s speed. + We use the multiplier and shift values to calculate the PWM speed using formula: + rad_per_sec = pwm * multiplier + shift, for us: pwm = (rad_per_sec - shift) / multiplier.""" + if self.pwm_multiplier == 0: + print("error: wheel %s pwm_multiplier is 0" % self.name) + return 0 + direction = 1 if radsec >= 0 else -1 + return direction * int((abs(radsec) - self.pwm_shift) / self.pwm_multiplier) + + def msec2pwm(self, msec): + """Returns the PWM speed for the given m/s speed.""" + rad_per_sec = self.enc.m2rad(msec) + return self.radsec2pwm(rad_per_sec) + + def stop(self): + """Stops the wheel immediately.""" + self.set_speed_pwm(0) + self.distance_remain_ticks = -1 + self.distance_req_time_us = -1 + self.enc.reset() + + def stop_on_no_work(self): + """Stops the wheel if the remaining distance in ticks or time is reached.""" + stop_due_to_ticks = True + if self.distance_remain_ticks != 0: + stop_due_to_ticks = False + stop_due_to_time = True + if self.distance_req_time_us >= 0: + time_delta = self.system.ticks_diff(self.system.ticks_us(), self.distance_start_time_us) + if time_delta < self.distance_req_time_us: + stop_due_to_time = False + # we stop only if both conditions are met + # otherwise we keep the other condition finish as well + if stop_due_to_ticks and stop_due_to_time: + self.stop() + + def on_tick(self): + """Updates the wheel state based on a new tick, + checks the remaining distance in ticks.""" + if self.distance_remain_ticks > 0: + self.distance_remain_ticks -= 1 + + def update(self): + """Updates the encoder and general wheel state.""" + if self.enc.update() is True: + self.on_tick() + self.stop_on_no_work() diff --git a/lesson_14/wheel_calibrator.py b/lesson_14/wheel_calibrator.py new file mode 100644 index 0000000..b45cdc5 --- /dev/null +++ b/lesson_14/wheel_calibrator.py @@ -0,0 +1,120 @@ +from wheel import Wheel +from system import System + + +class WheelCalibrator: + def __init__(self, system: System, wheel: Wheel): + """Initializes the wheel calibrator.""" + self.system = system + self.wheel = wheel + self.p2m2radsec = [0.0 for _ in range(wheel.pwm_min, wheel.pwm_max + 1)] + self.pwm_min = 0 + self.pwm_max = 0 + self.conversion_table_available = False + + def gather_pwm_to_real_speed_table_full(self): + """Gathers the real forward and reverse speeds for a desired PWM range + based on the encoder readings. Each value is scanned after half a second. + The calibration fills the conversion table between PWM and rad/s.""" + self.pwm_min = 0.0 + for speed_pwm in range(self.wheel.pwm_min, self.wheel.pwm_max + 1): + self.calibrate_pwm(speed_pwm=speed_pwm, gather_min=True) + self.calculate_pwm_value_multiplier_and_shift() + self.conversion_table_available = True + + def gather_pwm_to_real_speed_table_approx(self): + """Gathers the approximate forward and reverse speeds for a desired PWM range + based on the encoder readings. The range is sampled from bottom to top + in 10 PWM step intervals to find the lowest moving speed. As a follow-up step, + the top speed is scanned as well to find the highest moving speed. + The calibration fills the conversion table between PWM and rad/s using a linear + approximation. Multiplier (a) and shift (b) are calculated for the equation + y = a * x + b.""" + # minimum speed + speed_pwm = self.wheel.pwm_min + while speed_pwm <= self.wheel.pwm_max: + radsec, msec = self.gather_rad_msec_for_pwm(speed_pwm) + self.p2m2radsec[speed_pwm - self.wheel.pwm_min] = radsec + if radsec > 0.0: + self.pwm_min = speed_pwm + break + speed_pwm += 10 + if self.pwm_min == 0: + print("No movement detected when going through pwm_min .. pwm_max range, %s wheel calibration failed" + % self.wheel.name) + return -1 + if self.pwm_min > self.wheel.pwm_min: + print("Altering %s wheel pwm_min from %s to %s" % (self.wheel.name, self.wheel.pwm_min, self.pwm_min)) + self.wheel.pwm_min = self.pwm_min + + # maximum speed + speed_pwm = self.wheel.pwm_max + radsec, msec = self.gather_rad_msec_for_pwm(speed_pwm) + self.p2m2radsec[speed_pwm - self.wheel.pwm_min] = radsec + if radsec > 0.0: + self.pwm_max = speed_pwm + else: + print("No movement detected on pwm_max speed, %s wheel calibration failed" % self.wheel.name) + return -1 + + # linear approximation in radians per second + rad_speed_delta = (self.p2m2radsec[self.pwm_max - self.wheel.pwm_min] + - self.p2m2radsec[0]) + a = rad_speed_delta / (self.pwm_max - self.pwm_min) + b = self.p2m2radsec[0] - a * self.pwm_min + for speed_pwm in range(self.wheel.pwm_min, self.wheel.pwm_max + 1): + speed_write = a * speed_pwm + b + self.p2m2radsec[speed_pwm - self.wheel.pwm_min] = speed_write + self.calculate_pwm_value_multiplier_and_shift() + self.conversion_table_available = True + + def calibrate_pwm(self, speed_pwm, gather_min): + radsec, msec = self.gather_rad_msec_for_pwm(speed_pwm) + self.p2m2radsec[speed_pwm - self.wheel.pwm_min] = radsec + if gather_min and radsec > 0.0 and self.pwm_min == 0: + self.pwm_min = speed_pwm + + def gather_rad_msec_for_pwm(self, speed_pwm): + """Moves the wheel forward using given PWM speed for half a second, + returns the speed in rad/s and m/s.""" + start_time = self.system.ticks_us() + self.wheel.move_pwm(speed_pwm) + while self.system.ticks_diff(self.system.ticks_us(), start_time) <= 500_000: + self.wheel.update() + radsec = self.wheel.enc.speed_radsec + msec = self.wheel.enc.speed_msec() + self.wheel.stop() + return radsec, msec + + def calculate_pwm_value_multiplier_and_shift(self): + """ + Calculates pwm multiplier and shift using least squares regression (from valid pwm range). + multiplier = (count()*sum(pwd*rad) - sum(pwm)*sum(rad)) / (count()*sum(pwm*pwm) - sum(pwm)*sum(pwm)) + shift = (sum(rad) - multiplier*sum(pwm)) / count() + """ + count = self.wheel.pwm_max - self.wheel.pwm_min + 1 + sum_pwm = 0 + sum_pwm_mult_pwm = 0 + sum_rad = 0 + sum_pwm_mult_rad = 0 + for pwm in range(self.wheel.pwm_min, self.wheel.pwm_max + 1): + sum_pwm += pwm + sum_pwm_mult_pwm += pwm * pwm + sum_rad += self.p2m2radsec[pwm - self.wheel.pwm_min] + sum_pwm_mult_rad += pwm * self.p2m2radsec[pwm - self.wheel.pwm_min] + self.wheel.pwm_multiplier = (count * sum_pwm_mult_rad - sum_pwm * sum_rad) / \ + (count * sum_pwm_mult_pwm - sum_pwm * sum_pwm) + self.wheel.pwm_shift = (sum_rad - self.wheel.pwm_multiplier * sum_pwm) / count + print("Altering %s wheel multiplier: %s, shift: %s" % + (self.wheel.name, self.wheel.pwm_multiplier, self.wheel.pwm_shift)) + + def calibration_table_to_csv(self): + print("CSV calibration data for %s wheel (pwm_min = %s, multiplier = %s, shift = %s):" + % (self.wheel.name, self.wheel.pwm_min, self.wheel.pwm_multiplier, self.wheel.pwm_shift)) + print("pwm,radsec,msec") + for pwm in range(self.wheel.pwm_min, self.wheel.pwm_max + 1): + # We just print the calibration data for the border values (min 5, max 5) + if pwm < self.wheel.pwm_min + 5 or pwm > self.wheel.pwm_max - 5: + radsec = self.p2m2radsec[pwm - self.wheel.pwm_min] + msec = self.wheel.enc.rad2m(radsec) + print("%d,%f,%f" % (pwm, radsec, msec)) diff --git a/lesson_14/wheel_driver.py b/lesson_14/wheel_driver.py new file mode 100644 index 0000000..70534db --- /dev/null +++ b/lesson_14/wheel_driver.py @@ -0,0 +1,66 @@ +from system import System +from wheel import Wheel + + +class WheelDriver: + """Handles the movement of the whole robot + (forward, backward, turning). Activities are either + indefinite or conditional based on ticks, time + or real speed measured by the encoder on wheel level.""" + + def __init__(self, system: System, + left_pwm_min=50, left_pwm_max=255, left_pwm_multiplier=0, left_pwm_shift=0, + right_pwm_min=50, right_pwm_max=255, right_pwm_multiplier=0, right_pwm_shift=0): + """Initializes the wheel driver.""" + self.system = system + self.system.i2c_init_motor() + self.left = Wheel(system=system, name="left", + motor_fwd_cmd=5, motor_rwd_cmd=4, sensor_pin=system.get_encoder_pin_left(), + pwm_min=left_pwm_min, pwm_max=left_pwm_max, + pwm_multiplier=left_pwm_multiplier, pwm_shift=left_pwm_shift) + self.right = Wheel(system=system, name="right", + motor_fwd_cmd=3, motor_rwd_cmd=2, sensor_pin=system.get_encoder_pin_right(), + pwm_min=right_pwm_min, pwm_max=right_pwm_max, + pwm_multiplier=right_pwm_multiplier, pwm_shift=right_pwm_shift) + self.speed_rad = -99 + self.rotation_rad = -99 + self.stop() + + # Please note: normally, we would have aggregate move...() methods here for both wheels, but + # these got removed in favor of smaller code memory footprint + we control each wheel separately anyway. + + def move(self, speed_rad, rotation_rad): + """Moves the robot with specific PWM on each wheel while applying rotational speed in radians. + Positive rotation is clockwise, negative rotation is counterclockwise. + We first need to calculate the speed of each wheel based on the desired speed and rotation.""" + if speed_rad == 0 and rotation_rad == 0: + self.stop() + return + reverse = speed_rad < 0 + if reverse: + rotation_rad = -rotation_rad + speed_rad = abs(speed_rad) + left_speed = speed_rad - self.left.enc.WHEEL_CIRCUMFERENCE_M * rotation_rad + right_speed = speed_rad + self.right.enc.WHEEL_CIRCUMFERENCE_M * rotation_rad + if reverse: + left_speed = -left_speed + right_speed = -right_speed + print("Moving with speed_rad %d, rotation_rad %f, left_pwm %s, right_pwm %s" % (speed_rad, rotation_rad, left_speed, right_speed)) + self.left.move_rad(left_speed) + self.right.move_rad(right_speed) + self.speed_rad = speed_rad + self.rotation_rad = rotation_rad + + def stop(self): + """Stops the robot.""" + if self.speed_rad == 0 and self.rotation_rad == 0: + return + self.left.stop() + self.right.stop() + self.speed_rad = 0 + self.rotation_rad = 0 + + def update(self): + """Updates the wheel driver, propagating the changes to the hardware.""" + self.left.update() + self.right.update() diff --git a/lesson_14/wheel_encoder.py b/lesson_14/wheel_encoder.py new file mode 100644 index 0000000..1450abc --- /dev/null +++ b/lesson_14/wheel_encoder.py @@ -0,0 +1,104 @@ +from system import System + +class WheelEncoder: + """Encoder is able to monitor the wheel movement precisely + and provide the actual wheel rotation speed over the ticks + measured for X consecutive constant time intervals.""" + USE_BOTH_EDGES = False + TICKS_PER_WHEEL = 40 if USE_BOTH_EDGES else 20 + RAD_PER_WHEEL = 2 * 3.14159265359 + RAD_PER_TICK = RAD_PER_WHEEL / TICKS_PER_WHEEL + WHEEL_CIRCUMFERENCE_M = 0.21 + WHEEL_RADIUS_M = WHEEL_CIRCUMFERENCE_M / RAD_PER_WHEEL + WHEEL_CENTER_DISTANCE = 0.075 + M_PER_WHEEL = RAD_PER_WHEEL * WHEEL_RADIUS_M + TICKS_PER_M = TICKS_PER_WHEEL / M_PER_WHEEL + MIN_TICK_TIME_US = 5_000 # minimum possible tick time (switch instability detection under this value) + MAX_TICK_TIME_US = 200_000 # maximum possible tick time (after which we consider speed to be zero) + AVG_TICK_COUNT = 3 + + def __init__(self, system: System, sensor_pin): + """Initializes the wheel encoder.""" + self.system = system + self.sensor_pin = sensor_pin + self.sensor_value = -1 + self.tick_last_time = -1 + self.tick_last_time_avg = -1 + self.update_count = 0 + self.tick_count = 0 + self.speed_radsec = 0 + self.speed_radsec_avg = 0 + self.calc_value = -1 + self.calc_tick = 0 + self.calc_update_count = -1 + + def reset(self): + """Resets the encoder state.""" + self.__init__(self.system, self.sensor_pin) + + def update(self): + """Updates the encoder state based on the ongoing command.""" + """Retrieves the sensor value, checks for change and updates the wheel state + based on the ongoing command.""" + self.update_count += 1 + time_now = self.system.ticks_us() + last_time_diff = self.system.ticks_diff(time_now, self.tick_last_time) + if self.tick_last_time != -1 and last_time_diff < self.MIN_TICK_TIME_US: + return False + sensor_value_now = self.system.pin_read_digital(self.sensor_pin) + if sensor_value_now == self.sensor_value: + if last_time_diff >= self.MAX_TICK_TIME_US: + self.speed_radsec = 0 + return False + self.sensor_value = sensor_value_now + if self.tick_last_time == -1: + self.tick_last_time = time_now + self.tick_last_time_avg = time_now + return False + if sensor_value_now == 1: + if self.USE_BOTH_EDGES: + # compensate for much shorter time when the sensor is down + last_time_diff *= 1.8 + else: + # we count just 1->0 change in this mode (to achieve uniformity between 0 and 1) + return False + self.tick_last_time = time_now + self.speed_radsec = self.RAD_PER_TICK / (last_time_diff / 1_000_000) + + # calculate average speed (simplistic, just accumulate last several ticks once in a while) + self.tick_count += 1 + if self.tick_count < self.AVG_TICK_COUNT: + if self.speed_radsec_avg == 0: + self.speed_radsec_avg = self.speed_radsec + else: + last_time_avg_diff = self.system.ticks_diff(time_now, self.tick_last_time_avg) + self.speed_radsec_avg = self.RAD_PER_TICK * self.AVG_TICK_COUNT / (last_time_avg_diff / 1_000_000) + self.tick_last_time_avg = time_now + self.tick_count = 0 + + self.calc_value = sensor_value_now + self.calc_tick = self.tick_count + self.calc_update_count = self.update_count + if self.speed_radsec > 0 and self.update_count < (4 if self.USE_BOTH_EDGES else 2): + print("Warning: wheel encoder updating slow, %s counts per change" % self.update_count) + self.update_count = 0 + return True + + def speed_msec(self): + """Returns the current wheel speed in m/s.""" + return self.M_PER_WHEEL * self.speed_radsec / self.RAD_PER_WHEEL + + def speed_msec_avg(self): + """Returns the current wheel speed in m/s.""" + return self.M_PER_WHEEL * self.speed_radsec_avg / self.RAD_PER_WHEEL + + def m2rad(self, m): + """Converts meters to radians.""" + return self.RAD_PER_WHEEL * m / self.M_PER_WHEEL + + def rad2m(self, rad): + """Converts radians to meters.""" + return self.M_PER_WHEEL * rad / self.RAD_PER_WHEEL + + def __str__(self): + return "speed: %f rad/s, %f m/s" % (self.speed_radsec, self.speed_msec())