diff --git a/pyproject.toml b/pyproject.toml index f4ff622..5e01e04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "Node-Pod-Orchestration" -version = "0.5.0" +version = "0.5.1" description = "" authors = ["Alexander Röhl ", "David Hieber "] license = "Apache 2.0" diff --git a/src/k8s/kubernetes.py b/src/k8s/kubernetes.py index 5aec805..259185a 100644 --- a/src/k8s/kubernetes.py +++ b/src/k8s/kubernetes.py @@ -101,6 +101,10 @@ def create_analysis_deployment(name: str, if env is not None else []) containers.append(container) + net_stats_container = _build_net_stats_container(name) + if net_stats_container is not None: + containers.append(net_stats_container) + labels = {'app': name, 'component': "flame-analysis"} depl_metadata = client.V1ObjectMeta(name=name, namespace=namespace, labels=labels) depl_pod_metadata = client.V1ObjectMeta(labels=labels) @@ -241,6 +245,42 @@ def get_pod_status(deployment_name: str, namespace: str = 'default') -> Optional return None +def _build_net_stats_container(analysis_name: str) -> Optional[client.V1Container]: + """Build the net-stats sidecar container spec, or return None if disabled. + + Controlled by the ``NET_STATS_ENABLED`` env var. Image is read from + ``NET_STATS_IMAGE``. Emits a single cumulative log on SIGTERM. + """ + if os.getenv('NET_STATS_ENABLED', '').lower() not in ('1', 'true'): + return None + + _NET_STATS_SCRIPT = """\ + iface=$(grep -v -e lo -e 'Inter' -e 'face' /proc/net/dev | awk -F: '{print $1}' | tr -d ' ' | head -1) + line=$(grep "${iface}:" /proc/net/dev | tr -s ' ') + start_rx=$(echo $line | cut -d' ' -f2) + start_tx=$(echo $line | cut -d' ' -f10) + + handle_term() { + line=$(grep "${iface}:" /proc/net/dev | tr -s ' ') + rx=$(echo $line | cut -d' ' -f2) + tx=$(echo $line | cut -d' ' -f10) + printf '{"level":"info","message":"network_stats","bytes_in":%d,"bytes_out":%d,"interface":"%s","event_name":"netstats.analysis.traffic"}\\n' $((rx - start_rx)) $((tx - start_tx)) "$iface" + sleep 5 + exit 0 + } + trap handle_term TERM INT + + while true; do sleep 3600 & wait $!; done + """ + + return client.V1Container( + name=f'net-stats-{analysis_name}', + image=os.getenv('NET_STATS_IMAGE', 'busybox:1.37'), + image_pull_policy='IfNotPresent', + command=['/bin/sh', '-c', _NET_STATS_SCRIPT], + ) + + def _create_analysis_nginx_deployment(analysis_name: str, analysis_service_name: str, analysis_env: Optional[dict[str, str]] = None, diff --git a/src/resources/log/entity.py b/src/resources/log/entity.py index 5ac3369..1de489a 100644 --- a/src/resources/log/entity.py +++ b/src/resources/log/entity.py @@ -76,7 +76,7 @@ def __init__(self, elif error_type == "slow": log = (f"[flame -- POAPI: ANALYSISSTARTUPERROR -- " f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] " - f"Error: The analysis took to long during startup and was restarted " + f"Error: The analysis took too long during startup and was restarted " f"[restart {restart_num} of {_MAX_RESTARTS}].{term_msg}") elif error_type == "k8s": log = (f"[flame -- POAPI: ANALYSISSTARTUPERROR -- " diff --git a/src/resources/utils.py b/src/resources/utils.py index 06723ab..c5c2c4a 100644 --- a/src/resources/utils.py +++ b/src/resources/utils.py @@ -271,9 +271,21 @@ def unstuck_analysis_deployments(analysis_id: str, database: Database) -> None: """ if database.get_latest_deployment(analysis_id) is not None: stop_analysis(analysis_id, database) - time.sleep(10) # wait for k8s to update status - create_analysis(analysis_id, database) - database.delete_old_deployments_from_db(analysis_id) + success = False + for i in range(_MAX_UNSTUCK_REATTEMPTS): + try: + time.sleep(10) # wait for k8s to update status + create_analysis(analysis_id, database) + database.delete_old_deployments_from_db(analysis_id) + success = True + break + except Exception as e: + logger.warning(f"Failed to stop analysis {analysis_id} ({repr(e)}) " + f"-> Reattempting unstuck ({i + 1} of {_MAX_UNSTUCK_REATTEMPTS})") + if not success: + logger.error(f"Failed to unstuck analysis {analysis_id} after max reattempts.") + database.update_deployment_status(deployment.deployment_name, AnalysisStatus.FAILED.value) + stop_analysis(analysis_id, database) def cleanup(cleanup_type: str,