Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions packages/apify/src/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
EventTypeName,
IStorage,
RecordOptions,
StorageOpenOptions,
UseStateOptions,
} from '@crawlee/core';
import {
Expand All @@ -13,7 +14,6 @@ import {
purgeDefaultStorages,
RequestQueue,
serviceLocator,
StorageManager,
} from '@crawlee/core';
import type {
Awaitable,
Expand Down Expand Up @@ -43,6 +43,7 @@ import { decryptInputSecrets } from '@apify/input_secrets';
import log from '@apify/log';
import { addTimeoutToPromise } from '@apify/timeout';

import { ApifyStorageClient } from './apify_storage_client.js';
import type { ChargeOptions, ChargeResult } from './charging.js';
import { ChargingManager } from './charging.js';
import type { ConfigurationOptions } from './configuration.js';
Expand Down Expand Up @@ -515,7 +516,9 @@ export class Actor<Data extends Dictionary = Dictionary> {
if (this.isAtHome()) {
// availableMemoryRatio and disableBrowserSandbox are now set via
// conditional defaults in the Configuration constructor (isAtHome check)
serviceLocator.setStorageClient(this.apifyClient);
serviceLocator.setStorageClient(
new ApifyStorageClient(this.apifyClient),
);
serviceLocator.setEventManager(this.eventManager);
} else if (options.storage) {
serviceLocator.setStorageClient(options.storage);
Expand Down Expand Up @@ -1330,7 +1333,7 @@ export class Actor<Data extends Dictionary = Dictionary> {

// eslint-disable-next-line dot-notation
queue['initialCount'] =
(await queue.client.get())?.totalRequestCount ?? 0;
(await queue.client.getMetadata())?.totalRequestCount ?? 0;

return queue;
}
Expand Down Expand Up @@ -2251,17 +2254,16 @@ export class Actor<Data extends Dictionary = Dictionary> {
}

private async _openStorage<T extends IStorage>(
storageClass: Constructor<T>,
storageClass: Constructor<T> & {
open(id?: string | null, options?: StorageOpenOptions): Promise<T>;
},
id?: string,
options: OpenStorageOptions = {},
) {
const client = options.forceCloud ? this.apifyClient : undefined;
return StorageManager.openStorage<T>(
storageClass,
id,
client,
this.config,
);
const storageClient = options.forceCloud
? new ApifyStorageClient(this.apifyClient)
: undefined;
return storageClass.open(id ?? null, { storageClient });
}

private _ensureActorInit(methodCalled: string) {
Expand Down
92 changes: 92 additions & 0 deletions packages/apify/src/apify_storage_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import type {
CreateDatasetClientOptions,
CreateKeyValueStoreClientOptions,
CreateRequestQueueClientOptions,
DatasetClient,
KeyValueStoreClient,
RequestQueueClient,
StorageClient,
} from '@crawlee/types';
import type { ApifyClient } from 'apify-client';

type StorageType = 'Dataset' | 'KeyValueStore' | 'RequestQueue';

/**
* Bridges `apify-client`'s synchronous resource accessors (`dataset(id)`,
* `keyValueStore(id)`, `requestQueue(id, options?)`) to crawlee v4's
* `StorageClient` interface (async factory methods accepting either an `id`
* or a `name`).
*
* `storageExists()` is implemented so that `Dataset.open(idOrName)` and friends
* resolve a string argument to an id first (when one with that id exists on
* the platform) and fall back to a name otherwise — without this, crawlee's
* `resolveStorageIdentifier` would treat every string as a name and the SDK
* would silently create a brand-new storage whose name equals the passed-in id.
*
* When only a `name` is provided to a `create*Client` method, it is resolved
* to a concrete id via `getOrCreate(name)` — same behaviour the SDK relied on
* in v3.
*/
export class ApifyStorageClient implements StorageClient {
constructor(private readonly client: ApifyClient) {}

async storageExists(id: string, type: StorageType): Promise<boolean> {
// Apify's `GET /v2/{kind}/{idOrName}` endpoint matches by either id or
// name. Confirm it was an *id* match — otherwise crawlee should fall
// through to the `{ name }` branch.
const info = await this.resourceClient(id, type).get();
return info?.id === id;
}

async createDatasetClient(
options?: CreateDatasetClientOptions,
): Promise<DatasetClient> {
const id = await this.resolveId(options, 'Dataset');
// apify-client's resource clients overlap with `@crawlee/types`' shapes
// but don't yet implement the v4-added members (`getMetadata`,
// `getRecordPublicUrl`). Cast through for now; a follow-up should
// bring apify-client into structural alignment.
return this.client.dataset(id) as unknown as DatasetClient;
}

async createKeyValueStoreClient(
options?: CreateKeyValueStoreClientOptions,
): Promise<KeyValueStoreClient> {
const id = await this.resolveId(options, 'KeyValueStore');
return this.client.keyValueStore(id) as unknown as KeyValueStoreClient;
}

async createRequestQueueClient(
options?: CreateRequestQueueClientOptions,
): Promise<RequestQueueClient> {
const id = await this.resolveId(options, 'RequestQueue');
return this.client.requestQueue(
id,
options?.clientKey ? { clientKey: options.clientKey } : undefined,
) as unknown as RequestQueueClient;
}

private async resolveId(
options: { id?: string; name?: string } | undefined,
type: StorageType,
): Promise<string> {
if (options?.id) return options.id;
if (options?.name) {
return (await this.collectionClient(type).getOrCreate(options.name))
.id;
}
return '';
}

private resourceClient(id: string, type: StorageType) {
if (type === 'Dataset') return this.client.dataset(id);
if (type === 'KeyValueStore') return this.client.keyValueStore(id);
return this.client.requestQueue(id);
}

private collectionClient(type: StorageType) {
if (type === 'Dataset') return this.client.datasets();
if (type === 'KeyValueStore') return this.client.keyValueStores();
return this.client.requestQueues();
}
}
44 changes: 29 additions & 15 deletions packages/apify/src/key_value_store.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import type { StorageManagerOptions } from '@crawlee/core';
import type { StorageOpenOptions } from '@crawlee/core';
import { KeyValueStore as CoreKeyValueStore } from '@crawlee/core';
import type { KeyValueStoreInfo } from '@crawlee/types';

import { createHmacSignature } from '@apify/utilities';

import type { Configuration } from './configuration.js';

// @ts-ignore newer crawlee versions already declare this method in core
const { getPublicUrl } = CoreKeyValueStore.prototype;
// crawlee v4 dropped the `storageObject` cache from `KeyValueStore`, so the
// per-store `urlSigningSecretKey` (which is part of the platform's metadata
// response but not declared on `@crawlee/types`' `KeyValueStoreInfo`) has to
// be fetched on demand and accessed through a structural-typed augmentation.
type ApifyKeyValueStoreInfo = KeyValueStoreInfo & {
urlSigningSecretKey?: string;
};

/**
* @inheritDoc
Expand All @@ -15,24 +21,35 @@ export class KeyValueStore extends CoreKeyValueStore {
/**
* Returns a URL for the given key that may be used to publicly
* access the value in the remote key-value store.
*
* On the Apify platform the URL is signed with the store's
* `urlSigningSecretKey` so that anyone with the URL can read the record
* without authentication. Locally we delegate to crawlee's default
* implementation (which produces a `file://` URL or returns `undefined`).
*/
override getPublicUrl(key: string): string {
override async getPublicUrl(key: string): Promise<string | undefined> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this change closes this issue - I'm not sure if the renaming was a requirement, or a way to make this BC.

const config = this.config as Configuration;
if (!config.isAtHome && getPublicUrl) {
return getPublicUrl.call(this, key);
if (!config.isAtHome) {
return super.getPublicUrl(key);
}

const publicUrl = new URL(
`${config.apiPublicBaseUrl}/v2/key-value-stores/${this.id}/records/${key}`,
);

if (this.storageObject?.urlSigningSecretKey) {
// `client` is `private` on `CoreKeyValueStore`; bypass the visibility
// check to fetch the per-store secret. There is no public crawlee API
// surface for this yet — track upstream exposure as a follow-up.
Comment on lines +40 to +42
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, please create the issue in Crawlee (I see that Dataset and RequestProvider already both have client public, so it's weirdly asymmetrical now).

const metadata = (await (
this as unknown as {
client: { getMetadata(): Promise<KeyValueStoreInfo> };
}
).client.getMetadata()) as ApifyKeyValueStoreInfo;

if (metadata?.urlSigningSecretKey) {
publicUrl.searchParams.append(
'signature',
createHmacSignature(
this.storageObject.urlSigningSecretKey as string,
key,
),
createHmacSignature(metadata.urlSigningSecretKey, key),
);
}

Expand All @@ -44,11 +61,8 @@ export class KeyValueStore extends CoreKeyValueStore {
*/
static override async open(
storeIdOrName?: string | null,
options: StorageManagerOptions = {},
options: StorageOpenOptions = {},
): Promise<KeyValueStore> {
return super.open(storeIdOrName, options) as unknown as KeyValueStore;
}
}

// @ts-ignore newer crawlee versions already declare this method in core
CoreKeyValueStore.prototype.getPublicUrl = KeyValueStore.prototype.getPublicUrl;
14 changes: 9 additions & 5 deletions test/MemoryStorageEmulator.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { rm } from 'node:fs/promises';
import { resolve } from 'node:path';

import { StorageManager } from '@crawlee/core';
import { serviceLocator } from '@crawlee/core';
import { MemoryStorage } from '@crawlee/memory-storage';
import { Configuration } from 'apify';
import { ensureDir } from 'fs-extra';

import log from '@apify/log';
import { cryptoRandomObjectId } from '@apify/utilities';

import { resetGlobalState } from './resetGlobalState.js';

const LOCAL_EMULATION_DIR = resolve(
__dirname,
'..',
Expand All @@ -20,15 +21,18 @@ export class MemoryStorageEmulator {
protected localStorageDirectories: string[] = [];

async init(dirName = cryptoRandomObjectId(10)) {
StorageManager.clearCache();
// crawlee v4 dropped `StorageManager.clearCache()` and
// `Configuration.useStorageClient()`; reset the global state and
// re-register the in-memory client instead.
resetGlobalState();
const localStorageDir = resolve(LOCAL_EMULATION_DIR, dirName);
this.localStorageDirectories.push(localStorageDir);
await ensureDir(localStorageDir);

const storage = new MemoryStorage({
localDataDirectory: localStorageDir,
});
Configuration.getGlobalConfig().useStorageClient(storage);
serviceLocator.setStorageClient(storage);
log.debug(
`Initialized emulated memory storage in folder ${localStorageDir}`,
);
Expand All @@ -40,7 +44,7 @@ export class MemoryStorageEmulator {
});

await Promise.all(promises);
StorageManager.clearCache();
resetGlobalState();
}

static toString() {
Expand Down
49 changes: 32 additions & 17 deletions test/apify/actor.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createPublicKey } from 'node:crypto';

import { EventType, serviceLocator, StorageManager } from '@crawlee/core';
import { EventType, RequestQueue, serviceLocator } from '@crawlee/core';
import { sleep } from '@crawlee/utils';
import type { ApifyEnv } from 'apify';
import {
Expand Down Expand Up @@ -759,19 +759,31 @@ describe('Actor', () => {
test('openRequestQueue should open storage', async () => {
const queueId = 'abc';
const options = { forceCloud: true };
const openStorageSpy = vitest.spyOn(
StorageManager.prototype,
'openStorage',
);
const openSpy = vitest.spyOn(RequestQueue, 'open');

// crawlee v4's `RequestQueueClient` exposes metadata via
// `getMetadata()` (the v3 `get()` was dropped).
const mockRQ = {
client: { get: () => ({ totalRequestCount: 10 }) },
client: {
getMetadata: async () => ({ totalRequestCount: 10 }),
},
};

openStorageSpy.mockImplementationOnce(async () => mockRQ);
openSpy.mockImplementationOnce(async () => mockRQ as any);
const queue = await sdk.openRequestQueue(queueId, options);
expect(openStorageSpy).toBeCalledWith(queueId, sdk.apifyClient);
expect(openStorageSpy).toBeCalledTimes(1);
// `forceCloud: true` routes through an `ApifyStorageClient`
// adapter that satisfies crawlee v4's `StorageClient` interface.
expect(openSpy).toBeCalledWith(
queueId,
expect.objectContaining({
storageClient: expect.objectContaining({
createDatasetClient: expect.any(Function),
createKeyValueStoreClient: expect.any(Function),
createRequestQueueClient: expect.any(Function),
}),
}),
);
expect(openSpy).toBeCalledTimes(1);

// @ts-expect-error private prop
expect(queue.initialCount).toBe(10);
Expand All @@ -780,16 +792,19 @@ describe('Actor', () => {
test('openDataset should open storage', async () => {
const datasetName = 'abc';
const options = { forceCloud: true };
const mockOpenStorage = vitest.spyOn(
StorageManager.prototype,
'openStorage',
);
mockOpenStorage.mockResolvedValueOnce(vitest.fn());
const openSpy = vitest.spyOn(Dataset, 'open');
openSpy.mockResolvedValueOnce(vitest.fn() as any);
const ds = await sdk.openDataset(datasetName, options);
expect(mockOpenStorage).toBeCalledTimes(1);
expect(mockOpenStorage).toBeCalledWith(
expect(openSpy).toBeCalledTimes(1);
expect(openSpy).toBeCalledWith(
datasetName,
sdk.apifyClient,
expect.objectContaining({
storageClient: expect.objectContaining({
createDatasetClient: expect.any(Function),
createKeyValueStoreClient: expect.any(Function),
createRequestQueueClient: expect.any(Function),
}),
}),
);
});
});
Expand Down