Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/cherry-pick-patch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ on:
permissions:
contents: write
pull-requests: write
actions: write

jobs:
cherry-pick:
Expand Down Expand Up @@ -121,6 +122,7 @@ jobs:
if git cherry-pick $PICK_FLAGS $PICK_SHAS; then
git push origin "$RELEASE_BRANCH"
echo "✅ Cherry-picked PR #$PR_NUMBER onto $RELEASE_BRANCH"
gh workflow run integration-tests.yml --ref "$RELEASE_BRANCH" --repo "$GITHUB_REPOSITORY"
else
echo "⚠️ Cherry-pick conflict — creating resolution branch"
git cherry-pick --abort 2>/dev/null || true
Expand Down
13 changes: 13 additions & 0 deletions resources/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class FileBackedBlob extends InstanceOfBlobWithNoConstructor {
}
const filePath = getFilePath(storageInfo);
let writeFinished: boolean;
let writeFinishedRetries = 0;
const readContents = async () => {
let rawBytes: Buffer;
let size = HEADER_SIZE;
Expand Down Expand Up @@ -192,6 +193,18 @@ class FileBackedBlob extends InstanceOfBlobWithNoConstructor {
const store = storageInfo.store;
const lockKey = storageInfo.fileId + ':blob';
if (writeFinished) {
// The lock was free but the file is still incomplete — the writer may have
// crashed mid-write (process restart clears in-memory lock state). Allow a
// few brief retries so that an in-progress new write (e.g. replication
// re-send) can acquire the lock before we give up.
if (writeFinishedRetries++ < 3) {
logger.trace?.(
`Incomplete blob after writer finished, retrying (attempt ${writeFinishedRetries})`,
filePath
);
writeFinished = false;
return new Promise((resolve) => setTimeout(() => resolve(readContents()), 100));
}
throw new Error(`Incomplete blob for ${filePath}`);
}
return new Promise((resolve) => {
Expand Down
33 changes: 32 additions & 1 deletion unitTests/resources/blob.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
isSaving,
cleanupOrphans,
} = require('#src/resources/blob');
const { existsSync } = require('fs');
const { existsSync, writeFileSync } = require('fs');
const { pack } = require('msgpackr');
const { randomBytes } = require('crypto');

Expand Down Expand Up @@ -395,6 +395,37 @@ describe('Blob test', () => {
let orphansDeleted = await cleanupOrphans(getDatabases().test);
assert.equal(orphansDeleted, 0);
});
it('bytes() retries up to 3x then rejects when lock is free but file is incomplete', async () => {
// Simulate a crashed-writer scenario: the blob file exists on disk with a
// DEFAULT_HEADER (UNKNOWN_SIZE) + partial content, but no writer holds the lock.
// bytes() should retry 3 times (100 ms each) before throwing.
const content = Buffer.alloc(9001, 0x61); // >8192 so it is file-backed
// Use a Readable stream source so storageInfo.contentBuffer is NOT set — otherwise
// bytes() returns the in-memory buffer and never reads the corrupted disk file.
const blob = await createBlob(Readable.from([content]));
await BlobTest.put({ id: 901, blob });
// Use the original blob object (not the decoded record.blob) to ensure we always
// read from disk — the decoded blob may carry an in-memory contentBuffer in some
// storage-engine / transaction-cache configurations.
const filePath = getFilePathForBlob(blob);
assert(filePath, 'blob should be file-backed for this test');
// Await the blob file write to complete before corrupting — BlobTest.put only awaits
// the DB write, not the blob stream pipeline, so without this the pipeline can race
// and overwrite our corrupted file with the complete content.
await isSaving(blob);
assert(existsSync(filePath), 'blob file should exist after save');

// Corrupt: DEFAULT_HEADER (UNKNOWN_SIZE) + partial content, as a crashed write leaves.
// Byte[1] = 0 = UNCOMPRESSED_TYPE, matching the DEFAULT_HEADER constant in blob.ts.
const DEFAULT_HEADER = Buffer.from([0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]);
writeFileSync(filePath, Buffer.concat([DEFAULT_HEADER, content.subarray(0, 100)]));

const start = Date.now();
await assert.rejects(() => blob.bytes(), /incomplete blob/i);
const elapsed = Date.now() - start;
// 3 retries × 100 ms each = at least 300 ms
assert(elapsed >= 290, `expected ≥300 ms of retry delay, got ${elapsed} ms`);
});
afterEach(function () {
setAuditRetention(60000);
setDeletionDelay(50); // restore shorter, but need to have it happen for the last test
Expand Down
Loading