Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cosmotech/coal/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def file_prefix(self):
return self._configuration.bucket_prefix
return ""

@file_prefix.setter
def file_prefix(self, value):
self._configuration.bucket_prefix = value

@property
def use_ssl(self):
if "use_ssl" in self._configuration:
Expand Down
23 changes: 23 additions & 0 deletions cosmotech/coal/azure/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,26 @@ def data_upload(data_stream: BytesIO, file_name: str):
T("coal.common.data_transfer.sending_table").format(table_name=table_name, output_type=output_type)
)
data_upload(_data_stream, _file_name)


def delete_azure_blobs(configuration: Configuration = Configuration()) -> None:
container_client = BlobServiceClient(
account_url=f"https://{configuration.azure.account_name}.blob.core.windows.net/",
credential=ClientSecretCredential(
tenant_id=configuration.azure.tenant_id,
client_id=configuration.azure.client_id,
client_secret=configuration.azure.client_secret,
),
).get_container_client(configuration.azure.container_name)
Comment on lines +106 to +114
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change absolutely nothing, parameter value is set/create a function call


file_prefix = configuration.safe_get("azure.file_prefix", default="")
# List and delete blobs in batches of 256 as the limit is specified in the doc.
# https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.aio.containerclient?view=azure-python#azure-storage-blob-aio-containerclient-delete-blobs
blob_names_batch: list[str] = []
for blob in container_client.list_blobs(name_starts_with=file_prefix):
blob_names_batch.append(blob.name)
if len(blob_names_batch) >= 256:
container_client.delete_blobs(*blob_names_batch)
blob_names_batch = []
if blob_names_batch:
container_client.delete_blobs(*blob_names_batch)
7 changes: 4 additions & 3 deletions cosmotech/coal/postgresql/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ def remove_runner_metadata_from_postgresql(
with dbapi.connect(_psql.full_uri, autocommit=True) as conn:
with conn.cursor() as curs:
schema_table = f"{_psql.db_schema}.{_psql.table_prefix}RunnerMetadata"
last_run_id = runner.get("lastRunInfo").get("lastRunId")
sql_delete_from_metatable = f"""
DELETE FROM {schema_table}
WHERE last_csm_run_id={runner.get("lastRunId")};
WHERE last_csm_run_id= $1;
"""
curs.execute(sql_delete_from_metatable)
curs.execute(sql_delete_from_metatable, (last_run_id,))
conn.commit()
return runner.get("lastRunId")
return runner.get("lastRunInfo").get("lastRunId")
Comment on lines 105 to +113
12 changes: 4 additions & 8 deletions cosmotech/coal/store/output/aws_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,24 @@
from cosmotech.coal.aws import S3
from cosmotech.coal.store.output.channel_interface import (
ChannelInterface,
MissingChannelConfigError,
)
from cosmotech.coal.store.store import Store
from cosmotech.coal.utils.configuration import Configuration, Dotdict
from cosmotech.coal.utils.configuration import Dotdict
from cosmotech.coal.utils.logger import LOGGER


class AwsChannel(ChannelInterface):
required_keys = {
"coal": ["store"],
"cosmotech": ["runner_id"],
"s3": ["access_key_id", "endpoint_url", "secret_access_key"],
}
requirement_string = required_keys

def __init__(self, dct: Dotdict = None):
super().__init__(dct)
self._s3 = S3(self.configuration)
self._s3.file_prefix = self.configuration.cosmotech.runner_id + "/" + self._s3.file_prefix

def send(self, filter: Optional[list[str]] = None) -> bool:

Expand All @@ -36,12 +37,7 @@ def send(self, filter: Optional[list[str]] = None) -> bool:

if self._s3.output_type == "sqlite":
_file_path = _s._database_path
_file_name = "db.sqlite"
_uploaded_file_name = self.configuration.s3.bucket_prefix + _file_name
LOGGER.info(
T("coal.common.data_transfer.file_sent").format(file_path=_file_path, uploaded_name=_uploaded_file_name)
)
self._s3.upload_file(_file_path, _uploaded_file_name)
self._s3.upload_file(_file_path)
else:
tables = list(_s.list_tables())
if filter:
Expand Down
15 changes: 11 additions & 4 deletions cosmotech/coal/store/output/az_storage_channel.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from typing import Optional

from cosmotech.coal.azure.blob import dump_store_to_azure
from cosmotech.coal.azure.blob import delete_azure_blobs, dump_store_to_azure
from cosmotech.coal.store.output.channel_interface import (
ChannelInterface,
MissingChannelConfigError,
)
from cosmotech.coal.utils.configuration import Configuration, Dotdict
from cosmotech.coal.utils.configuration import Dotdict


class AzureStorageChannel(ChannelInterface):
required_keys = {
"coal": ["store"],
"cosmotech": ["runner_id"],
"azure": [
"account_name",
"container_name",
Expand All @@ -23,11 +23,18 @@ class AzureStorageChannel(ChannelInterface):
}
requirement_string = required_keys

def __init__(self, dct: Dotdict = None):
super().__init__(dct)
runner_id = self.configuration.cosmotech.runner_id
prefix = self.configuration.azure.file_prefix
if not prefix.startswith(runner_id):
self.configuration.azure.file_prefix = runner_id + "/" + prefix

def send(self, filter: Optional[list[str]] = None) -> bool:
dump_store_to_azure(
self.configuration,
selected_tables=filter,
)

def delete(self):
pass
delete_azure_blobs(self.configuration)
2 changes: 1 addition & 1 deletion cosmotech/coal/store/output/channel_spliter.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def send(self, filter: Optional[list[str]] = None) -> bool:
raise
return any_ok

def delete(self, filter: Optional[list[str]] = None) -> bool:
def delete(self) -> bool:
any_ok = False
for i in self.targets:
try:
Expand Down
24 changes: 24 additions & 0 deletions cosmotech/csm_data/commands/store/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright (C) - 2023 - 2025 - Cosmo Tech
# This document and all information contained herein is the exclusive property -
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
# Any use, reproduction, translation, broadcasting, transmission, distribution,
# etc., to any person is prohibited unless it has been previously and
# specifically authorized by written means by Cosmo Tech.

from cosmotech.csm_data.utils.click import click
from cosmotech.csm_data.utils.decorators import translate_help, web_help


@click.command()
@web_help("csm-data/store/delete")
@translate_help("csm_data.commands.store.delete.description")
def delete():
# Import the function at the start of the command
from cosmotech.coal.store.output import channel_spliter
from cosmotech.coal.utils.configuration import Configuration

try:
_cs = channel_spliter.ChannelSpliter(Configuration())
_cs.delete()
except ValueError as e:
raise click.Abort() from e
2 changes: 2 additions & 0 deletions cosmotech/csm_data/commands/store/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# etc., to any person is prohibited unless it has been previously and
# specifically authorized by written means by Cosmo Tech.

from cosmotech.csm_data.commands.store.delete import delete
from cosmotech.csm_data.commands.store.dump_to_azure import dump_to_azure
from cosmotech.csm_data.commands.store.dump_to_postgresql import dump_to_postgresql
from cosmotech.csm_data.commands.store.dump_to_s3 import dump_to_s3
Expand Down Expand Up @@ -34,3 +35,4 @@ def store():
store.add_command(dump_to_s3, "dump-to-s3")
store.add_command(dump_to_azure, "dump-to-azure")
store.add_command(output, "output")
store.add_command(delete, "delete")
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
description: |
Delete simulation result from where the output command pushed data.

This delete is base on the configuration.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
description: |
Dump simulation from the coal store to the configured output.

This output is base on the configuration. Default dconfiguration will try to output in postgres.
This output is base on the configuration.

parameters:
filter: names of the tables to output
2 changes: 1 addition & 1 deletion tests/unit/coal/test_postgresql/test_postgresql_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def test_remove_runner_metadata_to_postgresql(self, mock_connect, mock_postgres_
mock_runner = {
"id": "test-runner-id",
"name": "Test Runner",
"lastRunId": "test-run-id",
"lastRunInfo": {"lastRunId": "test-run-id"},
"runTemplateId": "test-template-id",
}

Expand Down
1 change: 0 additions & 1 deletion tests/unit/coal/test_postgresql/test_postgresql_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def test_get_postgresql_table_schema_not_found(self, mock_connect, base_configur
# Arrange
target_table_name = "test_table"
_psql = PostgresUtils(base_configuration)
print(base_configuration.postgres)

# Mock connection and cursor
mock_conn = MagicMock()
Expand Down
16 changes: 11 additions & 5 deletions tests/unit/coal/test_store/test_output/test_aws_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
def base_aws_config():
return {
"coal": {"store": "$cosmotech.parameters_absolute_path"},
"cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"},
"cosmotech": {
"runner_id": "runner-789",
"dataset_absolute_path": "/path/to/dataset",
"parameters_absolute_path": "/path/to/params",
},
"s3": {
"access_key_id": "test_key",
"endpoint_url": "http://test.url",
Expand All @@ -30,11 +34,11 @@ def base_aws_config():
class TestAwsChannel:
"""Tests for the AwsChannel class."""

def test_init_with_configuration(self, base_aws_config):
@patch("cosmotech.coal.store.output.aws_channel.S3")
def test_init_with_configuration(self, mock_s3_class, base_aws_config):
"""Test AwsChannel initialization with configuration."""
# Act
with patch("cosmotech.coal.store.output.aws_channel.S3"):
channel = AwsChannel(base_aws_config)
channel = AwsChannel(base_aws_config)

# Assert
assert channel.configuration is not None
Expand All @@ -44,6 +48,8 @@ def test_required_keys(self):
# Assert
assert "coal" in AwsChannel.required_keys
assert "s3" in AwsChannel.required_keys
assert "cosmotech" in AwsChannel.required_keys
assert "runner_id" in AwsChannel.required_keys["cosmotech"]
assert "store" in AwsChannel.required_keys["coal"]
assert "access_key_id" in AwsChannel.required_keys["s3"]
assert "endpoint_url" in AwsChannel.required_keys["s3"]
Expand All @@ -68,7 +74,7 @@ def test_send_sqlite(self, mock_s3_class, mock_store_class, base_aws_config):
channel.send()

# Assert
mock_s3.upload_file.assert_called_once_with("/path/to/db.sqlite", "prefix/db.sqlite")
mock_s3.upload_file.assert_called_once_with("/path/to/db.sqlite")

@patch("cosmotech.coal.store.output.aws_channel.Store")
@patch("cosmotech.coal.store.output.aws_channel.S3")
Expand Down
35 changes: 27 additions & 8 deletions tests/unit/coal/test_store/test_output/test_az_storage_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ def base_azure_storage_config():
return Configuration(
{
"coal": {"store": "$cosmotech.parameters_absolute_path"},
"cosmotech": {"dataset_absolute_path": "/path/to/dataset", "parameters_absolute_path": "/path/to/params"},
"cosmotech": {
"runner_id": "r-789",
"dataset_absolute_path": "/path/to/dataset",
"parameters_absolute_path": "/path/to/params",
},
"azure": {
"account_name": "test_account",
"container_name": "test_container",
Expand Down Expand Up @@ -62,39 +66,54 @@ def test_required_keys(self):
@patch("cosmotech.coal.azure.blob.ClientSecretCredential")
def test_send_without_filter(self, mock_client_secret, mock_store, mock_dump, base_azure_storage_config):
"""Test sending data without table filter."""
# Arrange
channel = AzureStorageChannel(base_azure_storage_config)

# Act
channel.send()

# Assert
mock_dump.assert_called_once_with(base_azure_storage_config, selected_tables=None)
expected_config = base_azure_storage_config
expected_config.azure.file_prefix = (
base_azure_storage_config.cosmotech.runner_id + "/" + base_azure_storage_config.azure.file_prefix
)
mock_dump.assert_called_once_with(expected_config, selected_tables=None)
Comment on lines +76 to +80
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the channel init function create a new configuration object non prepend is happenning


@patch("cosmotech.coal.store.output.az_storage_channel.dump_store_to_azure")
@patch("cosmotech.coal.azure.blob.Store")
@patch("cosmotech.coal.azure.blob.ClientSecretCredential")
def test_send_with_filter(self, mock_client_secret, mock_store, mock_dump, base_azure_storage_config):
"""Test sending data with table filter."""

# Arrange
channel = AzureStorageChannel(base_azure_storage_config)
tables_filter = ["table1", "table2"]

# Act
channel.send(filter=tables_filter)

# Assert
expected_config = base_azure_storage_config
expected_config.azure.file_prefix = (
base_azure_storage_config.cosmotech.runner_id + "/" + base_azure_storage_config.azure.file_prefix
)
mock_dump.assert_called_once_with(
base_azure_storage_config,
expected_config,
selected_tables=["table1", "table2"],
)
Comment on lines +95 to 102
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, channel init function create a new configuration object no prepend is happening


def test_delete(self, base_azure_storage_config):
@patch("cosmotech.coal.store.output.az_storage_channel.delete_azure_blobs")
@patch("cosmotech.coal.azure.blob.Store")
@patch("cosmotech.coal.azure.blob.ClientSecretCredential")
def test_delete(self, mock_client_secret, mock_store, mock_delete, base_azure_storage_config):
"""Test delete method (should do nothing)."""
channel = AzureStorageChannel(base_azure_storage_config)

# Act
result = channel.delete()
channel.delete()

# Assert
# Should not raise any exception and return None
assert result is None
expected_config = base_azure_storage_config
expected_config.azure.file_prefix = (
base_azure_storage_config.cosmotech.runner_id + "/" + base_azure_storage_config.azure.file_prefix
)
mock_delete.assert_called_once_with(expected_config)
Comment on lines 108 to +119
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re non

Loading