Skip to content
8 changes: 1 addition & 7 deletions packages/core/src/crawlers/statistics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,14 +330,8 @@ export class Statistics {

this.log.debug('Persisting state', { persistStateKey: this.persistStateKey });

// use half the interval of `persistState` to avoid race conditions
const persistStateIntervalMillis = serviceLocator.getConfiguration().persistStateIntervalMillis;
const timeoutSecs = persistStateIntervalMillis / 2_000;
await this.keyValueStore
.setValue(this.persistStateKey, this.toJSON(), {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a drastic change, but as part of #3652, we will unify and re-introduce the timeout handling.

timeoutSecs,
doNotRetryTimeouts: true,
})
.setValue(this.persistStateKey, this.toJSON())
.catch((error) =>
this.log.warning(`Failed to persist the statistics to ${this.persistStateKey}`, { error }),
);
Expand Down
8 changes: 1 addition & 7 deletions packages/core/src/session_pool/session_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,8 @@ export class SessionPool extends EventEmitter {
persistStateKey: this.persistStateKey,
});

// use half the interval of `persistState` to avoid race conditions
const persistStateIntervalMillis = serviceLocator.getConfiguration().persistStateIntervalMillis;
const timeoutSecs = persistStateIntervalMillis / 2_000;
await this.keyValueStore
?.setValue(this.persistStateKey, await this.getState(), {
timeoutSecs,
doNotRetryTimeouts: true,
})
?.setValue(this.persistStateKey, await this.getState())
.catch((error) =>
this.log.warning(`Failed to persist the session pool stats to ${this.persistStateKey}`, { error }),
);
Expand Down
173 changes: 83 additions & 90 deletions packages/core/src/storages/dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import type { DatasetClient, DatasetInfo, Dictionary, PaginatedList } from '@cra
import { stringify } from 'csv-stringify/sync';
import ow from 'ow';

import { MAX_PAYLOAD_SIZE_BYTES } from '@apify/consts';

import { Configuration } from '../configuration.js';
import type { CrawleeLogger } from '../log.js';
import { serviceLocator } from '../service_locator.js';
Expand All @@ -13,41 +11,33 @@ import { KeyValueStore } from './key_value_store.js';
import type { StorageIdentifier } from './storage_instance_manager.js';
import type { StorageOpenOptions } from './utils.js';
import { resolveStorageIdentifier } from './storage_instance_manager.js';
import { purgeDefaultStorages } from './utils.js';
import { createDualIterable, purgeDefaultStorages } from './utils.js';

/** @internal */
export const DATASET_ITERATORS_DEFAULT_LIMIT = 10000;

const SAFETY_BUFFER_PERCENT = 0.01 / 100; // 0.01%

