diff --git a/CHANGELOG.md b/CHANGELOG.md index b9fa1b14..9762d8c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Added +- Add git-sync support for multiple repositories ([#729]). - Add support for airflow 3.1.6 ([#742]). - Add operator versioning ([#725]). - GitSync considered for v1alpha1 and v1alpha2 @@ -21,6 +22,7 @@ - Prevent unnecessary Pod restarts when initially creating an AirflowCluster. This is achieved by applying the StatefulSet after all ConfigMaps and Secrets that it mounts ([#734]). +[#729]: https://github.com/stackabletech/airflow-operator/pull/729 [#725]: https://github.com/stackabletech/airflow-operator/pull/725 [#726]: https://github.com/stackabletech/airflow-operator/pull/726 [#727]: https://github.com/stackabletech/airflow-operator/pull/727 diff --git a/docs/modules/airflow/pages/usage-guide/mounting-dags.adoc b/docs/modules/airflow/pages/usage-guide/mounting-dags.adoc index cd03fc3b..58270633 100644 --- a/docs/modules/airflow/pages/usage-guide/mounting-dags.adoc +++ b/docs/modules/airflow/pages/usage-guide/mounting-dags.adoc @@ -42,7 +42,7 @@ For multiple DAGs, it is easier to expose them via `gitsync`, as shown below. == Via `git-sync` -{git-sync}[git-sync] is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes. +{git-sync}[git-sync] is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes. {git-sync}[git-sync] folders will be provided at `/stackable/app/allDAGs/current-i`, with i in {0,1,..,n-1}. The Stackable Airflow images already ship with git-sync included, and the operator takes care of calling the tool and mounting volumes, so that only the repository and synchronization details are required: .git-sync usage example: https @@ -82,3 +82,9 @@ include::example$example-airflow-gitsync-ssh.yaml[] NOTE: git-sync can be used with DAGs that make use of Python modules, as Python is configured to use the git-sync target folder as the "root" location when looking for referenced files. See the xref:usage-guide/applying-custom-resources.adoc[] example for more details. + +=== Multiple repositories via git-sync + +If you want to access multiple branches of a repository or load dags from multiple repositories, you can extend the list shown above. Those repositories get consolidated beneath `stackable/app/allDAGs/current-i`, where `i` is the position in the list ranging from (0,1,..,n-1). + +NOTE: Using DAG's from ConfigMaps will need you to either use celeryExecutors and mount them under `/stackable/app/allDAGs/` or use kubernetesExecutor and mount them somewhere else and change the PYTHONPATH as well as the `AIRFLOW\__CORE__DAGS_FOLDER` accordingly. diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 4930c575..78f80057 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -83,9 +83,9 @@ use crate::{ config::{self, PYTHON_IMPORTS}, controller_commons::{self, CONFIG_VOLUME_NAME, LOG_CONFIG_VOLUME_NAME, LOG_VOLUME_NAME}, crd::{ - self, AIRFLOW_CONFIG_FILENAME, APP_NAME, AirflowClusterStatus, AirflowConfig, - AirflowConfigOptions, AirflowExecutor, AirflowRole, CONFIG_PATH, Container, ExecutorConfig, - ExecutorConfigFragment, HTTP_PORT, HTTP_PORT_NAME, LISTENER_VOLUME_DIR, + self, AIRFLOW_CONFIG_FILENAME, AIRFLOW_DAGS_FOLDER, APP_NAME, AirflowClusterStatus, + AirflowConfig, AirflowConfigOptions, AirflowExecutor, AirflowRole, CONFIG_PATH, Container, + ExecutorConfig, ExecutorConfigFragment, HTTP_PORT, HTTP_PORT_NAME, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, LOG_CONFIG_DIR, METRICS_PORT, METRICS_PORT_NAME, OPERATOR_NAME, STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME, TEMPLATE_VOLUME_NAME, authentication::{ @@ -113,6 +113,7 @@ use crate::{ }, }; +pub const AIRFLOW_DAGS_INIT_VOLUME: &str = "all-dags-volume"; pub const AIRFLOW_CONTROLLER_NAME: &str = "airflowcluster"; pub const DOCKER_IMAGE_BASE_NAME: &str = "airflow"; pub const AIRFLOW_FULL_CONTROLLER_NAME: &str = @@ -1019,7 +1020,6 @@ fn build_server_rolegroup_statefulset( executor, authentication_config, authorization_config, - git_sync_resources, resolved_product_image, ) .context(BuildStatefulsetEnvVarsSnafu)?, @@ -1299,7 +1299,6 @@ fn build_executor_template_config_map( airflow, env_overrides, merged_executor_config, - git_sync_resources, resolved_product_image, )) .add_volume_mounts(airflow.volume_mounts()) @@ -1319,6 +1318,51 @@ fn build_executor_template_config_map( true, )?; + let mut dags_init_container = + ContainerBuilder::new("dags-init").context(InvalidContainerNameSnafu)?; + let mut dags_args = Vec::::new(); + let mut cp_commands = Vec::::new(); + for (i, _) in airflow.spec.cluster_config.dags_git_sync.iter().enumerate() { + cp_commands.push( + format!("cp -r /stackable/app/git-{i}/current/ {AIRFLOW_DAGS_FOLDER}/current-{i}") + .to_string(), + ); + } + dags_args.push(cp_commands.join(" && ")); + dags_init_container + .image_from_product_image(resolved_product_image) + .args(dags_args) + .add_env_vars(build_airflow_template_envs( + airflow, + env_overrides, + merged_executor_config, + resolved_product_image, + )) + .command(vec![ + "/bin/bash".to_string(), + "-x".to_string(), + "-euo".to_string(), + "pipefail".to_string(), + "-c".to_string(), + ]) + .add_volume_mounts(git_sync_resources.git_content_volume_mounts.clone()) + .context(AddVolumeMountSnafu)? + .add_volume_mount( + AIRFLOW_DAGS_INIT_VOLUME.to_owned(), + AIRFLOW_DAGS_FOLDER.to_owned(), + ) + .context(AddVolumeMountSnafu)?; + + airflow_container + .add_volume_mount( + AIRFLOW_DAGS_INIT_VOLUME.to_owned(), + AIRFLOW_DAGS_FOLDER.to_owned(), + ) + .context(AddVolumeMountSnafu)?; + + pb.add_init_container(dags_init_container.build()); + pb.add_volume(VolumeBuilder::new(AIRFLOW_DAGS_INIT_VOLUME.to_owned()).build()) + .context(AddVolumeSnafu)?; pb.add_container(airflow_container.build()); pb.add_volumes(airflow.volumes().clone()) .context(AddVolumeSnafu)?; diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 3d82d123..1f93850f 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -71,6 +71,7 @@ pub const STACKABLE_LOG_DIR: &str = "/stackable/log"; pub const LOG_CONFIG_DIR: &str = "/stackable/app/log_config"; pub const AIRFLOW_HOME: &str = "/stackable/airflow"; pub const AIRFLOW_CONFIG_FILENAME: &str = "webserver_config.py"; +pub const AIRFLOW_DAGS_FOLDER: &str = "/stackable/app/allDAGs"; pub const TEMPLATE_VOLUME_NAME: &str = "airflow-executor-pod-template"; pub const TEMPLATE_LOCATION: &str = "/templates"; @@ -439,6 +440,23 @@ impl v1alpha2::AirflowCluster { fragment::validate(conf_rolegroup).context(FragmentValidationFailureSnafu) } + pub fn create_gitsync_links(&self) -> Vec { + let mut symlinks = Vec::::new(); + for (i, _) in self.spec.cluster_config.dags_git_sync.iter().enumerate() { + symlinks.push(format!("{AIRFLOW_DAGS_FOLDER}/current-{i}").to_string()) + } + symlinks + } + + pub fn create_python_path_links(&self) -> Vec { + let mut python_path = Vec::::new(); + for (i, git_sync) in self.spec.cluster_config.dags_git_sync.iter().enumerate() { + let folder = &git_sync.git_folder.display(); + python_path.push(format!("{AIRFLOW_DAGS_FOLDER}/current-{i}/{folder}").to_string()) + } + python_path + } + /// Retrieve and merge resource configs for the executor template pub fn merged_executor_config( &self, @@ -600,11 +618,20 @@ impl AirflowRole { format!( "cp -RL {CONFIG_PATH}/{AIRFLOW_CONFIG_FILENAME} {AIRFLOW_HOME}/{AIRFLOW_CONFIG_FILENAME}" ), + // Adding cm as dags within the same AIRFLOW_DAGS_FOLDER leads to problems, thus checking if exists + format!("mkdir -p {AIRFLOW_DAGS_FOLDER}"), // graceful shutdown part COMMON_BASH_TRAP_FUNCTIONS.to_string(), remove_vector_shutdown_file_command(STACKABLE_LOG_DIR), ]; + let symlinks = airflow.create_gitsync_links(); + + for (i, _) in airflow.spec.cluster_config.dags_git_sync.iter().enumerate() { + command + .push(format!("ln -s /stackable/app/git-{i}/current {:?}", symlinks[i]).to_string()) + } + if resolved_product_image.product_version.starts_with("3.") { // Start-up commands have changed in 3.x. // See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and @@ -663,6 +690,7 @@ impl AirflowRole { container_debug_command(), "airflow triggerer &".to_string(), ]), + // This essentially doesn't work for KubernetesExecutor AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index d67c1c33..b6fca8ca 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -6,17 +6,15 @@ use std::{ use product_config::types::PropertyNameKind; use snafu::Snafu; use stackable_operator::{ - commons::product_image_selection::ResolvedProductImage, - crd::{authentication::oidc, git_sync}, - k8s_openapi::api::core::v1::EnvVar, - kube::ResourceExt, + commons::product_image_selection::ResolvedProductImage, crd::authentication::oidc, + k8s_openapi::api::core::v1::EnvVar, kube::ResourceExt, product_logging::framework::create_vector_shutdown_file_command, }; use crate::{ crd::{ - AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, STACKABLE_LOG_DIR, - TEMPLATE_LOCATION, TEMPLATE_NAME, + AIRFLOW_DAGS_FOLDER, AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, + STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME, authentication::{ AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, }, @@ -80,14 +78,13 @@ pub fn build_airflow_statefulset_envs( executor: &AirflowExecutor, auth_config: &AirflowClientAuthenticationDetailsResolved, authorization_config: &AirflowAuthorizationResolved, - git_sync_resources: &git_sync::v1alpha2::GitSyncResources, resolved_product_image: &ResolvedProductImage, ) -> Result, Error> { let mut env: BTreeMap = BTreeMap::new(); let secret = airflow.spec.cluster_config.credentials_secret.as_str(); let internal_secret_name = airflow.shared_internal_secret_secret_name(); - env.extend(static_envs(git_sync_resources)); + env.extend(static_envs(airflow)); // environment variables let env_vars = rolegroup_config.get(&PropertyNameKind::Env); @@ -154,12 +151,11 @@ pub fn build_airflow_statefulset_envs( ); } - let dags_folder = get_dags_folder(git_sync_resources); env.insert( AIRFLOW_CORE_DAGS_FOLDER.into(), EnvVar { name: AIRFLOW_CORE_DAGS_FOLDER.into(), - value: Some(dags_folder), + value: Some(AIRFLOW_DAGS_FOLDER.to_owned()), ..Default::default() }, ); @@ -288,34 +284,19 @@ pub fn build_airflow_statefulset_envs( Ok(transform_map_to_vec(env)) } -pub fn get_dags_folder(git_sync_resources: &git_sync::v1alpha2::GitSyncResources) -> String { - let git_sync_count = git_sync_resources.git_content_folders.len(); - if git_sync_count > 1 { - tracing::warn!( - "There are {git_sync_count} git-sync entries: Only the first one will be considered.", - ); - } +fn construct_python_path(airflow: &v1alpha2::AirflowCluster) -> String { + let mut python_path = format!("{LOG_CONFIG_DIR}:"); + let symlinks = airflow.create_python_path_links(); + python_path.push_str(symlinks.join(":").as_str()); - // If DAG provisioning via git-sync is not configured, set a default value - // so that PYTHONPATH can refer to it. N.B. nested variables need to be - // resolved, so that /stackable/airflow is used instead of $AIRFLOW_HOME. - // see https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder - git_sync_resources - .git_content_folders_as_string() - .first() - .cloned() - .unwrap_or("/stackable/airflow/dags".to_string()) + python_path } // This set of environment variables is a standard set that is not dependent on any // conditional logic and should be applied to the statefulset or the executor template config map. -fn static_envs( - git_sync_resources: &git_sync::v1alpha2::GitSyncResources, -) -> BTreeMap { +fn static_envs(airflow: &v1alpha2::AirflowCluster) -> BTreeMap { let mut env: BTreeMap = BTreeMap::new(); - let dags_folder = get_dags_folder(git_sync_resources); - env.insert( PYTHONPATH.into(), EnvVar { @@ -323,7 +304,7 @@ fn static_envs( // dependencies can be found: this must be the actual path and not a variable. // Also include the airflow site-packages by default (for airflow and kubernetes classes etc.) name: PYTHONPATH.into(), - value: Some(format!("{LOG_CONFIG_DIR}:{dags_folder}")), + value: Some(construct_python_path(airflow)), ..Default::default() }, ); @@ -372,7 +353,6 @@ pub fn build_airflow_template_envs( airflow: &v1alpha2::AirflowCluster, env_overrides: &HashMap, config: &ExecutorConfig, - git_sync_resources: &git_sync::v1alpha2::GitSyncResources, resolved_product_image: &ResolvedProductImage, ) -> Vec { let mut env: BTreeMap = BTreeMap::new(); @@ -407,17 +387,16 @@ pub fn build_airflow_template_envs( // the config map also requires the dag-folder location as this will be passed on // to the pods started by airflow. - let dags_folder = get_dags_folder(git_sync_resources); env.insert( AIRFLOW_CORE_DAGS_FOLDER.into(), EnvVar { name: AIRFLOW_CORE_DAGS_FOLDER.into(), - value: Some(dags_folder), + value: Some(AIRFLOW_DAGS_FOLDER.to_owned()), ..Default::default() }, ); - env.extend(static_envs(git_sync_resources)); + env.extend(static_envs(airflow)); add_version_specific_env_vars( airflow,