Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Added

- Add git-sync support for multiple repositories ([#729]).
- Add support for airflow 3.1.6 ([#742]).
- Add operator versioning ([#725]).
- GitSync considered for v1alpha1 and v1alpha2
Expand All @@ -21,6 +22,7 @@
- Prevent unnecessary Pod restarts when initially creating an AirflowCluster.
This is achieved by applying the StatefulSet after all ConfigMaps and Secrets that it mounts ([#734]).

[#729]: https://github.com/stackabletech/airflow-operator/pull/729
[#725]: https://github.com/stackabletech/airflow-operator/pull/725
[#726]: https://github.com/stackabletech/airflow-operator/pull/726
[#727]: https://github.com/stackabletech/airflow-operator/pull/727
Expand Down
8 changes: 7 additions & 1 deletion docs/modules/airflow/pages/usage-guide/mounting-dags.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ For multiple DAGs, it is easier to expose them via `gitsync`, as shown below.

== Via `git-sync`

{git-sync}[git-sync] is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes.
{git-sync}[git-sync] is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes. {git-sync}[git-sync] folders will be provided at `/stackable/app/allDAGs/current-i`, with i in {0,1,..,n-1}.
The Stackable Airflow images already ship with git-sync included, and the operator takes care of calling the tool and mounting volumes, so that only the repository and synchronization details are required:

.git-sync usage example: https
Expand Down Expand Up @@ -82,3 +82,9 @@ include::example$example-airflow-gitsync-ssh.yaml[]

NOTE: git-sync can be used with DAGs that make use of Python modules, as Python is configured to use the git-sync target folder as the "root" location when looking for referenced files.
See the xref:usage-guide/applying-custom-resources.adoc[] example for more details.

=== Multiple repositories via git-sync

If you want to access multiple branches of a repository or load dags from multiple repositories, you can extend the list shown above. Those repositories get consolidated beneath `stackable/app/allDAGs/current-i`, where `i` is the position in the list ranging from (0,1,..,n-1).

NOTE: Using DAG's from ConfigMaps will need you to either use celeryExecutors and mount them under `/stackable/app/allDAGs/<name>` or use kubernetesExecutor and mount them somewhere else and change the PYTHONPATH as well as the `AIRFLOW\__CORE__DAGS_FOLDER` accordingly.
54 changes: 49 additions & 5 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ use crate::{
config::{self, PYTHON_IMPORTS},
controller_commons::{self, CONFIG_VOLUME_NAME, LOG_CONFIG_VOLUME_NAME, LOG_VOLUME_NAME},
crd::{
self, AIRFLOW_CONFIG_FILENAME, APP_NAME, AirflowClusterStatus, AirflowConfig,
AirflowConfigOptions, AirflowExecutor, AirflowRole, CONFIG_PATH, Container, ExecutorConfig,
ExecutorConfigFragment, HTTP_PORT, HTTP_PORT_NAME, LISTENER_VOLUME_DIR,
self, AIRFLOW_CONFIG_FILENAME, AIRFLOW_DAGS_FOLDER, APP_NAME, AirflowClusterStatus,
AirflowConfig, AirflowConfigOptions, AirflowExecutor, AirflowRole, CONFIG_PATH, Container,
ExecutorConfig, ExecutorConfigFragment, HTTP_PORT, HTTP_PORT_NAME, LISTENER_VOLUME_DIR,
LISTENER_VOLUME_NAME, LOG_CONFIG_DIR, METRICS_PORT, METRICS_PORT_NAME, OPERATOR_NAME,
STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME, TEMPLATE_VOLUME_NAME,
authentication::{
Expand Down Expand Up @@ -113,6 +113,7 @@ use crate::{
},
};

pub const AIRFLOW_DAGS_INIT_VOLUME: &str = "all-dags-volume";
pub const AIRFLOW_CONTROLLER_NAME: &str = "airflowcluster";
pub const DOCKER_IMAGE_BASE_NAME: &str = "airflow";
pub const AIRFLOW_FULL_CONTROLLER_NAME: &str =
Expand Down Expand Up @@ -1019,7 +1020,6 @@ fn build_server_rolegroup_statefulset(
executor,
authentication_config,
authorization_config,
git_sync_resources,
resolved_product_image,
)
.context(BuildStatefulsetEnvVarsSnafu)?,
Expand Down Expand Up @@ -1299,7 +1299,6 @@ fn build_executor_template_config_map(
airflow,
env_overrides,
merged_executor_config,
git_sync_resources,
resolved_product_image,
))
.add_volume_mounts(airflow.volume_mounts())
Expand All @@ -1319,6 +1318,51 @@ fn build_executor_template_config_map(
true,
)?;

let mut dags_init_container =
ContainerBuilder::new("dags-init").context(InvalidContainerNameSnafu)?;
let mut dags_args = Vec::<String>::new();
let mut cp_commands = Vec::<String>::new();
for (i, _) in airflow.spec.cluster_config.dags_git_sync.iter().enumerate() {
cp_commands.push(
format!("cp -r /stackable/app/git-{i}/current/ {AIRFLOW_DAGS_FOLDER}/current-{i}")
.to_string(),
);
}
dags_args.push(cp_commands.join(" && "));
dags_init_container
.image_from_product_image(resolved_product_image)
.args(dags_args)
.add_env_vars(build_airflow_template_envs(
airflow,
env_overrides,
merged_executor_config,
resolved_product_image,
))
.command(vec![
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
])
.add_volume_mounts(git_sync_resources.git_content_volume_mounts.clone())
.context(AddVolumeMountSnafu)?
.add_volume_mount(
AIRFLOW_DAGS_INIT_VOLUME.to_owned(),
AIRFLOW_DAGS_FOLDER.to_owned(),
)
.context(AddVolumeMountSnafu)?;

airflow_container
.add_volume_mount(
AIRFLOW_DAGS_INIT_VOLUME.to_owned(),
AIRFLOW_DAGS_FOLDER.to_owned(),
)
.context(AddVolumeMountSnafu)?;

pb.add_init_container(dags_init_container.build());
pb.add_volume(VolumeBuilder::new(AIRFLOW_DAGS_INIT_VOLUME.to_owned()).build())
.context(AddVolumeSnafu)?;
pb.add_container(airflow_container.build());
pb.add_volumes(airflow.volumes().clone())
.context(AddVolumeSnafu)?;
Expand Down
28 changes: 28 additions & 0 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub const STACKABLE_LOG_DIR: &str = "/stackable/log";
pub const LOG_CONFIG_DIR: &str = "/stackable/app/log_config";
pub const AIRFLOW_HOME: &str = "/stackable/airflow";
pub const AIRFLOW_CONFIG_FILENAME: &str = "webserver_config.py";
pub const AIRFLOW_DAGS_FOLDER: &str = "/stackable/app/allDAGs";

pub const TEMPLATE_VOLUME_NAME: &str = "airflow-executor-pod-template";
pub const TEMPLATE_LOCATION: &str = "/templates";
Expand Down Expand Up @@ -439,6 +440,23 @@ impl v1alpha2::AirflowCluster {
fragment::validate(conf_rolegroup).context(FragmentValidationFailureSnafu)
}

pub fn create_gitsync_links(&self) -> Vec<String> {
let mut symlinks = Vec::<String>::new();
for (i, _) in self.spec.cluster_config.dags_git_sync.iter().enumerate() {
symlinks.push(format!("{AIRFLOW_DAGS_FOLDER}/current-{i}").to_string())
}
symlinks
}

pub fn create_python_path_links(&self) -> Vec<String> {
let mut python_path = Vec::<String>::new();
for (i, git_sync) in self.spec.cluster_config.dags_git_sync.iter().enumerate() {
let folder = &git_sync.git_folder.display();
python_path.push(format!("{AIRFLOW_DAGS_FOLDER}/current-{i}/{folder}").to_string())
}
python_path
}

/// Retrieve and merge resource configs for the executor template
pub fn merged_executor_config(
&self,
Expand Down Expand Up @@ -600,11 +618,20 @@ impl AirflowRole {
format!(
"cp -RL {CONFIG_PATH}/{AIRFLOW_CONFIG_FILENAME} {AIRFLOW_HOME}/{AIRFLOW_CONFIG_FILENAME}"
),
// Adding cm as dags within the same AIRFLOW_DAGS_FOLDER leads to problems, thus checking if exists
format!("mkdir -p {AIRFLOW_DAGS_FOLDER}"),
// graceful shutdown part
COMMON_BASH_TRAP_FUNCTIONS.to_string(),
remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
];

let symlinks = airflow.create_gitsync_links();

for (i, _) in airflow.spec.cluster_config.dags_git_sync.iter().enumerate() {
command
.push(format!("ln -s /stackable/app/git-{i}/current {:?}", symlinks[i]).to_string())
}

if resolved_product_image.product_version.starts_with("3.") {
// Start-up commands have changed in 3.x.
// See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and
Expand Down Expand Up @@ -663,6 +690,7 @@ impl AirflowRole {
container_debug_command(),
"airflow triggerer &".to_string(),
]),
// This essentially doesn't work for KubernetesExecutor
AirflowRole::Worker => command.extend(vec![
"prepare_signal_handlers".to_string(),
container_debug_command(),
Expand Down
51 changes: 15 additions & 36 deletions rust/operator-binary/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ use std::{
use product_config::types::PropertyNameKind;
use snafu::Snafu;
use stackable_operator::{
commons::product_image_selection::ResolvedProductImage,
crd::{authentication::oidc, git_sync},
k8s_openapi::api::core::v1::EnvVar,
kube::ResourceExt,
commons::product_image_selection::ResolvedProductImage, crd::authentication::oidc,
k8s_openapi::api::core::v1::EnvVar, kube::ResourceExt,
product_logging::framework::create_vector_shutdown_file_command,
};

use crate::{
crd::{
AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, STACKABLE_LOG_DIR,
TEMPLATE_LOCATION, TEMPLATE_NAME,
AIRFLOW_DAGS_FOLDER, AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR,
STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME,
authentication::{
AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved,
},
Expand Down Expand Up @@ -80,14 +78,13 @@ pub fn build_airflow_statefulset_envs(
executor: &AirflowExecutor,
auth_config: &AirflowClientAuthenticationDetailsResolved,
authorization_config: &AirflowAuthorizationResolved,
git_sync_resources: &git_sync::v1alpha2::GitSyncResources,
resolved_product_image: &ResolvedProductImage,
) -> Result<Vec<EnvVar>, Error> {
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();
let secret = airflow.spec.cluster_config.credentials_secret.as_str();
let internal_secret_name = airflow.shared_internal_secret_secret_name();

env.extend(static_envs(git_sync_resources));
env.extend(static_envs(airflow));

// environment variables
let env_vars = rolegroup_config.get(&PropertyNameKind::Env);
Expand Down Expand Up @@ -154,12 +151,11 @@ pub fn build_airflow_statefulset_envs(
);
}

let dags_folder = get_dags_folder(git_sync_resources);
env.insert(
AIRFLOW_CORE_DAGS_FOLDER.into(),
EnvVar {
name: AIRFLOW_CORE_DAGS_FOLDER.into(),
value: Some(dags_folder),
value: Some(AIRFLOW_DAGS_FOLDER.to_owned()),
..Default::default()
},
);
Expand Down Expand Up @@ -288,42 +284,27 @@ pub fn build_airflow_statefulset_envs(
Ok(transform_map_to_vec(env))
}

pub fn get_dags_folder(git_sync_resources: &git_sync::v1alpha2::GitSyncResources) -> String {
let git_sync_count = git_sync_resources.git_content_folders.len();
if git_sync_count > 1 {
tracing::warn!(
"There are {git_sync_count} git-sync entries: Only the first one will be considered.",
);
}
fn construct_python_path(airflow: &v1alpha2::AirflowCluster) -> String {
let mut python_path = format!("{LOG_CONFIG_DIR}:");
let symlinks = airflow.create_python_path_links();
python_path.push_str(symlinks.join(":").as_str());

// If DAG provisioning via git-sync is not configured, set a default value
// so that PYTHONPATH can refer to it. N.B. nested variables need to be
// resolved, so that /stackable/airflow is used instead of $AIRFLOW_HOME.
// see https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder
git_sync_resources
.git_content_folders_as_string()
.first()
.cloned()
.unwrap_or("/stackable/airflow/dags".to_string())
python_path
}

// This set of environment variables is a standard set that is not dependent on any
// conditional logic and should be applied to the statefulset or the executor template config map.
fn static_envs(
git_sync_resources: &git_sync::v1alpha2::GitSyncResources,
) -> BTreeMap<String, EnvVar> {
fn static_envs(airflow: &v1alpha2::AirflowCluster) -> BTreeMap<String, EnvVar> {
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();

let dags_folder = get_dags_folder(git_sync_resources);

env.insert(
PYTHONPATH.into(),
EnvVar {
// PYTHONPATH must be extended to include the dags folder so that dag
// dependencies can be found: this must be the actual path and not a variable.
// Also include the airflow site-packages by default (for airflow and kubernetes classes etc.)
name: PYTHONPATH.into(),
value: Some(format!("{LOG_CONFIG_DIR}:{dags_folder}")),
value: Some(construct_python_path(airflow)),
..Default::default()
},
);
Expand Down Expand Up @@ -372,7 +353,6 @@ pub fn build_airflow_template_envs(
airflow: &v1alpha2::AirflowCluster,
env_overrides: &HashMap<String, String>,
config: &ExecutorConfig,
git_sync_resources: &git_sync::v1alpha2::GitSyncResources,
resolved_product_image: &ResolvedProductImage,
) -> Vec<EnvVar> {
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();
Expand Down Expand Up @@ -407,17 +387,16 @@ pub fn build_airflow_template_envs(

// the config map also requires the dag-folder location as this will be passed on
// to the pods started by airflow.
let dags_folder = get_dags_folder(git_sync_resources);
env.insert(
AIRFLOW_CORE_DAGS_FOLDER.into(),
EnvVar {
name: AIRFLOW_CORE_DAGS_FOLDER.into(),
value: Some(dags_folder),
value: Some(AIRFLOW_DAGS_FOLDER.to_owned()),
..Default::default()
},
);

env.extend(static_envs(git_sync_resources));
env.extend(static_envs(airflow));

add_version_specific_env_vars(
airflow,
Expand Down