Skip to content

Rewrite automation service to use Temporal#35

Draft
malhotra5 wants to merge 9 commits into
mainfrom
feat/temporal-rewrite
Draft

Rewrite automation service to use Temporal#35
malhotra5 wants to merge 9 commits into
mainfrom
feat/temporal-rewrite

Conversation

@malhotra5
Copy link
Copy Markdown
Member

@malhotra5 malhotra5 commented Apr 3, 2026

Summary

This PR rewrites the automation service to use Temporal for durable workflow execution, replacing the legacy polling-based scheduler/dispatcher/watchdog system.

Key Changes

Removed (Legacy Code)

  • scheduler.py - Cron polling loop → replaced by Temporal Schedules
  • dispatcher.py - Pending run polling → replaced by Temporal Workflows
  • watchdog.py - Stale run detection → replaced by Temporal timeouts
  • execution.py - Sandbox lifecycle → moved to Temporal Activities
  • utils/run.py, utils/cron.py, utils/api_key.py - Legacy utilities

Added (Temporal Implementation)

File Purpose
temporal/types.py Data classes for workflow inputs/outputs
temporal/activities.py 6 activities for sandbox operations
temporal/workflows.py AutomationWorkflow with crash-proof execution
temporal/schedules.py Temporal Schedule management for cron jobs
temporal/client.py Temporal client factory
temporal/worker.py Worker setup

Updated

  • app.py - FastAPI app now runs Temporal worker as background task
  • router.py - API routes use Temporal for workflow execution
  • config.py - Added Temporal connection settings

Benefits of Temporal

  1. Fault Tolerance: Workflows automatically resume after crashes
  2. Built-in Timeouts: No need for watchdog polling
  3. Schedules: First-class cron scheduling with ScheduleSpec
  4. Activity Retries: Automatic retries with configurable policies
  5. Visibility: Rich UI for monitoring workflow execution

Tests Added

Test File Description Count
test_types.py Data class tests 12
test_schedules.py Schedule CRUD tests 11
test_activities.py Activity tests with ActivityEnvironment 12
test_workflows.py Workflow tests (skipped by default) 4

Running Tests

# Unit tests (fast, no external deps)
uv run pytest tests/temporal/test_types.py tests/temporal/test_schedules.py -v

# Activity tests
uv run pytest tests/temporal/test_activities.py -v

# All temporal tests (excluding workflow tests)
uv run pytest tests/temporal/ -v

Deployment Notes

Requires Temporal server. For Kubernetes deployment, Temporal can be deployed as pods. Configuration via environment variables:

AUTOMATION_TEMPORAL_HOST=temporal.default.svc.cluster.local
AUTOMATION_TEMPORAL_PORT=7233
AUTOMATION_TEMPORAL_NAMESPACE=default
AUTOMATION_TEMPORAL_TASK_QUEUE=automations

This PR was created by an AI assistant (OpenHands) on behalf of the user.

Replace legacy polling-based execution with Temporal workflows:

## Removed (Legacy Code)
- scheduler.py - Cron polling loop (replaced by Temporal Schedules)
- dispatcher.py - Pending run polling (replaced by Temporal Workflows)
- watchdog.py - Stale run detection (replaced by Temporal timeouts)
- execution.py - Sandbox lifecycle (moved to Temporal Activities)
- utils/run.py, utils/cron.py, utils/api_key.py - Legacy utilities

## Added (Temporal Implementation)
- temporal/types.py - Data classes for workflow inputs/outputs
- temporal/activities.py - 6 activities for sandbox operations
- temporal/workflows.py - AutomationWorkflow with crash-proof execution
- temporal/schedules.py - Temporal Schedule management for cron jobs
- temporal/client.py - Temporal client factory
- temporal/worker.py - Worker setup

## Updated
- app.py - FastAPI app now runs Temporal worker as background task
- router.py - API routes use Temporal for workflow execution
- config.py - Added Temporal connection settings

## Tests Added
- tests/temporal/test_types.py - Data class tests
- tests/temporal/test_schedules.py - Schedule CRUD tests
- tests/temporal/test_activities.py - Activity tests with ActivityEnvironment
- tests/temporal/test_workflows.py - Workflow tests (skipped by default)

Co-authored-by: openhands <openhands@all-hands.dev>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 3, 2026

🚀 Deploy Preview PR Created/Updated

A deploy preview has been created/updated for this PR.

Deploy PR: https://github.com/OpenHands/deploy/pull/3663
Automation SHA: a13e6a4b0a77eebdb8b6d06fa94439c5eb8f5fa2
Last updated: Apr 03, 2026, 10:45:55 PM ET

Once the deploy PR's CI passes, the automation service will be deployed to the feature environment.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 3, 2026

