Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,40 @@ package-dir = {"" = "cli-tools/gpu-dev-cli"}

[tool.setuptools.package-data]
gpu_dev_cli = ["py.typed"]

[project.optional-dependencies]
test = [
"pytest>=8.0.0",
"pytest-cov>=4.1.0",
"pytest-asyncio>=0.23.0",
"pytest-timeout>=2.2.0",
"moto[all]>=5.0.0",
"freezegun>=1.2.0",
"responses>=0.25.0",
]

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short"
markers = [
"unit: Unit tests (fast, mocked)",
"e2e: End-to-end tests (require AWS dev cluster)",
"slow: Slow tests that can be skipped",
]
filterwarnings = [
"ignore::DeprecationWarning",
]

[tool.coverage.run]
source = ["cli-tools/gpu-dev-cli/gpu_dev_cli", "terraform-gpu-devservers/lambda"]
omit = ["*/tests/*", "*/__pycache__/*"]

[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"raise NotImplementedError",
]
6 changes: 3 additions & 3 deletions terraform-gpu-devservers/availability.tf
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ resource "null_resource" "availability_updater_build" {
rm -rf package *.zip
mkdir -p package

# Install dependencies if requirements.txt exists
# Install dependencies using Docker for Linux x86_64 compatibility
if [ -f requirements.txt ]; then
python3 -m pip install --upgrade pip
python3 -m pip install -r requirements.txt --target package/ --force-reinstall
docker run --rm --platform linux/amd64 --entrypoint pip -v "$(pwd):/var/task" -w /var/task public.ecr.aws/lambda/python:3.13 \
install -r requirements.txt --target package/ --upgrade
fi

# Copy source code and shared modules
Expand Down
6 changes: 3 additions & 3 deletions terraform-gpu-devservers/expiry.tf
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ resource "null_resource" "reservation_expiry_build" {
rm -rf package *.zip
mkdir -p package

# Install dependencies with specific Python version
python3 -m pip install --upgrade pip
python3 -m pip install -r requirements.txt --target package/ --force-reinstall
# Install dependencies using Docker for Linux x86_64 compatibility
docker run --rm --platform linux/amd64 --entrypoint pip -v "$(pwd):/var/task" -w /var/task public.ecr.aws/lambda/python:3.13 \
install -r requirements.txt --target package/ --upgrade

# Copy source code and shared modules
cp index.py package/
Expand Down
7 changes: 4 additions & 3 deletions terraform-gpu-devservers/lambda.tf
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ resource "null_resource" "reservation_processor_build" {
rm -rf package *.zip
mkdir -p package

# Install dependencies with specific Python version
python3 -m pip install --upgrade pip
python3 -m pip install -r requirements.txt --target package/ --force-reinstall
# Install dependencies using Docker for Linux x86_64 compatibility
# This ensures native extensions (cryptography) are built for Lambda's Linux environment
docker run --rm --platform linux/amd64 --entrypoint pip -v "$(pwd):/var/task" -w /var/task public.ecr.aws/lambda/python:3.13 \
install -r requirements.txt --target package/ --upgrade

# Copy source code and shared modules
cp index.py package/
Expand Down
129 changes: 128 additions & 1 deletion terraform-gpu-devservers/lambda/reservation_expiry/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,15 @@ def handler(event, context):
logger.error(f"Error cleaning up soft-deleted snapshots: {e}")
deleted_snapshot_count = 0

# Clean up orphaned disks (in_use=True but pod/reservation gone)
# This handles cases where nodes went offline unexpectedly
try:
orphaned_disks_cleaned = cleanup_orphaned_disks()
logger.info(f"Cleaned up {orphaned_disks_cleaned} orphaned disks")
except Exception as e:
logger.error(f"Error cleaning up orphaned disks: {e}")
orphaned_disks_cleaned = 0

return {
"statusCode": 200,
"body": json.dumps(
Expand All @@ -873,6 +882,7 @@ def handler(event, context):
"deleted_snapshots": deleted_snapshot_count,
"tagged_snapshots": tagged_snapshot_count,
"synced_disks": synced_disk_count,
"orphaned_disks_cleaned": orphaned_disks_cleaned,
}
),
}
Expand Down Expand Up @@ -1017,6 +1027,108 @@ def find_disk_by_reservation(user_id: str, reservation_id: str) -> str | None:
return None


def cleanup_orphaned_disks() -> int:
"""
Find and clean up disks marked as in_use where the associated pod/reservation is gone.
This handles cases where a node went offline and the pod was never properly cleaned up.
Returns count of disks cleaned up.
"""
cleaned_count = 0

try:
disks_table = dynamodb.Table(DISKS_TABLE)
reservations_table = dynamodb.Table(RESERVATIONS_TABLE)

# Scan for disks marked as in_use
response = disks_table.scan(
FilterExpression="in_use = :true",
ExpressionAttributeValues={":true": True}
)

in_use_disks = response.get('Items', [])

# Handle pagination
while 'LastEvaluatedKey' in response:
response = disks_table.scan(
FilterExpression="in_use = :true",
ExpressionAttributeValues={":true": True},
ExclusiveStartKey=response['LastEvaluatedKey']
)
in_use_disks.extend(response.get('Items', []))

if not in_use_disks:
logger.debug("No disks marked as in_use found")
return 0

logger.info(f"Found {len(in_use_disks)} disks marked as in_use, checking for orphans")

for disk in in_use_disks:
user_id = disk.get('user_id')
disk_name = disk.get('disk_name')
attached_reservation = disk.get('attached_to_reservation')

if not user_id or not disk_name:
continue

# If no reservation attached, it's orphaned - clean it up
if not attached_reservation:
logger.info(f"Disk '{disk_name}' for user {user_id} has no attached reservation - cleaning up")
try:
mark_disk_not_in_use(user_id, disk_name)
cleaned_count += 1
except Exception as e:
logger.warning(f"Failed to clean up orphaned disk '{disk_name}': {e}")
continue

# Check if the attached reservation still exists and is active
try:
res_response = reservations_table.get_item(
Key={'reservation_id': attached_reservation}
)

if 'Item' not in res_response:
# Reservation doesn't exist - disk is orphaned
logger.info(f"Disk '{disk_name}' attached to non-existent reservation {attached_reservation[:8]} - cleaning up")
mark_disk_not_in_use(user_id, disk_name)
cleaned_count += 1
continue

reservation = res_response['Item']
status = reservation.get('status', '')

# If reservation is in a terminal state, clean up the disk
if status in ['expired', 'cancelled', 'failed']:
logger.info(f"Disk '{disk_name}' attached to {status} reservation {attached_reservation[:8]} - cleaning up")
mark_disk_not_in_use(user_id, disk_name)
cleaned_count += 1
continue

# If reservation is active/preparing, check if pod actually exists
if status in ['active', 'preparing']:
pod_name = reservation.get('pod_name')
if pod_name and not check_pod_exists(pod_name):
# Pod is gone but reservation shows active - node likely went offline
logger.info(f"Disk '{disk_name}' attached to reservation {attached_reservation[:8]} with missing pod {pod_name} - cleaning up")
mark_disk_not_in_use(user_id, disk_name)
cleaned_count += 1

# Also expire the reservation since pod is gone
try:
expire_reservation_due_to_missing_pod(reservation)
logger.info(f"Also expired reservation {attached_reservation[:8]} due to missing pod")
except Exception as expire_error:
logger.warning(f"Failed to expire reservation {attached_reservation[:8]}: {expire_error}")

except Exception as res_error:
logger.warning(f"Error checking reservation for disk '{disk_name}': {res_error}")

return cleaned_count

except Exception as e:
logger.error(f"Error in cleanup_orphaned_disks: {e}")
return cleaned_count


def handle_oom_event(reservation: dict, oom_info: dict) -> bool:
"""
Handle an OOM event for a reservation.
Expand Down Expand Up @@ -1193,9 +1305,10 @@ def warn_user_expiring(reservation: dict[str, Any], warning_minutes: int) -> Non


def expire_reservation_due_to_missing_pod(reservation: dict[str, Any]) -> None:
"""Mark reservation as expired when pod is missing (likely manually deleted)"""
"""Mark reservation as expired when pod is missing (likely manually deleted or node went offline)"""
try:
reservation_id = reservation["reservation_id"]
user_id = reservation.get("user_id")

logger.info(
f"Marking reservation {reservation_id} as expired due to missing pod"
Expand All @@ -1220,6 +1333,20 @@ def expire_reservation_due_to_missing_pod(reservation: dict[str, Any]) -> None:
f"Successfully marked reservation {reservation_id} as expired due to missing pod"
)

# Clean up disk in_use flag immediately (don't wait for next expiry run)
disk_name = reservation.get("disk_name")

# Fallback: if disk_name not in reservation, look it up from disks table
if user_id and not disk_name:
disk_name = find_disk_by_reservation(user_id, reservation_id)

if user_id and disk_name:
try:
mark_disk_not_in_use(user_id, disk_name)
logger.info(f"Cleared disk '{disk_name}' in_use flag for missing-pod reservation {reservation_id[:8]}")
except Exception as disk_error:
logger.warning(f"Failed to clear disk in_use flag for {reservation_id[:8]}: {disk_error}")

except Exception as e:
logger.error(
f"Error marking reservation {reservation.get('reservation_id')} as expired: {str(e)}"
Expand Down
14 changes: 10 additions & 4 deletions terraform-gpu-devservers/lambda/reservation_processor/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3417,12 +3417,18 @@ def get_pod_resource_requests(gpu_count: int, gpu_type: str, is_multinode: bool


def _pod_uses_efa(gpu_count: int, gpu_type: str, is_multinode: bool = False) -> bool:
"""Check if pod will use EFA based on configuration"""
"""Check if pod will use EFA based on configuration.

EFA is enabled for full-node allocations on high-end GPU types (H100, H200, B200, A100)
that have EFA hardware available. This enables RDMA/high-bandwidth networking for
both single-node and multi-node workloads.
"""
config = GPU_CONFIG.get(gpu_type, GPU_CONFIG_DEFAULT)
# GPU types that support EFA (have EFA hardware on their instance types)
efa_supported_types = {"h100", "h200", "b200", "a100"}
return (
gpu_type != "t4-small" and
is_multinode and
gpu_count == config["max_gpus"]
gpu_type in efa_supported_types and
gpu_count == config["max_gpus"] # Full node allocation only
)


Expand Down
5 changes: 2 additions & 3 deletions terraform-gpu-devservers/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ locals {
{ id = null, instance_count = 1 } # A100 on-demand (1 instance)
]
h100 = [
{ id = "cr-0a7caa7414866615a", instance_count = 4 }, # H100 reservation us-east-2c (p5.48xlarge)
{ id = null, instance_count = 2 } # H100 on-demand (2 instances)
{ id = "cr-0a0a39a4c51068e30", instance_count = 4 }, # H100 reservation us-east-2c (p5.48xlarge)
]
h200 = [
{ id = "cr-0f6d0766f5d3339e6", instance_count = 2 }, # H200 reservation us-east-2c (p5e.48xlarge)
Expand Down Expand Up @@ -319,7 +318,7 @@ locals {
"cr-0f6d0766f5d3339e6" = "tertiary" # us-east-2c (p5e.48xlarge)
"cr-06c9c978dea756a26" = "tertiary" # us-east-2c
# H100 capacity reservation
"cr-0a7caa7414866615a" = "tertiary" # us-east-2c (p5.48xlarge)
"cr-0a0a39a4c51068e30" = "tertiary" # us-east-2c (p5.48xlarge)
# A100 capacity reservation
"cr-01cc0f00f28b095af" = "primary" # us-east-2a
}
Expand Down