Skip to content

Commit 519f29e

Browse files
committed
Merge: Propagate workflow inputs as additional task inputs
2 parents 04c0010 + 43493d3 commit 519f29e

2 files changed

Lines changed: 29 additions & 2 deletions

File tree

internal/workflow/choice_task.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,16 @@ func (c *ChoiceTask) Evaluate(input *TaskData, r *Request) (TaskId, error) {
3636

3737
// simply evaluate the Conditions and set the matching one
3838
matchedCondition := -1
39+
var extendedInputs = make(map[string]interface{})
40+
for k, v := range r.Params {
41+
extendedInputs[k] = v
42+
}
43+
for k, v := range input.Data {
44+
extendedInputs[k] = v
45+
}
46+
3947
for i, condition := range c.Conditions {
40-
ok, err := condition.Evaluate(input.Data)
48+
ok, err := condition.Evaluate(extendedInputs)
4149
if err != nil {
4250
return "", fmt.Errorf("error while testing condition: %v", err)
4351
}

internal/workflow/function_task.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"github.com/serverledge-faas/serverledge/internal/node"
87
"time"
98

9+
"github.com/serverledge-faas/serverledge/internal/node"
10+
1011
"github.com/lithammer/shortuuid"
1112
"github.com/serverledge-faas/serverledge/internal/function"
1213
"github.com/serverledge-faas/serverledge/internal/scheduling"
@@ -37,6 +38,24 @@ func (s *FunctionTask) SetNext(nextTask Task) error {
3738

3839
func (s *FunctionTask) execute(input *TaskData, r *Request) (map[string]interface{}, error) {
3940

41+
// FIXME: We are adding additional inputs from r.Params to match the function signature. This workaround should be
42+
// dropped when we introduce the possibility to specify additional parameters for functions in a workflow.
43+
44+
funct, exists := function.GetFunction(s.Func)
45+
if !exists {
46+
return nil, fmt.Errorf("funtion %s doesn't exists", s.Func)
47+
}
48+
if funct.Signature == nil {
49+
return nil, fmt.Errorf("signature of function %s is nil. Recreate the function with a valid signature.\n", funct.Name)
50+
}
51+
for _, inputDef := range funct.Signature.GetInputs() {
52+
v, found := r.Params[inputDef.Name]
53+
_, alreadyDefined := input.Data[inputDef.Name]
54+
if found && !alreadyDefined {
55+
input.Data[inputDef.Name] = v
56+
}
57+
}
58+
4059
err := s.CheckInput(input.Data)
4160
if err != nil {
4261
return nil, err

0 commit comments

Comments
 (0)