diff --git a/gradle.properties b/gradle.properties index 5f27b3ac..f2c081dc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ version=2.5.1-SNAPSHOT -kestraVersion=1.3.13 +kestraVersion=1.3.19 diff --git a/src/main/java/io/kestra/plugin/aws/athena/Query.java b/src/main/java/io/kestra/plugin/aws/athena/Query.java index f719d7e7..d4a0cc84 100644 --- a/src/main/java/io/kestra/plugin/aws/athena/Query.java +++ b/src/main/java/io/kestra/plugin/aws/athena/Query.java @@ -17,6 +17,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Metric; import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.Output; @@ -34,7 +35,6 @@ import reactor.core.publisher.Flux; import software.amazon.awssdk.services.athena.AthenaClient; import software.amazon.awssdk.services.athena.model.*; -import io.kestra.core.models.annotations.PluginProperty; /** * This Query task is built with the Athena SDK, more info can be found here: https://docs.aws.amazon.com/athena/latest/ug/code-samples.html. @@ -323,7 +323,7 @@ private Pair store(List columnInfo, List results, Ru File tempFile = runContext.workingDir().createTempFile(".ion").toFile(); - try (var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) { + try (var output = new BufferedOutputStream(new FileOutputStream(tempFile), FileSerde.BUFFER_SIZE)) { Long count = FileSerde.writeAll(output, Flux.fromIterable(results).mapNotNull(row -> map(columnInfo, row))).block(); return Pair.of( diff --git a/src/main/java/io/kestra/plugin/aws/dynamodb/AbstractDynamoDb.java b/src/main/java/io/kestra/plugin/aws/dynamodb/AbstractDynamoDb.java index 7c0a9a9a..d95257b1 100644 --- a/src/main/java/io/kestra/plugin/aws/dynamodb/AbstractDynamoDb.java +++ b/src/main/java/io/kestra/plugin/aws/dynamodb/AbstractDynamoDb.java @@ -11,6 +11,7 @@ import org.apache.commons.lang3.tuple.Pair; import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.common.FetchOutput; @@ -32,7 +33,6 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import static io.kestra.core.utils.Rethrow.throwConsumer; -import io.kestra.core.models.annotations.PluginProperty; @SuperBuilder @ToString @@ -154,7 +154,7 @@ protected FetchOutput fetchOutputs(List> items, Fetc private Pair store(RunContext runContext, List> items) throws IOException { File tempFile = runContext.workingDir().createTempFile(".ion").toFile(); - try (var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) { + try (var output = new BufferedOutputStream(new FileOutputStream(tempFile), FileSerde.BUFFER_SIZE)) { var flux = Flux.fromIterable(items).map(attributes -> objectMapFrom(attributes)); Long count = FileSerde.writeAll(output, flux).block(); return Pair.of( diff --git a/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java b/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java index d9fa8377..45b5f039 100644 --- a/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java +++ b/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java @@ -218,7 +218,7 @@ private List readEntryList(RunContext runContext, Object entries) throws if (!from.getScheme().equals("kestra")) { throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entries."); } - try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) { + try (var inputStream = new BufferedInputStream(runContext.storage().getFile(from), FileSerde.BUFFER_SIZE)) { return FileSerde.readAll(inputStream, Entry.class) .collectList().block(); } diff --git a/src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java b/src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java index 7a499db0..8c454751 100644 --- a/src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java +++ b/src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java @@ -214,7 +214,7 @@ private List getRecordList(Object records, RunContext runContext) throws if (!from.getScheme().equals("kestra")) { throw new IllegalArgumentException("Invalid records parameter, must be a Kestra internal storage URI, or a list of records."); } - try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) { + try (var inputStream = new BufferedInputStream(runContext.storage().getFile(from), FileSerde.BUFFER_SIZE)) { return FileSerde.readAll(inputStream, Record.class) .collectList().block(); } diff --git a/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java b/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java index 06b78b1c..00c87ad1 100644 --- a/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java +++ b/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java @@ -1,9 +1,8 @@ package io.kestra.plugin.aws.eventbridge; -import java.io.BufferedReader; +import java.io.BufferedInputStream; import java.io.File; import java.io.FileOutputStream; -import java.io.InputStreamReader; import java.net.URI; import java.util.List; import java.util.Map; @@ -43,7 +42,7 @@ private static List getOutputEntries(PutEvents put, RunCo if (!from.getScheme().equals("kestra")) { throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry."); } - try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) { + try (var inputStream = new BufferedInputStream(runContext.storage().getFile(from), FileSerde.BUFFER_SIZE)) { outputEntries = FileSerde.readAll(inputStream, PutEvents.OutputEntry.class).collectList().block(); } return outputEntries; diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/ConsumeTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/ConsumeTest.java index 7e642787..9f668e14 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/ConsumeTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/ConsumeTest.java @@ -1,7 +1,6 @@ package io.kestra.plugin.aws.kinesis; -import java.io.BufferedReader; -import java.io.InputStreamReader; +import java.io.BufferedInputStream; import java.net.URI; import java.time.Instant; import java.util.List; @@ -20,8 +19,8 @@ class ConsumeTest extends AbstractKinesisTest { private static List loadOutput(RunContext ctx, URI uri) throws Exception { - try (BufferedReader r = new BufferedReader(new InputStreamReader(ctx.storage().getFile(uri)))) { - return FileSerde.readAll(r, Consume.ConsumedRecord.class).collectList().block(); + try (var inputStream = new BufferedInputStream(ctx.storage().getFile(uri), FileSerde.BUFFER_SIZE)) { + return FileSerde.readAll(inputStream, Consume.ConsumedRecord.class).collectList().block(); } } diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java index e674ee0b..a985c24d 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java @@ -1,9 +1,8 @@ package io.kestra.plugin.aws.kinesis; -import java.io.BufferedReader; +import java.io.BufferedInputStream; import java.io.File; import java.io.FileOutputStream; -import java.io.InputStreamReader; import java.net.URI; import java.util.List; @@ -85,7 +84,7 @@ private static List getOutputEntries(PutRecords put, Run if (!from.getScheme().equals("kestra")) { throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry."); } - try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) { + try (var inputStream = new BufferedInputStream(runContext.storage().getFile(from), FileSerde.BUFFER_SIZE)) { outputEntries = FileSerde.readAll(inputStream, PutRecords.OutputEntry.class).collectList().block(); } return outputEntries;