diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2d9fc98c..d509c2d0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -46,4 +46,5 @@ jobs: with: skip-test: ${{ github.event.inputs.skip-test == 'true' }} kestra-version: ${{ github.event.inputs.kestra-version }} + java-version: '25' secrets: inherit diff --git a/build.gradle b/build.gradle index 1c48fc27..29b64957 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ repositories { } } -final targetJavaVersion = JavaVersion.VERSION_21 +final targetJavaVersion = JavaVersion.VERSION_25 java { sourceCompatibility = targetJavaVersion diff --git a/gradle.properties b/gradle.properties index cfdab34b..03ae9836 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ version=2.4.1-SNAPSHOT -kestraVersion=1.3.13 +kestraVersion=2.0.0-SNAPSHOT diff --git a/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java b/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java index edbb7edc..3a516c0a 100644 --- a/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java @@ -37,12 +37,12 @@ namespace: company.team tasks: - id: each - type: io.kestra.plugin.core.flow.ForEach + type: io.kestra.plugin.core.flow.Loop values: "{{ trigger.series }}" tasks: - id: log type: io.kestra.plugin.core.log.Log - message: "Datapoint: {{ json(taskrun.value) }}" + message: "Datapoint: {{ fromJson(item.value) }}" triggers: - id: watch diff --git a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java index 4bc07569..2ebe2584 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java @@ -46,12 +46,12 @@ tasks: - id: each - type: io.kestra.plugin.core.flow.ForEach + type: io.kestra.plugin.core.flow.Loop values: "{{ trigger.objects | jq('.[].uri') }}" tasks: - id: return type: io.kestra.plugin.core.debug.Return - format: "{{ taskrun.value }}" + format: "{{ item.value }}" triggers: - id: watch @@ -77,12 +77,12 @@ tasks: - id: each - type: io.kestra.plugin.core.flow.ForEach + type: io.kestra.plugin.core.flow.Loop values: "{{ trigger.objects | jq('.[].key') }}" tasks: - id: return type: io.kestra.plugin.core.debug.Return - format: "{{ taskrun.value }}" + format: "{{ item.value }}" - id: delete type: io.kestra.plugin.aws.s3.Delete @@ -90,7 +90,7 @@ secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" region: "eu-central-1" bucket: "my-bucket" - key: "{{ taskrun.value }}" + key: "{{ item.value }}" triggers: - id: watch diff --git a/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java b/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java index 1ca3b621..e88e4671 100644 --- a/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java +++ b/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java @@ -14,8 +14,8 @@ import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.aws.AbstractLocalStackTest; -import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions; import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput; +import io.kestra.plugin.scripts.runner.docker.Docker; import jakarta.inject.Inject; @@ -36,8 +36,9 @@ void run() throws Exception { AwsCLI execute = AwsCLI.builder() .id(IdUtils.create()) .type(AwsCLI.class.getName()) - .docker( - DockerOptions.builder() + .taskRunner( + Docker.builder() + .type(Docker.class.getName()) // needed to be able to reach localstack from inside the container .networkMode("host") .image("amazon/aws-cli") diff --git a/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java index 7be9cd4b..b8932f06 100644 --- a/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.kestra.core.models.conditions.ConditionContext; @KestraTest class TriggerTest { @@ -59,8 +60,8 @@ void evaluate() throws Exception { Query.class, (mock, context) -> when(mock.run(any())).thenReturn(output) ) ) { - var conditionContext = io.kestra.core.utils.TestsUtils.mockTrigger(runContextFactory, trigger); - var execution = trigger.evaluate(conditionContext.getKey(), conditionContext.getValue()); + Map.Entry conditionContext = io.kestra.core.utils.TestsUtils.mockTrigger(runContextFactory, trigger); + var execution = trigger.evaluate(conditionContext.getKey(), conditionContext.getValue().context()); assertThat(execution.isPresent(), is(true)); assertThat(mockedQuery.constructed(), hasSize(1)); diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java index c2565a8e..468354dd 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java @@ -5,20 +5,17 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.*; import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.aws.kinesis.model.Record; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import software.amazon.awssdk.services.kinesis.model.*; import static org.hamcrest.MatcherAssert.assertThat; @@ -27,8 +24,7 @@ @KestraTest(startRunner = true, startScheduler = true) class RealtimeTriggerTest extends AbstractKinesisTest { @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - QueueInterface executionQueue; + DispatchQueueInterface executionQueue; @Inject LocalFlowRepositoryLoader repositoryLoader; @@ -37,56 +33,59 @@ class RealtimeTriggerTest extends AbstractKinesisTest { void evaluate() throws Exception { String consumerArn = registerConsumer(); CountDownLatch latch = new CountDownLatch(1); - - Flux received = TestsUtils.receive(executionQueue, e -> latch.countDown()); - - String yaml = """ - id: realtime - namespace: company.team - - tasks: - - id: log - type: io.kestra.plugin.core.log.Log - message: "{{ trigger.data }}" - - triggers: - - id: realtime - type: io.kestra.plugin.aws.kinesis.RealtimeTrigger - streamName: "%s" - consumerArn: "%s" - region: "us-east-1" - accessKeyId: "test" - secretKeyId: "test" - endpointOverride: "http://localhost:4566" - iteratorType: TRIM_HORIZON - """ - .formatted(streamName, consumerArn); - - File tempFlow = File.createTempFile("kinesis-realtime", ".yaml"); - Files.writeString(tempFlow.toPath(), yaml); - - repositoryLoader.load(tempFlow); - - Record record = Record.builder() - .partitionKey("pk") - .data("hello") - .build(); - - var put = PutRecords.builder() - .endpointOverride(Property.ofValue(localstack.getEndpoint().toString())) - .region(Property.ofValue(localstack.getRegion())) - .accessKeyId(Property.ofValue(localstack.getAccessKey())) - .secretKeyId(Property.ofValue(localstack.getSecretKey())) - .streamName(Property.ofValue(streamName)) - .records(List.of(record)) - .build(); - - put.run(runContextFactory.of()); - - boolean done = latch.await(30, TimeUnit.SECONDS); - assertThat(done, is(true)); - - Execution exec = received.blockLast(); - assertThat(exec.getTrigger().getVariables().get("data"), is("hello")); + AtomicReference lastExecution = new AtomicReference<>(); + + executionQueue.addListener(e -> { + lastExecution.set(e); + latch.countDown(); + }); + + String yaml = """ + id: realtime + namespace: company.team + + tasks: + - id: log + type: io.kestra.plugin.core.log.Log + message: "{{ trigger.data }}" + + triggers: + - id: realtime + type: io.kestra.plugin.aws.kinesis.RealtimeTrigger + streamName: "%s" + consumerArn: "%s" + region: "us-east-1" + accessKeyId: "test" + secretKeyId: "test" + endpointOverride: "http://localhost:4566" + iteratorType: TRIM_HORIZON + """ + .formatted(streamName, consumerArn); + + File tempFlow = File.createTempFile("kinesis-realtime", ".yaml"); + Files.writeString(tempFlow.toPath(), yaml); + + repositoryLoader.load(tempFlow); + + Record record = Record.builder() + .partitionKey("pk") + .data("hello") + .build(); + + var put = PutRecords.builder() + .endpointOverride(Property.ofValue(localstack.getEndpoint().toString())) + .region(Property.ofValue(localstack.getRegion())) + .accessKeyId(Property.ofValue(localstack.getAccessKey())) + .secretKeyId(Property.ofValue(localstack.getSecretKey())) + .streamName(Property.ofValue(streamName)) + .records(List.of(record)) + .build(); + + put.run(runContextFactory.of()); + + boolean done = latch.await(30, TimeUnit.SECONDS); + assertThat(done, is(true)); + + assertThat(lastExecution.get().getTrigger().getVariables().get("data"), is("hello")); } } diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java index fae6535d..ef4bd028 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java @@ -58,8 +58,8 @@ void evaluate() throws Exception { .endpointOverride(Property.ofValue(localstack.getEndpoint().toString())) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index d80dfc2f..798ddc12 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -2,29 +2,27 @@ import java.time.Duration; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.models.triggers.StatefulTriggerInterface; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; -import io.kestra.core.repositories.LocalFlowRepositoryLoader; +import io.kestra.core.queues.DispatchQueueInterface; +import io.kestra.core.runners.Scheduler; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.aws.s3.models.S3Object; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -32,22 +30,24 @@ @KestraTest(startRunner = true, startScheduler = true) class TriggerTest extends AbstractTest { @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - private QueueInterface executionQueue; + private DispatchQueueInterface executionQueue; @Inject - protected LocalFlowRepositoryLoader repositoryLoader; + protected Scheduler scheduler; @Test + @LoadFlows({"flows/s3/s3-listen.yaml"}) void deleteAction() throws Exception { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> scheduler.isActive()); + String bucket = "trigger-test"; this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); + CountDownLatch queueCount = new CountDownLatch(1); AtomicReference last = new AtomicReference<>(); - Flux receive = TestsUtils.receive(executionQueue, executionWithError -> { - Execution execution = executionWithError.getLeft(); + executionQueue.addListener(execution -> { if (execution.getFlowId().equals("s3-listen")) { last.set(execution); queueCount.countDown(); @@ -57,11 +57,8 @@ void deleteAction() throws Exception { upload("trigger/s3", bucket); upload("trigger/s3", bucket); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); - - boolean await = queueCount.await(10, TimeUnit.SECONDS); + boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat(await, is(true)); - receive.blockLast(); @SuppressWarnings("unchecked") java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); @@ -75,16 +72,17 @@ void deleteAction() throws Exception { } @Test + @LoadFlows({"flows/s3/s3-listen-none-action.yaml"}) void noneAction() throws Exception { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> scheduler.isActive()); + String bucket = "trigger-none-action-test"; this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); CountDownLatch queueCount = new CountDownLatch(1); AtomicReference last = new AtomicReference<>(); - Flux receive = TestsUtils.receive(executionQueue, executionWithError -> { - Execution execution = executionWithError.getLeft(); - + executionQueue.addListener(execution -> { if (execution.getFlowId().equals("s3-listen-none-action")) { last.set(execution); queueCount.countDown(); @@ -94,11 +92,8 @@ void noneAction() throws Exception { upload("trigger/s3", bucket); upload("trigger/s3", bucket); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); - - boolean await = queueCount.await(10, TimeUnit.SECONDS); + boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat(await, is(true)); - receive.blockLast(); @SuppressWarnings("unchecked") java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); @@ -112,16 +107,18 @@ void noneAction() throws Exception { } @Test + @LoadFlows({"flows/s3/s3-listen-localhost-force-path-style.yaml"}) void forcePathStyleWithSimpleLocalhost() throws Exception { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> scheduler.isActive()); + String bucket = "trigger-force-path-style-test"; this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); CountDownLatch queueCount = new CountDownLatch(1); AtomicReference last = new AtomicReference<>(); - Flux receive = TestsUtils.receive(executionQueue, executionWithError -> { - Execution execution = executionWithError.getLeft(); + executionQueue.addListener(execution -> { if (execution.getFlowId().equals("s3-listen-localhost-force-path-style")) { last.set(execution); queueCount.countDown(); @@ -131,11 +128,8 @@ void forcePathStyleWithSimpleLocalhost() throws Exception { upload("trigger/s3", bucket); upload("trigger/s3", bucket); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-localhost-force-path-style.yaml"))); - - boolean await = queueCount.await(15, TimeUnit.SECONDS); + boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat("trigger should work with localhost endpoint + forcePathStyle", await, is(true)); - receive.blockLast(); @SuppressWarnings("unchecked") java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); @@ -169,9 +163,9 @@ void shouldExecuteOnCreate() throws Exception { upload("trigger/on-create", bucket); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } @@ -196,14 +190,14 @@ void shouldExecuteOnUpdate() throws Exception { .interval(Duration.ofSeconds(10)) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - trigger.evaluate(context.getKey(), context.getValue()); + trigger.evaluate(context.getKey(), context.getValue().context()); update(key, bucket); Thread.sleep(2000); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } @@ -227,15 +221,15 @@ void shouldExecuteOnCreateOrUpdate() throws Exception { var key = upload("trigger/on-create-or-update", bucket); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional createExecution = trigger.evaluate(context.getKey(), context.getValue()); + Optional createExecution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat("Trigger should fire on CREATE", createExecution.isPresent(), is(true)); update(key, bucket); Thread.sleep(2000); - Optional updateExecution = trigger.evaluate(context.getKey(), context.getValue()); + Optional updateExecution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(updateExecution.isPresent(), is(true)); } @@ -264,9 +258,9 @@ void maxFilesExceeded() throws Exception { .interval(Duration.ofSeconds(10)) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); // When maxFiles exceeded, List returns first 3 files, so Trigger should fire assertThat(execution.isPresent(), is(true)); } @@ -296,9 +290,9 @@ void maxFilesNotExceeded() throws Exception { .interval(Duration.ofSeconds(10)) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } } diff --git a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java index cba5a81e..70123a24 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java @@ -4,6 +4,7 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; @@ -11,14 +12,10 @@ import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.aws.sqs.model.Message; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -26,8 +23,7 @@ @KestraTest(startRunner = true, startScheduler = true) class RealtimeTriggerTest extends AbstractSqsTest { @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - private QueueInterface executionQueue; + private DispatchQueueInterface executionQueue; @Inject protected LocalFlowRepositoryLoader repositoryLoader; @@ -35,9 +31,11 @@ class RealtimeTriggerTest extends AbstractSqsTest { @Test void flow() throws Exception { CountDownLatch queueCount = new CountDownLatch(1); - Flux receive = TestsUtils.receive(executionQueue, execution -> { + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); queueCount.countDown(); - assertThat(execution.getLeft().getFlowId(), is("realtime")); + assertThat(execution.getFlowId(), is("realtime")); }); repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/sqs/realtime.yaml"))); @@ -55,12 +53,14 @@ void flow() throws Exception { ) .build(); - task.run(runContextFactory.of()); + var runContext = runContextFactory.of(); + + task.run(runContext); boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat(await, is(true)); - Execution last = receive.blockLast(); + Execution last = lastExecution.get(); assertThat(last.getTrigger().getVariables().size(), is(1)); assertThat(last.getTrigger().getVariables().get("data"), is("Hello World")); } diff --git a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java index 41b8b01d..7587ab79 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java @@ -1,25 +1,24 @@ package io.kestra.plugin.aws.sqs; +import java.time.Duration; import java.util.List; -import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; -import io.kestra.core.repositories.LocalFlowRepositoryLoader; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.runners.RunContextFactory; -import io.kestra.core.utils.TestsUtils; +import io.kestra.core.runners.Scheduler; import io.kestra.plugin.aws.sqs.model.Message; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -28,25 +27,27 @@ @KestraTest(startRunner = true, startScheduler = true) class TriggerTest extends AbstractSqsTest { @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - private QueueInterface executionQueue; + private DispatchQueueInterface executionQueue; @Inject - protected LocalFlowRepositoryLoader repositoryLoader; + protected Scheduler scheduler; @Inject protected RunContextFactory runContextFactory; @Test + @LoadFlows({"flows/sqs/sqs-listen.yaml"}) void flow() throws Exception { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> scheduler.isActive()); + CountDownLatch queueCount = new CountDownLatch(1); - Flux receive = TestsUtils.receive(executionQueue, execution -> { + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); queueCount.countDown(); - assertThat(execution.getLeft().getFlowId(), is("sqs-listen")); + assertThat(execution.getFlowId(), is("sqs-listen")); }); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/sqs/sqs-listen.yaml"))); - Publish task = Publish.builder() .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) .queueUrl(Property.ofValue(queueUrl())) @@ -61,12 +62,14 @@ void flow() throws Exception { ) .build(); - task.run(runContextFactory.of()); + var runContext = runContextFactory.of(); + + task.run(runContext); boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat(await, is(true)); - Execution last = receive.blockLast(); + Execution last = lastExecution.get(); var count = (Integer) last.getTrigger().getVariables().get("count"); var uri = (String) last.getTrigger().getVariables().get("uri"); assertThat(count, is(2)); diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index f1bb4dd5..ad450b21 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -9,3 +9,10 @@ kestra: type: memory repository: type: memory + + worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost \ No newline at end of file