-
Notifications
You must be signed in to change notification settings - Fork 1.4k
refactor!: Align DatasetClient and KeyValueStoreClient interfaces with their Python counterparts
#3627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v4
Are you sure you want to change the base?
refactor!: Align DatasetClient and KeyValueStoreClient interfaces with their Python counterparts
#3627
Changes from all commits
1e349f5
f724635
1786bcd
b999174
07d6793
76c2543
1d6129b
9469b2e
10ee5a4
6e70953
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,6 @@ import type { DatasetClient, DatasetInfo, Dictionary, PaginatedList } from '@cra | |
| import { stringify } from 'csv-stringify/sync'; | ||
| import ow from 'ow'; | ||
|
|
||
| import { MAX_PAYLOAD_SIZE_BYTES } from '@apify/consts'; | ||
|
|
||
| import { Configuration } from '../configuration.js'; | ||
| import type { CrawleeLogger } from '../log.js'; | ||
| import { serviceLocator } from '../service_locator.js'; | ||
|
|
@@ -13,41 +11,33 @@ import { KeyValueStore } from './key_value_store.js'; | |
| import type { StorageIdentifier } from './storage_instance_manager.js'; | ||
| import type { StorageOpenOptions } from './utils.js'; | ||
| import { resolveStorageIdentifier } from './storage_instance_manager.js'; | ||
| import { purgeDefaultStorages } from './utils.js'; | ||
| import { createDualIterable, purgeDefaultStorages } from './utils.js'; | ||
|
|
||
| /** @internal */ | ||
| export const DATASET_ITERATORS_DEFAULT_LIMIT = 10000; | ||
|
|
||
| const SAFETY_BUFFER_PERCENT = 0.01 / 100; // 0.01% | ||
|
|
||
| /** | ||
| * Accepts a JSON serializable object as an input, validates its serializability, | ||
| * and validates its serialized size against limitBytes. Optionally accepts its index | ||
| * in an array to provide better error messages. Returns serialized object. | ||
| * Validates that the given value is a plain JSON-serializable object | ||
| * (not an array, not a primitive, not circular). | ||
| * | ||
| * @param item The value to validate. | ||
| * @param index Optional index for error messages when validating inside an array. | ||
| * @ignore | ||
| */ | ||
| export function checkAndSerialize<T>(item: T, limitBytes: number, index?: number): string { | ||
| export function assertJsonSerializable<T>(item: T, index?: number): void { | ||
| const s = typeof index === 'number' ? ` at index ${index} ` : ' '; | ||
| const isItemObject = item && typeof item === 'object' && !Array.isArray(item); | ||
|
|
||
| if (!isItemObject) { | ||
| throw new Error(`Data item${s}is not an object. You can push only objects into a dataset.`); | ||
| } | ||
|
|
||
| let payload; | ||
| try { | ||
| payload = JSON.stringify(item); | ||
| JSON.stringify(item); | ||
| } catch (e) { | ||
| const err = e as Error; | ||
| throw new Error(`Data item${s}is not serializable to JSON.\nCause: ${err.message}`); | ||
| } | ||
|
|
||
| const bytes = Buffer.byteLength(payload); | ||
| if (bytes > limitBytes) { | ||
| throw new Error(`Data item${s}is too large (size: ${bytes} bytes, limit: ${limitBytes} bytes)`); | ||
| } | ||
|
|
||
| return payload; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -260,44 +250,21 @@ export class Dataset<Data extends Dictionary = Dictionary> { | |
| * **IMPORTANT**: Make sure to use the `await` keyword when calling `pushData()`, | ||
| * otherwise the crawler process might finish before the data is stored! | ||
| * | ||
| * The size of the data is limited by the receiving API and therefore `pushData()` will only | ||
| * allow objects whose JSON representation is smaller than 9MB. When an array is passed, | ||
| * none of the included objects | ||
| * may be larger than 9MB, but the array itself may be of any size. | ||
| * | ||
| * The function internally | ||
| * chunks the array into separate items and pushes them sequentially. | ||
| * The chunking process is stable (keeps order of data), but it does not provide a transaction | ||
| * safety mechanism. Therefore, in the event of an uploading error (after several automatic retries), | ||
| * the function's Promise will reject and the dataset will be left in a state where some of | ||
| * the items have already been saved to the dataset while other items from the source array were not. | ||
| * To overcome this limitation, the developer may, for example, read the last item saved in the dataset | ||
| * and re-attempt the save of the data from this item onwards to prevent duplicates. | ||
| * @param data Object or array of objects containing data to be stored in the default dataset. | ||
| * The objects must be serializable to JSON and the JSON representation of each object must be smaller than 9MB. | ||
| * The objects must be serializable to JSON. | ||
| */ | ||
| async pushData(data: Data | Data[]): Promise<void> { | ||
| checkStorageAccess(); | ||
|
|
||
| ow(data, 'data', ow.object); | ||
| const dispatch = async (payload: string) => this.client.pushItems(payload); | ||
| const limit = MAX_PAYLOAD_SIZE_BYTES - Math.ceil(MAX_PAYLOAD_SIZE_BYTES * SAFETY_BUFFER_PERCENT); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handling of oversize payloads will be reintroduced in SDK as a part of apify/apify-sdk-js#603 |
||
|
|
||
| // Handle singular Objects | ||
| if (!Array.isArray(data)) { | ||
| const payload = checkAndSerialize(data, limit); | ||
| await dispatch(payload); | ||
| return; | ||
| } | ||
|
|
||
| // Handle Arrays | ||
| const payloads = data.map((item, index) => checkAndSerialize(item, limit, index)); | ||
| const chunks = chunkBySize(payloads, limit); | ||
|
|
||
| // Invoke client in series to preserve order of data | ||
| for (const chunk of chunks) { | ||
| await dispatch(chunk); | ||
| // Normalize to array and validate each item | ||
| const items = Array.isArray(data) ? data : [data]; | ||
| for (let i = 0; i < items.length; i++) { | ||
| assertJsonSerializable(items[i], i); | ||
| } | ||
|
|
||
| await this.client.pushData(items); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -307,7 +274,7 @@ export class Dataset<Data extends Dictionary = Dictionary> { | |
| checkStorageAccess(); | ||
|
|
||
| try { | ||
| return await this.client.listItems(options); | ||
| return await this.client.getData(options); | ||
| } catch (e) { | ||
| const error = e as Error; | ||
| if (error.message.includes('Cannot create a string longer than')) { | ||
|
|
@@ -328,22 +295,9 @@ export class Dataset<Data extends Dictionary = Dictionary> { | |
|
|
||
| const items: Data[] = []; | ||
|
|
||
| const fetchNextChunk = async (offset = 0): Promise<void> => { | ||
| const limit = 1000; | ||
| const value = await this.client.listItems({ offset, limit, ...options }); | ||
|
|
||
| if (value.count === 0) { | ||
| return; | ||
| } | ||
|
|
||
| items.push(...value.items); | ||
|
|
||
| if (value.total > offset + value.count) { | ||
| await fetchNextChunk(offset + value.count); | ||
| } | ||
| }; | ||
|
|
||
| await fetchNextChunk(); | ||
| for await (const page of this.fetchPages(options)) { | ||
| items.push(...page.items); | ||
| } | ||
|
|
||
| return items; | ||
| } | ||
|
|
@@ -602,60 +556,99 @@ export class Dataset<Data extends Dictionary = Dictionary> { | |
| return currentMemo; | ||
| } | ||
|
|
||
| private async *fetchEntryPages(options: DatasetIteratorOptions): AsyncGenerator<PaginatedList<[number, Data]>> { | ||
| let index = options.offset ?? 0; | ||
| for await (const page of this.fetchPages(options)) { | ||
| yield { | ||
| ...page, | ||
| items: page.items.map((item) => [index++, item] as [number, Data]), | ||
| }; | ||
| } | ||
| } | ||
|
|
||
| private async *fetchPages( | ||
| options: DatasetIteratorOptions, | ||
| pageSize = DATASET_ITERATORS_DEFAULT_LIMIT, | ||
| ): AsyncGenerator<PaginatedList<Data>> { | ||
| let offset = options.offset ?? 0; | ||
| const totalLimit = options.limit; | ||
| let yielded = 0; | ||
|
|
||
| while (true) { | ||
| const fetchLimit = totalLimit !== undefined ? Math.min(pageSize, totalLimit - yielded) : pageSize; | ||
| if (fetchLimit <= 0) break; | ||
|
|
||
| const page = await this.client.getData({ ...options, offset, limit: fetchLimit }); | ||
| yield page; | ||
|
|
||
| yielded += page.items.length; | ||
| if (page.items.length < fetchLimit || offset + page.items.length >= page.total) break; | ||
| offset += page.items.length; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Iterates over dataset items using an async generator, | ||
| * allowing the use of `for await...of` syntax. | ||
| * Returns dataset items. | ||
| * | ||
| * When awaited (`await dataset.values()`), returns all items as a flat `Data[]` array. | ||
| * When used as an async iterable (`for await...of`), streams all items across pages | ||
| * without buffering everything in memory. | ||
| * | ||
| * **Example usage:** | ||
| * ```javascript | ||
| * const dataset = await Dataset.open('my-results'); | ||
| * | ||
| * // Stream all items (memory-efficient for large datasets) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
This might be slightly confusing for some (we're async iterating over the items, but loading the actual items is "blocking" - you have to load the entire item from the storage before processing it) |
||
| * for await (const item of dataset.values()) { | ||
| * console.log(item); | ||
| * } | ||
| * | ||
| * // Or fetch all items at once | ||
| * const items = await dataset.values(); | ||
| * console.log(items); | ||
| * ``` | ||
| * | ||
| * @param options Options for the iteration. | ||
| */ | ||
| values(options: DatasetIteratorOptions = {}): AsyncIterable<Data> & Promise<PaginatedList<Data>> { | ||
| values(options: DatasetIteratorOptions = {}): AsyncIterable<Data> & Promise<Data[]> { | ||
| checkStorageAccess(); | ||
|
|
||
| const result = this.client.listItems(options) as AsyncIterable<Data> & Promise<PaginatedList<Data>>; | ||
|
|
||
| if (!(Symbol.asyncIterator in result)) { | ||
| Object.defineProperty(result, Symbol.asyncIterator, { | ||
| get() { | ||
| throw new Error('Resource client "listItems" method does not return an async iterable.'); | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| return result; | ||
| return createDualIterable({ | ||
| createPages: () => this.fetchPages(options), | ||
| extractItems: (page) => page.items, | ||
| }); | ||
|
Comment on lines
+616
to
+619
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means that Let's assume a dataset of size, e.g., for await (const value of Dataset.values()) {
// iterates over all the 3000 items
}
for (const value of await Array.fromAsync(Dataset.values())) {
// blocks while fetching the entire dataset, then iterates over all the 3000 items
}
for (const value of await Dataset.values()) {
// explodes, returns a { items: [1000 items], limit: 1000, offset: 0, total: 3000 }
}This imo breaks the whole idea of the dual iterables. Fetching pages can be done (and has historically been done) with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, just to make sure what you're proposing:
Or did I miss the point completely?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I agree with all points except for the last one
Imo
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are two things that led me to this conclusion - the result of Which of these two do you consider worth breaking? 😁
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd break the first one, 100% 😄 Imo if a user drops tons of data in the KVS, then calls Imo pagination has its place in the lower abstraction levels (Apify API, AWS API, etc.), but Crawlee shouldn't leak this (
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, let's do it this way, accepting the footgun. |
||
| } | ||
|
|
||
| /** | ||
| * Iterates over dataset entries (index-value pairs) using an async generator, | ||
| * allowing the use of `for await...of` syntax. | ||
| * Returns dataset entries (index-value pairs). | ||
| * | ||
| * When awaited (`await dataset.entries()`), returns all entries as a flat `[index, item][]` array. | ||
| * When used as an async iterable (`for await...of`), streams all entries across pages | ||
| * without buffering everything in memory. | ||
| * | ||
| * **Example usage:** | ||
| * ```javascript | ||
| * const dataset = await Dataset.open('my-results'); | ||
| * | ||
| * // Stream all entries | ||
| * for await (const [index, item] of dataset.entries()) { | ||
| * console.log(`Item at ${index}: ${JSON.stringify(item)}`); | ||
| * } | ||
| * | ||
| * // Or fetch all at once | ||
| * const entries = await dataset.entries(); | ||
| * console.log(entries); | ||
| * ``` | ||
| * | ||
| * @param options Options for the iteration. | ||
| */ | ||
| entries( | ||
| options: DatasetIteratorOptions = {}, | ||
| ): AsyncIterable<[number, Data]> & Promise<PaginatedList<[number, Data]>> { | ||
| entries(options: DatasetIteratorOptions = {}): AsyncIterable<[number, Data]> & Promise<[number, Data][]> { | ||
| checkStorageAccess(); | ||
|
|
||
| if (!this.client.listEntries) { | ||
| throw new Error('Resource client is missing the "listEntries" method.'); | ||
| } | ||
|
|
||
| return this.client.listEntries(options); | ||
| return createDualIterable({ | ||
| createPages: () => this.fetchEntryPages(options), | ||
| extractItems: (page) => page.items, | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -681,7 +674,7 @@ export class Dataset<Data extends Dictionary = Dictionary> { | |
| async drop(): Promise<void> { | ||
| checkStorageAccess(); | ||
|
|
||
| await this.client.delete(); | ||
| await this.client.drop(); | ||
| serviceLocator.getStorageInstanceManager().removeFromCache(this); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a drastic change, but as part of #3652, we will unify and re-introduce the timeout handling.