diff --git a/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java b/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java index 1af66d6..4368803 100644 --- a/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java +++ b/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java @@ -132,13 +132,14 @@ public void collect(Dataset batchDF, Long batchId, List postB if (!skipLimiting && limit > 0) { orderedDs = orderedDs.limit(limit); } - + List collected = orderedDs.collectAsList(); + Dataset createdDsFromCollected = SparkSession.builder().getOrCreate().createDataFrame(collected, this.inputSchema); Dataset current; if (this.savedDs == null) { - current = orderedDs; + current = createdDsFromCollected; } else { - current = savedDs.union(orderedDs); + current = savedDs.union(createdDsFromCollected); } current = orderDataset(current);