Skip to content

Implement CrateDownloader actor for crates.io artifact retrieval #36

@rrrodzilla

Description

@rrrodzilla

Summary

Implement the core download functionality for the CrateDownloader actor to enable downloading, validating, and storing crate archives from crates.io following the event-driven pub/sub architecture. This actor is a stateless event broadcaster that downloads crates and broadcasts results without knowing about subscribers.

Priority

🔴 HIGH - Blocks core value proposition and downstream processing pipeline

Type

✨ Feature - New functionality

Current State

The CrateDownloader actor exists as a stub implementation with message handling infrastructure but no download logic (see src/actors/crate_downloader.rs:119-125).

Architecture Pattern

Stateless Event Broadcaster (Pattern 1)

Following .agents/important-info/actor-architecture-philosophy.md:

  • ✅ Zero internal state about "what crates are being processed"
  • ✅ Zero knowledge of subscribers or next pipeline stages
  • ✅ Pure message transformation: trigger → download → broadcast result
  • ✅ Isolated I/O concern (network downloads)
  • ✅ Uses act_on for parallel execution (multiple downloads concurrently)

Implementation Details

1. Actor Structure (Stateless)

#[acton_actor]
pub struct CrateDownloader;  // ✅ No state - stateless worker

impl CrateDownloader {
    pub async fn spawn(
        runtime: &mut AgentRuntime,
        config: PipelineConfig,
    ) -> anyhow::Result<AgentHandle> {
        let mut builder = runtime.new_agent::<CrateDownloader>().await;
        
        // ✅ Use act_on for parallel execution (no state mutation)
        builder.act_on::<CrateReceived>(|agent, envelope| {
            let specifier = envelope.message().specifier.clone();
            let features = envelope.message().features.clone();
            let broker = agent.broker().clone();  // ✅ Get broker, not actor handles
            let cache_dir = config.cache_dir.clone();
            
            AgentReply::from_async(async move {
                // 1. Check cache for existing download
                if let Some(cached_path) = check_cache(&cache_dir, &specifier).await {
                    broker.broadcast(CrateDownloaded {
                        specifier,
                        path: cached_path,
                        features,
                    }).await;
                    return;
                }
                
                // 2. Download from crates.io
                match download_and_validate(&specifier, &cache_dir).await {
                    Ok(path) => {
                        // ✅ Broadcast success - multiple subscribers react:
                        //   - FileReaderActor starts reading (next stage)
                        //   - CrateCoordinatorActor updates state
                        //   - Console displays progress
                        //   - DatabaseActor persists metadata
                        broker.broadcast(CrateDownloaded {
                            specifier,
                            path,
                            features,
                        }).await;
                    }
                    Err(e) => {
                        // ✅ Broadcast failure - observers react independently
                        broker.broadcast(CrateDownloadFailed {
                            specifier,
                            error: e.to_string(),
                        }).await;
                    }
                }
            })
        });
        
        // ✅ Subscribe to trigger event
        builder.handle().subscribe::<CrateReceived>().await;
        
        Ok(builder.start().await)
    }
}

2. HTTP Download Client

Dependencies:

reqwest = { version = "0.12", features = ["json", "rustls-tls"] }
sha2 = "0.10"  # For checksum verification

Implementation:

  • Use reqwest client with connection pooling for efficiency
  • Download from crates.io API: https://crates.io/api/v1/crates/{name}/{version}/download
  • Handle 302 redirects to actual download URLs (typically static.crates.io)
  • Implement retry logic with exponential backoff (3 attempts, 1s/2s/4s delays)
  • Set appropriate User-Agent header: crately/{version} (roland@govcraft.ai)
  • Stream downloads to disk to handle large crates efficiently

3. XDG-Compliant Storage

Cache Directory Structure:

$XDG_CACHE_HOME/crately/
├── downloads/
│   ├── {crate_name}/
│   │   └── {version}/
│   │       ├── {crate_name}-{version}.crate  # Downloaded archive
│   │       └── metadata.json                  # Download metadata

Implementation:

  • Use existing xdg crate dependency (already in Cargo.toml)
  • Create cache directories with proper permissions (0o755)
  • Store metadata alongside archives (timestamp, checksum, size)
  • Handle concurrent downloads with file locking
  • Check for existing downloads before attempting re-download

4. Archive Validation

Checksum Verification:

  • Fetch checksum from crates.io API: https://crates.io/api/v1/crates/{name}/{version}
  • Verify SHA-256 hash of downloaded archive matches expected value
  • Broadcast CrateDownloadFailed if verification fails

Archive Integrity:

  • Verify archive is valid gzip-compressed tar format
  • Basic header validation before marking download complete
  • Quarantine corrupted downloads in separate directory

5. Event Broadcasting (Zero Coupling)

Events to Broadcast:

// ✅ Success - multiple subscribers react
broker.broadcast(CrateDownloaded {
    specifier: CrateSpecifier,
    path: PathBuf,
    features: Vec<String>,
}).await;

// ✅ Failure - observers handle independently
broker.broadcast(CrateDownloadFailed {
    specifier: CrateSpecifier,
    error: String,
}).await;

// Optional: Progress updates during download
broker.broadcast(DownloadProgress {
    specifier: CrateSpecifier,
    bytes_downloaded: u64,
    total_bytes: Option<u64>,
}).await;

