Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/materializer/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ echo "generate_dependent_dags.py completed successfully."
if [[ $(find generated_materializer_dag_files/*/*/task_dep_dags -type f 2> /dev/null | wc -l) -gt 0 ]]
then
echo "Copying DAG files to GCS bucket..."
echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/"
gsutil -m cp -r 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/"
echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/"
gcloud storage cp --recursive 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/"
else
echo "No task dependent DAG files to copy to GCS bucket!"
fi
Expand Down
13 changes: 10 additions & 3 deletions common/materializer/templates/airflow_dag_template_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from datetime import timedelta

import airflow
from airflow import __version__ as airflow_version
from packaging.version import Version
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import \
BigQueryInsertJobOperator
Expand All @@ -34,18 +36,23 @@

default_dag_args = {
"depends_on_past": False,
"start_date": datetime(${year}, ${month}, ${day}),
"start_date": datetime(int("${year}"), int("${month}"), int("${day}")),
"catchup": False,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": "${load_frequency}"}
else:
schedule_kwarg = {"schedule_interval": "${load_frequency}"}

with airflow.DAG("${dag_full_name}",
default_args=default_dag_args,
catchup=False,
max_active_runs=1,
schedule_interval="${load_frequency}",
tags=${tags}) as dag:
tags=ast.literal_eval("${tags}"),
**schedule_kwarg) as dag:
start_task = EmptyOperator(task_id="start")
refresh_table = BigQueryInsertJobOperator(
task_id="refresh_table",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from datetime import timedelta

import airflow
from airflow import __version__ as airflow_version
from packaging.version import Version
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import \
BigQueryInsertJobOperator
Expand All @@ -36,18 +38,23 @@

default_dag_args = {
"depends_on_past": False,
"start_date": datetime(${year}, ${month}, ${day}),
"start_date": datetime(int("${year}"), int("${month}"), int("${day}")),
"catchup": False,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": "${load_frequency}"}
else:
schedule_kwarg = {"schedule_interval": "${load_frequency}"}

with airflow.DAG("${dag_full_name}",
default_args=default_dag_args,
catchup=False,
max_active_runs=1,
schedule_interval="${load_frequency}",
tags=${tags}) as dag:
tags=ast.literal_eval("${tags}"),
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ steps:
if [[ $(find generated_materializer_dag_files -type f 2> /dev/null | wc -l) -gt 0 ]]
then
echo "Copying DAG files to GCS bucket..."
echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/"
gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/
echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/"
gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/
else
echo "No files to copy to GCS bucket!"
fi
Expand Down
6 changes: 3 additions & 3 deletions common/py_libs/k9_deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def _simple_process_and_upload(k9_id: str, k9_dir: str, jinja_dict: dict,
if "__init__.py" not in [str(p.relative_to(k9_dir)) for p in k9_files]:
with open(f"{tmp_dir}/__init__.py", "w", encoding="utf-8") as f:
f.writelines([
"import os",
"import sys",
"import os\n",
"import sys\n",
("sys.path.append("
"os.path.dirname(os.path.realpath(__file__)))")
"os.path.dirname(os.path.realpath(__file__)))\n")
])

if data_source == "k9":
Expand Down
2 changes: 1 addition & 1 deletion common/py_libs/resource_validation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def validate_resources(
if isinstance(ex, NotFound):
logging.error("🛑 Storage bucket `%s` doesn't exist. 🛑",
bucket.name)
elif isinstance(ex, Unauthorized, Forbidden):
elif isinstance(ex, (Unauthorized, Forbidden)):
if checking_on_writing:
logging.error("🛑 Storage bucket `%s` "
"is not writable. 🛑", bucket.name)
Expand Down
13 changes: 11 additions & 2 deletions local_k9/costcenter_hierarchy/costcenter_hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from airflow import DAG
from datetime import datetime, timedelta

from airflow import __version__ as airflow_version
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from packaging.version import Version

# BigQuery Job Labels - converts generated string to dict
# If string is empty, assigns empty dict
Expand All @@ -30,16 +32,23 @@
"retry_delay": timedelta(minutes=5),
}

load_frequency = "@monthly"

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": load_frequency}
else:
schedule_kwarg = {"schedule_interval": load_frequency}

# This DAG creates following two tables
# and deletes hierarchy from a specific node(if needed).
# 1-flattened cost center table.
# 2-cost center and node mapping.
with DAG(dag_id="cost_center",
default_args=default_args,
schedule_interval="@monthly",
start_date=datetime(2023, 11, 27),
catchup=False,
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
13 changes: 11 additions & 2 deletions local_k9/currency_conversion/currency_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
from airflow import DAG
from datetime import datetime, timedelta

from airflow import __version__ as airflow_version
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from packaging.version import Version

# BigQuery Job Labels - converts generated string to dict
# If string is empty, assigns empty dict
Expand All @@ -33,16 +35,23 @@
"retry_delay": timedelta(minutes=5),
}

load_frequency = "@daily"

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": load_frequency}
else:
schedule_kwarg = {"schedule_interval": load_frequency}

# This DAG creates two table:
# 1-currency_conversion for storing the exchange rate and other columns.
# 2-currency_decimal to fix the decimal place of amounts
# for non-decimal-based currencies.
with DAG(dag_id="currency_conversion",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2022, 8, 11),
catchup=False,
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
12 changes: 10 additions & 2 deletions local_k9/financial_statement/financial_statement_initial_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from airflow import DAG
from datetime import datetime, timedelta

from airflow import __version__ as airflow_version
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from packaging.version import Version

# BigQuery Job Labels - converts generated string to dict
# If string is empty, assigns empty dict
Expand All @@ -30,13 +32,19 @@
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
load_frequency = None

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": load_frequency}
else:
schedule_kwarg = {"schedule_interval": load_frequency}

with DAG(dag_id="financial_statement_initial_load",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2023, 8, 30),
catchup=False,
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from airflow import DAG
from datetime import datetime, timedelta

from airflow import __version__ as airflow_version
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from packaging.version import Version

# BigQuery Job Labels - converts generated string to dict
# If string is empty, assigns empty dict
Expand All @@ -31,12 +33,19 @@
"retry_delay": timedelta(minutes=5),
}

load_frequency = "@monthly"

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": load_frequency}
else:
schedule_kwarg = {"schedule_interval": load_frequency}

with DAG(dag_id="financial_statement_periodical_load",
default_args=default_args,
schedule_interval="@monthly",
start_date=datetime(2023, 8, 30),
catchup=False,
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
12 changes: 10 additions & 2 deletions local_k9/fsv_hierarchy/financial_statement_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
from airflow import DAG
from datetime import datetime, timedelta

from airflow import __version__ as airflow_version
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from packaging.version import Version

# BigQuery Job Labels - converts generated string to dict
# If string is empty, assigns empty dict
Expand All @@ -31,17 +33,23 @@
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
load_frequency = "@monthly"

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": load_frequency}
else:
schedule_kwarg = {"schedule_interval": load_frequency}

# This DAG creates following two tables
# and deletes hierarchy from a specific node(if needed).
# 1-flattened fsv table.
# 2-glaccount and fsv node mapping.
with DAG(dag_id="financial_statement_version",
default_args=default_args,
schedule_interval="@monthly",
start_date=datetime(2023, 8, 4),
catchup=False,
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
12 changes: 10 additions & 2 deletions local_k9/inventory_snapshots/stock_monthly_snapshots_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
from airflow import DAG
from datetime import datetime, timedelta

from airflow import __version__ as airflow_version
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from packaging.version import Version

# BigQuery Job Labels - converts generated string to dict
# If string is empty, assigns empty dict
Expand All @@ -41,14 +43,20 @@
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
load_frequency = "@once"

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": load_frequency}
else:
schedule_kwarg = {"schedule_interval": load_frequency}

with DAG(dag_id="Stock_Monthly_Snapshots_Initial",
default_args=default_args,
description="Initial creation of monthly inventory snapshot.",
schedule_interval="@once",
start_date=datetime(2023, 2, 13),
catchup=False,
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from airflow import DAG
from datetime import datetime, timedelta

from airflow import __version__ as airflow_version
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from packaging.version import Version

# BigQuery Job Labels - converts generated string to dict
# If string is empty, assigns empty dict
Expand All @@ -42,15 +44,21 @@
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
load_frequency = "@monthly"

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": load_frequency}
else:
schedule_kwarg = {"schedule_interval": load_frequency}

with DAG(dag_id="Stock_Monthly_Snapshots_Periodical_Update",
template_searchpath=["/home/airflow/gcs/data/bq_data_replication/"],
default_args=default_args,
description="Update monthly inventory snapshot every month.",
schedule_interval="@monthly",
start_date=datetime(2023, 2, 13),
catchup=False,
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from airflow import DAG
from datetime import datetime, timedelta

from airflow import __version__ as airflow_version
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.empty import EmptyOperator
from packaging.version import Version

# BigQuery Job Labels - converts generated string to dict
# If string is empty, assigns empty dict
Expand All @@ -42,14 +44,20 @@
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
load_frequency = "@daily"

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": load_frequency}
else:
schedule_kwarg = {"schedule_interval": load_frequency}

with DAG(dag_id="Stock_Monthly_Snapshots_Daily_Update",
default_args=default_args,
description="Update monthly snapshot with new data everyday",
schedule_interval="@daily",
start_date=datetime(2023, 2, 13),
catchup=False,
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
Loading