Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sagemaker-core/src/sagemaker/core/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,9 @@ def _normalize_outputs(self, outputs=None):
# If the output's s3_uri is not an s3_uri, create one.
parse_result = urlparse(output.s3_output.s3_uri)
if parse_result.scheme != "s3":
if getattr(self.sagemaker_session, "local_mode", False) and parse_result.scheme == "file":
normalized_outputs.append(output)
continue
if _pipeline_config:
s3_uri = Join(
on="/",
Expand Down
255 changes: 255 additions & 0 deletions sagemaker-core/tests/unit/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,261 @@ def test_normalize_outputs_invalid_type(self, mock_session):
processor._normalize_outputs(["invalid"])




class TestBugConditionFileUriReplacedInLocalMode:
"""Bug condition exploration test: file:// URIs should be preserved in local mode.

**Validates: Requirements 1.1, 1.2, 2.1, 2.2**

EXPECTED TO FAIL on unfixed code — failure confirms the bug exists.
The bug is that _normalize_outputs() replaces file:// URIs with s3:// paths
even when the session is a LocalSession (local_mode=True).
"""

@pytest.fixture
def local_mock_session(self):
session = Mock()
session.boto_session = Mock()
session.boto_session.region_name = "us-west-2"
session.sagemaker_client = Mock()
session.default_bucket = Mock(return_value="default-bucket")
session.default_bucket_prefix = "prefix"
session.expand_role = Mock(side_effect=lambda x: x)
session.sagemaker_config = {}
session.local_mode = True
return session

@pytest.mark.parametrize(
"file_uri",
[
"file:///tmp/output",
"file:///home/user/results",
"file:///data/processed",
],
)
def test_normalize_outputs_preserves_file_uri_in_local_mode(self, local_mock_session, file_uri):
"""file:// URIs must be preserved when local_mode=True.

On unfixed code, _normalize_outputs replaces file:// URIs with
s3://default-bucket/prefix/job-name/output/output-1, which is the bug.
"""
processor = Processor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
image_uri="test-image:latest",
instance_count=1,
instance_type="ml.m5.xlarge",
sagemaker_session=local_mock_session,
)
processor._current_job_name = "test-job"

s3_output = ProcessingS3Output(
s3_uri=file_uri,
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)]

with patch("sagemaker.core.workflow.utilities._pipeline_config", None):
result = processor._normalize_outputs(outputs)

assert len(result) == 1
assert result[0].s3_output.s3_uri == file_uri, (
f"Expected file:// URI to be preserved as '{file_uri}' in local mode, "
f"but got '{result[0].s3_output.s3_uri}'"
)


class TestPreservationNonLocalFileBehavior:
"""Preservation property tests: Non-local-file behavior must remain unchanged.

**Validates: Requirements 3.1, 3.2, 3.3, 3.4**

These tests capture baseline behavior on UNFIXED code. They MUST PASS on both
unfixed and fixed code, confirming no regressions are introduced by the fix.
"""

@pytest.fixture
def session_local_mode_true(self):
session = Mock()
session.boto_session = Mock()
session.boto_session.region_name = "us-west-2"
session.sagemaker_client = Mock()
session.default_bucket = Mock(return_value="default-bucket")
session.default_bucket_prefix = "prefix"
session.expand_role = Mock(side_effect=lambda x: x)
session.sagemaker_config = {}
session.local_mode = True
return session

@pytest.fixture
def session_local_mode_false(self):
session = Mock()
session.boto_session = Mock()
session.boto_session.region_name = "us-west-2"
session.sagemaker_client = Mock()
session.default_bucket = Mock(return_value="default-bucket")
session.default_bucket_prefix = "prefix"
session.expand_role = Mock(side_effect=lambda x: x)
session.sagemaker_config = {}
session.local_mode = False
return session

def _make_processor(self, session):
processor = Processor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
image_uri="test-image:latest",
instance_count=1,
instance_type="ml.m5.xlarge",
sagemaker_session=session,
)
processor._current_job_name = "test-job"
return processor

# --- Requirement 3.1: S3 URIs pass through unchanged regardless of local_mode ---

@pytest.mark.parametrize(
"s3_uri,local_mode_fixture",
[
("s3://my-bucket/path", "session_local_mode_true"),
("s3://my-bucket/path", "session_local_mode_false"),
("s3://another-bucket/deep/nested/path", "session_local_mode_true"),
("s3://another-bucket/deep/nested/path", "session_local_mode_false"),
],
)
def test_s3_uri_preserved_regardless_of_local_mode(self, s3_uri, local_mode_fixture, request):
"""S3 URIs must pass through unchanged regardless of local_mode setting.

**Validates: Requirements 3.1**
"""
session = request.getfixturevalue(local_mode_fixture)
processor = self._make_processor(session)

s3_output = ProcessingS3Output(
s3_uri=s3_uri,
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)]

with patch("sagemaker.core.workflow.utilities._pipeline_config", None):
result = processor._normalize_outputs(outputs)

assert len(result) == 1
assert result[0].s3_output.s3_uri == s3_uri

# --- Requirement 3.2: Non-S3 URIs with local_mode=False replaced with S3 paths ---

@pytest.mark.parametrize(
"non_s3_uri",
[
"/local/output/path",
"http://example.com/output",
"ftp://server/output",
],
)
def test_non_s3_uri_replaced_when_not_local_mode(self, non_s3_uri, session_local_mode_false):
"""Non-S3 URIs in non-local sessions are replaced with auto-generated S3 paths.

**Validates: Requirements 3.2**
"""
processor = self._make_processor(session_local_mode_false)

