From 11f7de53e8bf9a474e9d8c4208fad906d3aaec49 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Fri, 10 Apr 2026 11:02:25 +0530 Subject: [PATCH 1/9] fix: plugin v2 compatibility --- .github/workflows/main.yml | 1 + build.gradle | 2 +- gradle.properties | 2 +- .../plugin/aws/cloudwatch/TriggerTest.java | 5 ++-- .../plugin/aws/kinesis/TriggerTest.java | 4 ++-- .../io/kestra/plugin/aws/s3/TriggerTest.java | 24 +++++++++---------- 6 files changed, 20 insertions(+), 18 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2d9fc98c..d509c2d0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -46,4 +46,5 @@ jobs: with: skip-test: ${{ github.event.inputs.skip-test == 'true' }} kestra-version: ${{ github.event.inputs.kestra-version }} + java-version: '25' secrets: inherit diff --git a/build.gradle b/build.gradle index 1c48fc27..29b64957 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ repositories { } } -final targetJavaVersion = JavaVersion.VERSION_21 +final targetJavaVersion = JavaVersion.VERSION_25 java { sourceCompatibility = targetJavaVersion diff --git a/gradle.properties b/gradle.properties index cfdab34b..03ae9836 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ version=2.4.1-SNAPSHOT -kestraVersion=1.3.13 +kestraVersion=2.0.0-SNAPSHOT diff --git a/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java index 7be9cd4b..b8932f06 100644 --- a/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/cloudwatch/TriggerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.kestra.core.models.conditions.ConditionContext; @KestraTest class TriggerTest { @@ -59,8 +60,8 @@ void evaluate() throws Exception { Query.class, (mock, context) -> when(mock.run(any())).thenReturn(output) ) ) { - var conditionContext = io.kestra.core.utils.TestsUtils.mockTrigger(runContextFactory, trigger); - var execution = trigger.evaluate(conditionContext.getKey(), conditionContext.getValue()); + Map.Entry conditionContext = io.kestra.core.utils.TestsUtils.mockTrigger(runContextFactory, trigger); + var execution = trigger.evaluate(conditionContext.getKey(), conditionContext.getValue().context()); assertThat(execution.isPresent(), is(true)); assertThat(mockedQuery.constructed(), hasSize(1)); diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java index fae6535d..ef4bd028 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/TriggerTest.java @@ -58,8 +58,8 @@ void evaluate() throws Exception { .endpointOverride(Property.ofValue(localstack.getEndpoint().toString())) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index d80dfc2f..27941a6e 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -169,9 +169,9 @@ void shouldExecuteOnCreate() throws Exception { upload("trigger/on-create", bucket); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } @@ -196,14 +196,14 @@ void shouldExecuteOnUpdate() throws Exception { .interval(Duration.ofSeconds(10)) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - trigger.evaluate(context.getKey(), context.getValue()); + trigger.evaluate(context.getKey(), context.getValue().context()); update(key, bucket); Thread.sleep(2000); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } @@ -227,15 +227,15 @@ void shouldExecuteOnCreateOrUpdate() throws Exception { var key = upload("trigger/on-create-or-update", bucket); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional createExecution = trigger.evaluate(context.getKey(), context.getValue()); + Optional createExecution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat("Trigger should fire on CREATE", createExecution.isPresent(), is(true)); update(key, bucket); Thread.sleep(2000); - Optional updateExecution = trigger.evaluate(context.getKey(), context.getValue()); + Optional updateExecution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(updateExecution.isPresent(), is(true)); } @@ -264,9 +264,9 @@ void maxFilesExceeded() throws Exception { .interval(Duration.ofSeconds(10)) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); // When maxFiles exceeded, List returns first 3 files, so Trigger should fire assertThat(execution.isPresent(), is(true)); } @@ -296,9 +296,9 @@ void maxFilesNotExceeded() throws Exception { .interval(Duration.ofSeconds(10)) .build(); - Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); - Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + Optional execution = trigger.evaluate(context.getKey(), context.getValue().context()); assertThat(execution.isPresent(), is(true)); } } From 63dfd7524b7785dad957916676b89556b89eb40a Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Sun, 12 Apr 2026 00:55:42 +0530 Subject: [PATCH 2/9] fix: v2 compatibility --- build.gradle | 1 + .../aws/kinesis/RealtimeTriggerTest.java | 139 +++++++------ .../io/kestra/plugin/aws/s3/TriggerTest.java | 195 +++++++++++------- .../plugin/aws/sqs/RealtimeTriggerTest.java | 93 ++++++--- .../io/kestra/plugin/aws/sqs/TriggerTest.java | 99 +++++---- src/test/resources/application.yml | 7 + 6 files changed, 332 insertions(+), 202 deletions(-) diff --git a/build.gradle b/build.gradle index 29b64957..eb597bad 100644 --- a/build.gradle +++ b/build.gradle @@ -127,6 +127,7 @@ dependencies { testImplementation group: "io.kestra", name: "repository-memory" testImplementation group: "io.kestra", name: "scheduler" testImplementation group: "io.kestra", name: "worker" + testImplementation group: "io.kestra", name: "indexer", version: kestraVersion // testcontainers testImplementation "org.testcontainers:testcontainers:2.0.5" diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java index c2565a8e..592e87b3 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java @@ -3,32 +3,39 @@ import java.io.File; import java.nio.file.Files; import java.util.List; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.*; -import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.utils.TestsUtils; +import io.kestra.core.runners.FlowListeners; +import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.kinesis.model.Record; +import io.kestra.scheduler.AbstractScheduler; +import io.kestra.worker.DefaultWorker; + +import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import software.amazon.awssdk.services.kinesis.model.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; -@KestraTest(startRunner = true, startScheduler = true) class RealtimeTriggerTest extends AbstractKinesisTest { @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - QueueInterface executionQueue; + ApplicationContext applicationContext; + + @Inject + FlowListeners flowListeners; + + @Inject + DispatchQueueInterface executionQueue; @Inject LocalFlowRepositoryLoader repositoryLoader; @@ -37,56 +44,66 @@ class RealtimeTriggerTest extends AbstractKinesisTest { void evaluate() throws Exception { String consumerArn = registerConsumer(); CountDownLatch latch = new CountDownLatch(1); - - Flux received = TestsUtils.receive(executionQueue, e -> latch.countDown()); - - String yaml = """ - id: realtime - namespace: company.team - - tasks: - - id: log - type: io.kestra.plugin.core.log.Log - message: "{{ trigger.data }}" - - triggers: - - id: realtime - type: io.kestra.plugin.aws.kinesis.RealtimeTrigger - streamName: "%s" - consumerArn: "%s" - region: "us-east-1" - accessKeyId: "test" - secretKeyId: "test" - endpointOverride: "http://localhost:4566" - iteratorType: TRIM_HORIZON - """ - .formatted(streamName, consumerArn); - - File tempFlow = File.createTempFile("kinesis-realtime", ".yaml"); - Files.writeString(tempFlow.toPath(), yaml); - - repositoryLoader.load(tempFlow); - - Record record = Record.builder() - .partitionKey("pk") - .data("hello") - .build(); - - var put = PutRecords.builder() - .endpointOverride(Property.ofValue(localstack.getEndpoint().toString())) - .region(Property.ofValue(localstack.getRegion())) - .accessKeyId(Property.ofValue(localstack.getAccessKey())) - .secretKeyId(Property.ofValue(localstack.getSecretKey())) - .streamName(Property.ofValue(streamName)) - .records(List.of(record)) - .build(); - - put.run(runContextFactory.of()); - - boolean done = latch.await(30, TimeUnit.SECONDS); - assertThat(done, is(true)); - - Execution exec = received.blockLast(); - assertThat(exec.getTrigger().getVariables().get("data"), is("hello")); + AtomicReference lastExecution = new AtomicReference<>(); + + executionQueue.addListener(e -> { + lastExecution.set(e); + latch.countDown(); + }); + + DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); + try (AbstractScheduler scheduler = new JdbcScheduler(applicationContext, flowListeners)) { + + worker.run(); + scheduler.run(); + + String yaml = """ + id: realtime + namespace: company.team + + tasks: + - id: log + type: io.kestra.plugin.core.log.Log + message: "{{ trigger.data }}" + + triggers: + - id: realtime + type: io.kestra.plugin.aws.kinesis.RealtimeTrigger + streamName: "%s" + consumerArn: "%s" + region: "us-east-1" + accessKeyId: "test" + secretKeyId: "test" + endpointOverride: "http://localhost:4566" + iteratorType: TRIM_HORIZON + """ + .formatted(streamName, consumerArn); + + File tempFlow = File.createTempFile("kinesis-realtime", ".yaml"); + Files.writeString(tempFlow.toPath(), yaml); + + repositoryLoader.load(tempFlow); + + Record record = Record.builder() + .partitionKey("pk") + .data("hello") + .build(); + + var put = PutRecords.builder() + .endpointOverride(Property.ofValue(localstack.getEndpoint().toString())) + .region(Property.ofValue(localstack.getRegion())) + .accessKeyId(Property.ofValue(localstack.getAccessKey())) + .secretKeyId(Property.ofValue(localstack.getSecretKey())) + .streamName(Property.ofValue(streamName)) + .records(List.of(record)) + .build(); + + put.run(runContextFactory.of()); + + boolean done = latch.await(30, TimeUnit.SECONDS); + assertThat(done, is(true)); + + assertThat(lastExecution.get().getTrigger().getVariables().get("data"), is("hello")); + } } -} +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index 27941a6e..900add6d 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -4,6 +4,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -11,29 +12,35 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; -import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.models.triggers.StatefulTriggerInterface; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; +import io.kestra.core.runners.FlowListeners; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; +import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.s3.models.S3Object; +import io.kestra.scheduler.AbstractScheduler; +import io.kestra.worker.DefaultWorker; + +import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -@KestraTest(startRunner = true, startScheduler = true) class TriggerTest extends AbstractTest { @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - private QueueInterface executionQueue; + private ApplicationContext applicationContext; + + @Inject + private FlowListeners flowListenersService; + + @Inject + private DispatchQueueInterface executionQueue; @Inject protected LocalFlowRepositoryLoader repositoryLoader; @@ -43,35 +50,52 @@ void deleteAction() throws Exception { String bucket = "trigger-test"; this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); + + // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); - AtomicReference last = new AtomicReference<>(); - Flux receive = TestsUtils.receive(executionQueue, executionWithError -> { - Execution execution = executionWithError.getLeft(); - if (execution.getFlowId().equals("s3-listen")) { - last.set(execution); - queueCount.countDown(); + // scheduler + DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); + try ( + AbstractScheduler scheduler = new JdbcScheduler( + this.applicationContext, + this.flowListenersService + ) + ) { + AtomicReference last = new AtomicReference<>(); + + // wait for execution + executionQueue.addListener(execution -> { + if (execution.getFlowId().equals("s3-listen")) { + last.set(execution); + queueCount.countDown(); + } + }); + + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); + + worker.run(); + scheduler.run(); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); + + boolean await = queueCount.await(10, TimeUnit.SECONDS); + try { + assertThat(await, is(true)); + } finally { + worker.shutdown(); } - }); - - upload("trigger/s3", bucket); - upload("trigger/s3", bucket); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); - boolean await = queueCount.await(10, TimeUnit.SECONDS); - assertThat(await, is(true)); - receive.blockLast(); + assertThat(trigger.size(), is(2)); - @SuppressWarnings("unchecked") - java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); - - assertThat(trigger.size(), is(2)); - - int remainingFilesOnBucket = listTask.run(runContext(listTask)) - .getObjects() - .size(); - assertThat(remainingFilesOnBucket, is(0)); + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(0)); + } } @Test @@ -80,35 +104,48 @@ void noneAction() throws Exception { this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); + // wait for execution CountDownLatch queueCount = new CountDownLatch(1); AtomicReference last = new AtomicReference<>(); - Flux receive = TestsUtils.receive(executionQueue, executionWithError -> { - Execution execution = executionWithError.getLeft(); - + executionQueue.addListener(execution -> { if (execution.getFlowId().equals("s3-listen-none-action")) { last.set(execution); queueCount.countDown(); } }); - upload("trigger/s3", bucket); - upload("trigger/s3", bucket); - - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); - - boolean await = queueCount.await(10, TimeUnit.SECONDS); - assertThat(await, is(true)); - receive.blockLast(); + // scheduler + DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); + try ( + AbstractScheduler scheduler = new JdbcScheduler( + this.applicationContext, + this.flowListenersService + ) + ) { + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); + + worker.run(); + scheduler.run(); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); + + boolean await = queueCount.await(10, TimeUnit.SECONDS); + try { + assertThat(await, is(true)); + } finally { + worker.shutdown(); + } - @SuppressWarnings("unchecked") - java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); - assertThat(trigger.size(), is(2)); + assertThat(trigger.size(), is(2)); - int remainingFilesOnBucket = listTask.run(runContext(listTask)) - .getObjects() - .size(); - assertThat(remainingFilesOnBucket, is(2)); + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(2)); + } } @Test @@ -118,34 +155,48 @@ void forcePathStyleWithSimpleLocalhost() throws Exception { List listTask = list().bucket(Property.ofValue(bucket)).build(); CountDownLatch queueCount = new CountDownLatch(1); - AtomicReference last = new AtomicReference<>(); - Flux receive = TestsUtils.receive(executionQueue, executionWithError -> { - Execution execution = executionWithError.getLeft(); - if (execution.getFlowId().equals("s3-listen-localhost-force-path-style")) { - last.set(execution); - queueCount.countDown(); + // scheduler + DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); + try ( + AbstractScheduler scheduler = new JdbcScheduler( + this.applicationContext, + this.flowListenersService + ) + ) { + AtomicReference last = new AtomicReference<>(); + + executionQueue.addListener(execution -> { + if (execution.getFlowId().equals("s3-listen-localhost-force-path-style")) { + last.set(execution); + queueCount.countDown(); + } + }); + + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); + + worker.run(); + scheduler.run(); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-localhost-force-path-style.yaml"))); + + boolean await = queueCount.await(15, TimeUnit.SECONDS); + try { + assertThat("trigger should work with localhost endpoint + forcePathStyle", await, is(true)); + } finally { + worker.shutdown(); } - }); - - upload("trigger/s3", bucket); - upload("trigger/s3", bucket); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-localhost-force-path-style.yaml"))); + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); - boolean await = queueCount.await(15, TimeUnit.SECONDS); - assertThat("trigger should work with localhost endpoint + forcePathStyle", await, is(true)); - receive.blockLast(); + assertThat(trigger.size(), is(2)); - @SuppressWarnings("unchecked") - java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); - - assertThat(trigger.size(), is(2)); - - int remainingFilesOnBucket = listTask.run(runContext(listTask)) - .getObjects() - .size(); - assertThat(remainingFilesOnBucket, is(0)); + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(0)); + } } @Test diff --git a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java index cba5a81e..85c9753b 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java @@ -2,67 +2,94 @@ import java.util.List; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; -import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.utils.TestsUtils; +import io.kestra.core.runners.FlowListeners; +import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.sqs.model.Message; +import io.kestra.scheduler.AbstractScheduler; +import io.kestra.worker.DefaultWorker; + +import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -@KestraTest(startRunner = true, startScheduler = true) class RealtimeTriggerTest extends AbstractSqsTest { @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - private QueueInterface executionQueue; + private ApplicationContext applicationContext; + + @Inject + private FlowListeners flowListenersService; + + @Inject + private DispatchQueueInterface executionQueue; @Inject protected LocalFlowRepositoryLoader repositoryLoader; @Test void flow() throws Exception { + // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); - Flux receive = TestsUtils.receive(executionQueue, execution -> { - queueCount.countDown(); - assertThat(execution.getLeft().getFlowId(), is("realtime")); - }); - - repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/sqs/realtime.yaml"))); - - Publish task = Publish.builder() - .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) - .queueUrl(Property.ofValue(queueUrl())) - .region(Property.ofValue(localstack.getRegion())) - .accessKeyId(Property.ofValue(localstack.getAccessKey())) - .secretKeyId(Property.ofValue(localstack.getSecretKey())) - .from( - List.of( - Message.builder().data("Hello World").build() - ) + + // scheduler + DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); + try ( + AbstractScheduler scheduler = new JdbcScheduler( + this.applicationContext, + this.flowListenersService ) - .build(); + ) { + // wait for execution + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); + queueCount.countDown(); + assertThat(execution.getFlowId(), is("realtime")); + }); + + worker.run(); + scheduler.run(); + + repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/sqs/realtime.yaml"))); + + // publish two messages to trigger the flow + Publish task = Publish.builder() + .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.ofValue(queueUrl())) + .region(Property.ofValue(localstack.getRegion())) + .accessKeyId(Property.ofValue(localstack.getAccessKey())) + .secretKeyId(Property.ofValue(localstack.getSecretKey())) + .from( + List.of( + Message.builder().data("Hello World").build() + ) + ) + .build(); + + var runContext = runContextFactory.of(); - task.run(runContextFactory.of()); + task.run(runContext); - boolean await = queueCount.await(1, TimeUnit.MINUTES); - assertThat(await, is(true)); + boolean await = queueCount.await(1, TimeUnit.MINUTES); + assertThat(await, is(true)); - Execution last = receive.blockLast(); - assertThat(last.getTrigger().getVariables().size(), is(1)); - assertThat(last.getTrigger().getVariables().get("data"), is("Hello World")); + Execution last = lastExecution.get(); + assertThat(last.getTrigger().getVariables().size(), is(1)); + assertThat(last.getTrigger().getVariables().get("data"), is("Hello World")); + } } } diff --git a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java index 41b8b01d..043d0736 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java @@ -2,34 +2,41 @@ import java.util.List; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; -import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; +import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; +import io.kestra.core.runners.FlowListeners; import io.kestra.core.runners.RunContextFactory; -import io.kestra.core.utils.TestsUtils; +import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.sqs.model.Message; +import io.kestra.scheduler.AbstractScheduler; +import io.kestra.worker.DefaultWorker; + +import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; -import jakarta.inject.Named; -import reactor.core.publisher.Flux; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -@KestraTest(startRunner = true, startScheduler = true) class TriggerTest extends AbstractSqsTest { @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - private QueueInterface executionQueue; + private ApplicationContext applicationContext; + + @Inject + private FlowListeners flowListenersService; + + @Inject + private DispatchQueueInterface executionQueue; @Inject protected LocalFlowRepositoryLoader repositoryLoader; @@ -39,38 +46,58 @@ class TriggerTest extends AbstractSqsTest { @Test void flow() throws Exception { + // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); - Flux receive = TestsUtils.receive(executionQueue, execution -> { - queueCount.countDown(); - assertThat(execution.getLeft().getFlowId(), is("sqs-listen")); - }); - - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/sqs/sqs-listen.yaml"))); - - Publish task = Publish.builder() - .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) - .queueUrl(Property.ofValue(queueUrl())) - .region(Property.ofValue(localstack.getRegion())) - .accessKeyId(Property.ofValue(localstack.getAccessKey())) - .secretKeyId(Property.ofValue(localstack.getSecretKey())) - .from( - List.of( - Message.builder().data("Hello World").build(), - Message.builder().data("Hello Kestra").delaySeconds(5).build() - ) + + // scheduler + DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); + try ( + AbstractScheduler scheduler = new JdbcScheduler( + this.applicationContext, + this.flowListenersService ) - .build(); + ) { + // wait for execution + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); + queueCount.countDown(); + assertThat(execution.getFlowId(), is("sqs-listen")); + }); + + worker.run(); + scheduler.run(); + + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/sqs/sqs-listen.yaml"))); + + // publish two messages to trigger the flow + Publish task = Publish.builder() + .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.ofValue(queueUrl())) + .region(Property.ofValue(localstack.getRegion())) + .accessKeyId(Property.ofValue(localstack.getAccessKey())) + .secretKeyId(Property.ofValue(localstack.getSecretKey())) + .from( + List.of( + Message.builder().data("Hello World").build(), + Message.builder().data("Hello Kestra").delaySeconds(5).build() + ) + ) + .build(); + + var runContext = runContextFactory.of(); - task.run(runContextFactory.of()); + task.run(runContext); - boolean await = queueCount.await(1, TimeUnit.MINUTES); - assertThat(await, is(true)); + boolean await = queueCount.await(1, TimeUnit.MINUTES); + assertThat(await, is(true)); - Execution last = receive.blockLast(); - var count = (Integer) last.getTrigger().getVariables().get("count"); - var uri = (String) last.getTrigger().getVariables().get("uri"); - assertThat(count, is(2)); - assertThat(uri, is(notNullValue())); + Execution last = lastExecution.get(); + var count = (Integer) last.getTrigger().getVariables().get("count"); + var uri = (String) last.getTrigger().getVariables().get("uri"); + assertThat(count, is(2)); + assertThat(uri, is(notNullValue())); + } } } diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index f1bb4dd5..32ccf5a7 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -9,3 +9,10 @@ kestra: type: memory repository: type: memory + +worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost From 6d67b9879fad593a59fa27da78d3a4a82fd61850 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Sun, 12 Apr 2026 01:52:42 +0530 Subject: [PATCH 3/9] fix: v2 compatibility --- build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/build.gradle b/build.gradle index eb597bad..29b64957 100644 --- a/build.gradle +++ b/build.gradle @@ -127,7 +127,6 @@ dependencies { testImplementation group: "io.kestra", name: "repository-memory" testImplementation group: "io.kestra", name: "scheduler" testImplementation group: "io.kestra", name: "worker" - testImplementation group: "io.kestra", name: "indexer", version: kestraVersion // testcontainers testImplementation "org.testcontainers:testcontainers:2.0.5" From e5728067aa7156da7a7887dccb6ada9d6757fa24 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Sun, 12 Apr 2026 02:24:36 +0530 Subject: [PATCH 4/9] fix: v2 compatibility --- .../aws/kinesis/RealtimeTriggerTest.java | 24 +-- .../io/kestra/plugin/aws/s3/TriggerTest.java | 177 ++++++------------ .../plugin/aws/sqs/RealtimeTriggerTest.java | 85 +++------ .../io/kestra/plugin/aws/sqs/TriggerTest.java | 91 ++++----- 4 files changed, 122 insertions(+), 255 deletions(-) diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java index 592e87b3..468354dd 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/RealtimeTriggerTest.java @@ -3,37 +3,26 @@ import java.io.File; import java.nio.file.Files; import java.util.List; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.*; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.runners.FlowListeners; -import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.kinesis.model.Record; -import io.kestra.scheduler.AbstractScheduler; -import io.kestra.worker.DefaultWorker; - -import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import software.amazon.awssdk.services.kinesis.model.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; +@KestraTest(startRunner = true, startScheduler = true) class RealtimeTriggerTest extends AbstractKinesisTest { - @Inject - ApplicationContext applicationContext; - - @Inject - FlowListeners flowListeners; - @Inject DispatchQueueInterface executionQueue; @@ -51,12 +40,6 @@ void evaluate() throws Exception { latch.countDown(); }); - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try (AbstractScheduler scheduler = new JdbcScheduler(applicationContext, flowListeners)) { - - worker.run(); - scheduler.run(); - String yaml = """ id: realtime namespace: company.team @@ -104,6 +87,5 @@ void evaluate() throws Exception { assertThat(done, is(true)); assertThat(lastExecution.get().getTrigger().getVariables().get("data"), is("hello")); - } } -} \ No newline at end of file +} diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index 900add6d..198c3441 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -4,7 +4,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -12,33 +11,23 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.models.triggers.StatefulTriggerInterface; import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.runners.FlowListeners; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; -import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.s3.models.S3Object; -import io.kestra.scheduler.AbstractScheduler; -import io.kestra.worker.DefaultWorker; - -import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +@KestraTest(startRunner = true, startScheduler = true) class TriggerTest extends AbstractTest { - @Inject - private ApplicationContext applicationContext; - - @Inject - private FlowListeners flowListenersService; - @Inject private DispatchQueueInterface executionQueue; @@ -51,51 +40,33 @@ void deleteAction() throws Exception { this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); - // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); + AtomicReference last = new AtomicReference<>(); - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - AtomicReference last = new AtomicReference<>(); - - // wait for execution - executionQueue.addListener(execution -> { - if (execution.getFlowId().equals("s3-listen")) { - last.set(execution); - queueCount.countDown(); - } - }); - - upload("trigger/s3", bucket); - upload("trigger/s3", bucket); - - worker.run(); - scheduler.run(); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); - - boolean await = queueCount.await(10, TimeUnit.SECONDS); - try { - assertThat(await, is(true)); - } finally { - worker.shutdown(); + executionQueue.addListener(execution -> { + if (execution.getFlowId().equals("s3-listen")) { + last.set(execution); + queueCount.countDown(); } + }); - @SuppressWarnings("unchecked") - java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); - assertThat(trigger.size(), is(2)); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); - int remainingFilesOnBucket = listTask.run(runContext(listTask)) - .getObjects() - .size(); - assertThat(remainingFilesOnBucket, is(0)); - } + boolean await = queueCount.await(10, TimeUnit.SECONDS); + assertThat(await, is(true)); + + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + + assertThat(trigger.size(), is(2)); + + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(0)); } @Test @@ -104,7 +75,6 @@ void noneAction() throws Exception { this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); - // wait for execution CountDownLatch queueCount = new CountDownLatch(1); AtomicReference last = new AtomicReference<>(); executionQueue.addListener(execution -> { @@ -114,38 +84,23 @@ void noneAction() throws Exception { } }); - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - upload("trigger/s3", bucket); - upload("trigger/s3", bucket); - - worker.run(); - scheduler.run(); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); - - boolean await = queueCount.await(10, TimeUnit.SECONDS); - try { - assertThat(await, is(true)); - } finally { - worker.shutdown(); - } + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); - @SuppressWarnings("unchecked") - java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); - assertThat(trigger.size(), is(2)); + boolean await = queueCount.await(10, TimeUnit.SECONDS); + assertThat(await, is(true)); - int remainingFilesOnBucket = listTask.run(runContext(listTask)) - .getObjects() - .size(); - assertThat(remainingFilesOnBucket, is(2)); - } + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + + assertThat(trigger.size(), is(2)); + + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(2)); } @Test @@ -155,48 +110,32 @@ void forcePathStyleWithSimpleLocalhost() throws Exception { List listTask = list().bucket(Property.ofValue(bucket)).build(); CountDownLatch queueCount = new CountDownLatch(1); + AtomicReference last = new AtomicReference<>(); - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - AtomicReference last = new AtomicReference<>(); - - executionQueue.addListener(execution -> { - if (execution.getFlowId().equals("s3-listen-localhost-force-path-style")) { - last.set(execution); - queueCount.countDown(); - } - }); - - upload("trigger/s3", bucket); - upload("trigger/s3", bucket); - - worker.run(); - scheduler.run(); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-localhost-force-path-style.yaml"))); - - boolean await = queueCount.await(15, TimeUnit.SECONDS); - try { - assertThat("trigger should work with localhost endpoint + forcePathStyle", await, is(true)); - } finally { - worker.shutdown(); + executionQueue.addListener(execution -> { + if (execution.getFlowId().equals("s3-listen-localhost-force-path-style")) { + last.set(execution); + queueCount.countDown(); } + }); - @SuppressWarnings("unchecked") - java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); - assertThat(trigger.size(), is(2)); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-localhost-force-path-style.yaml"))); - int remainingFilesOnBucket = listTask.run(runContext(listTask)) - .getObjects() - .size(); - assertThat(remainingFilesOnBucket, is(0)); - } + boolean await = queueCount.await(15, TimeUnit.SECONDS); + assertThat("trigger should work with localhost endpoint + forcePathStyle", await, is(true)); + + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + + assertThat(trigger.size(), is(2)); + + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(0)); } @Test diff --git a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java index 85c9753b..70123a24 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java @@ -2,7 +2,6 @@ import java.util.List; import java.util.Objects; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -10,29 +9,19 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.runners.FlowListeners; -import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.sqs.model.Message; -import io.kestra.scheduler.AbstractScheduler; -import io.kestra.worker.DefaultWorker; - -import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +@KestraTest(startRunner = true, startScheduler = true) class RealtimeTriggerTest extends AbstractSqsTest { - @Inject - private ApplicationContext applicationContext; - - @Inject - private FlowListeners flowListenersService; - @Inject private DispatchQueueInterface executionQueue; @@ -41,55 +30,39 @@ class RealtimeTriggerTest extends AbstractSqsTest { @Test void flow() throws Exception { - // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); - - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - // wait for execution - AtomicReference lastExecution = new AtomicReference<>(); - executionQueue.addListener(execution -> { - lastExecution.set(execution); - queueCount.countDown(); - assertThat(execution.getFlowId(), is("realtime")); - }); - - worker.run(); - scheduler.run(); - - repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/sqs/realtime.yaml"))); - - // publish two messages to trigger the flow - Publish task = Publish.builder() - .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) - .queueUrl(Property.ofValue(queueUrl())) - .region(Property.ofValue(localstack.getRegion())) - .accessKeyId(Property.ofValue(localstack.getAccessKey())) - .secretKeyId(Property.ofValue(localstack.getSecretKey())) - .from( - List.of( - Message.builder().data("Hello World").build() - ) + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); + queueCount.countDown(); + assertThat(execution.getFlowId(), is("realtime")); + }); + + repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/sqs/realtime.yaml"))); + + Publish task = Publish.builder() + .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.ofValue(queueUrl())) + .region(Property.ofValue(localstack.getRegion())) + .accessKeyId(Property.ofValue(localstack.getAccessKey())) + .secretKeyId(Property.ofValue(localstack.getSecretKey())) + .from( + List.of( + Message.builder().data("Hello World").build() ) - .build(); + ) + .build(); - var runContext = runContextFactory.of(); + var runContext = runContextFactory.of(); - task.run(runContext); + task.run(runContext); - boolean await = queueCount.await(1, TimeUnit.MINUTES); - assertThat(await, is(true)); + boolean await = queueCount.await(1, TimeUnit.MINUTES); + assertThat(await, is(true)); - Execution last = lastExecution.get(); - assertThat(last.getTrigger().getVariables().size(), is(1)); - assertThat(last.getTrigger().getVariables().get("data"), is("Hello World")); - } + Execution last = lastExecution.get(); + assertThat(last.getTrigger().getVariables().size(), is(1)); + assertThat(last.getTrigger().getVariables().get("data"), is("Hello World")); } } diff --git a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java index 043d0736..2e260d4e 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java @@ -2,7 +2,6 @@ import java.util.List; import java.util.Objects; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -10,31 +9,21 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; +import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.queues.DispatchQueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.runners.FlowListeners; import io.kestra.core.runners.RunContextFactory; -import io.kestra.jdbc.runner.JdbcScheduler; import io.kestra.plugin.aws.sqs.model.Message; -import io.kestra.scheduler.AbstractScheduler; -import io.kestra.worker.DefaultWorker; - -import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +@KestraTest(startRunner = true, startScheduler = true) class TriggerTest extends AbstractSqsTest { - @Inject - private ApplicationContext applicationContext; - - @Inject - private FlowListeners flowListenersService; - @Inject private DispatchQueueInterface executionQueue; @@ -46,58 +35,42 @@ class TriggerTest extends AbstractSqsTest { @Test void flow() throws Exception { - // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); - - // scheduler - DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, UUID.randomUUID().toString(), 8, null); - try ( - AbstractScheduler scheduler = new JdbcScheduler( - this.applicationContext, - this.flowListenersService - ) - ) { - // wait for execution - AtomicReference lastExecution = new AtomicReference<>(); - executionQueue.addListener(execution -> { - lastExecution.set(execution); - queueCount.countDown(); - assertThat(execution.getFlowId(), is("sqs-listen")); - }); - - worker.run(); - scheduler.run(); - - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/sqs/sqs-listen.yaml"))); - - // publish two messages to trigger the flow - Publish task = Publish.builder() - .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) - .queueUrl(Property.ofValue(queueUrl())) - .region(Property.ofValue(localstack.getRegion())) - .accessKeyId(Property.ofValue(localstack.getAccessKey())) - .secretKeyId(Property.ofValue(localstack.getSecretKey())) - .from( - List.of( - Message.builder().data("Hello World").build(), - Message.builder().data("Hello Kestra").delaySeconds(5).build() - ) + AtomicReference lastExecution = new AtomicReference<>(); + executionQueue.addListener(execution -> { + lastExecution.set(execution); + queueCount.countDown(); + assertThat(execution.getFlowId(), is("sqs-listen")); + }); + + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/sqs/sqs-listen.yaml"))); + + Publish task = Publish.builder() + .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.ofValue(queueUrl())) + .region(Property.ofValue(localstack.getRegion())) + .accessKeyId(Property.ofValue(localstack.getAccessKey())) + .secretKeyId(Property.ofValue(localstack.getSecretKey())) + .from( + List.of( + Message.builder().data("Hello World").build(), + Message.builder().data("Hello Kestra").delaySeconds(5).build() ) - .build(); + ) + .build(); - var runContext = runContextFactory.of(); + var runContext = runContextFactory.of(); - task.run(runContext); + task.run(runContext); - boolean await = queueCount.await(1, TimeUnit.MINUTES); - assertThat(await, is(true)); + boolean await = queueCount.await(1, TimeUnit.MINUTES); + assertThat(await, is(true)); - Execution last = lastExecution.get(); - var count = (Integer) last.getTrigger().getVariables().get("count"); - var uri = (String) last.getTrigger().getVariables().get("uri"); - assertThat(count, is(2)); - assertThat(uri, is(notNullValue())); - } + Execution last = lastExecution.get(); + var count = (Integer) last.getTrigger().getVariables().get("count"); + var uri = (String) last.getTrigger().getVariables().get("uri"); + assertThat(count, is(2)); + assertThat(uri, is(notNullValue())); } } From 50e7d82122ccd2990c729aee67e8cfec6935e3c5 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Thu, 7 May 2026 01:32:01 +0530 Subject: [PATCH 5/9] fix: v2 compatibility --- .../java/io/kestra/plugin/aws/cloudwatch/Trigger.java | 4 ++-- src/main/java/io/kestra/plugin/aws/s3/Trigger.java | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java b/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java index edbb7edc..9588fc9c 100644 --- a/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java @@ -37,12 +37,12 @@ namespace: company.team tasks: - id: each - type: io.kestra.plugin.core.flow.ForEach + type: io.kestra.plugin.core.flow.Loop values: "{{ trigger.series }}" tasks: - id: log type: io.kestra.plugin.core.log.Log - message: "Datapoint: {{ json(taskrun.value) }}" + message: "Datapoint: {{ json(item.value) }}" triggers: - id: watch diff --git a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java index 4bc07569..2ebe2584 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java @@ -46,12 +46,12 @@ tasks: - id: each - type: io.kestra.plugin.core.flow.ForEach + type: io.kestra.plugin.core.flow.Loop values: "{{ trigger.objects | jq('.[].uri') }}" tasks: - id: return type: io.kestra.plugin.core.debug.Return - format: "{{ taskrun.value }}" + format: "{{ item.value }}" triggers: - id: watch @@ -77,12 +77,12 @@ tasks: - id: each - type: io.kestra.plugin.core.flow.ForEach + type: io.kestra.plugin.core.flow.Loop values: "{{ trigger.objects | jq('.[].key') }}" tasks: - id: return type: io.kestra.plugin.core.debug.Return - format: "{{ taskrun.value }}" + format: "{{ item.value }}" - id: delete type: io.kestra.plugin.aws.s3.Delete @@ -90,7 +90,7 @@ secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" region: "eu-central-1" bucket: "my-bucket" - key: "{{ taskrun.value }}" + key: "{{ item.value }}" triggers: - id: watch From a857d284290c1a025ecd7b1820d16fd4d0a9b1a9 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Thu, 7 May 2026 14:48:41 +0530 Subject: [PATCH 6/9] fix: v2 compatibility --- src/test/resources/application.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 32ccf5a7..ad450b21 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -10,9 +10,9 @@ kestra: repository: type: memory -worker: - controllers: - type: STATIC - static: - endpoints: - - host: localhost + worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost \ No newline at end of file From eb7dc59bcb992aab110aea83989a60fb45fe76e0 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Thu, 7 May 2026 17:12:00 +0530 Subject: [PATCH 7/9] fix: v2 compatibility --- .../io/kestra/plugin/aws/s3/TriggerTest.java | 28 +++++++++++-------- .../io/kestra/plugin/aws/sqs/TriggerTest.java | 13 +++++---- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index 198c3441..798ddc12 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -2,22 +2,23 @@ import java.time.Duration; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.models.triggers.StatefulTriggerInterface; import io.kestra.core.queues.DispatchQueueInterface; -import io.kestra.core.repositories.LocalFlowRepositoryLoader; +import io.kestra.core.runners.Scheduler; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.aws.s3.models.S3Object; @@ -32,10 +33,13 @@ class TriggerTest extends AbstractTest { private DispatchQueueInterface executionQueue; @Inject - protected LocalFlowRepositoryLoader repositoryLoader; + protected Scheduler scheduler; @Test + @LoadFlows({"flows/s3/s3-listen.yaml"}) void deleteAction() throws Exception { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> scheduler.isActive()); + String bucket = "trigger-test"; this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); @@ -53,9 +57,7 @@ void deleteAction() throws Exception { upload("trigger/s3", bucket); upload("trigger/s3", bucket); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); - - boolean await = queueCount.await(10, TimeUnit.SECONDS); + boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat(await, is(true)); @SuppressWarnings("unchecked") @@ -70,7 +72,10 @@ void deleteAction() throws Exception { } @Test + @LoadFlows({"flows/s3/s3-listen-none-action.yaml"}) void noneAction() throws Exception { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> scheduler.isActive()); + String bucket = "trigger-none-action-test"; this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); @@ -87,9 +92,7 @@ void noneAction() throws Exception { upload("trigger/s3", bucket); upload("trigger/s3", bucket); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); - - boolean await = queueCount.await(10, TimeUnit.SECONDS); + boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat(await, is(true)); @SuppressWarnings("unchecked") @@ -104,7 +107,10 @@ void noneAction() throws Exception { } @Test + @LoadFlows({"flows/s3/s3-listen-localhost-force-path-style.yaml"}) void forcePathStyleWithSimpleLocalhost() throws Exception { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> scheduler.isActive()); + String bucket = "trigger-force-path-style-test"; this.createBucket(bucket); List listTask = list().bucket(Property.ofValue(bucket)).build(); @@ -122,9 +128,7 @@ void forcePathStyleWithSimpleLocalhost() throws Exception { upload("trigger/s3", bucket); upload("trigger/s3", bucket); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-localhost-force-path-style.yaml"))); - - boolean await = queueCount.await(15, TimeUnit.SECONDS); + boolean await = queueCount.await(1, TimeUnit.MINUTES); assertThat("trigger should work with localhost endpoint + forcePathStyle", await, is(true)); @SuppressWarnings("unchecked") diff --git a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java index 2e260d4e..7587ab79 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java @@ -1,20 +1,22 @@ package io.kestra.plugin.aws.sqs; +import java.time.Duration; import java.util.List; -import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.property.Property; import io.kestra.core.queues.DispatchQueueInterface; -import io.kestra.core.repositories.LocalFlowRepositoryLoader; import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.runners.Scheduler; import io.kestra.plugin.aws.sqs.model.Message; import jakarta.inject.Inject; @@ -28,13 +30,16 @@ class TriggerTest extends AbstractSqsTest { private DispatchQueueInterface executionQueue; @Inject - protected LocalFlowRepositoryLoader repositoryLoader; + protected Scheduler scheduler; @Inject protected RunContextFactory runContextFactory; @Test + @LoadFlows({"flows/sqs/sqs-listen.yaml"}) void flow() throws Exception { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> scheduler.isActive()); + CountDownLatch queueCount = new CountDownLatch(1); AtomicReference lastExecution = new AtomicReference<>(); executionQueue.addListener(execution -> { @@ -43,8 +48,6 @@ void flow() throws Exception { assertThat(execution.getFlowId(), is("sqs-listen")); }); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/sqs/sqs-listen.yaml"))); - Publish task = Publish.builder() .endpointOverride(Property.ofValue(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) .queueUrl(Property.ofValue(queueUrl())) From ae352e909601cb7acd9f21519eb5d68d869eac96 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Thu, 7 May 2026 17:28:46 +0530 Subject: [PATCH 8/9] fix: v2 compatibility --- src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java b/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java index 9588fc9c..3a516c0a 100644 --- a/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/cloudwatch/Trigger.java @@ -42,7 +42,7 @@ tasks: - id: log type: io.kestra.plugin.core.log.Log - message: "Datapoint: {{ json(item.value) }}" + message: "Datapoint: {{ fromJson(item.value) }}" triggers: - id: watch From 3cb91669c05992fbcf3b1f87e25ee6fec6497bd1 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Sun, 10 May 2026 14:38:54 +0530 Subject: [PATCH 9/9] fix: v2 compatibility --- src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java b/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java index 1ca3b621..e88e4671 100644 --- a/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java +++ b/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java @@ -14,8 +14,8 @@ import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.aws.AbstractLocalStackTest; -import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions; import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput; +import io.kestra.plugin.scripts.runner.docker.Docker; import jakarta.inject.Inject; @@ -36,8 +36,9 @@ void run() throws Exception { AwsCLI execute = AwsCLI.builder() .id(IdUtils.create()) .type(AwsCLI.class.getName()) - .docker( - DockerOptions.builder() + .taskRunner( + Docker.builder() + .type(Docker.class.getName()) // needed to be able to reach localstack from inside the container .networkMode("host") .image("amazon/aws-cli")