Coverage

Coverage Report
FileStmtsMissCoverMissing
__init__.py10100% 
app.py994851%35, 38, 41, 47, 49, 52–55, 58–64, 69–71, 73–75, 79–80, 82, 84, 87–88, 91–100, 103, 106–107, 112–114, 178–181
auth.py77692%59, 92, 149, 200, 208–209
config.py46197%69
constants.py120100% 
db.py442640%37–39, 48–49, 51–52, 54, 62, 69, 79, 82–83, 87–88, 96, 104, 109, 114, 117–123
exceptions.py440%8, 18, 21, 29
logger.py531866%25–26, 36, 46–47, 49–55, 68, 88, 90–93
models.py650100% 
preset_router.py1575068%188–189, 194–201, 206, 209, 211–212, 222–225, 227–231, 236, 245, 374–375, 380–387, 392, 395, 397–398, 408–411, 413–417, 422, 432
router.py1378736%57, 95, 98–102, 104–105, 110, 130, 132, 135, 137, 151, 165–167, 169–170, 172–173, 176–180, 182–185, 187, 199–201, 204, 223, 226, 231–233, 236, 248, 253, 261–263, 269, 275, 277–281, 286, 300, 303, 305, 312, 314, 338–339, 341, 346–348, 350–352, 354–355, 361, 365–368, 371, 379, 381–382, 387–388, 390, 411–413, 417
schemas.py1441192%67–69, 71, 129, 153–154, 157, 162, 167, 173
uploads.py1075944%138–141, 149–151, 157–158, 161, 170–171, 174–175, 183–184, 186–189, 192–195, 197, 199–201, 203–206, 208–209, 211, 226, 232–233, 236, 239, 242, 245, 247, 260–261, 275, 278–280, 282–283, 285, 291–292, 305, 313–315, 319
presets
   __init__.py00100% 
storage
   __init__.py50100% 
   factory.py110100% 
   file_store.py18572%11, 20, 25, 30, 54
   google_cloud.py751086%103–108, 142–143, 196, 198
   s3.py1151487%100, 102–103, 107, 109, 190, 213–215, 269–270, 275, 337–338
temporal
   __init__.py30100% 
   activities.py1662883%90, 104, 148, 156, 172–180, 182–183, 185, 285–286, 291, 405–406, 409, 412, 451, 454, 459–461
   client.py322328%30–31, 33, 40–42, 44–47, 49, 53, 56–57, 59, 65–66, 83–85, 91, 95–96
   schedules.py881681%89, 167–168, 189–190, 203, 207–208, 303, 305–308, 319–321
   types.py800100% 
   worker.py412831%40–41, 43–44, 46, 53, 60, 78, 80, 82, 84, 87–89, 98, 100, 102–104, 106–108, 110–111, 113, 118, 120–121
   workflows.py643840%64–65, 132, 141–143, 145, 147, 158, 161, 173, 176, 178, 180, 192–194, 196, 206, 218, 227–228, 231, 236, 239, 255, 264, 273–274, 281, 288–289, 293, 302–304, 315–316
utils
   __init__.py20100% 
   sandbox.py1081080%7–8, 10–11, 14, 17–18, 21–25, 28, 32–37, 40, 51–52, 57–60, 62–64, 66–72, 75, 84, 86, 96–97, 99–101, 103–104, 107–108, 114, 120–122, 125, 132–133, 138–144, 147, 167–168, 170–174, 176–180, 183–184, 187–192, 195, 217–218, 220, 222–225, 230–231, 234, 236–237, 243–245, 250–252, 257–258, 266–268, 270
   tarball_url.py22195%69
   tarball_validation.py290100% 
   time.py30100% 
TOTAL180858167% 

openhands-agent and others added 8 commits April 3, 2026 22:05
- Add mock_temporal_client fixture to conftest.py
- Override get_client dependency for async_client tests
- Add list_workflows mock for readiness check
- Update TestAuthIntegration tests to use mock_temporal_client
- Fix line length issues flagged by pre-commit

Co-authored-by: openhands <openhands@all-hands.dev>
- Mock response should return {items: [{exit_code, stdout, stderr}]}
- Mock asyncio.sleep to avoid test timeout
- These tests were timing out because they were waiting for exit_code

Co-authored-by: openhands <openhands@all-hands.dev>
- test_ready_endpoint_db_unavailable: Check 'errors' array instead of 'error'
- dispatch tests: Expect RUNNING status instead of PENDING
- dispatch tests: Expect started_at to be set (workflow starts immediately)
- Replace test_dispatch_updates_last_triggered_at with test_dispatch_creates_running_run
- Fix execute_entrypoint tests with proper response structure and mock asyncio.sleep