/**
* Accepts a JSON serializable object as an input, validates its serializability,
* and validates its serialized size against limitBytes. Optionally accepts its index
* in an array to provide better error messages. Returns serialized object.
* Validates that the given value is a plain JSON-serializable object
* (not an array, not a primitive, not circular).
*
* @param item The value to validate.
* @param index Optional index for error messages when validating inside an array.
* @ignore
*/
export function checkAndSerialize<T>(item: T, limitBytes: number, index?: number): string {
export function assertJsonSerializable<T>(item: T, index?: number): void {
const s = typeof index === 'number' ? ` at index ${index} ` : ' ';
const isItemObject = item && typeof item === 'object' && !Array.isArray(item);

if (!isItemObject) {
throw new Error(`Data item${s}is not an object. You can push only objects into a dataset.`);
}

let payload;
try {
payload = JSON.stringify(item);
JSON.stringify(item);
} catch (e) {
const err = e as Error;
throw new Error(`Data item${s}is not serializable to JSON.\nCause: ${err.message}`);
}

const bytes = Buffer.byteLength(payload);
if (bytes > limitBytes) {
throw new Error(`Data item${s}is too large (size: ${bytes} bytes, limit: ${limitBytes} bytes)`);
}

return payload;
}

/**
Expand Down Expand Up @@ -260,44 +250,21 @@ export class Dataset<Data extends Dictionary = Dictionary> {
* **IMPORTANT**: Make sure to use the `await` keyword when calling `pushData()`,
* otherwise the crawler process might finish before the data is stored!
*
* The size of the data is limited by the receiving API and therefore `pushData()` will only
* allow objects whose JSON representation is smaller than 9MB. When an array is passed,
* none of the included objects
* may be larger than 9MB, but the array itself may be of any size.
*
* The function internally
* chunks the array into separate items and pushes them sequentially.
* The chunking process is stable (keeps order of data), but it does not provide a transaction
* safety mechanism. Therefore, in the event of an uploading error (after several automatic retries),
* the function's Promise will reject and the dataset will be left in a state where some of
* the items have already been saved to the dataset while other items from the source array were not.
* To overcome this limitation, the developer may, for example, read the last item saved in the dataset
* and re-attempt the save of the data from this item onwards to prevent duplicates.
* @param data Object or array of objects containing data to be stored in the default dataset.
* The objects must be serializable to JSON and the JSON representation of each object must be smaller than 9MB.
* The objects must be serializable to JSON.
*/
async pushData(data: Data | Data[]): Promise<void> {
checkStorageAccess();

ow(data, 'data', ow.object);
const dispatch = async (payload: string) => this.client.pushItems(payload);
const limit = MAX_PAYLOAD_SIZE_BYTES - Math.ceil(MAX_PAYLOAD_SIZE_BYTES * SAFETY_BUFFER_PERCENT);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling of oversize payloads will be reintroduced in SDK as a part of apify/apify-sdk-js#603


// Handle singular Objects
if (!Array.isArray(data)) {
const payload = checkAndSerialize(data, limit);
await dispatch(payload);
return;
}

// Handle Arrays
const payloads = data.map((item, index) => checkAndSerialize(item, limit, index));
const chunks = chunkBySize(payloads, limit);

// Invoke client in series to preserve order of data
for (const chunk of chunks) {
await dispatch(chunk);
// Normalize to array and validate each item
const items = Array.isArray(data) ? data : [data];
for (let i = 0; i < items.length; i++) {
assertJsonSerializable(items[i], i);
}

await this.client.pushData(items);
}

/**
Expand All @@ -307,7 +274,7 @@ export class Dataset<Data extends Dictionary = Dictionary> {
checkStorageAccess();

try {
return await this.client.listItems(options);
return await this.client.getData(options);
} catch (e) {
const error = e as Error;
if (error.message.includes('Cannot create a string longer than')) {
Expand All @@ -328,22 +295,9 @@ export class Dataset<Data extends Dictionary = Dictionary> {

const items: Data[] = [];

const fetchNextChunk = async (offset = 0): Promise<void> => {
const limit = 1000;
const value = await this.client.listItems({ offset, limit, ...options });

if (value.count === 0) {
return;
}

items.push(...value.items);

if (value.total > offset + value.count) {
await fetchNextChunk(offset + value.count);
}
};

await fetchNextChunk();
for await (const page of this.fetchPages(options)) {
items.push(...page.items);
}

return items;
}
Expand Down Expand Up @@ -602,60 +556,99 @@ export class Dataset<Data extends Dictionary = Dictionary> {
return currentMemo;
}

private async *fetchEntryPages(options: DatasetIteratorOptions): AsyncGenerator<PaginatedList<[number, Data]>> {
let index = options.offset ?? 0;
for await (const page of this.fetchPages(options)) {
yield {
...page,
items: page.items.map((item) => [index++, item] as [number, Data]),
};
}
}

private async *fetchPages(
options: DatasetIteratorOptions,
pageSize = DATASET_ITERATORS_DEFAULT_LIMIT,
): AsyncGenerator<PaginatedList<Data>> {
let offset = options.offset ?? 0;
const totalLimit = options.limit;
let yielded = 0;

while (true) {
const fetchLimit = totalLimit !== undefined ? Math.min(pageSize, totalLimit - yielded) : pageSize;
if (fetchLimit <= 0) break;

const page = await this.client.getData({ ...options, offset, limit: fetchLimit });
yield page;

yielded += page.items.length;
if (page.items.length < fetchLimit || offset + page.items.length >= page.total) break;
offset += page.items.length;
}
}

/**
* Iterates over dataset items using an async generator,
* allowing the use of `for await...of` syntax.
* Returns dataset items.
*
* When awaited (`await dataset.values()`), returns all items as a flat `Data[]` array.
* When used as an async iterable (`for await...of`), streams all items across pages
* without buffering everything in memory.
*
* **Example usage:**
* ```javascript
* const dataset = await Dataset.open('my-results');
*
* // Stream all items (memory-efficient for large datasets)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

"stream" all items

This might be slightly confusing for some (we're async iterating over the items, but loading the actual items is "blocking" - you have to load the entire item from the storage before processing it)

* for await (const item of dataset.values()) {
* console.log(item);
* }
*
* // Or fetch all items at once
* const items = await dataset.values();
* console.log(items);
* ```
*
* @param options Options for the iteration.
*/
values(options: DatasetIteratorOptions = {}): AsyncIterable<Data> & Promise<PaginatedList<Data>> {
values(options: DatasetIteratorOptions = {}): AsyncIterable<Data> & Promise<Data[]> {
checkStorageAccess();

const result = this.client.listItems(options) as AsyncIterable<Data> & Promise<PaginatedList<Data>>;

if (!(Symbol.asyncIterator in result)) {
Object.defineProperty(result, Symbol.asyncIterator, {
get() {
throw new Error('Resource client "listItems" method does not return an async iterable.');
},
});
}

return result;
return createDualIterable({
createPages: () => this.fetchPages(options),
extractItems: (page) => page.items,
});
Comment on lines +616 to +619
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that await Dataset.values() will return the first page (of size dependent on the storage client implementation, e.g., 1000)

Let's assume a dataset of size, e.g., 3000

for await (const value of Dataset.values()) {
     // iterates over all the 3000 items 
}

for (const value of await Array.fromAsync(Dataset.values())) {
     // blocks while fetching the entire dataset, then iterates over all the 3000 items 
}

for (const value of await Dataset.values()) {
    // explodes, returns a { items: [1000 items], limit: 1000, offset: 0, total: 3000 }
}

This imo breaks the whole idea of the dual iterables. Fetching pages can be done (and has historically been done) with getData. imo the values / keys / entries methods should hide the backend pagination and allow the user to work directly with the data.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, just to make sure what you're proposing:

  • Dataset.getData - one page of backend-dependent length, in a wrapper container with some metadata, no dual iterator
  • Dataset.export - all items in the dataset, as a simple raw array
  • Dataset.values - dual iterator, just items (Data[]), no metadata
  • Dataset.entries - same as the above, but with indexes
  • KeyValueStore.keys, KeyValueStore.values, KeyValueStore.entries - dual iterator, no metadata
  • all dual iterators follow the "await to get one page, iterate to go through everything" paradigm

Or did I miss the point completely?

Copy link
Copy Markdown
Member

@barjin barjin May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with all points except for the last one

all dual iterators follow the "await to get one page, iterate to go through everything" paradigm

Imo awaiting the dual iterators should return all the items (same as Dataset.export)... the need for pagination comes from Apify API, Crawlee users imo shouldn't care for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two things that led me to this conclusion - the result of KeyValueStore.values() will easily be very big and allowing pulling it into memory with a single await feels like a major footgun to me. And if we make one dual iterator return a single page, I'd prefer to keep it consistent accross all of them.

Which of these two do you consider worth breaking? 😁

Copy link
Copy Markdown
Member

@barjin barjin May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd break the first one, 100% 😄

Imo if a user drops tons of data in the KVS, then calls console.log(JSON.stringify(await kvs.values()))... it's on them, and we shouldn't stop them from doing this. The performance flop will be fully their responsibility (and quite obvious to spot).

Imo pagination has its place in the lower abstraction levels (Apify API, AWS API, etc.), but Crawlee shouldn't leak this (Dataset.getData I'm willing to accept, as the name is rather technical.) Key-Value store and Map are well-known concepts, and I don't think many ppl connect these with pagination (which might be, therefore, surprising for them).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's do it this way, accepting the footgun. 6e70953 (this PR)

}

/**
* Iterates over dataset entries (index-value pairs) using an async generator,
* allowing the use of `for await...of` syntax.
* Returns dataset entries (index-value pairs).
*
* When awaited (`await dataset.entries()`), returns all entries as a flat `[index, item][]` array.
* When used as an async iterable (`for await...of`), streams all entries across pages
* without buffering everything in memory.
*
* **Example usage:**
* ```javascript
* const dataset = await Dataset.open('my-results');
*
* // Stream all entries
* for await (const [index, item] of dataset.entries()) {
* console.log(`Item at ${index}: ${JSON.stringify(item)}`);
* }
*
* // Or fetch all at once
* const entries = await dataset.entries();
* console.log(entries);
* ```
*
* @param options Options for the iteration.
*/
entries(
options: DatasetIteratorOptions = {},
): AsyncIterable<[number, Data]> & Promise<PaginatedList<[number, Data]>> {
entries(options: DatasetIteratorOptions = {}): AsyncIterable<[number, Data]> & Promise<[number, Data][]> {
checkStorageAccess();

if (!this.client.listEntries) {
throw new Error('Resource client is missing the "listEntries" method.');
}

return this.client.listEntries(options);
return createDualIterable({
createPages: () => this.fetchEntryPages(options),
extractItems: (page) => page.items,
});
}

/**
Expand All @@ -681,7 +674,7 @@ export class Dataset<Data extends Dictionary = Dictionary> {
async drop(): Promise<void> {
checkStorageAccess();

await this.client.delete();
await this.client.drop();
serviceLocator.getStorageInstanceManager().removeFromCache(this);
}

Expand Down
Loading
Loading