Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions .github/workflows/durabletask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,7 @@ jobs:
- name: Pytest unit tests
working-directory: tests/durabletask
run: |
pytest -m "not e2e and not dts" --verbose
# Sidecar for running e2e tests requires Go SDK
- name: Install Go SDK
uses: actions/setup-go@v5
with:
go-version: 'stable'
# Install and run the durabletask-go sidecar for running e2e tests
- name: Pytest e2e tests
working-directory: tests/durabletask
run: |
go install github.com/microsoft/durabletask-go@main
durabletask-go --port 4001 &
pytest -m "e2e and not dts" --verbose
pytest -m "not dts" --verbose

publish-release:
if: startsWith(github.ref, 'refs/tags/v') # Only run if a matching tag is pushed
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

ADDED

- Added `durabletask.testing` module with `InMemoryOrchestrationBackend` for testing orchestrations without a sidecar process

FIXED:

- Fix unbound variable in entity V1 processing
Expand Down
9 changes: 3 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
init:
pip3 install -r requirements.txt

test-unit:
pytest -m "not e2e" --verbose

test-e2e:
pytest -m e2e --verbose
test:
pytest --verbose

install:
python3 -m pip install .
Expand All @@ -16,4 +13,4 @@ gen-proto:
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
rm durabletask/internal/*.proto

.PHONY: init test-unit test-e2e gen-proto install
.PHONY: init test gen-proto install
21 changes: 3 additions & 18 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,10 @@ make gen-proto

This will download the `orchestrator_service.proto` from the `microsoft/durabletask-protobuf` repo and compile it using `grpcio-tools`. The version of the source proto file that was downloaded can be found in the file `durabletask/internal/PROTO_SOURCE_COMMIT_HASH`.

### Running unit tests
### Running tests

Unit tests can be run using the following command from the project root. Unit tests _don't_ require a sidecar process to be running.
Tests can be run using the following command from the project root.

```sh
make test-unit
```

### Running E2E tests

The E2E (end-to-end) tests require a sidecar process to be running. You can use the Durable Task test sidecar using the following `docker` command:

```sh
go install github.com/microsoft/durabletask-go@main
durabletask-go --port 4001
```

To run the E2E tests, run the following command from the project root:

```sh
make test-e2e
make test
```
293 changes: 293 additions & 0 deletions durabletask/testing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
# Durable Task Testing Utilities

This package provides testing utilities for the Durable Task Python SDK,
including an in-memory backend that eliminates the need for external
dependencies during testing.

## In-Memory Backend

The `InMemoryOrchestrationBackend` is a lightweight, in-memory implementation
of the Durable Task backend that runs as a gRPC server. It's designed for
testing scenarios where you want to test orchestrations without requiring a
sidecar process or external storage.

### Features

- **In-memory state storage**: All orchestration state is stored in memory
- **Full gRPC compatibility**: Implements the same gRPC interface as the production backend
- **Thread-safe**: Safe for concurrent access from multiple threads
- **Work item streaming**: Supports streaming work items to workers
- **Event handling**: Supports raising events, timers, and sub-orchestrations
- **Entity support**: Supports function-based and class-based entities
- **Lifecycle management**: Supports suspend, resume, terminate, and restart operations
- **State waiting**: Built-in support for waiting on orchestration state changes

### Quick Start

```python
import pytest
from durabletask.testing import create_test_backend
from durabletask.client import TaskHubGrpcClient, OrchestrationStatus
from durabletask.worker import TaskHubGrpcWorker

@pytest.fixture
def backend():
"""Create an in-memory backend for testing."""
backend = create_test_backend(port=50051)
yield backend
backend.stop()
backend.reset()

def test_simple_orchestration(backend):
# Create client and worker
client = TaskHubGrpcClient(host_address="localhost:50051")
worker = TaskHubGrpcWorker(host_address="localhost:50051")

# Define orchestrator and activity
def hello_orchestrator(ctx, _):
result = yield ctx.call_activity(say_hello, input="World")
return result

def say_hello(ctx, name: str):
return f"Hello, {name}!"

# Register orchestrator and activity with the worker
worker.add_orchestrator(hello_orchestrator)
worker.add_activity(say_hello)

# Start worker
worker.start()

try:
# Schedule orchestration
instance_id = client.schedule_new_orchestration(hello_orchestrator)

# Wait for completion
state = client.wait_for_orchestration_completion(instance_id, timeout=10)

# Verify results
assert state.runtime_status == OrchestrationStatus.COMPLETED
assert state.serialized_output == '"Hello, World!"'
finally:
worker.stop()
```

### Advanced Usage

#### Testing with Multiple Ports

```python
import random
import pytest
from durabletask.testing import create_test_backend
from durabletask.client import TaskHubGrpcClient
from durabletask.worker import TaskHubGrpcWorker

@pytest.fixture
def backend():
# Use a random port to avoid conflicts
port = random.randint(50000, 60000)
backend = create_test_backend(port=port)
yield backend, port
backend.stop()
backend.reset()

def test_orchestration(backend):
backend_instance, port = backend
client = TaskHubGrpcClient(host_address=f"localhost:{port}")
worker = TaskHubGrpcWorker(host_address=f"localhost:{port}")
# ...
```

#### Testing Event Handling

```python
def test_external_events(backend):
client = TaskHubGrpcClient(host_address="localhost:50051")
worker = TaskHubGrpcWorker(host_address="localhost:50051")

def wait_for_event_orchestrator(ctx, _):
event_data = yield ctx.wait_for_external_event("approval")
return event_data

worker.add_orchestrator(wait_for_event_orchestrator)
worker.start()

try:
instance_id = client.schedule_new_orchestration(wait_for_event_orchestrator)

# Wait for orchestration to start
client.wait_for_orchestration_start(instance_id, timeout=5)

# Raise an event
client.raise_orchestration_event(instance_id, "approval", data="approved")

# Wait for completion
state = client.wait_for_orchestration_completion(instance_id, timeout=10)

assert state.runtime_status == OrchestrationStatus.COMPLETED
assert state.serialized_output == '"approved"'
finally:
worker.stop()
```

#### Testing Sub-Orchestrations

```python
def test_sub_orchestrations(backend):
client = TaskHubGrpcClient(host_address="localhost:50051")
worker = TaskHubGrpcWorker(host_address="localhost:50051")

def parent_orchestrator(ctx, _):
result1 = yield ctx.call_sub_orchestrator(child_orchestrator, input=1)
result2 = yield ctx.call_sub_orchestrator(child_orchestrator, input=2)
return result1 + result2

def child_orchestrator(ctx, input: int):
return input * 2

worker.add_orchestrator(parent_orchestrator)
worker.add_orchestrator(child_orchestrator)
worker.start()

try:
instance_id = client.schedule_new_orchestration(parent_orchestrator)
state = client.wait_for_orchestration_completion(instance_id, timeout=10)

assert state.runtime_status == OrchestrationStatus.COMPLETED
assert state.serialized_output == "6" # (1*2) + (2*2)
finally:
worker.stop()
```

#### Testing Timers

```python
def test_durable_timers(backend):
import time
from datetime import timedelta

client = TaskHubGrpcClient(host_address="localhost:50051")
worker = TaskHubGrpcWorker(host_address="localhost:50051")

def timer_orchestrator(ctx, _):
fire_at = ctx.current_utc_datetime + timedelta(seconds=1)
yield ctx.create_timer(fire_at)
return "timer_fired"

worker.add_orchestrator(timer_orchestrator)
worker.start()

try:
start_time = time.time()
instance_id = client.schedule_new_orchestration(timer_orchestrator)
state = client.wait_for_orchestration_completion(instance_id, timeout=10)
elapsed = time.time() - start_time

assert state.runtime_status == OrchestrationStatus.COMPLETED
assert elapsed >= 1.0 # Timer should have waited at least 1 second
finally:
worker.stop()
```

#### Testing Termination

```python
def test_orchestration_termination(backend):
client = TaskHubGrpcClient(host_address="localhost:50051")
worker = TaskHubGrpcWorker(host_address="localhost:50051")

def long_running_orchestrator(ctx, _):
yield ctx.wait_for_external_event("never_happens")
return "completed"

worker.add_orchestrator(long_running_orchestrator)
worker.start()

try:
instance_id = client.schedule_new_orchestration(long_running_orchestrator)

# Wait for it to start
client.wait_for_orchestration_start(instance_id, timeout=5)

# Terminate it
client.terminate_orchestration(instance_id, output="terminated_by_test")

# Verify termination
state = client.wait_for_orchestration_completion(instance_id, timeout=10)

assert state.runtime_status == OrchestrationStatus.TERMINATED
finally:
worker.stop()
```

### Configuration Options

The `InMemoryOrchestrationBackend` supports the following configuration options:

- **port** (int): Port to listen on for gRPC connections (default: 50051)
- **max_history_size** (int): Maximum number of history events per orchestration (default: 10000)

```python
backend = InMemoryOrchestrationBackend(
port=50051,
max_history_size=100000 # Support larger orchestrations
)
backend.start()
```

Or use the convenience factory, which starts the server automatically:

```python
backend = create_test_backend(port=50051, max_history_size=10000)
```

### Thread Safety

The in-memory backend is thread-safe and can be safely accessed from
multiple threads. All state mutations are protected by locks to ensure
consistency.

### Limitations

The in-memory backend is designed for testing and has some limitations compared to production backends:

1. **No persistence**: All state is lost when the backend is stopped
2. **No distributed execution**: Runs in a single process
3. **No history streaming**: StreamInstanceHistory is not implemented
4. **No rewind**: RewindInstance is not implemented
5. **No recursive termination**: Recursive termination is not supported

### Best Practices

1. **Use fixtures**: Create pytest fixtures to manage backend lifecycle
2. **Reset between tests**: Call `backend.reset()` to clear state between tests
3. **Use random ports**: When running tests in parallel, use random port assignments
4. **Set appropriate timeouts**: Use reasonable timeout values in wait operations
5. **Clean up workers**: Always stop workers in finally blocks to prevent resource leaks

### Troubleshooting

#### Connection Errors

If you see connection errors:

- Ensure the backend is started before creating clients/workers
- Verify the port is not already in use
- Check that the host address matches the backend port

#### Timeouts

If tests timeout:

- Increase timeout values in `wait_for_orchestration_completion`
- Check that workers are started and processing work items
- Verify orchestrators and activities are registered correctly

#### State Not Found

If orchestration state is not found:

- Ensure you're using the correct instance ID
- Verify the orchestration was successfully scheduled
- Check that the backend wasn't reset between operations
14 changes: 14 additions & 0 deletions durabletask/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Testing utilities for the Durable Task Python SDK."""

from durabletask.testing.in_memory_backend import (
InMemoryOrchestrationBackend,
create_test_backend,
)

__all__ = [
"InMemoryOrchestrationBackend",
"create_test_backend",
]
Loading