Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3467b7b
reapply change
sfc-gh-yuwang Apr 21, 2026
143f30a
fix test
sfc-gh-yuwang Apr 21, 2026
e45239a
avoid regress
sfc-gh-yuwang Apr 22, 2026
0692a09
dual mode of catalog
sfc-gh-yuwang Apr 27, 2026
f610835
init back end at catalog init
sfc-gh-yuwang Apr 27, 2026
bfcf249
Merge branch 'main' into SNOW-2267482
sfc-gh-yuwang Apr 28, 2026
674d831
push missed test fixture
sfc-gh-yuwang Apr 28, 2026
5933508
fix lint
sfc-gh-yuwang Apr 28, 2026
02ec3b8
remove changelog and use notimplementederror
sfc-gh-yuwang Apr 28, 2026
c78333c
move fixture back to test_catalog
sfc-gh-yuwang Apr 28, 2026
9e49c85
Merge branch 'main' into SNOW-2267482
sfc-gh-yuwang Apr 28, 2026
bc65acf
Merge branch 'main' into SNOW-2267482
sfc-gh-yuwang Apr 29, 2026
2afd738
fix test
sfc-gh-yuwang Apr 29, 2026
50d0c32
add limit 10000 in sql base(scos only)
sfc-gh-yuwang Apr 29, 2026
9e89ca4
remove wrong test
sfc-gh-yuwang Apr 29, 2026
4334c32
Merge branch 'main' into SNOW-2267482
sfc-gh-yuwang Apr 29, 2026
701c08b
Merge branch 'main' into SNOW-2267482
sfc-gh-yuwang Apr 30, 2026
ef16ee8
address comments
sfc-gh-yuwang Apr 30, 2026
d9dad80
restore comment
sfc-gh-yuwang Apr 30, 2026
68650b1
parameter protection
sfc-gh-yuwang May 1, 2026
1906d61
Merge branch 'main' into SNOW-2267482
sfc-gh-yuwang May 1, 2026
bb2ef8e
parameter protection
sfc-gh-yuwang May 1, 2026
e0097c9
add test
sfc-gh-yuwang May 1, 2026
1472367
fix test
sfc-gh-yuwang May 4, 2026
8a9bc66
rewrite parameter protection
sfc-gh-yuwang May 4, 2026
40ace28
revert session change
sfc-gh-yuwang May 4, 2026
e8cbe79
rename to _use_sql_base_catalog
sfc-gh-yuwang May 4, 2026
ff8a005
add test
sfc-gh-yuwang May 5, 2026
743620b
address comments
sfc-gh-yuwang May 6, 2026
1e3af12
address comments
sfc-gh-yuwang May 6, 2026
2ecc805
address comments
sfc-gh-yuwang May 6, 2026
0608e79
Merge branch 'main' into SNOW-2267482
sfc-gh-yuwang May 7, 2026
1823470
Merge branch 'main' into SNOW-2267482
sfc-gh-yuwang May 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
990 changes: 857 additions & 133 deletions src/snowflake/snowpark/catalog.py

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/snowflake/snowpark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@

# This is an internal-only global flag, used to determine whether the api code which will be executed is compatible with snowflake.snowpark_connect
_is_snowpark_connect_compatible_mode = False

# Default backend selector for the Snowpark Catalog when running in
# Snowpark-Connect / SCOS compatible mode. True -> SQL-based backend,
# False -> legacy snowflake.core REST backend. Read live by Catalog.__init__.
_use_sql_base_catalog = True
# Internal-only global flag that enables improved SQL simplifier query flattening
# for filter, sort, select, and distinct. When True (default), the branch
# improvements are active regardless of _is_snowpark_connect_compatible_mode.
Expand Down
6 changes: 6 additions & 0 deletions src/snowflake/snowpark/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,9 @@ class SnowparkInvalidObjectNameException(SnowparkGeneralException):
"""

pass


class _NotFoundError(SnowparkClientException):
"""Internal exception raised when a Snowpark catalog object is not found."""

pass
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from snowflake.snowpark._internal.utils import warning_dict
from .ast.conftest import default_unparser_path

pytest_plugins = ("tests.integ.test_catalog",)

logging.getLogger("snowflake.connector").setLevel(logging.ERROR)

excluded_frontend_files = [
Expand Down
211 changes: 18 additions & 193 deletions tests/integ/test_catalog.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,26 @@
#
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#
"""Catalog integration tests and shared fixtures.

