refactor!: Align DatasetClient and KeyValueStoreClient interfaces with their Python counterparts#3627
refactor!: Align DatasetClient and KeyValueStoreClient interfaces with their Python counterparts#3627janbuchar wants to merge 10 commits into
DatasetClient and KeyValueStoreClient interfaces with their Python counterparts#3627Conversation
DatasetClient and KeyValueStoreClient interfaces with their Python counterparts
barjin
left a comment
There was a problem hiding this comment.
Thank you @janbuchar ! Few ideas below ⬇️
d998bc7 to
1e349f5
Compare
| const persistStateIntervalMillis = serviceLocator.getConfiguration().persistStateIntervalMillis; | ||
| const timeoutSecs = persistStateIntervalMillis / 2_000; | ||
| await this.keyValueStore | ||
| .setValue(this.persistStateKey, this.toJSON(), { |
There was a problem hiding this comment.
This seems like a drastic change, but as part of #3652, we will unify and re-introduce the timeout handling.
|
|
||
| ow(data, 'data', ow.object); | ||
| const dispatch = async (payload: string) => this.client.pushItems(payload); | ||
| const limit = MAX_PAYLOAD_SIZE_BYTES - Math.ceil(MAX_PAYLOAD_SIZE_BYTES * SAFETY_BUFFER_PERCENT); |
There was a problem hiding this comment.
Handling of oversize payloads will be reintroduced in SDK as a part of apify/apify-sdk-js#603
There was a problem hiding this comment.
The methods removed from this file were reincarnated in packages/core/src/storages/utils.ts
|
|
||
| test('can be awaited directly (backward compatibility)', async () => { | ||
| const result = await dataset.listItems({ limit: 10 }); | ||
| test('getData returns a paginated result', async () => { |
There was a problem hiding this comment.
The changes here deserve most attention - they define the interface for future storage implementations.
barjin
left a comment
There was a problem hiding this comment.
Thank you @janbuchar! I have a few more questions/ideas ⬇️
| return createDualIterable({ | ||
| createPages: () => this.fetchPages(options), | ||
| extractItems: (page) => page.items, | ||
| mapFirstPage: (page) => page, | ||
| }); |
There was a problem hiding this comment.
This means that await Dataset.values() will return the first page (of size dependent on the storage client implementation, e.g., 1000)
Let's assume a dataset of size, e.g., 3000
for await (const value of Dataset.values()) {
// iterates over all the 3000 items
}
for (const value of await Array.fromAsync(Dataset.values())) {
// blocks while fetching the entire dataset, then iterates over all the 3000 items
}
for (const value of await Dataset.values()) {
// explodes, returns a { items: [1000 items], limit: 1000, offset: 0, total: 3000 }
}This imo breaks the whole idea of the dual iterables. Fetching pages can be done (and has historically been done) with getData. imo the values / keys / entries methods should hide the backend pagination and allow the user to work directly with the data.
There was a problem hiding this comment.
So, just to make sure what you're proposing:
Dataset.getData- one page of backend-dependent length, in a wrapper container with some metadata, no dual iteratorDataset.export- all items in the dataset, as a simple raw arrayDataset.values- dual iterator, just items (Data[]), no metadataDataset.entries- same as the above, but with indexesKeyValueStore.keys,KeyValueStore.values,KeyValueStore.entries- dual iterator, no metadata- all dual iterators follow the "await to get one page, iterate to go through everything" paradigm
Or did I miss the point completely?
There was a problem hiding this comment.
Yes, I agree with all points except for the last one
all dual iterators follow the "await to get one page, iterate to go through everything" paradigm
Imo awaiting the dual iterators should return all the items (same as Dataset.export)... the need for pagination comes from Apify API, Crawlee users imo shouldn't care for this.
There was a problem hiding this comment.
There are two things that led me to this conclusion - the result of KeyValueStore.values() will easily be very big and allowing pulling it into memory with a single await feels like a major footgun to me. And if we make one dual iterator return a single page, I'd prefer to keep it consistent accross all of them.
Which of these two do you consider worth breaking? 😁
There was a problem hiding this comment.
I'd break the first one, 100% 😄
Imo if a user drops tons of data in the KVS, then calls console.log(JSON.stringify(await kvs.values()))... it's on them, and we shouldn't stop them from doing this. The performance flop will be fully their responsibility (and quite obvious to spot).
Imo pagination has its place in the lower abstraction levels (Apify API, AWS API, etc.), but Crawlee shouldn't leak this (Dataset.getData I'm willing to accept, as the name is rather technical.) Key-Value store and Map are well-known concepts, and I don't think many ppl connect these with pagination (which might be, therefore, surprising for them).
There was a problem hiding this comment.
OK, let's do it this way, accepting the footgun. 6e70953 (this PR)
barjin
left a comment
There was a problem hiding this comment.
a few more minuscule ideas, but I like where this is going!
I'd love a second (third) pair of eyes here, though; I'm getting tunnel vision already :)
| batchAddRequests(requests: RequestSchema[], options?: RequestOptions): Promise<BatchAddRequestsResult>; | ||
| getRequest(id: string): Promise<RequestOptions | undefined>; | ||
| updateRequest(request: UpdateRequestSchema, options?: RequestOptions): Promise<QueueOperationInfo>; | ||
| deleteRequest(id: string): Promise<unknown>; |
There was a problem hiding this comment.
Was this removal intentional? I see we have this method in API. What is the alternative?
There was a problem hiding this comment.
Yeah, the crawlers don't really need this method.
| /** Remove all items from the dataset but keep the dataset itself. */ | ||
| purge(): Promise<void>; |
There was a problem hiding this comment.
Is purge possible with the Apify Platform Dataset implementation?
I suppose we could drop + create in case of named datasets (but it will likely still change the dataset id), but you won't be able to do this with unnamed / default datasets imo.
There was a problem hiding this comment.
Actually, you should be able to drop any storage by ID and recreate it. And yes, it will change the ID. In the Python version, we log a warning if someone does this on a storage backend that doesn't have native support for purge
| * ```javascript | ||
| * const dataset = await Dataset.open('my-results'); | ||
| * | ||
| * // Stream all items (memory-efficient for large datasets) |
There was a problem hiding this comment.
nit:
"stream" all items
This might be slightly confusing for some (we're async iterating over the items, but loading the actual items is "blocking" - you have to load the entire item from the storage before processing it)
StorageClientinterface - this handles the "subclients"RequestQueueClientis left for Move Apify RequestQueue-specific logic to SDK #3328updatewere done here, the likes oflistHeadandfetchNextRequestwill have to waitAsyncIterator<T> | Promise<Page<T>>) for iterating dataset items and key-value store keys (values, key-value pairs) were left only in storage "frontends" so that individual storage clients do not need to implement this - they only have to provide a fetch-single-page method. The "awaitable" part of the dual iterable always returns a single page of the results - for key-value stores, fetching everything is potentially very heavy on memory and we want to keep the interface consistent.