Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 126 additions & 14 deletions pyatlan/client/common/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,44 @@ def process_direct_api_response(


class Save:
@staticmethod
def _process_atlan_tag_semantics(asset: Asset) -> None:
"""
Process Atlan tags with semantics and populate appropriate fields.

Tags with APPEND semantic go to add_or_update_classifications.
Tags with REMOVE semantic go to remove_classifications.
Tags with REPLACE semantic or None (backward compatibility) stay in atlan_tags.

:param asset: the asset to process
"""
if not asset.atlan_tags:
return

append_tags = []
remove_tags = []
replace_tags = []

for tag in asset.atlan_tags:
if tag.semantic == SaveSemantic.APPEND:
append_tags.append(tag)
elif tag.semantic == SaveSemantic.REMOVE:
remove_tags.append(tag)
else:
# REPLACE or None (backward compatibility)
replace_tags.append(tag)

# Update asset fields based on processed tags
if append_tags:
asset.add_or_update_classifications = append_tags
if remove_tags:
asset.remove_classifications = remove_tags
if replace_tags:
asset.atlan_tags = replace_tags
elif append_tags or remove_tags:
# If we only have append/remove tags, clear atlan_tags to avoid conflicts
asset.atlan_tags = None

@staticmethod
def prepare_request(
entity: Union[Asset, List[Asset]],
Expand All @@ -695,13 +733,6 @@ def prepare_request(
:param client: the Atlan client instance for flushing custom metadata
:returns: tuple of (query_params, bulk_request)
"""
query_params = {
"replaceTags": replace_atlan_tags,
"appendTags": append_atlan_tags,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

entities: List[Asset] = []
if isinstance(entity, list):
entities.extend(entity)
Expand All @@ -712,6 +743,50 @@ def prepare_request(
raise ValueError(
"AtlanClient instance must be provided to validate and flush cm for assets."
)

# Process Atlan tags with semantics for each asset
has_semantic_tags = False
has_replace_semantic = False
for asset in entities:
if asset.atlan_tags:
# Check if any tags have semantics
if any(tag.semantic is not None for tag in asset.atlan_tags):
has_semantic_tags = True
# Check if any tags have REPLACE semantic before processing
if any(
tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags
):
has_replace_semantic = True
Save._process_atlan_tag_semantics(asset)

# Determine query parameters based on semantic usage
if has_semantic_tags:
# If tags have semantics, override the parameters
if has_replace_semantic:
# If any asset has REPLACE semantic, use replaceTags
query_params = {
"replaceTags": True,
"appendTags": False,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
else:
# If only APPEND/REMOVE semantics, use appendTags
query_params = {
"replaceTags": False,
"appendTags": True,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
else:
# Backward compatibility: use provided parameters
query_params = {
"replaceTags": replace_atlan_tags,
"appendTags": append_atlan_tags,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

# Validate and flush entities BEFORE creating the BulkRequest
Save.validate_and_flush_entities(entities, client)
return query_params, BulkRequest[Asset](entities=entities)
Expand All @@ -736,13 +811,6 @@ async def prepare_request_async(
:param client: Optional[AsyncAtlanClient] = None,
:returns: tuple of (query_params, bulk_request)
"""
query_params = {
"replaceTags": replace_atlan_tags,
"appendTags": append_atlan_tags,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

entities: List[Asset] = []
if isinstance(entity, list):
entities.extend(entity)
Expand All @@ -753,6 +821,50 @@ async def prepare_request_async(
raise ValueError(
"AsyncAtlanClient instance must be provided to validate and flush cm for assets."
)

# Process Atlan tags with semantics for each asset
has_semantic_tags = False
has_replace_semantic = False
for asset in entities:
if asset.atlan_tags:
# Check if any tags have semantics
if any(tag.semantic is not None for tag in asset.atlan_tags):
has_semantic_tags = True
# Check if any tags have REPLACE semantic before processing
if any(
tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags
):
has_replace_semantic = True
Save._process_atlan_tag_semantics(asset)

# Determine query parameters based on semantic usage
if has_semantic_tags:
# If tags have semantics, override the parameters
if has_replace_semantic:
# If any asset has REPLACE semantic, use replaceTags
query_params = {
"replaceTags": True,
"appendTags": False,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
else:
# If only APPEND/REMOVE semantics, use appendTags
query_params = {
"replaceTags": False,
"appendTags": True,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
else:
# Backward compatibility: use provided parameters
query_params = {
"replaceTags": replace_atlan_tags,
"appendTags": append_atlan_tags,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

# Validate and flush entities BEFORE creating the BulkRequest
await Save.validate_and_flush_entities_async(entities, client)
return query_params, BulkRequest[Asset](entities=entities)
Expand Down
17 changes: 17 additions & 0 deletions pyatlan/model/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,15 @@ class Config:
source_tag_attachments: List[SourceTagAttachment] = Field(
default_factory=list, exclude=True
)
semantic: Optional[SaveSemantic] = Field(
default=None,
exclude=True,
description=(
"Semantic for how this Atlan tag should be saved, "
"if used in an asset request on which `.save()` is called. "
"Options are: APPEND (add/update tag), REMOVE (remove tag), or REPLACE (replace all tags)."
),
)

attributes: Optional[Dict[str, Any]] = None
tag_id: Optional[str] = Field(default=None, exclude=True)
Expand All @@ -334,6 +343,7 @@ def of(
entity_guid: Optional[str] = None,
source_tag_attachment: Optional[SourceTagAttachment] = None,
client: Optional[AtlanClient] = None,
semantic: Optional[SaveSemantic] = None,
) -> AtlanTag:
"""
Construct an Atlan tag assignment for a specific entity.
Expand All @@ -342,6 +352,7 @@ def of(
:param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned
:param source_tag_attachment: (optional) source-specific details for the tag
:param client: (optional) client instance used for translating source-specific details
:param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE)
:return: an Atlan tag assignment with default settings for propagation and a specific entity assignment
:raises InvalidRequestError: if client is not provided and source_tag_attachment is specified
"""
Expand All @@ -358,6 +369,8 @@ def of(
)
tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item]
tag.source_tag_attachments.append(source_tag_attachment)
if semantic is not None:
tag.semantic = semantic
return tag

@classmethod
Expand All @@ -367,6 +380,7 @@ async def of_async(
entity_guid: Optional[str] = None,
source_tag_attachment: Optional[SourceTagAttachment] = None,
client: Optional[AsyncAtlanClient] = None,
semantic: Optional[SaveSemantic] = None,
) -> AtlanTag:
"""
Async version of AtlanTag.of() for use with AsyncAtlanClient.
Expand All @@ -377,6 +391,7 @@ async def of_async(
:param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned
:param source_tag_attachment: (optional) source-specific details for the tag
:param client: (optional) async client instance used for translating source-specific details
:param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE)
:return: an Atlan tag assignment with default settings for propagation and a specific entity assignment
:raises InvalidRequestError: if client is not provided and source_tag_attachment is specified
"""
Expand All @@ -393,6 +408,8 @@ async def of_async(
)
tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item]
tag.source_tag_attachments.append(source_tag_attachment)
if semantic is not None:
tag.semantic = semantic
return tag


Expand Down
Loading