Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 128 additions & 77 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -3127,8 +3127,9 @@ static vine_result_code_t commit_task_group_to_worker(struct vine_manager *q, st
return result;
}

/* 1 if task resubmitted, 0 otherwise */
static int resubmit_task_on_exhaustion(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
/* Returns true if task should be resubmitted due to resource exhaustion. */

static int should_resubmit_task_on_exhaustion(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
if (t->result != VINE_RESULT_RESOURCE_EXHAUSTION) {
return 0;
Expand Down Expand Up @@ -3158,15 +3159,15 @@ static int resubmit_task_on_exhaustion(struct vine_manager *q, struct vine_worke
} else {
debug(D_VINE, "Task %d resubmitted using new resource allocation.\n", t->task_id);
t->resource_request = next;
change_task_state(q, t, VINE_TASK_READY);
return 1;
}

return 0;
}

/* 1 if task resubmitted, 0 otherwise */
static int resubmit_task_on_sandbox_exhaustion(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
/* Returns true if task should be resubmitted due to sandbox disk exhaustion. */

static int should_resubmit_task_on_sandbox_exhaustion(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
if (t->result != VINE_RESULT_SANDBOX_EXHAUSTION) {
return 0;
Expand All @@ -3191,12 +3192,12 @@ static int resubmit_task_on_sandbox_exhaustion(struct vine_manager *q, struct vi
return 0;
}

change_task_state(q, t, VINE_TASK_READY);

return 1;
}

static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
/* Returns true if this completed task should be resubmitted into the READY state. */

static int should_resubmit_task(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
/* in this function, any change_task_state should only be to VINE_TASK_READY */
if (t->result == VINE_RESULT_FORSAKEN) {
Expand All @@ -3206,7 +3207,6 @@ static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w

/* forsaken tasks get a retry back as they are victims of circumstance */
t->try_count -= 1;
change_task_state(q, t, VINE_TASK_READY);
return 1;
}

Expand All @@ -3219,10 +3219,10 @@ static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w
* have not reached max_retries. */
switch (t->result) {
case VINE_RESULT_RESOURCE_EXHAUSTION:
return resubmit_task_on_exhaustion(q, w, t);
return should_resubmit_task_on_exhaustion(q, w, t);
break;
case VINE_RESULT_SANDBOX_EXHAUSTION:
return resubmit_task_on_sandbox_exhaustion(q, w, t);
return should_resubmit_task_on_sandbox_exhaustion(q, w, t);
break;
default:
/* by default tasks are not resumitted */
Expand Down Expand Up @@ -3292,17 +3292,13 @@ static void reap_task_from_worker(struct vine_manager *q, struct vine_worker_inf
switch (t->type) {
case VINE_TASK_TYPE_STANDARD:
case VINE_TASK_TYPE_RECOVERY:
if (new_state != VINE_TASK_RETRIEVED || !resubmit_if_needed(q, w, t)) {
if (new_state == VINE_TASK_RETRIEVED && should_resubmit_task(q, w, t)) {
change_task_state(q, t, VINE_TASK_READY);
} else {
change_task_state(q, t, new_state);
}
break;
case VINE_TASK_TYPE_LIBRARY_INSTANCE:
change_task_state(q, t, VINE_TASK_RETRIEVED);
break;
return;

case VINE_TASK_TYPE_LIBRARY_TEMPLATE:
/* A library template should not be scheduled... */
case VINE_TASK_TYPE_LIBRARY:
change_task_state(q, t, VINE_TASK_RETRIEVED);
break;
return;
Expand Down Expand Up @@ -4389,11 +4385,21 @@ static void delete_task_at_exit(struct vine_task *t)
return;
}

/* Each task in q->tasks has one reference that was added by the vine_manager. */
vine_task_delete(t);

if (t->type == VINE_TASK_TYPE_LIBRARY_INSTANCE) {
/* manager created this task, so it is not the API caller's reponsibility. */
vine_task_delete(t);
switch(t->type) {
case VINE_TASK_TYPE_STANDARD:
/* The user created, and the user must delete. */
break;
case VINE_TASK_TYPE_RECOVERY:
/* The manager dropped the primary reference at create time. */
/* This task will go away when all referring files are deleted. */
break;
case VINE_TASK_TYPE_LIBRARY:
/* The manager created the task, and so the manager must delete it. */
vine_task_delete(t);
break;
}
}

Expand Down Expand Up @@ -4698,7 +4704,7 @@ static vine_task_state_t change_task_state(struct vine_manager *q, struct vine_t
break;
case VINE_TASK_RETRIEVED:
/* Library task can be set to RETRIEVED when it failed or was removed intentionally */
if (t->type == VINE_TASK_TYPE_LIBRARY_INSTANCE) {
if (t->type == VINE_TASK_TYPE_LIBRARY) {
vine_task_set_result(t, VINE_RESULT_LIBRARY_EXIT);
}
list_push_head(q->retrieved_list, t);
Expand All @@ -4709,8 +4715,6 @@ static vine_task_state_t change_task_state(struct vine_manager *q, struct vine_t
q->fixed_location_in_queue--;
}
vine_taskgraph_log_write_task(q, t);
itable_remove(q->tasks, t->task_id);
vine_task_delete(t);
break;
}

Expand Down Expand Up @@ -4901,7 +4905,6 @@ struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_wor

/* Duplicate the original task */
struct vine_task *t = vine_task_copy(original);
t->type = VINE_TASK_TYPE_LIBRARY_INSTANCE;

/* Give it a unique taskid if library fits the worker. */
t->task_id = q->next_task_id++;
Expand Down Expand Up @@ -4938,7 +4941,7 @@ struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_wor

void vine_manager_install_library(struct vine_manager *q, struct vine_task *t, const char *name)
{
t->type = VINE_TASK_TYPE_LIBRARY_TEMPLATE;
t->type = VINE_TASK_TYPE_LIBRARY;
t->library_failed_count = 0;
t->task_id = -1;
vine_task_set_library_provided(t, name);
Expand Down Expand Up @@ -5133,73 +5136,122 @@ static int connect_new_workers(struct vine_manager *q, int stoptime, int max_new
return new_workers;
}

struct vine_task *find_task_to_return(struct vine_manager *q, const char *tag, int task_id)
/*
Select a completed (or nonstandard) task in the retrieved
list that should either be returned to the user, or consumed
silently by the sytem. If either the tag or task_id parameters
are given, then select a task that matches those criteria.
*/

static struct vine_task *find_complete_or_nonstandard_task(struct vine_manager *q, const char *tag, int task_id)
{
while (1) {
struct vine_task *t = NULL;

if (tag) {
struct vine_task *temp = NULL;
int tasks_to_consider = list_size(q->retrieved_list);
while (tasks_to_consider > 0) {
tasks_to_consider--;
temp = list_peek_head(q->retrieved_list);
// a small hack, if task is not standard we accepted it so it can be deleted below.
if (temp->type != VINE_TASK_TYPE_STANDARD || task_tag_comparator(temp, tag)) {
// temp points to head of list
t = list_pop_head(q->retrieved_list);
break;
} else {
list_rotate(q->retrieved_list);
}
}
} else if (task_id >= 0) {
// XXX: library tasks are never removed!
struct vine_task *temp = itable_lookup(q->tasks, task_id);
if (!temp || temp->state != VINE_TASK_RETRIEVED) {
break;
struct vine_task *t = NULL;

if (tag) {
int tasks_to_consider = list_size(q->retrieved_list);
while (tasks_to_consider > 0) {
tasks_to_consider--;
t = list_peek_head(q->retrieved_list);
/* If the tag matches OR it is non standard, return it. */
if (task_tag_comparator(t, tag) || t->type != VINE_TASK_TYPE_STANDARD) {
/* task is at the top of the list */
return list_pop_head(q->retrieved_list);
} else {
list_rotate(q->retrieved_list);
}
t = temp;
}
return 0;
} else if (task_id >= 0) {
/* First check that the request task exists and is retrieved. */
struct vine_task *t = itable_lookup(q->tasks, task_id);
if (t && t->state == VINE_TASK_RETRIEVED) {
/* Return it regardless of type, to be consumed below. */
list_remove(q->retrieved_list, t);
} else if (list_size(q->retrieved_list) > 0) {
t = list_pop_head(q->retrieved_list);
return t;
} else {
/* If not, return zero to indicate no such task is complete. */
return 0;
}
} else if (list_size(q->retrieved_list) > 0) {
/* Process the first available retrieved task, regardless of type. */
return list_pop_head(q->retrieved_list);
} else {
return 0;
}
}

if (!t) {
/* didn't find a retrieved task to return */
return NULL;
}
/*
Find a task to return to the user based on the caller's criteria.
If we find a regular task, that is given back to the user.
If we find a non-standard task, that is consumed quietly,
This allows us to treat non-standard tasks as normally as
possible through their lifetime, until the moment
at which they would (otherwise) be returned to the user.
*/

struct vine_task *find_task_to_return(struct vine_manager *q, const char *tag, int task_id)
{
while (1) {
/* Find one task that matches the criteria *or* is nonstandard. */
struct vine_task *t = find_complete_or_nonstandard_task(q, tag, task_id);

// Save task type as task may be freed in change_task_state
vine_task_type_t task_type = t->type;
/* If null, then there are no completed tasks available to return. */
if (!t)
return 0;

change_task_state(q, t, VINE_TASK_DONE);

if (t->result != VINE_RESULT_SUCCESS) {
q->stats->tasks_failed++;
}

switch (task_type) {
/* Grab the type *before* deleting the task object. */
vine_task_type_t type = t->type;

/* The task was given a reference when added to the table, so delete a reference on removal. */
itable_remove(q->tasks, t->task_id);
vine_task_delete(t);

/* CAREFUL: a non-standard task *may* no longer exist following vine_delete! */

switch (type) {
case VINE_TASK_TYPE_STANDARD:
/* if this is a standard task type, then break and return it to the user. */

/* If this is a standard task type, give it back the user. */
return t;
break;

case VINE_TASK_TYPE_RECOVERY:
/* do nothing and let vine_manager_consider_recovery_task do its job */

/*
If this is a recovery task, it is owned by the manager,
and should not be given back to the user. If the vine_file
objects that it produces still exist, then the task still
exists in the DONE state by virtue of those references,
and may by resubmitted later by vine_manager_consider_recovery task.
If there are no such output files, then the task no longer exists!
Either way, the user should not get it back.

Go around again and find another task to return or consume.
*/

t = 0;
break;
case VINE_TASK_TYPE_LIBRARY_INSTANCE:
/* silently delete the task, since it was created by the manager.
* note: other functions may still hold references to this library task.
* those references will be released once the functions complete.

case VINE_TASK_TYPE_LIBRARY:

/*
* If this is a library instance task, then it was created by the manager
* as needed to deploy a function call task on a particular node.
* Such a task should not normally exit, but if it did and reached the DONE
* state, then the manager should delete it, because the manager created it.
* We also do not return this task type to the user.
*
* change_task_state above internally removes the reference from q->tasks,
* and this following call drops the manager's own reference.
* remaining references will be released gradually upon the completion of relavant
* function tasks, and the task will be automatically freed once no
* references remain, regardless of whether the functions complete successfully. */
* Go around again and find another task to return or consume.
*/

vine_task_delete(t);
break;
case VINE_TASK_TYPE_LIBRARY_TEMPLATE:
/* A template shouldn't be scheduled. It's deleted when template table is deleted.*/
t = 0;
break;
}
}
Expand Down Expand Up @@ -5717,14 +5769,13 @@ int vine_cancel_all_by_tag(struct vine_manager *q, const char *tag)
default:
switch (t->type) {
case VINE_TASK_TYPE_STANDARD:
case VINE_TASK_TYPE_LIBRARY_INSTANCE:
case VINE_TASK_TYPE_LIBRARY:
if (task_tag_comparator(t, tag)) {
vine_cancel_by_task_id(q, task_id);
count++;
}
break;
case VINE_TASK_TYPE_RECOVERY:
case VINE_TASK_TYPE_LIBRARY_TEMPLATE:
/* recovery tasks should not be canceled (unless explicitely by task id)
* as there are temporary files that the workflow already considers done. */
continue;
Expand Down
3 changes: 1 addition & 2 deletions taskvine/src/manager/vine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ End user may only use the API described in taskvine.h
typedef enum {
VINE_TASK_TYPE_STANDARD, /**< A normal task that should be returned to the user. */
VINE_TASK_TYPE_RECOVERY, /**< An internally-created recovery task that should not be returned to the user. */
VINE_TASK_TYPE_LIBRARY_TEMPLATE, /**< An internally-created library task that should not be returned to the user. */
VINE_TASK_TYPE_LIBRARY_INSTANCE, /**< An internally-created library task that should not be returned to the user. */
VINE_TASK_TYPE_LIBRARY, /**< An internally-created library task that should not be returned to the user. */
} vine_task_type_t;

typedef enum {
Expand Down
2 changes: 0 additions & 2 deletions taskvine/src/tools/vine_benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ int main(int argc, char *argv[])
}
}

vine_set_runtime_info_path("vine_benchmark_info");

struct vine_manager *q = vine_create(port);
if(!q) fatal("couldn't listen on any port!");

Expand Down