diff --git a/pyatlan/client/common/asset.py b/pyatlan/client/common/asset.py index ca265a7e1..257f7d320 100644 --- a/pyatlan/client/common/asset.py +++ b/pyatlan/client/common/asset.py @@ -675,6 +675,44 @@ def process_direct_api_response( class Save: + @staticmethod + def _process_atlan_tag_semantics(asset: Asset) -> None: + """ + Process Atlan tags with semantics and populate appropriate fields. + + Tags with APPEND semantic go to add_or_update_classifications. + Tags with REMOVE semantic go to remove_classifications. + Tags with REPLACE semantic or None (backward compatibility) stay in atlan_tags. + + :param asset: the asset to process + """ + if not asset.atlan_tags: + return + + append_tags = [] + remove_tags = [] + replace_tags = [] + + for tag in asset.atlan_tags: + if tag.semantic == SaveSemantic.APPEND: + append_tags.append(tag) + elif tag.semantic == SaveSemantic.REMOVE: + remove_tags.append(tag) + else: + # REPLACE or None (backward compatibility) + replace_tags.append(tag) + + # Update asset fields based on processed tags + if append_tags: + asset.add_or_update_classifications = append_tags + if remove_tags: + asset.remove_classifications = remove_tags + if replace_tags: + asset.atlan_tags = replace_tags + elif append_tags or remove_tags: + # If we only have append/remove tags, clear atlan_tags to avoid conflicts + asset.atlan_tags = None + @staticmethod def prepare_request( entity: Union[Asset, List[Asset]], @@ -695,13 +733,6 @@ def prepare_request( :param client: the Atlan client instance for flushing custom metadata :returns: tuple of (query_params, bulk_request) """ - query_params = { - "replaceTags": replace_atlan_tags, - "appendTags": append_atlan_tags, - "replaceBusinessAttributes": replace_custom_metadata, - "overwriteBusinessAttributes": overwrite_custom_metadata, - } - entities: List[Asset] = [] if isinstance(entity, list): entities.extend(entity) @@ -712,6 +743,50 @@ def prepare_request( raise ValueError( "AtlanClient instance must be provided to validate and flush cm for assets." ) + + # Process Atlan tags with semantics for each asset + has_semantic_tags = False + has_replace_semantic = False + for asset in entities: + if asset.atlan_tags: + # Check if any tags have semantics + if any(tag.semantic is not None for tag in asset.atlan_tags): + has_semantic_tags = True + # Check if any tags have REPLACE semantic before processing + if any( + tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags + ): + has_replace_semantic = True + Save._process_atlan_tag_semantics(asset) + + # Determine query parameters based on semantic usage + if has_semantic_tags: + # If tags have semantics, override the parameters + if has_replace_semantic: + # If any asset has REPLACE semantic, use replaceTags + query_params = { + "replaceTags": True, + "appendTags": False, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + else: + # If only APPEND/REMOVE semantics, use appendTags + query_params = { + "replaceTags": False, + "appendTags": True, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + else: + # Backward compatibility: use provided parameters + query_params = { + "replaceTags": replace_atlan_tags, + "appendTags": append_atlan_tags, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + # Validate and flush entities BEFORE creating the BulkRequest Save.validate_and_flush_entities(entities, client) return query_params, BulkRequest[Asset](entities=entities) @@ -736,13 +811,6 @@ async def prepare_request_async( :param client: Optional[AsyncAtlanClient] = None, :returns: tuple of (query_params, bulk_request) """ - query_params = { - "replaceTags": replace_atlan_tags, - "appendTags": append_atlan_tags, - "replaceBusinessAttributes": replace_custom_metadata, - "overwriteBusinessAttributes": overwrite_custom_metadata, - } - entities: List[Asset] = [] if isinstance(entity, list): entities.extend(entity) @@ -753,6 +821,50 @@ async def prepare_request_async( raise ValueError( "AsyncAtlanClient instance must be provided to validate and flush cm for assets." ) + + # Process Atlan tags with semantics for each asset + has_semantic_tags = False + has_replace_semantic = False + for asset in entities: + if asset.atlan_tags: + # Check if any tags have semantics + if any(tag.semantic is not None for tag in asset.atlan_tags): + has_semantic_tags = True + # Check if any tags have REPLACE semantic before processing + if any( + tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags + ): + has_replace_semantic = True + Save._process_atlan_tag_semantics(asset) + + # Determine query parameters based on semantic usage + if has_semantic_tags: + # If tags have semantics, override the parameters + if has_replace_semantic: + # If any asset has REPLACE semantic, use replaceTags + query_params = { + "replaceTags": True, + "appendTags": False, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + else: + # If only APPEND/REMOVE semantics, use appendTags + query_params = { + "replaceTags": False, + "appendTags": True, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + else: + # Backward compatibility: use provided parameters + query_params = { + "replaceTags": replace_atlan_tags, + "appendTags": append_atlan_tags, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + # Validate and flush entities BEFORE creating the BulkRequest await Save.validate_and_flush_entities_async(entities, client) return query_params, BulkRequest[Asset](entities=entities) diff --git a/pyatlan/model/core.py b/pyatlan/model/core.py index 0a4b89da0..046b9008b 100644 --- a/pyatlan/model/core.py +++ b/pyatlan/model/core.py @@ -323,6 +323,15 @@ class Config: source_tag_attachments: List[SourceTagAttachment] = Field( default_factory=list, exclude=True ) + semantic: Optional[SaveSemantic] = Field( + default=None, + exclude=True, + description=( + "Semantic for how this Atlan tag should be saved, " + "if used in an asset request on which `.save()` is called. " + "Options are: APPEND (add/update tag), REMOVE (remove tag), or REPLACE (replace all tags)." + ), + ) attributes: Optional[Dict[str, Any]] = None tag_id: Optional[str] = Field(default=None, exclude=True) @@ -334,6 +343,7 @@ def of( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Construct an Atlan tag assignment for a specific entity. @@ -342,6 +352,7 @@ def of( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ @@ -358,6 +369,8 @@ def of( ) tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item] tag.source_tag_attachments.append(source_tag_attachment) + if semantic is not None: + tag.semantic = semantic return tag @classmethod @@ -367,6 +380,7 @@ async def of_async( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AsyncAtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Async version of AtlanTag.of() for use with AsyncAtlanClient. @@ -377,6 +391,7 @@ async def of_async( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) async client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ @@ -393,6 +408,8 @@ async def of_async( ) tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item] tag.source_tag_attachments.append(source_tag_attachment) + if semantic is not None: + tag.semantic = semantic return tag