From bccb9aa817d5d83f38d306c5732fed6617fce16f Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 4 Feb 2026 13:47:32 +0000 Subject: [PATCH 1/3] Add support for in-bulk asset tag management using SaveSemantic - Added semantic field to AtlanTag class with APPEND, REMOVE, and REPLACE support - Updated AtlanTag.of() and AtlanTag.of_async() methods to accept semantic parameter - Implemented tag semantic handling logic in Save.prepare_request() and prepare_request_async() - Tags with APPEND semantic are added to add_or_update_classifications - Tags with REMOVE semantic are added to remove_classifications - Tags with REPLACE semantic or None (backward compatibility) remain in atlan_tags - Query parameters are automatically set based on semantic usage - Maintains full backward compatibility with existing functionality Resolves LINTEST-345 --- pyatlan/client/common/asset.py | 136 +++++++++++++++++++++++++++++---- pyatlan/model/core.py | 17 +++++ 2 files changed, 139 insertions(+), 14 deletions(-) diff --git a/pyatlan/client/common/asset.py b/pyatlan/client/common/asset.py index ca265a7e1..b04c43960 100644 --- a/pyatlan/client/common/asset.py +++ b/pyatlan/client/common/asset.py @@ -675,6 +675,44 @@ def process_direct_api_response( class Save: + @staticmethod + def _process_atlan_tag_semantics(asset: Asset) -> None: + """ + Process Atlan tags with semantics and populate appropriate fields. + + Tags with APPEND semantic go to add_or_update_classifications. + Tags with REMOVE semantic go to remove_classifications. + Tags with REPLACE semantic or None (backward compatibility) stay in atlan_tags. + + :param asset: the asset to process + """ + if not asset.atlan_tags: + return + + append_tags = [] + remove_tags = [] + replace_tags = [] + + for tag in asset.atlan_tags: + if tag.semantic == SaveSemantic.APPEND: + append_tags.append(tag) + elif tag.semantic == SaveSemantic.REMOVE: + remove_tags.append(tag) + else: + # REPLACE or None (backward compatibility) + replace_tags.append(tag) + + # Update asset fields based on processed tags + if append_tags: + asset.add_or_update_classifications = append_tags + if remove_tags: + asset.remove_classifications = remove_tags + if replace_tags: + asset.atlan_tags = replace_tags + elif append_tags or remove_tags: + # If we only have append/remove tags, clear atlan_tags to avoid conflicts + asset.atlan_tags = None + @staticmethod def prepare_request( entity: Union[Asset, List[Asset]], @@ -695,13 +733,6 @@ def prepare_request( :param client: the Atlan client instance for flushing custom metadata :returns: tuple of (query_params, bulk_request) """ - query_params = { - "replaceTags": replace_atlan_tags, - "appendTags": append_atlan_tags, - "replaceBusinessAttributes": replace_custom_metadata, - "overwriteBusinessAttributes": overwrite_custom_metadata, - } - entities: List[Asset] = [] if isinstance(entity, list): entities.extend(entity) @@ -712,6 +743,48 @@ def prepare_request( raise ValueError( "AtlanClient instance must be provided to validate and flush cm for assets." ) + + # Process Atlan tags with semantics for each asset + has_semantic_tags = False + has_replace_semantic = False + for asset in entities: + if asset.atlan_tags: + # Check if any tags have semantics + if any(tag.semantic is not None for tag in asset.atlan_tags): + has_semantic_tags = True + # Check if any tags have REPLACE semantic before processing + if any(tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags): + has_replace_semantic = True + Save._process_atlan_tag_semantics(asset) + + # Determine query parameters based on semantic usage + if has_semantic_tags: + # If tags have semantics, override the parameters + if has_replace_semantic: + # If any asset has REPLACE semantic, use replaceTags + query_params = { + "replaceTags": True, + "appendTags": False, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + else: + # If only APPEND/REMOVE semantics, use appendTags + query_params = { + "replaceTags": False, + "appendTags": True, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + else: + # Backward compatibility: use provided parameters + query_params = { + "replaceTags": replace_atlan_tags, + "appendTags": append_atlan_tags, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + # Validate and flush entities BEFORE creating the BulkRequest Save.validate_and_flush_entities(entities, client) return query_params, BulkRequest[Asset](entities=entities) @@ -736,13 +809,6 @@ async def prepare_request_async( :param client: Optional[AsyncAtlanClient] = None, :returns: tuple of (query_params, bulk_request) """ - query_params = { - "replaceTags": replace_atlan_tags, - "appendTags": append_atlan_tags, - "replaceBusinessAttributes": replace_custom_metadata, - "overwriteBusinessAttributes": overwrite_custom_metadata, - } - entities: List[Asset] = [] if isinstance(entity, list): entities.extend(entity) @@ -753,6 +819,48 @@ async def prepare_request_async( raise ValueError( "AsyncAtlanClient instance must be provided to validate and flush cm for assets." ) + + # Process Atlan tags with semantics for each asset + has_semantic_tags = False + has_replace_semantic = False + for asset in entities: + if asset.atlan_tags: + # Check if any tags have semantics + if any(tag.semantic is not None for tag in asset.atlan_tags): + has_semantic_tags = True + # Check if any tags have REPLACE semantic before processing + if any(tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags): + has_replace_semantic = True + Save._process_atlan_tag_semantics(asset) + + # Determine query parameters based on semantic usage + if has_semantic_tags: + # If tags have semantics, override the parameters + if has_replace_semantic: + # If any asset has REPLACE semantic, use replaceTags + query_params = { + "replaceTags": True, + "appendTags": False, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + else: + # If only APPEND/REMOVE semantics, use appendTags + query_params = { + "replaceTags": False, + "appendTags": True, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + else: + # Backward compatibility: use provided parameters + query_params = { + "replaceTags": replace_atlan_tags, + "appendTags": append_atlan_tags, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + # Validate and flush entities BEFORE creating the BulkRequest await Save.validate_and_flush_entities_async(entities, client) return query_params, BulkRequest[Asset](entities=entities) diff --git a/pyatlan/model/core.py b/pyatlan/model/core.py index 0a4b89da0..046b9008b 100644 --- a/pyatlan/model/core.py +++ b/pyatlan/model/core.py @@ -323,6 +323,15 @@ class Config: source_tag_attachments: List[SourceTagAttachment] = Field( default_factory=list, exclude=True ) + semantic: Optional[SaveSemantic] = Field( + default=None, + exclude=True, + description=( + "Semantic for how this Atlan tag should be saved, " + "if used in an asset request on which `.save()` is called. " + "Options are: APPEND (add/update tag), REMOVE (remove tag), or REPLACE (replace all tags)." + ), + ) attributes: Optional[Dict[str, Any]] = None tag_id: Optional[str] = Field(default=None, exclude=True) @@ -334,6 +343,7 @@ def of( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Construct an Atlan tag assignment for a specific entity. @@ -342,6 +352,7 @@ def of( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ @@ -358,6 +369,8 @@ def of( ) tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item] tag.source_tag_attachments.append(source_tag_attachment) + if semantic is not None: + tag.semantic = semantic return tag @classmethod @@ -367,6 +380,7 @@ async def of_async( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AsyncAtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Async version of AtlanTag.of() for use with AsyncAtlanClient. @@ -377,6 +391,7 @@ async def of_async( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) async client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ @@ -393,6 +408,8 @@ async def of_async( ) tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item] tag.source_tag_attachments.append(source_tag_attachment) + if semantic is not None: + tag.semantic = semantic return tag From 77b05c046eca3a21cb750b80905ef89be3f4cd37 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 4 Feb 2026 13:54:11 +0000 Subject: [PATCH 2/3] Fix ruff formatting issues - Remove trailing whitespace from docstrings and comments - Split long line to meet 88 character limit - Ensure proper blank line spacing per ruff format requirements --- pyatlan/client/common/asset.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/pyatlan/client/common/asset.py b/pyatlan/client/common/asset.py index b04c43960..5b6944c0b 100644 --- a/pyatlan/client/common/asset.py +++ b/pyatlan/client/common/asset.py @@ -679,20 +679,20 @@ class Save: def _process_atlan_tag_semantics(asset: Asset) -> None: """ Process Atlan tags with semantics and populate appropriate fields. - + Tags with APPEND semantic go to add_or_update_classifications. Tags with REMOVE semantic go to remove_classifications. Tags with REPLACE semantic or None (backward compatibility) stay in atlan_tags. - + :param asset: the asset to process """ if not asset.atlan_tags: return - + append_tags = [] remove_tags = [] replace_tags = [] - + for tag in asset.atlan_tags: if tag.semantic == SaveSemantic.APPEND: append_tags.append(tag) @@ -701,7 +701,7 @@ def _process_atlan_tag_semantics(asset: Asset) -> None: else: # REPLACE or None (backward compatibility) replace_tags.append(tag) - + # Update asset fields based on processed tags if append_tags: asset.add_or_update_classifications = append_tags @@ -743,7 +743,7 @@ def prepare_request( raise ValueError( "AtlanClient instance must be provided to validate and flush cm for assets." ) - + # Process Atlan tags with semantics for each asset has_semantic_tags = False has_replace_semantic = False @@ -753,10 +753,13 @@ def prepare_request( if any(tag.semantic is not None for tag in asset.atlan_tags): has_semantic_tags = True # Check if any tags have REPLACE semantic before processing - if any(tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags): + if any( + tag.semantic == SaveSemantic.REPLACE + for tag in asset.atlan_tags + ): has_replace_semantic = True Save._process_atlan_tag_semantics(asset) - + # Determine query parameters based on semantic usage if has_semantic_tags: # If tags have semantics, override the parameters @@ -784,7 +787,7 @@ def prepare_request( "replaceBusinessAttributes": replace_custom_metadata, "overwriteBusinessAttributes": overwrite_custom_metadata, } - + # Validate and flush entities BEFORE creating the BulkRequest Save.validate_and_flush_entities(entities, client) return query_params, BulkRequest[Asset](entities=entities) @@ -819,7 +822,7 @@ async def prepare_request_async( raise ValueError( "AsyncAtlanClient instance must be provided to validate and flush cm for assets." ) - + # Process Atlan tags with semantics for each asset has_semantic_tags = False has_replace_semantic = False @@ -829,10 +832,13 @@ async def prepare_request_async( if any(tag.semantic is not None for tag in asset.atlan_tags): has_semantic_tags = True # Check if any tags have REPLACE semantic before processing - if any(tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags): + if any( + tag.semantic == SaveSemantic.REPLACE + for tag in asset.atlan_tags + ): has_replace_semantic = True Save._process_atlan_tag_semantics(asset) - + # Determine query parameters based on semantic usage if has_semantic_tags: # If tags have semantics, override the parameters @@ -860,7 +866,7 @@ async def prepare_request_async( "replaceBusinessAttributes": replace_custom_metadata, "overwriteBusinessAttributes": overwrite_custom_metadata, } - + # Validate and flush entities BEFORE creating the BulkRequest await Save.validate_and_flush_entities_async(entities, client) return query_params, BulkRequest[Asset](entities=entities) From 4dcd2c10370c828cbe264170583ac9416bbfc0d1 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 4 Feb 2026 13:59:44 +0000 Subject: [PATCH 3/3] Apply ruff formatting to generator expressions - Keep generator expression in any() on single line per ruff standards --- pyatlan/client/common/asset.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pyatlan/client/common/asset.py b/pyatlan/client/common/asset.py index 5b6944c0b..257f7d320 100644 --- a/pyatlan/client/common/asset.py +++ b/pyatlan/client/common/asset.py @@ -754,8 +754,7 @@ def prepare_request( has_semantic_tags = True # Check if any tags have REPLACE semantic before processing if any( - tag.semantic == SaveSemantic.REPLACE - for tag in asset.atlan_tags + tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags ): has_replace_semantic = True Save._process_atlan_tag_semantics(asset) @@ -833,8 +832,7 @@ async def prepare_request_async( has_semantic_tags = True # Check if any tags have REPLACE semantic before processing if any( - tag.semantic == SaveSemantic.REPLACE - for tag in asset.atlan_tags + tag.semantic == SaveSemantic.REPLACE for tag in asset.atlan_tags ): has_replace_semantic = True Save._process_atlan_tag_semantics(asset)