Who Subscribes (Worker Doesn't Know or Care):

  • FileReaderActor - Starts reading extracted files (next pipeline stage)
  • CrateCoordinatorActor - Updates state: Received → Downloaded
  • Console - Displays "Downloaded: {name}@{version}"
  • DatabaseActor - Persists download metadata and timestamp

6. Error Handling Strategy

Comprehensive Error Types:

  • Network errors (timeout, connection refused, DNS failure)
  • HTTP errors (404 not found, 403 forbidden, 500 server error)
  • Validation errors (checksum mismatch, corrupt archive)
  • Filesystem errors (disk full, permission denied, path too long)

Retry Strategy:

  • Network transients: 3 retries with exponential backoff
  • HTTP 429 (rate limit): Respect Retry-After header
  • HTTP 5xx: 2 retries with backoff
  • HTTP 4xx: No retry, immediate failure
  • Validation failures: No retry, mark as failed

All errors broadcast as events for observers to react:

broker.broadcast(CrateDownloadFailed {
    specifier,
    error: format!("Network timeout after 3 retries: {}", e),
}).await;

7. Helper Functions (Pure Logic)

// Pure functions - no actor state mutation
async fn download_and_validate(
    specifier: &CrateSpecifier,
    cache_dir: &Path,
) -> anyhow::Result<PathBuf> {
    // Download logic
}

async fn check_cache(
    cache_dir: &Path,
    specifier: &CrateSpecifier,
) -> Option<PathBuf> {
    // Cache check logic
}

fn verify_checksum(path: &Path, expected: &str) -> anyhow::Result<()> {
    // Validation logic
}

Acceptance Criteria

  • CrateDownloader is stateless (pub struct CrateDownloader;)
  • Uses act_on handler for parallel execution
  • Broadcasts CrateDownloaded on success (doesn't send to specific actors)
  • Broadcasts CrateDownloadFailed on failure
  • Zero knowledge of subscribers (no Console/Database actor lookups)
  • Downloads crate archives from crates.io
  • Stores in XDG-compliant cache directory
  • SHA-256 checksum verification implemented
  • Retry logic handles transient failures gracefully
  • Concurrent downloads work correctly (parallel execution)
  • Existing cached downloads detected and reused
  • No println! or direct console output
  • Unit test coverage ≥90% for new code
  • Integration tests verify event broadcast
  • All clippy lints resolved (zero warnings)
  • Documentation complete with pub/sub pattern examples

Testing Requirements

Unit Tests:

  • ✅ Download URL construction for various crate names/versions
  • ✅ Checksum verification logic
  • ✅ Cache path generation following XDG spec
  • ✅ Error handling for all failure modes
  • ✅ Retry logic and backoff calculation

Integration Tests:

  • ✅ Download small test crate (e.g., serde@1.0.0)
  • ✅ Verify cache directory structure created correctly
  • ✅ Test concurrent downloads execute in parallel
  • ✅ Verify event broadcast (not direct sends)
  • ✅ Test graceful degradation on network failures
  • ✅ Verify multiple subscribers receive events

Event Flow Testing:

#[tokio::test]
async fn test_download_broadcasts_success() {
    let mut runtime = ActonApp::launch();
    let broker = runtime.broker();
    
    // Create test subscriber
    let mut receiver = broker.subscribe::<CrateDownloaded>();
    
    // Spawn downloader
    let downloader = CrateDownloader::spawn(&mut runtime, config).await?;
    
    // Trigger download
    broker.broadcast(CrateReceived { /* ... */ }).await;
    
    // Verify event broadcast (not direct send)
    let event = receiver.recv().await?;
    assert_eq!(event.specifier.name(), "test-crate");
}

Dependencies

Blocking:

  • Message types: CrateReceived, CrateDownloaded, CrateDownloadFailed
  • Pipeline configuration (PipelineConfig in Config struct)

New Crate Dependencies:

  • reqwest@0.12 with json and rustls-tls features
  • sha2@0.10 for checksum verification

Files to Modify

  • src/actors/crate_downloader.rs - Main implementation (replace stub)
  • src/messages/crate_downloaded.rs - Success event (create)
  • src/messages/crate_download_failed.rs - Failure event (create)
  • src/messages/download_progress.rs - Optional progress event (create)
  • src/messages/mod.rs - Export new message types
  • Cargo.toml - Add new dependencies

Estimated Effort

Complexity: Medium-High
Time Estimate: 6-8 hours

  • HTTP client implementation: 2 hours
  • Storage and validation: 2 hours
  • Event broadcasting pattern: 1 hour
  • Error handling and retries: 1.5 hours
  • Testing (including event flow): 2-3 hours
  • Documentation: 0.5 hour

Anti-Patterns to Avoid

Don't do this (tight coupling):

let console = agent.get_actor("console");
let database = agent.get_actor("database");
console.send(PrintSuccess { ... }).await;
database.send(PersistCrate { ... }).await;

Do this (pub/sub decoupling):

let broker = agent.broker().clone();
broker.broadcast(CrateDownloaded { ... }).await;
// Multiple subscribers react independently

Don't maintain processing state:

struct CrateDownloader {
    current_downloads: HashMap<CrateSpecifier, Status>,  // ❌ No!
}

Stay stateless:

pub struct CrateDownloader;  // ✅ Yes!

References

  • .agents/important-info/actor-architecture-philosophy.md (Pattern 1: lines 116-149)
  • Code example: lines 294-338
  • act_on vs mutate_on: lines 42-110
  • Zero coupling principle: lines 31-38
  • Existing pattern: src/actors/server_actor.rs:268 (ServerStarted broadcast)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions