diff --git a/cosmotech/coal/aws/s3.py b/cosmotech/coal/aws/s3.py index 69626076..22f09202 100644 --- a/cosmotech/coal/aws/s3.py +++ b/cosmotech/coal/aws/s3.py @@ -33,6 +33,10 @@ def file_prefix(self): return self._configuration.bucket_prefix return "" + @file_prefix.setter + def file_prefix(self, value): + self._configuration.bucket_prefix = value + @property def use_ssl(self): if "use_ssl" in self._configuration: diff --git a/cosmotech/coal/azure/blob.py b/cosmotech/coal/azure/blob.py index 1bada51a..7958097c 100644 --- a/cosmotech/coal/azure/blob.py +++ b/cosmotech/coal/azure/blob.py @@ -101,3 +101,26 @@ def data_upload(data_stream: BytesIO, file_name: str): T("coal.common.data_transfer.sending_table").format(table_name=table_name, output_type=output_type) ) data_upload(_data_stream, _file_name) + + +def delete_azure_blobs(configuration: Configuration = Configuration()) -> None: + container_client = BlobServiceClient( + account_url=f"https://{configuration.azure.account_name}.blob.core.windows.net/", + credential=ClientSecretCredential( + tenant_id=configuration.azure.tenant_id, + client_id=configuration.azure.client_id, + client_secret=configuration.azure.client_secret, + ), + ).get_container_client(configuration.azure.container_name) + + file_prefix = configuration.safe_get("azure.file_prefix", default="") + # List and delete blobs in batches of 256 as the limit is specified in the doc. + # https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.aio.containerclient?view=azure-python#azure-storage-blob-aio-containerclient-delete-blobs + blob_names_batch: list[str] = [] + for blob in container_client.list_blobs(name_starts_with=file_prefix): + blob_names_batch.append(blob.name) + if len(blob_names_batch) >= 256: + container_client.delete_blobs(*blob_names_batch) + blob_names_batch = [] + if blob_names_batch: + container_client.delete_blobs(*blob_names_batch) diff --git a/cosmotech/coal/postgresql/runner.py b/cosmotech/coal/postgresql/runner.py index 683628bb..49d1638f 100644 --- a/cosmotech/coal/postgresql/runner.py +++ b/cosmotech/coal/postgresql/runner.py @@ -103,10 +103,11 @@ def remove_runner_metadata_from_postgresql( with dbapi.connect(_psql.full_uri, autocommit=True) as conn: with conn.cursor() as curs: schema_table = f"{_psql.db_schema}.{_psql.table_prefix}RunnerMetadata" + last_run_id = runner.get("lastRunInfo").get("lastRunId") sql_delete_from_metatable = f""" DELETE FROM {schema_table} - WHERE last_csm_run_id={runner.get("lastRunId")}; + WHERE last_csm_run_id= $1; """ - curs.execute(sql_delete_from_metatable) + curs.execute(sql_delete_from_metatable, (last_run_id,)) conn.commit() - return runner.get("lastRunId") + return runner.get("lastRunInfo").get("lastRunId") diff --git a/cosmotech/coal/store/output/aws_channel.py b/cosmotech/coal/store/output/aws_channel.py index c5a24428..13590683 100644 --- a/cosmotech/coal/store/output/aws_channel.py +++ b/cosmotech/coal/store/output/aws_channel.py @@ -8,16 +8,16 @@ from cosmotech.coal.aws import S3 from cosmotech.coal.store.output.channel_interface import ( ChannelInterface, - MissingChannelConfigError, ) from cosmotech.coal.store.store import Store -from cosmotech.coal.utils.configuration import Configuration, Dotdict +from cosmotech.coal.utils.configuration import Dotdict from cosmotech.coal.utils.logger import LOGGER class AwsChannel(ChannelInterface): required_keys = { "coal": ["store"], + "cosmotech": ["runner_id"], "s3": ["access_key_id", "endpoint_url", "secret_access_key"], } requirement_string = required_keys @@ -25,6 +25,7 @@ class AwsChannel(ChannelInterface): def __init__(self, dct: Dotdict = None): super().__init__(dct) self._s3 = S3(self.configuration) + self._s3.file_prefix = self.configuration.cosmotech.runner_id + "/" + self._s3.file_prefix def send(self, filter: Optional[list[str]] = None) -> bool: @@ -36,12 +37,7 @@ def send(self, filter: Optional[list[str]] = None) -> bool: if self._s3.output_type == "sqlite": _file_path = _s._database_path - _file_name = "db.sqlite" - _uploaded_file_name = self.configuration.s3.bucket_prefix + _file_name - LOGGER.info( - T("coal.common.data_transfer.file_sent").format(file_path=_file_path, uploaded_name=_uploaded_file_name) - ) - self._s3.upload_file(_file_path, _uploaded_file_name) + self._s3.upload_file(_file_path) else: tables = list(_s.list_tables()) if filter: diff --git a/cosmotech/coal/store/output/az_storage_channel.py b/cosmotech/coal/store/output/az_storage_channel.py index a51eec8d..a4ee9a8c 100644 --- a/cosmotech/coal/store/output/az_storage_channel.py +++ b/cosmotech/coal/store/output/az_storage_channel.py @@ -1,16 +1,16 @@ from typing import Optional -from cosmotech.coal.azure.blob import dump_store_to_azure +from cosmotech.coal.azure.blob import delete_azure_blobs, dump_store_to_azure from cosmotech.coal.store.output.channel_interface import ( ChannelInterface, - MissingChannelConfigError, ) -from cosmotech.coal.utils.configuration import Configuration, Dotdict +from cosmotech.coal.utils.configuration import Dotdict class AzureStorageChannel(ChannelInterface): required_keys = { "coal": ["store"], + "cosmotech": ["runner_id"], "azure": [ "account_name", "container_name", @@ -23,6 +23,13 @@ class AzureStorageChannel(ChannelInterface): } requirement_string = required_keys + def __init__(self, dct: Dotdict = None): + super().__init__(dct) + runner_id = self.configuration.cosmotech.runner_id + prefix = self.configuration.azure.file_prefix + if not prefix.startswith(runner_id): + self.configuration.azure.file_prefix = runner_id + "/" + prefix + def send(self, filter: Optional[list[str]] = None) -> bool: dump_store_to_azure( self.configuration, @@ -30,4 +37,4 @@ def send(self, filter: Optional[list[str]] = None) -> bool: ) def delete(self): - pass + delete_azure_blobs(self.configuration) diff --git a/cosmotech/coal/store/output/channel_spliter.py b/cosmotech/coal/store/output/channel_spliter.py index 98d934d6..e5ff6739 100644 --- a/cosmotech/coal/store/output/channel_spliter.py +++ b/cosmotech/coal/store/output/channel_spliter.py @@ -49,7 +49,7 @@ def send(self, filter: Optional[list[str]] = None) -> bool: raise return any_ok - def delete(self, filter: Optional[list[str]] = None) -> bool: + def delete(self) -> bool: any_ok = False for i in self.targets: try: diff --git a/cosmotech/csm_data/commands/store/delete.py b/cosmotech/csm_data/commands/store/delete.py new file mode 100644 index 00000000..cb99e624 --- /dev/null +++ b/cosmotech/csm_data/commands/store/delete.py @@ -0,0 +1,24 @@ +# Copyright (C) - 2023 - 2025 - Cosmo Tech +# This document and all information contained herein is the exclusive property - +# including all intellectual property rights pertaining thereto - of Cosmo Tech. +# Any use, reproduction, translation, broadcasting, transmission, distribution, +# etc., to any person is prohibited unless it has been previously and +# specifically authorized by written means by Cosmo Tech. + +from cosmotech.csm_data.utils.click import click +from cosmotech.csm_data.utils.decorators import translate_help, web_help + + +@click.command() +@web_help("csm-data/store/delete") +@translate_help("csm_data.commands.store.delete.description") +def delete(): + # Import the function at the start of the command + from cosmotech.coal.store.output import channel_spliter + from cosmotech.coal.utils.configuration import Configuration + + try: + _cs = channel_spliter.ChannelSpliter(Configuration()) + _cs.delete() + except ValueError as e: + raise click.Abort() from e diff --git a/cosmotech/csm_data/commands/store/store.py b/cosmotech/csm_data/commands/store/store.py index ec361152..47be8957 100644 --- a/cosmotech/csm_data/commands/store/store.py +++ b/cosmotech/csm_data/commands/store/store.py @@ -5,6 +5,7 @@ # etc., to any person is prohibited unless it has been previously and # specifically authorized by written means by Cosmo Tech. +from cosmotech.csm_data.commands.store.delete import delete from cosmotech.csm_data.commands.store.dump_to_azure import dump_to_azure from cosmotech.csm_data.commands.store.dump_to_postgresql import dump_to_postgresql from cosmotech.csm_data.commands.store.dump_to_s3 import dump_to_s3 @@ -34,3 +35,4 @@ def store(): store.add_command(dump_to_s3, "dump-to-s3") store.add_command(dump_to_azure, "dump-to-azure") store.add_command(output, "output") +store.add_command(delete, "delete") diff --git a/cosmotech/translation/csm_data/en-US/csm_data/commands/store/delete.yml b/cosmotech/translation/csm_data/en-US/csm_data/commands/store/delete.yml new file mode 100644 index 00000000..cc219bc6 --- /dev/null +++ b/cosmotech/translation/csm_data/en-US/csm_data/commands/store/delete.yml @@ -0,0 +1,4 @@ +description: | + Delete simulation result from where the output command pushed data. + + This delete is base on the configuration. diff --git a/cosmotech/translation/csm_data/en-US/csm_data/commands/store/output.yml b/cosmotech/translation/csm_data/en-US/csm_data/commands/store/output.yml index df538a64..0bb960e3 100644 --- a/cosmotech/translation/csm_data/en-US/csm_data/commands/store/output.yml +++ b/cosmotech/translation/csm_data/en-US/csm_data/commands/store/output.yml @@ -1,7 +1,7 @@ description: | Dump simulation from the coal store to the configured output. - This output is base on the configuration. Default dconfiguration will try to output in postgres. + This output is base on the configuration. parameters: filter: names of the tables to output diff --git a/tests/unit/coal/test_postgresql/test_postgresql_runner.py b/tests/unit/coal/test_postgresql/test_postgresql_runner.py index d50ff91e..711099af 100644 --- a/tests/unit/coal/test_postgresql/test_postgresql_runner.py +++ b/tests/unit/coal/test_postgresql/test_postgresql_runner.py @@ -112,7 +112,7 @@ def test_remove_runner_metadata_to_postgresql(self, mock_connect, mock_postgres_ mock_runner = { "id": "test-runner-id", "name": "Test Runner", - "lastRunId": "test-run-id", + "lastRunInfo": {"lastRunId": "test-run-id"}, "runTemplateId": "test-template-id", } diff --git a/tests/unit/coal/test_postgresql/test_postgresql_utils.py b/tests/unit/coal/test_postgresql/test_postgresql_utils.py index 2c3eafc0..adb03444 100644 --- a/tests/unit/coal/test_postgresql/test_postgresql_utils.py +++ b/tests/unit/coal/test_postgresql/test_postgresql_utils.py @@ -110,7 +110,6 @@ def test_get_postgresql_table_schema_not_found(self, mock_connect, base_configur # Arrange target_table_name = "test_table" _psql = PostgresUtils(base_configuration) - print(base_configuration.postgres) # Mock connection and cursor mock_conn = MagicMock() diff --git a/tests/unit/coal/test_store/test_output/test_aws_channel.py b/tests/unit/coal/test_store/test_output/test_aws_channel.py index 33e83a1f..13ba9d72 100644 --- a/tests/unit/coal/test_store/test_output/test_aws_channel.py +++ b/tests/unit/coal/test_store/test_output/test_aws_channel.py @@ -17,7 +17,11 @@ def base_aws_config(): return { "coal": {"store": "$cosmotech.parameters_absolute_path"}, - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, + "cosmotech": { + "runner_id": "runner-789", + "dataset_absolute_path": "/path/to/dataset", + "parameters_absolute_path": "/path/to/params", + }, "s3": { "access_key_id": "test_key", "endpoint_url": "http://test.url", @@ -30,11 +34,11 @@ def base_aws_config(): class TestAwsChannel: """Tests for the AwsChannel class.""" - def test_init_with_configuration(self, base_aws_config): + @patch("cosmotech.coal.store.output.aws_channel.S3") + def test_init_with_configuration(self, mock_s3_class, base_aws_config): """Test AwsChannel initialization with configuration.""" # Act - with patch("cosmotech.coal.store.output.aws_channel.S3"): - channel = AwsChannel(base_aws_config) + channel = AwsChannel(base_aws_config) # Assert assert channel.configuration is not None @@ -44,6 +48,8 @@ def test_required_keys(self): # Assert assert "coal" in AwsChannel.required_keys assert "s3" in AwsChannel.required_keys + assert "cosmotech" in AwsChannel.required_keys + assert "runner_id" in AwsChannel.required_keys["cosmotech"] assert "store" in AwsChannel.required_keys["coal"] assert "access_key_id" in AwsChannel.required_keys["s3"] assert "endpoint_url" in AwsChannel.required_keys["s3"] @@ -68,7 +74,7 @@ def test_send_sqlite(self, mock_s3_class, mock_store_class, base_aws_config): channel.send() # Assert - mock_s3.upload_file.assert_called_once_with("/path/to/db.sqlite", "prefix/db.sqlite") + mock_s3.upload_file.assert_called_once_with("/path/to/db.sqlite") @patch("cosmotech.coal.store.output.aws_channel.Store") @patch("cosmotech.coal.store.output.aws_channel.S3") diff --git a/tests/unit/coal/test_store/test_output/test_az_storage_channel.py b/tests/unit/coal/test_store/test_output/test_az_storage_channel.py index fad986d2..3f15a222 100644 --- a/tests/unit/coal/test_store/test_output/test_az_storage_channel.py +++ b/tests/unit/coal/test_store/test_output/test_az_storage_channel.py @@ -18,7 +18,11 @@ def base_azure_storage_config(): return Configuration( { "coal": {"store": "$cosmotech.parameters_absolute_path"}, - "cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"}, + "cosmotech": { + "runner_id": "r-789", + "dataset_absolute_path": "/path/to/dataset", + "parameters_absolute_path": "/path/to/params", + }, "azure": { "account_name": "test_account", "container_name": "test_container", @@ -62,20 +66,25 @@ def test_required_keys(self): @patch("cosmotech.coal.azure.blob.ClientSecretCredential") def test_send_without_filter(self, mock_client_secret, mock_store, mock_dump, base_azure_storage_config): """Test sending data without table filter.""" + # Arrange channel = AzureStorageChannel(base_azure_storage_config) # Act channel.send() # Assert - mock_dump.assert_called_once_with(base_azure_storage_config, selected_tables=None) + expected_config = base_azure_storage_config + expected_config.azure.file_prefix = ( + base_azure_storage_config.cosmotech.runner_id + "/" + base_azure_storage_config.azure.file_prefix + ) + mock_dump.assert_called_once_with(expected_config, selected_tables=None) @patch("cosmotech.coal.store.output.az_storage_channel.dump_store_to_azure") @patch("cosmotech.coal.azure.blob.Store") @patch("cosmotech.coal.azure.blob.ClientSecretCredential") def test_send_with_filter(self, mock_client_secret, mock_store, mock_dump, base_azure_storage_config): """Test sending data with table filter.""" - + # Arrange channel = AzureStorageChannel(base_azure_storage_config) tables_filter = ["table1", "table2"] @@ -83,18 +92,28 @@ def test_send_with_filter(self, mock_client_secret, mock_store, mock_dump, base_ channel.send(filter=tables_filter) # Assert + expected_config = base_azure_storage_config + expected_config.azure.file_prefix = ( + base_azure_storage_config.cosmotech.runner_id + "/" + base_azure_storage_config.azure.file_prefix + ) mock_dump.assert_called_once_with( - base_azure_storage_config, + expected_config, selected_tables=["table1", "table2"], ) - def test_delete(self, base_azure_storage_config): + @patch("cosmotech.coal.store.output.az_storage_channel.delete_azure_blobs") + @patch("cosmotech.coal.azure.blob.Store") + @patch("cosmotech.coal.azure.blob.ClientSecretCredential") + def test_delete(self, mock_client_secret, mock_store, mock_delete, base_azure_storage_config): """Test delete method (should do nothing).""" channel = AzureStorageChannel(base_azure_storage_config) # Act - result = channel.delete() + channel.delete() # Assert - # Should not raise any exception and return None - assert result is None + expected_config = base_azure_storage_config + expected_config.azure.file_prefix = ( + base_azure_storage_config.cosmotech.runner_id + "/" + base_azure_storage_config.azure.file_prefix + ) + mock_delete.assert_called_once_with(expected_config)