Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=2.5.1-SNAPSHOT
kestraVersion=1.3.13
kestraVersion=1.3.19
4 changes: 2 additions & 2 deletions src/main/java/io/kestra/plugin/aws/athena/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Metric;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Output;
Expand All @@ -34,7 +35,6 @@
import reactor.core.publisher.Flux;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import io.kestra.core.models.annotations.PluginProperty;

/**
* This Query task is built with the Athena SDK, more info can be found here: https://docs.aws.amazon.com/athena/latest/ug/code-samples.html.
Expand Down Expand Up @@ -323,7 +323,7 @@ private Pair<URI, Long> store(List<ColumnInfo> columnInfo, List<Row> results, Ru

File tempFile = runContext.workingDir().createTempFile(".ion").toFile();

try (var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) {
try (var output = new BufferedOutputStream(new FileOutputStream(tempFile), FileSerde.BUFFER_SIZE)) {
Long count = FileSerde.writeAll(output, Flux.fromIterable(results).mapNotNull(row -> map(columnInfo, row))).block();

return Pair.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.commons.lang3.tuple.Pair;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.common.FetchOutput;
Expand All @@ -32,7 +33,6 @@
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

import static io.kestra.core.utils.Rethrow.throwConsumer;
import io.kestra.core.models.annotations.PluginProperty;

@SuperBuilder
@ToString
Expand Down Expand Up @@ -154,7 +154,7 @@ protected FetchOutput fetchOutputs(List<Map<String, AttributeValue>> items, Fetc
private Pair<URI, Long> store(RunContext runContext, List<Map<String, AttributeValue>> items) throws IOException {
File tempFile = runContext.workingDir().createTempFile(".ion").toFile();

try (var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) {
try (var output = new BufferedOutputStream(new FileOutputStream(tempFile), FileSerde.BUFFER_SIZE)) {
var flux = Flux.fromIterable(items).map(attributes -> objectMapFrom(attributes));
Long count = FileSerde.writeAll(output, flux).block();
return Pair.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private List<Entry> readEntryList(RunContext runContext, Object entries) throws
if (!from.getScheme().equals("kestra")) {
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entries.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
try (var inputStream = new BufferedInputStream(runContext.storage().getFile(from), FileSerde.BUFFER_SIZE)) {
return FileSerde.readAll(inputStream, Entry.class)
.collectList().block();
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private List<Record> getRecordList(Object records, RunContext runContext) throws
if (!from.getScheme().equals("kestra")) {
throw new IllegalArgumentException("Invalid records parameter, must be a Kestra internal storage URI, or a list of records.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
try (var inputStream = new BufferedInputStream(runContext.storage().getFile(from), FileSerde.BUFFER_SIZE)) {
return FileSerde.readAll(inputStream, Record.class)
.collectList().block();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.kestra.plugin.aws.eventbridge;

import java.io.BufferedReader;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -43,7 +42,7 @@ private static List<PutEvents.OutputEntry> getOutputEntries(PutEvents put, RunCo
if (!from.getScheme().equals("kestra")) {
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
try (var inputStream = new BufferedInputStream(runContext.storage().getFile(from), FileSerde.BUFFER_SIZE)) {
outputEntries = FileSerde.readAll(inputStream, PutEvents.OutputEntry.class).collectList().block();
}
return outputEntries;
Expand Down
7 changes: 3 additions & 4 deletions src/test/java/io/kestra/plugin/aws/kinesis/ConsumeTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.kestra.plugin.aws.kinesis;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.BufferedInputStream;
import java.net.URI;
import java.time.Instant;
import java.util.List;
Expand All @@ -20,8 +19,8 @@

class ConsumeTest extends AbstractKinesisTest {
private static List<Consume.ConsumedRecord> loadOutput(RunContext ctx, URI uri) throws Exception {
try (BufferedReader r = new BufferedReader(new InputStreamReader(ctx.storage().getFile(uri)))) {
return FileSerde.readAll(r, Consume.ConsumedRecord.class).collectList().block();
try (var inputStream = new BufferedInputStream(ctx.storage().getFile(uri), FileSerde.BUFFER_SIZE)) {
return FileSerde.readAll(inputStream, Consume.ConsumedRecord.class).collectList().block();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.kestra.plugin.aws.kinesis;

import java.io.BufferedReader;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.List;

Expand Down Expand Up @@ -85,7 +84,7 @@ private static List<PutRecords.OutputEntry> getOutputEntries(PutRecords put, Run
if (!from.getScheme().equals("kestra")) {
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
try (var inputStream = new BufferedInputStream(runContext.storage().getFile(from), FileSerde.BUFFER_SIZE)) {
outputEntries = FileSerde.readAll(inputStream, PutRecords.OutputEntry.class).collectList().block();
}
return outputEntries;
Expand Down
Loading