Co-authored-by: openhands <openhands@all-hands.dev>
…ctions

The Temporal workflow sandbox restricts imports like httpx and urllib.request.
The workflow was importing tarball_validation.py which transitively imported
fastapi, sqlalchemy, and httpx through its dependency chain.

This fix:
1. Creates tarball_url.py with pure URL parsing functions (no heavy deps)
2. Updates workflows.py to import from the lightweight module
3. Updates tarball_validation.py to re-export from tarball_url.py

This allows the workflow code to use is_http_url() and parse_internal_upload_id()
without triggering sandbox restriction errors.

Co-authored-by: openhands <openhands@all-hands.dev>
When running with ddtrace-run, the import instrumentation conflicts with
Temporal's workflow sandbox, causing circular import errors with beartype.

This adds AUTOMATION_SKIP_WORKER config option that:
- When true, the API server skips creating an in-process Temporal worker
- Allows the API to run with ddtrace for HTTP tracing
- The separate worker deployment handles workflow execution without ddtrace

Co-authored-by: openhands <openhands@all-hands.dev>
The temporal package __init__.py was importing activities, workflows, and
worker modules at package level. When Temporal's workflow sandbox loads
the workflows module, it first loads the package's __init__.py, which
triggered httpx import inside the sandbox (via activities.py).

This fix removes those imports from __init__.py. All code already uses
direct module imports (e.g., from automation.temporal.activities import ...)
so this doesn't break any existing code.

Co-authored-by: openhands <openhands@all-hands.dev>
The schedules module imports automation.models which uses SQLAlchemy,
causing sandbox validation to fail with:
    TypeError: __annotations__ must be set to a dict object

Also added scripts/test_sandbox.py to verify sandbox validation locally
before deploying.

Co-authored-by: openhands <openhands@all-hands.dev>
When AUTOMATION_FAST_FAIL=true, all Temporal activity retry policies
use maximum_attempts=1 instead of the default 3-5 attempts with backoff.

This prevents long test durations when workflows fail - instead of
waiting minutes for retries to exhaust, tests fail immediately.

Production (default): Full retry policies for resilience
- API key: 5 attempts, up to 60s backoff
- Sandbox: 3 attempts, up to 2min backoff
- Tarball: 3 attempts, up to 60s backoff
- Cleanup: 3 attempts, up to 30s backoff

Fast-fail mode: No retries
- All policies: 1 attempt only

Co-authored-by: openhands <openhands@all-hands.dev>


@workflow.defn
class AutomationWorkflow:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it AutomationExecutionWorkflow?

I'd imagine we will have some other workflows

Copy link
Copy Markdown
Contributor

@jlav jlav left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly recommend adding a docker compose setup that has the temporal dev server set up (and any other dependencies like postgres) for ease of local development: https://learn.temporal.io/getting_started/python/dev_environment/

Having the temporal UI available locally for debugging is super useful.

logger.info("Downloading internal tarball: upload_id=%s", input.upload_id)

settings = get_settings()
engine_result = await create_engine(settings)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can share a database connection pool across all your activities. Creating an engine here creates a new pool with new connections every time.


try:
# 1. Get per-user API key
api_key = await workflow.execute_activity(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Activity output is stored in plain text in the temporal database, and is visible in the Temporal UI by default. Not a great place to store credentials.

Also, only return information from an activity that is guaranteed to be static until the end of the workflow execution. If these API keys are short lived and the workflow becomes long-lived, it's bound to fail permanently once the API Key expires.

Depending on the lifetime here, sometimes it's better to just re-retrieve info inside of individual activities. If it's long lived, you can look into data converters to encrypt the info: https://docs.temporal.io/default-custom-data-converters#default-data-converter

if upload_id is None:
raise ValueError(f"Invalid tarball_path: {tarball_path}")

tarball_data = await workflow.execute_activity(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The largest factor in workflow throughput is the size of the activity payloads. These tarballs are going to be stored directly workflow history, inside of postgres. Postgres isn't a good place to store large binary data.

Factor this out to store tarballs somewhere externally and have the activity retrieve the contents directly.

EXTERNAL_MAX_FILESIZE is 100MB right now. The max per-event payload size defaults to 4MB in temporal, and the max history is 50MB with warnings emitted at 10MB. Increasing those defaults cripples performance, and right now any file larger than ~4MB would just fail altogether.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly recommend just biting the bullet and adding replay tests as early as possible. They'll catch non-determinism errors that you might introduce by modifying the workflow after it's already been deployed. When you make code changes to a workflow, if there are any active workflows when you deploy the change they are likely to fail if they aren't done in a backwards compatible way.

You can read more here on replay tests: https://docs.temporal.io/develop/python/testing-suite#replay

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants