Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 104 additions & 6 deletions taskvine/src/bindings/python3/ndcctools/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
set_port_range,
get_c_constant,
)
from . import vine_cache as vc

import atexit
import errno
Expand All @@ -44,6 +45,7 @@
import sys
import tempfile
import time
import uuid


##
Expand Down Expand Up @@ -90,6 +92,9 @@ def __init__(self,
self._stats_hierarchy = None
self._task_table = {}
self._library_table = {} # A table of all libraries known to the manager
# Task result cache (opt-in via enable_tasks_cache())
self._tasks_cache = None # TasksCache instance, or None if disabled
self._cached_queue = [] # CachedTaskResult objects pending return from wait()
self._info_widget = None
self._using_ssl = False
if staging_path:
Expand Down Expand Up @@ -466,7 +471,7 @@ def set_draining_by_hostname(self, hostname, drain_mode=True):
#
# @param self Reference to the current manager object.
def empty(self):
return cvine.vine_empty(self._taskvine)
return cvine.vine_empty(self._taskvine) and len(self._cached_queue) < 1

##
# Determine whether the manager can support more tasks.
Expand Down Expand Up @@ -892,6 +897,70 @@ def set_keepalive_timeout(self, timeout):
def tune(self, name, value):
return cvine.vine_tune(self._taskvine, name, value)

##
# Enable task result caching.
#
# Tasks whose fingerprint matches a previously completed task are returned
# from cache without re-execution. Cache state persists across manager
# restarts via a JSON transaction log.
#
# @param cache_dir Directory to store cached output files (created if absent).
# @param log_file Path to the JSON transaction log.
def enable_tasks_cache(self, cache_dir="vine-cache-outputs", log_file="vine-cache.txlog"):
self._tasks_cache = vc.TasksCache(cache_dir=cache_dir, log_file=log_file)

# If caching is enabled, prepare the task hash, check for a cache hit, and queue a
# CachedTaskResult if one is found. Returns a cached_id string on a hit, else None.
def _submit_cache_check(self, task):
if not self._tasks_cache:
return None
# Pre-compute stable hash data that requires _fn_def (PythonTask only).
# Must run BEFORE submit_finalize() because submit_finalize() clears _fn_def.
self._tasks_cache.prepare(task)
# Check if any input is an output of a currently pending task (like vine_rewind).
# Uses _task_table directly — no separate inflight dict needed.
has_upstream = any(
input_file is output_file
for pending in self._task_table.values()
for (output_file, _oname, _flags) in getattr(pending, '_tracked_outputs', [])
for (input_file, _iname) in getattr(task, '_tracked_inputs', [])
)
if has_upstream:
return None
task_hash = self._tasks_cache.fingerprint(task)
cached_metadata = self._tasks_cache.lookup(task_hash)
if not cached_metadata:
return None
python_output_file = cached_metadata.get('python_output_file')
output_file_obj = None
if python_output_file and os.path.exists(python_output_file):
output_file_obj = self.declare_file(python_output_file, cache=False)
cached_id = str(uuid.uuid4())
self._cached_queue.append(
vc.CachedTaskResult(cached_id, task, cached_metadata, output_file_obj)
)
return cached_id

# Return and remove a matching CachedTaskResult from the cached queue, or None.
def _pop_cached_result(self, tag):
if not self._tasks_cache or not self._cached_queue:
return None
if tag is None:
return self._cached_queue.pop(0)
for i, cached in enumerate(self._cached_queue):
if cached.tag == tag:
return self._cached_queue.pop(i)
return None

# Cache the output of a successfully completed task, if caching is enabled.
def _store_task_in_cache(self, task):
if not self._tasks_cache:
return
if task.successful() and not self._tasks_cache.has_pending_inputs(task):
final_hash = self._tasks_cache.fingerprint(task)
python_output_path = self._tasks_cache.copy_output_to_cache(task, final_hash)
self._tasks_cache.record(final_hash, task, python_output_path)

##
# Submit a task to the manager.
#
Expand All @@ -901,13 +970,19 @@ def tune(self, name, value):
# @param task A task description created from @ref ndcctools.taskvine.task.Task.
def submit(self, task):
task.manager = self

cached_id = self._submit_cache_check(task)
if cached_id:
return cached_id

task.submit_finalize()
task._finalize_outputs()
task_id = cvine.vine_submit(self._taskvine, task._task)
if task_id == 0:
raise ValueError("invalid task description")
else:
self._task_table[task_id] = task
return task_id

self._task_table[task_id] = task
return task_id

##
# Submit a library to install on all connected workers
Expand Down Expand Up @@ -1130,13 +1205,25 @@ def wait_for_tag(self, tag, timeout="wait_forever"):

self._update_status_display()

# Drain cached queue before blocking on C runtime.
cached = self._pop_cached_result(tag)
if cached:
return cached

task_pointer = cvine.vine_wait_for_tag(self._taskvine, tag, timeout)
if task_pointer:
task_id = cvine.vine_task_get_id(task_pointer)
if self.empty():
# if last task in queue, update display
self._update_status_display(force=True)
task = self._task_table[cvine.vine_task_get_id(task_pointer)]
del self._task_table[cvine.vine_task_get_id(task_pointer)]
task = self._task_table[task_id]
del self._task_table[task_id]

# Recompute fingerprint now that all input files exist (like vine_rewind).
# _core_hash is stored on the task object so fingerprint() works even though
# submit_finalize() already cleared _fn_def.
self._store_task_in_cache(task)

return task
return None

Expand Down Expand Up @@ -1565,13 +1652,24 @@ def undeclare_function(self, fn):
except KeyError:
pass

# When caching is enabled, declare a real file in the cache dir instead of a
# VINE_TEMP, so the output can be copied to the cache on task completion.
def _declare_temp_for_cache(self):
if not self._tasks_cache:
return None
filepath = os.path.join(self._tasks_cache._cache_dir, f"{uuid.uuid4()}.output")
return self.declare_file(filepath, cache=False, unlink_when_done=True)

##
# Declare an anonymous file has no initial content, but is created as the
# output of a task, and may be consumed by other tasks.
#
# @param self The manager to register this file
# @return A file object to use in @ref ndcctools.taskvine.task.Task.add_input or @ref ndcctools.taskvine.task.Task.add_output
def declare_temp(self):
cached_file = self._declare_temp_for_cache()
if cached_file:
return cached_file
f = cvine.vine_declare_temp(self._taskvine)
return File(f)

Expand Down
31 changes: 28 additions & 3 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ def __init__(self, command, **task_info):
if not self._task:
raise Exception("Unable to create internal Task structure")

# Cache support: track inputs/outputs for fingerprinting and deferred registration
self._tracked_inputs = [] # list of (file_obj, remote_name)
self._tracked_outputs = [] # list of (file_obj, remote_name, flags)
self._outputs_finalized = False

attributes = [
"library_required",
"library_provided",
Expand Down Expand Up @@ -93,7 +98,6 @@ def __init__(self, command, **task_info):
parameters = {"remote_name": value}
else:
raise TypeError(f"{value} is not a str or dict")
print(f"self.add_output({key}, {parameters})")
self.add_output(key, **parameters)

if "env" in task_info:
Expand Down Expand Up @@ -355,6 +359,8 @@ def add_input(self, file, remote_name, strict_input=False, mount_symlink=False):
if not isinstance(remote_name, str):
raise TypeError(f"remote_name {remote_name} is not a str")

self._tracked_inputs.append((file, remote_name)) # track for cache fingerprinting

flags = Task._determine_mount_flags(strict_input=strict_input, mount_symlink=mount_symlink)

if cvine.vine_task_add_input(self._task, file._file, remote_name, flags) == 0:
Expand All @@ -381,8 +387,23 @@ def add_output(self, file, remote_name, watch=False, failure_only=None, success_
raise TypeError(f"remote_name {remote_name} is not a str")

flags = Task._determine_mount_flags(watch, failure_only, success_only)
if cvine.vine_task_add_output(self._task, file._file, remote_name, flags) == 0:
raise ValueError("invalid file description")
# Defer C registration to _finalize_outputs(), called by Manager.submit().
# This prevents PENDING state from being set for tasks that end up as cache hits.
self._tracked_outputs.append((file, remote_name, flags))

##
# Register all deferred outputs with the C task structure.
#
# Called by Manager.submit() immediately before cvine.vine_submit() for every
# task that will actually execute (i.e. not a cache hit). This is a no-op if
# called more than once.
def _finalize_outputs(self):
if self._outputs_finalized:
return
for (file, remote_name, flags) in self._tracked_outputs:
if cvine.vine_task_add_output(self._task, file._file, remote_name, flags) == 0:
raise ValueError(f"invalid output file description for {remote_name}")
self._outputs_finalized = True

##
# When monitoring, indicates a json-encoded file that instructs the monitor
Expand Down Expand Up @@ -882,6 +903,10 @@ def __init__(self, func, *args, **kwargs):
self._tmp_output_enabled = False
self._cache_output = False

# Cache support: stable core hash computed by TasksCache.prepare() before
# submit_finalize() clears _fn_def. None until prepare() is called.
self._core_hash = None

# vine File object that will contain the output of this function
self._output_file = None

Expand Down
Loading
Loading