s3_output = ProcessingS3Output(
s3_uri=non_s3_uri,
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
outputs = [ProcessingOutput(output_name="output-1", s3_output=s3_output)]

with patch("sagemaker.core.workflow.utilities._pipeline_config", None):
result = processor._normalize_outputs(outputs)

assert len(result) == 1
assert result[0].s3_output.s3_uri.startswith("s3://default-bucket/")

# --- Requirement 3.3: Pipeline variable URIs skip normalization ---

def test_pipeline_variable_uri_skips_normalization(self, session_local_mode_false):
"""Pipeline variable URIs skip normalization entirely.

**Validates: Requirements 3.3**
"""
processor = self._make_processor(session_local_mode_false)

s3_output = ProcessingS3Output(
s3_uri="s3://bucket/output",
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
outputs = [ProcessingOutput(output_name="output-1", s3_output=s3_output)]

with patch("sagemaker.core.processing.is_pipeline_variable", return_value=True):
result = processor._normalize_outputs(outputs)

assert len(result) == 1
# Pipeline variable outputs are appended as-is without URI modification
assert result[0].s3_output.s3_uri == "s3://bucket/output"

# --- Requirement 3.4: Non-ProcessingOutput objects raise TypeError ---

@pytest.mark.parametrize(
"invalid_output",
[
["a string"],
[42],
[{"key": "value"}],
],
)
def test_non_processing_output_raises_type_error(self, invalid_output, session_local_mode_false):
"""Non-ProcessingOutput objects must raise TypeError.

**Validates: Requirements 3.4**
"""
processor = self._make_processor(session_local_mode_false)

with pytest.raises(TypeError, match="must be provided as ProcessingOutput objects"):
processor._normalize_outputs(invalid_output)

# --- Output name auto-generation ---

def test_multiple_outputs_with_s3_uris_preserved(self, session_local_mode_false):
"""Multiple outputs with S3 URIs are all preserved unchanged.

**Validates: Requirements 3.1, 3.2**
"""
processor = self._make_processor(session_local_mode_false)

outputs = [
ProcessingOutput(
output_name="first-output",
s3_output=ProcessingS3Output(
s3_uri="s3://my-bucket/first",
local_path="/opt/ml/processing/output1",
s3_upload_mode="EndOfJob",
),
),
ProcessingOutput(
output_name="second-output",
s3_output=ProcessingS3Output(
s3_uri="s3://my-bucket/second",
local_path="/opt/ml/processing/output2",
s3_upload_mode="EndOfJob",
),
),
]

with patch("sagemaker.core.workflow.utilities._pipeline_config", None):
result = processor._normalize_outputs(outputs)

assert len(result) == 2
assert result[0].output_name == "first-output"
assert result[1].output_name == "second-output"
# S3 URIs should be preserved since they already have s3:// scheme
assert result[0].s3_output.s3_uri == "s3://my-bucket/first"
assert result[1].s3_output.s3_uri == "s3://my-bucket/second"


class TestProcessorStartNew:
def test_start_new_with_pipeline_session(self, mock_session):
from sagemaker.core.workflow.pipeline_context import PipelineSession
Expand Down
24 changes: 23 additions & 1 deletion sagemaker-serve/src/sagemaker/serve/mode/local_container_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
from __future__ import absolute_import
from pathlib import Path
import logging
import os
from datetime import datetime, timedelta
from typing import Dict, Type
import base64
import time
import subprocess
import docker

from sagemaker.core.local.utils import check_for_studio

from sagemaker.serve.model_server.tensorflow_serving.server import LocalTensorflowServing
from sagemaker.serve.spec.inference_spec import InferenceSpec
from sagemaker.serve.builder.schema_builder import SchemaBuilder
Expand All @@ -33,6 +36,25 @@
+ "Please increase container_timeout_seconds or review your inference code."
)

STUDIO_DOCKER_SOCKET_PATHS = [
"/docker/proxy/docker.sock",
"/var/run/docker.sock",
]


def _get_docker_client():
"""Get a Docker client, handling SageMaker Studio's non-standard socket path."""
if os.environ.get("DOCKER_HOST"):
return docker.from_env()
try:
if check_for_studio():
for socket_path in STUDIO_DOCKER_SOCKET_PATHS:
if os.path.exists(socket_path):
return docker.DockerClient(base_url=f"unix://{socket_path}")
except (NotImplementedError, Exception):
pass
return docker.from_env()


class LocalContainerMode(
LocalTorchServe,
Expand Down Expand Up @@ -212,7 +234,7 @@ def _pull_image(self, image: str):

# Check if Docker is available first
try:
self.client = docker.from_env()
self.client = _get_docker_client()
self.client.ping() # Test Docker connection
except Exception as e:
raise RuntimeError(
Expand Down
13 changes: 11 additions & 2 deletions sagemaker-serve/src/sagemaker/serve/model_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
ENDPOINT_CONFIG_ASYNC_KMS_KEY_ID_PATH,
MODEL_CONTAINERS_PATH,
)
from sagemaker.serve.constants import SUPPORTED_MODEL_SERVERS, Framework
from sagemaker.serve.constants import LOCAL_MODES, SUPPORTED_MODEL_SERVERS, Framework
from sagemaker.core.workflow.pipeline_context import PipelineSession, runnable_by_pipeline
from sagemaker.core import fw_utils
from sagemaker.core.helper.session_helper import container_def
Expand Down Expand Up @@ -1287,7 +1287,16 @@ def _build_for_passthrough(self) -> Model:
if not self.image_uri:
raise ValueError("image_uri is required for pass-through cases")

self.s3_upload_path = None
self.secret_key = ""

if self.model_path and self.model_path.startswith("s3://"):
self.s3_upload_path = self.model_path
else:
self.s3_upload_path = None

if self.mode in LOCAL_MODES:
self._prepare_for_mode()

return self._create_model()

def _build_default_async_inference_config(self, async_inference_config):
Expand Down
Loading
Loading