diff --git a/Cargo.lock b/Cargo.lock index 0c1bd888..3d4a27f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -491,7 +491,7 @@ dependencies = [ [[package]] name = "config" -version = "0.3.52" +version = "0.3.53" dependencies = [ "base64", "chrono", @@ -598,7 +598,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto" -version = "0.3.52" +version = "0.3.53" dependencies = [ "aes-gcm", "base64", @@ -3316,7 +3316,7 @@ dependencies = [ [[package]] name = "testutils" -version = "0.3.52" +version = "0.3.53" dependencies = [ "pem", "rsa", @@ -3586,7 +3586,7 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" [[package]] name = "tower" -version = "0.3.52" +version = "0.3.53" dependencies = [ "config", "pyo3", @@ -3614,7 +3614,7 @@ dependencies = [ [[package]] name = "tower-api" -version = "0.3.52" +version = "0.3.53" dependencies = [ "reqwest", "serde", @@ -3626,7 +3626,7 @@ dependencies = [ [[package]] name = "tower-cmd" -version = "0.3.52" +version = "0.3.53" dependencies = [ "axum", "bytes", @@ -3696,7 +3696,7 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-package" -version = "0.3.52" +version = "0.3.53" dependencies = [ "async-compression", "config", @@ -3714,7 +3714,7 @@ dependencies = [ [[package]] name = "tower-runtime" -version = "0.3.52" +version = "0.3.53" dependencies = [ "async-trait", "chrono", @@ -3737,7 +3737,7 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-telemetry" -version = "0.3.52" +version = "0.3.53" dependencies = [ "tracing", "tracing-appender", @@ -3746,7 +3746,7 @@ dependencies = [ [[package]] name = "tower-uv" -version = "0.3.52" +version = "0.3.53" dependencies = [ "async-compression", "async_zip", @@ -3764,7 +3764,7 @@ dependencies = [ [[package]] name = "tower-version" -version = "0.3.52" +version = "0.3.53" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 9ed835ae..1613ed30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "0.3.52" +version = "0.3.53" description = "Tower is the best way to host Python data apps in production" rust-version = "1.81" authors = ["Brad Heller "] diff --git a/crates/tower-cmd/src/apps.rs b/crates/tower-cmd/src/apps.rs index 1fd816f0..ddb530af 100644 --- a/crates/tower-cmd/src/apps.rs +++ b/crates/tower-cmd/src/apps.rs @@ -277,6 +277,7 @@ const FOLLOW_BACKOFF_MAX: Duration = Duration::from_secs(5); const LOG_DRAIN_DURATION: Duration = Duration::from_secs(5); const RUN_START_POLL_INTERVAL: Duration = Duration::from_millis(500); const RUN_START_MESSAGE_DELAY: Duration = Duration::from_secs(3); +const RUN_START_TIMEOUT: Duration = Duration::from_secs(30); async fn follow_logs(config: Config, name: String, seq: i64) { let enable_ctrl_c = !output::get_output_mode().is_mcp(); @@ -304,6 +305,12 @@ async fn follow_logs(config: Config, name: String, seq: i64) { let mut notified = false; loop { sleep(RUN_START_POLL_INTERVAL).await; + + if wait_started.elapsed() > RUN_START_TIMEOUT { + output::error("Timed out waiting for run to start. The runner may be unavailable."); + return; + } + // Avoid blank output on slow starts while keeping fast starts quiet. if should_notify_run_wait(notified, wait_started.elapsed()) { output::write("Waiting for run to start...\n"); diff --git a/crates/tower-cmd/src/error.rs b/crates/tower-cmd/src/error.rs index 777deec5..6fbfc8e1 100644 --- a/crates/tower-cmd/src/error.rs +++ b/crates/tower-cmd/src/error.rs @@ -38,6 +38,9 @@ pub enum Error { #[snafu(display("Run was cancelled"))] RunCancelled, + #[snafu(display("Timed out waiting for run to start. The runner may be unavailable."))] + RunStartTimeout, + #[snafu(display("App crashed during local execution"))] AppCrashed, diff --git a/crates/tower-cmd/src/output.rs b/crates/tower-cmd/src/output.rs index 2dfcd709..36eace9b 100644 --- a/crates/tower-cmd/src/output.rs +++ b/crates/tower-cmd/src/output.rs @@ -164,6 +164,9 @@ pub fn package_error(err: tower_package::Error) { "There was a problem determining exactly where your Towerfile was stored on disk" .to_string() } + tower_package::Error::InvalidGlob { message } => { + format!("Invalid file glob pattern: {}", message) + } }; let line = format!("{} {}\n", "Package error:".red(), msg); diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index 03a10e5c..631fa6eb 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -740,21 +740,24 @@ fn create_pyiceberg_catalog_property_name(catalog_name: &str, property_name: &st format!("PYICEBERG_CATALOG__{}__{}", catalog_name, property_name) } +const RUN_START_TIMEOUT: Duration = Duration::from_secs(30); + /// wait_for_run_start waits for the run to enter a "running" state. It polls the API every 500ms to see /// if it's started yet. async fn wait_for_run_start(config: &Config, run: &Run) -> Result<(), Error> { - loop { - let res = api::describe_run(config, &run.app_name, run.number).await?; + timeout(RUN_START_TIMEOUT, async { + loop { + let res = api::describe_run(config, &run.app_name, run.number).await?; + + if is_run_started(&res.run)? { + return Ok(()); + } - if is_run_started(&res.run)? { - break; - } else { - // Wait half a second to to try again. sleep(Duration::from_millis(500)).await; } - } - - Ok(()) + }) + .await + .map_err(|_| Error::RunStartTimeout)? } /// wait_for_run_completion waits for the run to enter an terminal state. It polls the API every diff --git a/crates/tower-package/src/error.rs b/crates/tower-package/src/error.rs index 97cd1e77..dde66067 100644 --- a/crates/tower-package/src/error.rs +++ b/crates/tower-package/src/error.rs @@ -11,6 +11,9 @@ pub enum Error { #[snafu(display("Invalid path"))] InvalidPath, + + #[snafu(display("Invalid glob pattern: {message}"))] + InvalidGlob { message: String }, } impl From for Error { diff --git a/crates/tower-package/src/lib.rs b/crates/tower-package/src/lib.rs index 59cd539c..04c7d007 100644 --- a/crates/tower-package/src/lib.rs +++ b/crates/tower-package/src/lib.rs @@ -224,6 +224,15 @@ impl Package { // less. let base_dir = spec.base_dir.canonicalize()?; + // Canonicalize import paths upfront so the resolver can whitelist files within them. + let canonical_import_paths: Vec = spec + .import_paths + .iter() + .map(|p| base_dir.join(p).canonicalize()) + .collect::, _>>()?; + + let resolver = FileResolver::new(base_dir.clone(), canonical_import_paths.clone()); + let tmp_dir = TmpDir::new("tower-package").await?; let package_path = tmp_dir.to_path_buf().join("package.tar"); debug!("building package at: {:?}", package_path); @@ -253,7 +262,7 @@ impl Package { for file_glob in file_globs { let path = base_dir.join(file_glob); - resolve_glob_path(path, &base_dir, &mut file_paths).await; + resolver.resolve_glob(path, &mut file_paths).await?; } // App code lives in the app dir @@ -277,13 +286,17 @@ impl Package { let mut import_paths = vec![]; // Now we need to package up all the modules to include in the code base too. - for import_path in &spec.import_paths { - // The import_path should always be relative to the base_path. - let import_path = base_dir.join(import_path).canonicalize()?; - let parent = import_path.parent().unwrap(); + for import_path in &canonical_import_paths { let mut file_paths = HashMap::new(); - resolve_path(&import_path, parent, &mut file_paths).await; + resolver.resolve_path(&import_path, &mut file_paths).await; + + // Resolve module files relative to the import path's parent so that the + // directory structure inside the package matches the manifest entry. Without + // this, an import path that lives inside base_dir (e.g. libs/shared) would be + // resolved relative to base_dir by logical_path(), producing + // modules/libs/shared/... while the manifest entry is modules/shared. + let import_parent = import_path.parent().unwrap_or(import_path.as_path()); // The file_name should constitute the logical path let import_path = import_path.file_name().unwrap(); @@ -292,8 +305,11 @@ impl Package { import_paths.push(import_path_str); // Now we write all of these paths to the modules directory. - for (physical_path, logical_path) in file_paths { - let logical_path = module_dir.join(logical_path); + for (physical_path, _) in file_paths { + let logical_path = match physical_path.strip_prefix(import_parent) { + Ok(p) => module_dir.join(p), + Err(_) => continue, + }; let hash = compute_sha256_file(&physical_path).await?; path_hashes.insert(logical_path.clone(), hash); @@ -440,73 +456,6 @@ async fn unpack_archive>( Ok(()) } -async fn resolve_glob_path( - path: PathBuf, - base_dir: &PathBuf, - file_paths: &mut HashMap, -) { - let path_str = extract_glob_path(path); - debug!("resolving glob pattern: {}", path_str); - - for entry in glob(&path_str).unwrap() { - resolve_path(&entry.unwrap(), base_dir, file_paths).await; - } -} - -async fn resolve_path(path: &PathBuf, base_dir: &Path, file_paths: &mut HashMap) { - let mut queue = VecDeque::new(); - queue.push_back(path.to_path_buf()); - - while let Some(current_path) = queue.pop_front() { - let canonical_path = current_path.canonicalize(); - - if canonical_path.is_err() { - debug!( - " - skipping path {}: {}", - current_path.display(), - canonical_path.unwrap_err() - ); - continue; - } - - // We can safely unwrap this because we understand that it's not going to fail at this - // point. - let physical_path = canonical_path.unwrap(); - - if physical_path.is_dir() { - let mut entries = tokio::fs::read_dir(&physical_path).await.unwrap(); - - while let Some(entry) = entries.next_entry().await.unwrap() { - queue.push_back(entry.path()); - } - } else { - if !should_ignore_file(&physical_path) { - let cp = physical_path.clone(); - - match cp.strip_prefix(base_dir) { - Err(err) => { - debug!( - " - skipping file {}: not in base directory {}: {:?}", - physical_path.display(), - base_dir.display(), - err - ); - continue; - } - Ok(logical_path) => { - debug!( - " - resolved path {} to logical path {}", - physical_path.display(), - logical_path.display() - ); - file_paths.insert(physical_path, logical_path.to_path_buf()); - } - } - } - } - } -} - fn is_in_dir(p: &PathBuf, dir: &str) -> bool { let mut comps = p.components(); comps.any(|comp| { @@ -526,37 +475,154 @@ fn is_file(p: &PathBuf, name: &str) -> bool { } } -fn should_ignore_file(p: &PathBuf) -> bool { - // Ignore anything that is compiled python - if p.ends_with(".pyc") { - return true; - } +struct FileResolver { + // base_dir is the directory from which logical paths are computed. + base_dir: PathBuf, - if is_file(p, "Towerfile") { - return true; - } + // import_paths are canonicalized paths to imported directories. Files within these directories + // are also allowed, with logical paths computed relative to each import path's parent. + import_paths: Vec, +} - // Ignore a .gitignore file - if is_file(p, ".gitignore") { - return true; +impl FileResolver { + fn new(base_dir: PathBuf, import_paths: Vec) -> Self { + Self { + base_dir, + import_paths, + } } - // Remove anything thats __pycache__ - if is_in_dir(p, "__pycache__") { - return true; + fn should_ignore(&self, p: &PathBuf) -> bool { + // Ignore anything that is compiled python + if p.extension().map(|ext| ext == "pyc").unwrap_or(false) { + return true; + } + + // Only exclude the root Towerfile (base_dir/Towerfile). Since base_dir is already + // canonicalized, we can derive this path directly. Towerfiles in sub-directories are + // legitimate app content and must be preserved. + if p == &self.base_dir.join("Towerfile") { + return true; + } + + // Ignore a .gitignore file + if is_file(p, ".gitignore") { + return true; + } + + // Remove anything thats __pycache__ + if is_in_dir(p, "__pycache__") { + return true; + } + + // Ignore anything that lives within a .git directory + if is_in_dir(p, ".git") { + return true; + } + + // Ignore anything that's in a virtualenv, too + if is_in_dir(p, ".venv") { + return true; + } + + false } - // Ignore anything that lives within a .git directory - if is_in_dir(p, ".git") { - return true; + fn logical_path<'a>(&self, physical_path: &'a Path) -> Option<&'a Path> { + if let Ok(p) = physical_path.strip_prefix(&self.base_dir) { + return Some(p); + } + + // Try each import path's parent as a prefix. This allows files within import paths + // (which may live outside base_dir) to be resolved with logical paths that preserve + // the import directory name (e.g. "shared_lib/foo.py"). + for import_path in &self.import_paths { + if let Some(parent) = import_path.parent() { + if let Ok(p) = physical_path.strip_prefix(parent) { + return Some(p); + } + } + } + + None } - // Ignore anything that's in a virtualenv, too - if is_in_dir(p, ".venv") { - return true; + async fn resolve_glob( + &self, + path: PathBuf, + file_paths: &mut HashMap, + ) -> Result<(), Error> { + let path_str = extract_glob_path(path); + debug!("resolving glob pattern: {}", path_str); + + let entries = glob(&path_str).map_err(|e| Error::InvalidGlob { + message: format!("{}: {}", path_str, e), + })?; + + for entry in entries { + match entry { + Ok(path) => self.resolve_path(&path, file_paths).await, + Err(e) => { + debug!("skipping glob entry: {}", e); + } + } + } + + Ok(()) } - return false; + async fn resolve_path(&self, path: &PathBuf, file_paths: &mut HashMap) { + let mut queue = VecDeque::new(); + queue.push_back(path.to_path_buf()); + + while let Some(current_path) = queue.pop_front() { + let canonical_path = current_path.canonicalize(); + + if canonical_path.is_err() { + debug!( + " - skipping path {}: {}", + current_path.display(), + canonical_path.unwrap_err() + ); + continue; + } + + // We can safely unwrap this because we understand that it's not going to fail at this + // point. + let physical_path = canonical_path.unwrap(); + + if physical_path.is_dir() { + let mut entries = tokio::fs::read_dir(&physical_path).await.unwrap(); + + while let Some(entry) = entries.next_entry().await.unwrap() { + queue.push_back(entry.path()); + } + } else { + if !self.should_ignore(&physical_path) { + let cp = physical_path.clone(); + + match self.logical_path(&cp) { + None => { + debug!( + " - skipping file {}: not in base directory {}: ...", + physical_path.display(), + self.base_dir.display(), + ); + continue; + } + Some(logical_path) => { + debug!( + " - resolved path {} to logical path {}", + physical_path.display(), + logical_path.display() + ); + file_paths.insert(physical_path, logical_path.to_path_buf()); + } + } + } + } + } + } } // normalize_path converts a Path to a normalized string with forward slashes as separators. @@ -650,9 +716,22 @@ pub async fn compute_sha256_file(file_path: &PathBuf) -> Result { #[cfg(test)] mod test { use super::*; - use std::collections::HashMap; use std::path::PathBuf; + #[test] + fn test_should_ignore_pyc_files() { + let resolver = FileResolver::new(PathBuf::from("/project"), vec![]); + + // A .pyc file should be ignored + assert!(resolver.should_ignore(&PathBuf::from("/project/module.pyc"))); + + // A .pyc file in a subdirectory should be ignored + assert!(resolver.should_ignore(&PathBuf::from("/project/sub/module.pyc"))); + + // A .py file should not be ignored + assert!(!resolver.should_ignore(&PathBuf::from("/project/module.py"))); + } + #[tokio::test] async fn test_normalize_path() { let path = PathBuf::from(".") diff --git a/crates/tower-package/tests/package_test.rs b/crates/tower-package/tests/package_test.rs index 725f8c8a..c201e2c5 100644 --- a/crates/tower-package/tests/package_test.rs +++ b/crates/tower-package/tests/package_test.rs @@ -306,6 +306,63 @@ async fn it_packages_import_paths() { ); } +#[tokio::test] +async fn it_packages_import_paths_nested_within_base_dir() { + // When an import path lives inside base_dir (e.g. libs/shared), module files must + // still be placed under modules//... (not modules/libs/shared/...) so that + // the package structure matches the manifest's PYTHONPATH entry. + let tmp_dir = TmpDir::new("nested-import") + .await + .expect("Failed to create temp dir"); + create_test_file(tmp_dir.to_path_buf(), "Towerfile", "").await; + create_test_file(tmp_dir.to_path_buf(), "main.py", "print('Hello')").await; + create_test_file(tmp_dir.to_path_buf(), "libs/shared/__init__.py", "").await; + create_test_file(tmp_dir.to_path_buf(), "libs/shared/util.py", "# util").await; + + let spec = PackageSpec { + invoke: "main.py".to_string(), + base_dir: tmp_dir.to_path_buf(), + towerfile_path: tmp_dir.to_path_buf().join("Towerfile"), + file_globs: vec!["main.py".to_string()], + parameters: vec![], + schedule: None, + import_paths: vec!["libs/shared".to_string()], + }; + + let package = Package::build(spec).await.expect("Failed to build package"); + let files = read_package_files(package).await; + + // Module files should be under modules/shared/..., NOT modules/libs/shared/... + assert!( + files.contains_key(make_path!("modules", "shared", "__init__.py")), + "files {:?} was missing modules/shared/__init__.py", + files + ); + assert!( + files.contains_key(make_path!("modules", "shared", "util.py")), + "files {:?} was missing modules/shared/util.py", + files + ); + assert!( + !files.contains_key(make_path!("modules", "libs", "shared", "__init__.py")), + "files {:?} should NOT contain modules/libs/shared/__init__.py", + files + ); + + // Verify the manifest import_paths entry matches the actual package structure. + let manifest = Manifest::from_json(files.get("MANIFEST").unwrap()) + .await + .expect("Manifest was not valid JSON"); + + assert!( + manifest + .import_paths + .contains(make_path!("modules", "shared")), + "Import paths {:?} did not contain expected path modules/shared", + manifest.import_paths + ); +} + #[tokio::test] async fn it_excludes_various_content_that_should_not_be_there() { let tmp_dir = TmpDir::new("example") @@ -385,6 +442,70 @@ async fn building_package_spec_from_towerfile() { assert_eq!(spec.schedule, Some("0 0 * * *".to_string())); } +#[tokio::test] +async fn it_includes_subapp_towerfiles_but_excludes_root_towerfile() { + // When a project contains sub-apps with their own Towerfiles, only the root Towerfile (the + // one used to build the package) should be excluded. Towerfiles belonging to sub-apps must + // be included so those apps can function correctly. + let tmp_dir = TmpDir::new("subapp-towerfile") + .await + .expect("Failed to create temp dir"); + + // Root app files + create_test_file(tmp_dir.to_path_buf(), "Towerfile", "[app]\nname = \"root\"").await; + create_test_file(tmp_dir.to_path_buf(), "main.py", "print('Hello, world!')").await; + + // Sub-app with its own Towerfile + create_test_file(tmp_dir.to_path_buf(), "subapp/Towerfile", "[app]\nname = \"subapp\"").await; + create_test_file(tmp_dir.to_path_buf(), "subapp/main.py", "print('subapp')").await; + + let spec = PackageSpec { + invoke: "main.py".to_string(), + base_dir: tmp_dir.to_path_buf(), + towerfile_path: tmp_dir.to_path_buf().join("Towerfile"), + file_globs: vec![], + parameters: vec![], + schedule: None, + import_paths: vec![], + }; + + let package = Package::build(spec).await.expect("Failed to build package"); + let files = read_package_files(package).await; + + // Root Towerfile should NOT be in the app directory (it's added separately as "Towerfile") + assert!( + !files.contains_key("app/Towerfile"), + "files {:?} should not contain the root Towerfile under app/", + files + ); + + // The root Towerfile is still bundled at the top level for reference + assert!( + files.contains_key("Towerfile"), + "files {:?} should contain the root Towerfile at the top level", + files + ); + + // Sub-app's Towerfile MUST be included + assert!( + files.contains_key("app/subapp/Towerfile"), + "files {:?} should contain the sub-app Towerfile", + files + ); + + // Other files should be present + assert!( + files.contains_key("app/main.py"), + "files {:?} was missing main.py", + files + ); + assert!( + files.contains_key("app/subapp/main.py"), + "files {:?} was missing subapp/main.py", + files + ); +} + #[tokio::test] async fn it_includes_hidden_parameters_in_manifest() { let tmp_dir = TmpDir::new("hidden-params") diff --git a/pyproject.toml b/pyproject.toml index fdf30a6e..7b165c9f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "tower" -version = "0.3.52" +version = "0.3.53" description = "Tower CLI and runtime environment for Tower." authors = [{ name = "Tower Computing Inc.", email = "brad@tower.dev" }] readme = "README.md" diff --git a/uv.lock b/uv.lock index 7ded35b2..4fe7e815 100644 --- a/uv.lock +++ b/uv.lock @@ -2488,7 +2488,7 @@ wheels = [ [[package]] name = "tower" -version = "0.3.52" +version = "0.3.53" source = { editable = "." } dependencies = [ { name = "attrs" },