Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 146 additions & 49 deletions __init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import logging
import asyncio
import shutil
import folder_paths
from aiohttp import web
from server import PromptServer
Expand All @@ -24,31 +25,73 @@

API_PREFIX = "35b631e00fa2dbc173ee4a5f899cba8f"

# Save the original function before wrapping
original_get_filename_list = folder_paths.get_filename_list

# Wrapper for folder_paths.get_filename_list
def get_filename_list_wrapper(folder_name):
"""Wrapper for folder_paths.get_filename_list to get list of files in a folder"""
try:
# print("get_filename_list wrapper called for folder:", folder_name)
result = original_get_filename_list(folder_name)
# Prepend folder path entry for download directory
mapped_folder = folder_paths.map_legacy(folder_name)
if mapped_folder in folder_paths.folder_names_and_paths:
paths, _ = folder_paths.folder_names_and_paths[mapped_folder]
if paths and any("/models/" in path for path in paths): # Check if paths list is not empty and contains /models/
folder_entry = "__folder__path__" + folder_name
if not result:
result = [folder_entry]
else:
result = [folder_entry] + result
return result
except Exception as e:
logging.error(f"[ComfyUI-Downloader] Error getting file list for {folder_name}: {e}")
return []

folder_paths.get_filename_list = get_filename_list_wrapper
DISALLOWED_DOWNLOAD_FOLDERS = {
"custom_nodes",
"configs",
"input",
"output",
"temp",
"user",
}

# ComfyUI-Manager-compatible aliases for model directories.
MODEL_DIR_ALIASES = {
"checkpoints": "checkpoints",
"checkpoint": "checkpoints",
"unclip": "checkpoints",
"text_encoders": "text_encoders",
"clip": "text_encoders",
"vae": "vae",
"lora": "loras",
"t2i-adapter": "controlnet",
"t2i-style": "controlnet",
"controlnet": "controlnet",
"clip_vision": "clip_vision",
"gligen": "gligen",
"upscale": "upscale_models",
"embedding": "embeddings",
"embeddings": "embeddings",
"unet": "diffusion_models",
"diffusion_model": "diffusion_models",
}


def _normalize_for_match(path):
"""Normalize separators for reliable path checks across platforms."""
return os.path.normpath(path).replace("/", os.sep).replace("\\", os.sep).lower()


def _looks_like_model_path(path):
normalized = _normalize_for_match(path)
marker = f"{os.sep}models{os.sep}"
return marker in normalized


def _is_downloadable_folder(folder_name, paths):
"""Allow only model-related folders and block unsafe targets."""
if folder_name in DISALLOWED_DOWNLOAD_FOLDERS:
return False
if not paths:
return False
if folder_name in MODEL_DIR_ALIASES.values():
return True
return any(_looks_like_model_path(path) for path in paths)


def _resolve_output_dir_for_save_path(save_path):
"""Resolve and validate output directory for a given save_path."""
mapped_folder = folder_paths.map_legacy(save_path)
if mapped_folder not in folder_paths.folder_names_and_paths:
raise ValueError(f"Invalid save_path: {save_path} not found in folder_paths")

paths, _ = folder_paths.folder_names_and_paths[mapped_folder]
if not paths:
raise ValueError(f"No valid paths configured for {save_path}")

if not _is_downloadable_folder(mapped_folder, paths):
raise ValueError(f"Invalid save_path for download: {save_path}")

return os.path.abspath(paths[0]), mapped_folder


@PromptServer.instance.routes.post(f"/{API_PREFIX}/server_download/start")
Expand Down Expand Up @@ -86,36 +129,22 @@ async def start_download(request):
)

# Get the first path for this folder type from folder_paths
mapped_folder = folder_paths.map_legacy(save_path)
if mapped_folder not in folder_paths.folder_names_and_paths:
return web.json_response(
{"error": f"Invalid save_path: {save_path} not found in folder_paths"},
status=400
)

paths, _ = folder_paths.folder_names_and_paths[mapped_folder]
if not paths:
return web.json_response(
{"error": f"No valid paths configured for {save_path}"},
status=400
)

# Filter paths to only include those containing /models/
model_paths = [path for path in paths if "/models/" in path]
if not model_paths:
try:
output_dir, mapped_folder = _resolve_output_dir_for_save_path(save_path)
except ValueError as e:
return web.json_response(
{"error": f"No valid model paths (containing /models/) configured for {save_path}"},
{"error": str(e)},
status=400
)

# Use the first path from the configured paths
output_dir = model_paths[0]
output_path = os.path.join(output_dir, safe_filename)

# Final security check: ensure the resolved path is within the configured directory
output_path = os.path.abspath(output_path)
output_dir = os.path.abspath(output_dir)
if not output_path.startswith(output_dir + os.sep):
try:
common = os.path.commonpath([output_dir, output_path])
except ValueError:
common = ""
if common != output_dir:
return web.json_response(
{"error": "Security error: attempted directory escape"},
status=400
Expand Down Expand Up @@ -586,7 +615,7 @@ async def get_folder_names(request):
# Only return folders that have valid paths (non-empty paths list)
folder_names = []
for folder_name, (paths, _) in folder_paths.folder_names_and_paths.items():
if paths and any("/models/" in path for path in paths): # Check if paths list is not empty and contains /models/
if _is_downloadable_folder(folder_name, paths):
folder_names.append(folder_name)

return web.json_response({
Expand All @@ -601,6 +630,74 @@ async def get_folder_names(request):
)


@PromptServer.instance.routes.get(f"/{API_PREFIX}/available_files")
async def get_available_files(request):
"""Get available files grouped by downloadable folder name."""
try:
files_by_folder = {}

for folder_name, (paths, _) in folder_paths.folder_names_and_paths.items():
if not _is_downloadable_folder(folder_name, paths):
continue

try:
files = folder_paths.get_filename_list(folder_name)
except Exception as e:
logging.warning(f"[ComfyUI-Downloader] Failed to get file list for {folder_name}: {e}")
files = []

# Normalize slashes and keep only valid string entries.
normalized_files = [
f.replace("\\", "/")
for f in files
if isinstance(f, str) and not f.startswith("__folder__path__")
]
files_by_folder[folder_name] = normalized_files

return web.json_response({
"success": True,
"files": files_by_folder
})
except Exception as e:
logging.error(f"[ComfyUI-Downloader] Error getting available files: {e}")
return web.json_response(
{"error": str(e)},
status=500
)


@PromptServer.instance.routes.get(f"/{API_PREFIX}/disk_space")
async def get_disk_space(request):
"""Get disk usage details for models dir or a specific save_path target."""
try:
save_path = (request.query.get("save_path") or "").strip()

if save_path:
try:
target_path, mapped_folder = _resolve_output_dir_for_save_path(save_path)
except ValueError as e:
return web.json_response({"error": str(e)}, status=400)
else:
target_path = os.path.abspath(folder_paths.models_dir)
mapped_folder = None

usage = shutil.disk_usage(target_path)
return web.json_response({
"success": True,
"save_path": mapped_folder,
"path": target_path,
"total_bytes": usage.total,
"used_bytes": usage.used,
"free_bytes": usage.free
})
except Exception as e:
logging.error(f"[ComfyUI-Downloader] Error getting disk space: {e}")
return web.json_response(
{"error": str(e)},
status=500
)


# Add route to serve CSS
@PromptServer.instance.routes.get(f"/{API_PREFIX}/extensions/ComfyUI-Downloader/css/downloader.css")
async def get_css(request):
Expand Down
Loading