Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
56eef2d
Feature store v3 (#5490)
Aditi2424 Feb 4, 2026
d5fd783
feat: feature_processor v3
bhhalim Feb 19, 2026
e91d8d2
integ tests
bhhalim Feb 20, 2026
4079a5d
fix
bhhalim Feb 25, 2026
e6721ce
chore(docs): Add API docs
bhhalim Feb 25, 2026
611f577
fix: Fix flaky integ tests
bhhalim Feb 26, 2026
6f4f490
fix diff
bhhalim Feb 27, 2026
09abb01
chore: rename parameter + cleanup comments
bhhalim Feb 27, 2026
a841d7d
Merge branch 'master' into feature-processor-v3
mollyheamazon Feb 27, 2026
aa654e4
Feature store v3 (#5490)
Aditi2424 Feb 4, 2026
7cdd077
add pyspark to test deps
bhhalim Feb 27, 2026
101824f
add test deps
bhhalim Feb 28, 2026
1e2b5fe
fix unit test deps
bhhalim Mar 2, 2026
bbf1a3f
pin setuptools<82 for feature-processor and unit tests
bhhalim Mar 2, 2026
5ca86dc
Set JAVA_HOME for integ tests which requires java
bhhalim Mar 2, 2026
beb078d
fix spark session bug
bhhalim Mar 2, 2026
7a659af
fix(feature-processor): Fix Spark session config and Ivy cache race c…
bhhalim Mar 2, 2026
59b68e8
revert previous change + create different ivy cache per test to fix c…
bhhalim Mar 3, 2026
fa59f5e
Merge branch 'master' into feature-processor-v3
BassemHalim Mar 3, 2026
a1b1bc3
revert changes to sagemaker-core
bhhalim Mar 3, 2026
3cd4686
refactor(feature-processor): Migrate to FeatureGroup resource API
bhhalim Mar 3, 2026
39f1be2
add `build` to test_requirements
bhhalim Mar 3, 2026
4797633
add upper bounds for test dependencies
bhhalim Mar 3, 2026
e2e179f
move feature-processor config to sagemaker-mlops optional deps
bhhalim Mar 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/api/sagemaker_mlops.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,12 @@ Local Development
:members:
:undoc-members:
:show-inheritance:


Feature Store
-------------

.. automodule:: sagemaker.mlops.feature_store
:members:
:undoc-members:
:show-inheritance:
3 changes: 2 additions & 1 deletion requirements/extras/test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ scipy
omegaconf
graphene
typing_extensions>=4.9.0
tensorflow>=2.16.2,<=2.19.0
tensorflow>=2.16.2,<=2.19.0
build
11 changes: 11 additions & 0 deletions sagemaker-mlops/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,21 @@ dependencies = [
]

[project.optional-dependencies]
feature-processor = [
"pyspark==3.3.2",
"sagemaker-feature-store-pyspark-3.3",
"setuptools<82",
]

test = [
"pytest",
"pytest-cov",
"mock",
"setuptools<82",
"pyspark==3.3.2",
"sagemaker-feature-store-pyspark-3.3",
"pandas<3.0",
"numpy<3.0",
]
dev = [
"pytest",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Exported classes for the sagemaker.mlops.feature_store.feature_processor module."""
from __future__ import absolute_import

from sagemaker.mlops.feature_store.feature_processor._data_source import ( # noqa: F401
CSVDataSource,
FeatureGroupDataSource,
ParquetDataSource,
BaseDataSource,
PySparkDataSource,
)
from sagemaker.mlops.feature_store.feature_processor._exceptions import ( # noqa: F401
IngestionError,
)
from sagemaker.mlops.feature_store.feature_processor.feature_processor import ( # noqa: F401
feature_processor,
)
from sagemaker.mlops.feature_store.feature_processor.feature_scheduler import ( # noqa: F401
to_pipeline,
schedule,
describe,
put_trigger,
delete_trigger,
enable_trigger,
disable_trigger,
delete_schedule,
list_pipelines,
execute,
TransformationCode,
FeatureProcessorPipelineEvents,
)
from sagemaker.mlops.feature_store.feature_processor._enums import ( # noqa: F401
FeatureProcessorPipelineExecutionStatus,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Contains classes for preparing and uploading configs for a scheduled feature processor."""
from __future__ import absolute_import
from typing import Callable, Dict, Optional, Tuple, List, Union

import attr

from sagemaker.core.helper.session_helper import Session
from sagemaker.mlops.feature_store.feature_processor._constants import (
SPARK_JAR_FILES_PATH,
SPARK_PY_FILES_PATH,
SPARK_FILES_PATH,
S3_DATA_DISTRIBUTION_TYPE,
)
from sagemaker.core.inputs import TrainingInput
from sagemaker.core.shapes import Channel, DataSource, S3DataSource
from sagemaker.core.remote_function.core.stored_function import StoredFunction
from sagemaker.core.remote_function.job import (
_prepare_and_upload_workspace,
_prepare_and_upload_runtime_scripts,
_JobSettings,
RUNTIME_SCRIPTS_CHANNEL_NAME,
REMOTE_FUNCTION_WORKSPACE,
SPARK_CONF_CHANNEL_NAME,
_prepare_and_upload_spark_dependent_files,
)
from sagemaker.core.remote_function.runtime_environment.runtime_environment_manager import (
RuntimeEnvironmentManager,
)
from sagemaker.core.remote_function.spark_config import SparkConfig
from sagemaker.core.remote_function.custom_file_filter import CustomFileFilter
from sagemaker.core.s3 import s3_path_join


@attr.s
class ConfigUploader:
"""Prepares and uploads customer provided configs to S3"""

remote_decorator_config: _JobSettings = attr.ib()
runtime_env_manager: RuntimeEnvironmentManager = attr.ib()

def prepare_step_input_channel_for_spark_mode(
self, func: Callable, s3_base_uri: str, sagemaker_session: Session
) -> Tuple[List[Channel], Dict]:
"""Prepares input channels for SageMaker Pipeline Step.

Returns:
Tuple of (List[Channel], spark_dependency_paths dict)
"""
self._prepare_and_upload_callable(func, s3_base_uri, sagemaker_session)
bootstrap_scripts_s3uri = self._prepare_and_upload_runtime_scripts(
self.remote_decorator_config.spark_config,
s3_base_uri,
self.remote_decorator_config.s3_kms_key,
sagemaker_session,
)
dependencies_list_path = self.runtime_env_manager.snapshot(
self.remote_decorator_config.dependencies
)
user_workspace_s3uri = self._prepare_and_upload_workspace(
dependencies_list_path,
self.remote_decorator_config.include_local_workdir,
self.remote_decorator_config.pre_execution_commands,
self.remote_decorator_config.pre_execution_script,
s3_base_uri,
self.remote_decorator_config.s3_kms_key,
sagemaker_session,
self.remote_decorator_config.custom_file_filter,
)

(
submit_jars_s3_paths,
submit_py_files_s3_paths,
submit_files_s3_path,
config_file_s3_uri,
) = self._prepare_and_upload_spark_dependent_files(
self.remote_decorator_config.spark_config,
s3_base_uri,
self.remote_decorator_config.s3_kms_key,
sagemaker_session,
)

channels = [
Channel(
channel_name=RUNTIME_SCRIPTS_CHANNEL_NAME,
data_source=DataSource(
s3_data_source=S3DataSource(
s3_uri=bootstrap_scripts_s3uri,
s3_data_type="S3Prefix",
s3_data_distribution_type=S3_DATA_DISTRIBUTION_TYPE,
)
),
input_mode="File",
)
]

if user_workspace_s3uri:
channels.append(
Channel(
channel_name=REMOTE_FUNCTION_WORKSPACE,
data_source=DataSource(
s3_data_source=S3DataSource(
s3_uri=s3_path_join(s3_base_uri, REMOTE_FUNCTION_WORKSPACE),
s3_data_type="S3Prefix",
s3_data_distribution_type=S3_DATA_DISTRIBUTION_TYPE,
)
),
input_mode="File",
)
)

if config_file_s3_uri:
channels.append(
Channel(
channel_name=SPARK_CONF_CHANNEL_NAME,
data_source=DataSource(
s3_data_source=S3DataSource(
s3_uri=config_file_s3_uri,
s3_data_type="S3Prefix",
s3_data_distribution_type=S3_DATA_DISTRIBUTION_TYPE,
)
),
input_mode="File",
)
)

return channels, {
SPARK_JAR_FILES_PATH: submit_jars_s3_paths,
SPARK_PY_FILES_PATH: submit_py_files_s3_paths,
SPARK_FILES_PATH: submit_files_s3_path,
}

def _prepare_and_upload_callable(
self, func: Callable, s3_base_uri: str, sagemaker_session: Session
) -> None:
"""Prepares and uploads callable to S3"""
stored_function = StoredFunction(
sagemaker_session=sagemaker_session,
s3_base_uri=s3_base_uri,
s3_kms_key=self.remote_decorator_config.s3_kms_key,
)
stored_function.save(func)

def _prepare_and_upload_workspace(
self,
local_dependencies_path: str,
include_local_workdir: bool,
pre_execution_commands: List[str],
pre_execution_script_local_path: str,
s3_base_uri: str,
s3_kms_key: str,
sagemaker_session: Session,
custom_file_filter: Optional[Union[Callable[[str, List], List], CustomFileFilter]] = None,
) -> str:
"""Upload the training step dependencies to S3 if present"""
return _prepare_and_upload_workspace(
local_dependencies_path=local_dependencies_path,
include_local_workdir=include_local_workdir,
pre_execution_commands=pre_execution_commands,
pre_execution_script_local_path=pre_execution_script_local_path,
s3_base_uri=s3_base_uri,
s3_kms_key=s3_kms_key,
sagemaker_session=sagemaker_session,
custom_file_filter=custom_file_filter,
)

def _prepare_and_upload_runtime_scripts(
self,
spark_config: SparkConfig,
s3_base_uri: str,
s3_kms_key: str,
sagemaker_session: Session,
) -> str:
"""Copy runtime scripts to a folder and upload to S3"""
return _prepare_and_upload_runtime_scripts(
spark_config=spark_config,
s3_base_uri=s3_base_uri,
s3_kms_key=s3_kms_key,
sagemaker_session=sagemaker_session,
)

def _prepare_and_upload_spark_dependent_files(
self,
spark_config: SparkConfig,
s3_base_uri: str,
s3_kms_key: str,
sagemaker_session: Session,
) -> Tuple:
"""Upload the spark dependencies to S3 if present"""
if not spark_config:
return None, None, None, None

return _prepare_and_upload_spark_dependent_files(
spark_config=spark_config,
s3_base_uri=s3_base_uri,
s3_kms_key=s3_kms_key,
sagemaker_session=sagemaker_session,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Module containing constants for feature_processor and feature_scheduler module."""
from __future__ import absolute_import

from sagemaker.core.workflow.parameters import Parameter, ParameterTypeEnum

DEFAULT_INSTANCE_TYPE = "ml.m5.xlarge"
DEFAULT_SCHEDULE_STATE = "ENABLED"
DEFAULT_TRIGGER_STATE = "ENABLED"
UNDERSCORE = "_"
RESOURCE_NOT_FOUND_EXCEPTION = "ResourceNotFoundException"
RESOURCE_NOT_FOUND = "ResourceNotFound"
EXECUTION_TIME_PIPELINE_PARAMETER = "scheduled_time"
VALIDATION_EXCEPTION = "ValidationException"
EVENT_BRIDGE_INVOCATION_TIME = "<aws.scheduler.scheduled-time>"
SCHEDULED_TIME_PIPELINE_PARAMETER = Parameter(
name=EXECUTION_TIME_PIPELINE_PARAMETER, parameter_type=ParameterTypeEnum.STRING
)
EXECUTION_TIME_PIPELINE_PARAMETER_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # 2023-01-01T07:00:00Z
NO_FLEXIBLE_TIME_WINDOW = dict(Mode="OFF")
PIPELINE_NAME_MAXIMUM_LENGTH = 80
PIPELINE_CONTEXT_TYPE = "FeatureEngineeringPipeline"
SPARK_JAR_FILES_PATH = "submit_jars_s3_paths"
SPARK_PY_FILES_PATH = "submit_py_files_s3_paths"
SPARK_FILES_PATH = "submit_files_s3_path"
FEATURE_PROCESSOR_TAG_KEY = "sm-fs-fe:created-from"
FEATURE_PROCESSOR_TAG_VALUE = "fp-to-pipeline"
FEATURE_GROUP_ARN_REGEX_PATTERN = r"arn:(.*?):sagemaker:(.*?):(.*?):feature-group/(.*?)$"
PIPELINE_ARN_REGEX_PATTERN = r"arn:(.*?):sagemaker:(.*?):(.*?):pipeline/(.*?)$"
EVENTBRIDGE_RULE_ARN_REGEX_PATTERN = r"arn:(.*?):events:(.*?):(.*?):rule/(.*?)$"
SAGEMAKER_WHL_FILE_S3_PATH = "s3://ada-private-beta/sagemaker-2.151.1.dev0-py2.py3-none-any.whl"
S3_DATA_DISTRIBUTION_TYPE = "FullyReplicated"
PIPELINE_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-context-name"
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-version-context-name"
TO_PIPELINE_RESERVED_TAG_KEYS = [
FEATURE_PROCESSOR_TAG_KEY,
PIPELINE_CONTEXT_NAME_TAG_KEY,
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY,
]
BASE_EVENT_PATTERN = {
"source": ["aws.sagemaker"],
"detail": {"currentPipelineExecutionStatus": [], "pipelineArn": []},
}
Loading
Loading