Mode-agnostic tests (same behavior for SQL and REST catalog backends) live in
this module. Backend-specific tests are in ``test_catalog_sql_mode.py`` and
``test_catalog_rest_mode.py``, which reuse the fixtures defined here via
``pytest_plugins`` in ``conftest.py``.
"""

from unittest.mock import patch
import uuid
from unittest.mock import patch

import pytest

from snowflake.snowpark._internal.analyzer.analyzer_utils import unquote_if_quoted
from snowflake.snowpark.catalog import Catalog
from snowflake.snowpark.context import _DEFAULT_ARTIFACT_REPOSITORY
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import IntegerType
from snowflake.core.exceptions import APIError
from snowflake.snowpark.context import _DEFAULT_ARTIFACT_REPOSITORY


pytestmark = [
pytest.mark.xfail(
"config.getoption('local_testing_mode', default=False)",
reason="deepcopy is not supported and required by local testing",
run=False,
),
pytest.mark.xfail(
raises=APIError,
reason="Failure due to warehouse overload",
),
]

CATALOG_TEMP_OBJECT_PREFIX = "SP_CATALOG_TEMP"
DOES_NOT_EXIST_PATTERN = "does_not_exist_.*"


def get_temp_name(type: str) -> str:
Expand Down Expand Up @@ -186,34 +180,13 @@ def temp_udf2(session, temp_db1, temp_schema1):
)


DOES_NOT_EXIST_PATTERN = "does_not_exist_.*"


def test_list_db(session, temp_db1, temp_db2):
catalog: Catalog = session.catalog
db_list = catalog.list_databases(pattern=f"{CATALOG_TEMP_OBJECT_PREFIX}_DB_*")
assert {db.name for db in db_list} >= {temp_db1, temp_db2}

db_list = catalog.list_databases(like=f"{CATALOG_TEMP_OBJECT_PREFIX}_DB_%")
assert {db.name for db in db_list} >= {temp_db1, temp_db2}


def test_list_schema(session, temp_db1, temp_schema1, temp_schema2):
catalog: Catalog = session.catalog
assert (
len(catalog.list_databases(pattern=f"{CATALOG_TEMP_OBJECT_PREFIX}_SCHEMA_.*"))
== 0
)

schema_list = catalog.list_schemas(
pattern=f"{CATALOG_TEMP_OBJECT_PREFIX}_SCHEMA_.*", database=temp_db1
)
assert {schema.name for schema in schema_list} >= {temp_schema1, temp_schema2}

schema_list = catalog.list_schemas(
like=f"{CATALOG_TEMP_OBJECT_PREFIX}_SCHEMA_%", database=temp_db1
)
assert {schema.name for schema in schema_list} >= {temp_schema1, temp_schema2}
pytestmark = [
pytest.mark.xfail(
"config.getoption('local_testing_mode', default=False)",
reason="deepcopy is not supported and required by local testing",
run=False,
),
]


def test_list_tables(session, temp_db1, temp_schema1, temp_table1, temp_table2):
Expand Down Expand Up @@ -344,48 +317,6 @@ def test_list_udfs(session, temp_db1, temp_schema1, temp_udf1, temp_udf2):
assert {udf.name for udf in udf_list} >= {temp_udf1, temp_udf2}


def test_get_db_schema(session):
catalog: Catalog = session.catalog
current_db = session.get_current_database()
current_schema = session.get_current_schema()
assert catalog.get_database(current_db).name == unquote_if_quoted(current_db)
assert catalog.get_schema(current_schema).name == unquote_if_quoted(current_schema)


def test_get_table_view(session, temp_db1, temp_schema1, temp_table1, temp_view1):
catalog: Catalog = session.catalog
table = catalog.get_table(temp_table1, database=temp_db1, schema=temp_schema1)
assert table.name == temp_table1
assert table.database_name == temp_db1
assert table.schema_name == temp_schema1

view = catalog.get_view(temp_view1, database=temp_db1, schema=temp_schema1)
assert view.name == temp_view1
assert view.database_name == temp_db1
assert view.schema_name == temp_schema1


@pytest.mark.udf
def test_get_function_procedure_udf(
session, temp_db1, temp_schema1, temp_procedure1, temp_udf1
):
catalog: Catalog = session.catalog

procedure = catalog.get_procedure(
temp_procedure1, [IntegerType()], database=temp_db1, schema=temp_schema1
)
assert procedure.name == temp_procedure1
assert procedure.database_name == temp_db1
assert procedure.schema_name == temp_schema1

udf = catalog.get_user_defined_function(
temp_udf1, [IntegerType()], database=temp_db1, schema=temp_schema1
)
assert udf.name == temp_udf1
assert udf.database_name == temp_db1
assert udf.schema_name == temp_schema1


def test_set_db_schema(session, temp_db1, temp_db2, temp_schema1, temp_schema2):
catalog = session.catalog

Expand All @@ -407,112 +338,6 @@ def test_set_db_schema(session, temp_db1, temp_db2, temp_schema1, temp_schema2):
session.use_schema(original_schema)


def test_exists_db_schema(session, temp_db1, temp_schema1):
catalog = session.catalog
assert catalog.database_exists(temp_db1)
assert not catalog.database_exists("does_not_exist")

assert catalog.schema_exists(temp_schema1, database=temp_db1)
assert not catalog.schema_exists(temp_schema1, database="does_not_exist")


def test_exists_table_view(session, temp_db1, temp_schema1, temp_table1, temp_view1):
catalog = session.catalog
db1_obj = catalog._root.databases[temp_db1].fetch()
schema1_obj = catalog._root.databases[temp_db1].schemas[temp_schema1].fetch()

assert catalog.table_exists(temp_table1, database=temp_db1, schema=temp_schema1)
assert catalog.table_exists(temp_table1, database=db1_obj, schema=schema1_obj)
table = catalog.get_table(temp_table1, database=temp_db1, schema=temp_schema1)
assert catalog.table_exists(table)
assert not catalog.table_exists(
"does_not_exist", database=temp_db1, schema=temp_schema1
)

assert catalog.view_exists(temp_view1, database=temp_db1, schema=temp_schema1)
assert catalog.view_exists(temp_view1, database=db1_obj, schema=schema1_obj)
view = catalog.get_view(temp_view1, database=temp_db1, schema=temp_schema1)
assert catalog.view_exists(view)
assert not catalog.view_exists(
"does_not_exist", database=temp_db1, schema=temp_schema1
)


@pytest.mark.udf
def test_exists_function_procedure_udf(
session, temp_db1, temp_schema1, temp_procedure1, temp_udf1
):
catalog = session.catalog
db1_obj = catalog._root.databases[temp_db1].fetch()
schema1_obj = catalog._root.databases[temp_db1].schemas[temp_schema1].fetch()

assert catalog.procedure_exists(
temp_procedure1, [IntegerType()], database=temp_db1, schema=temp_schema1
)
assert catalog.procedure_exists(
temp_procedure1, [IntegerType()], database=db1_obj, schema=schema1_obj
)
proc = catalog.get_procedure(
temp_procedure1, [IntegerType()], database=temp_db1, schema=temp_schema1
)
assert catalog.procedure_exists(proc)
assert not catalog.procedure_exists(
"does_not_exist", [], database=temp_db1, schema=temp_schema1
)

assert catalog.user_defined_function_exists(
temp_udf1, [IntegerType()], database=temp_db1, schema=temp_schema1
)
assert catalog.user_defined_function_exists(
temp_udf1, [IntegerType()], database=db1_obj, schema=schema1_obj
)
udf = catalog.get_user_defined_function(
temp_udf1, [IntegerType()], database=temp_db1, schema=temp_schema1
)
assert catalog.user_defined_function_exists(udf)
assert not catalog.user_defined_function_exists(
"does_not_exist", [], database=temp_db1, schema=temp_schema1
)


@pytest.mark.parametrize("use_object", [True, False])
def test_drop(session, use_object):
catalog = session.catalog

original_db = session.get_current_database()
original_schema = session.get_current_schema()
try:
temp_db = create_temp_db(session)
temp_schema = create_temp_schema(session, temp_db)
temp_table = create_temp_table(session, temp_db, temp_schema)
temp_view = create_temp_view(session, temp_db, temp_schema)
if use_object:
temp_schema = catalog._root.databases[temp_db].schemas[temp_schema].fetch()
temp_db = catalog._root.databases[temp_db].fetch()

assert catalog.database_exists(temp_db)
assert catalog.schema_exists(temp_schema, database=temp_db)
assert catalog.table_exists(temp_table, database=temp_db, schema=temp_schema)
assert catalog.view_exists(temp_view, database=temp_db, schema=temp_schema)

catalog.drop_table(temp_table, database=temp_db, schema=temp_schema)
catalog.drop_view(temp_view, database=temp_db, schema=temp_schema)

assert not catalog.table_exists(
temp_table, database=temp_db, schema=temp_schema
)
assert not catalog.view_exists(temp_view, database=temp_db, schema=temp_schema)

catalog.drop_schema(temp_schema, database=temp_db)
assert not catalog.schema_exists(temp_schema, database=temp_db)

catalog.drop_database(temp_db)
assert not catalog.database_exists(temp_db)
finally:
session.use_database(original_db)
session.use_schema(original_schema)


def test_parse_names_negative(session):
catalog = session.catalog
with pytest.raises(
Expand Down
Loading
Loading