Graceful handling of single-range diff download

This commit is contained in:
Fedor Indutny 2022-04-07 19:14:41 -07:00 committed by GitHub
parent d8e6516fb9
commit 37d06ec7b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 129 additions and 37 deletions

0
fixtures/diff-empty.bin Normal file
View File

Binary file not shown.

View File

@ -60,6 +60,8 @@ describe('updater/differential', () => {
const oldFile = 'diff-original.bin';
const oldBlockFile = getBlockMapFileName(oldFile);
const emptyFile = 'diff-empty.bin';
const newFile = 'diff-modified.bin';
const newBlockFile = getBlockMapFileName(newFile);
const newHash =
@ -109,6 +111,20 @@ describe('updater/differential', () => {
return [parseInt(range[1], 10), parseInt(range[2], 10)];
});
if (ranges.length === 1) {
res.writeHead(200, {
'content-type': 'application/octet-stream',
});
if (shouldTimeout === 'response') {
res.flushHeaders();
return;
}
const [from, to] = ranges[0];
res.end(fullFile.slice(from, to + 1));
return;
}
const BOUNDARY = 'f8f254ce1ba37627';
res.writeHead(206, {
@ -278,6 +294,34 @@ describe('updater/differential', () => {
);
});
it('downloads the full file with a single range', async () => {
const data = await prepareDownload({
oldFile: path.join(FIXTURES, emptyFile),
newUrl: `${baseUrl}/${newFile}`,
sha512: newHash,
});
const outDir = await fs.mkdtemp(path.join(tmpdir(), 'signal-temp-'));
await fs.mkdir(outDir, { recursive: true });
const outFile = path.join(outDir, 'out.bin');
const chunks = new Array<number>();
await download(outFile, data, {
statusCallback(size) {
chunks.push(size);
},
});
const expected = await fs.readFile(path.join(FIXTURES, newFile));
const actual = await fs.readFile(outFile);
assert.isTrue(actual.equals(expected), 'Files do not match');
assert.isTrue(
chunks.length > 0,
'Expected multiple callback invocations'
);
});
it('handles response timeouts gracefully', async () => {
const data = await prepareDownload({
oldFile: path.join(FIXTURES, oldFile),

View File

@ -3,10 +3,11 @@
import type { FileHandle } from 'fs/promises';
import { readFile, open } from 'fs/promises';
import type { Readable } from 'stream';
import { promisify } from 'util';
import { gunzip as nativeGunzip } from 'zlib';
import got from 'got';
import { chunk as lodashChunk } from 'lodash';
import { chunk as lodashChunk, noop } from 'lodash';
import pMap from 'p-map';
import Dicer from 'dicer';
@ -197,7 +198,7 @@ export function computeDiff(
last.size += size;
}
return optimizedDiff;
return optimizedDiff.filter(({ size }) => size !== 0);
}
export async function prepareDownload({
@ -369,45 +370,52 @@ export async function downloadRanges(
const onPart = async (part: Dicer.PartStream): Promise<void> => {
const diff = await takeDiffFromPart(part, diffByRange);
let offset = 0;
for await (const chunk of part) {
strictAssert(
offset + chunk.length <= diff.size,
'Server returned more data than expected, ' +
`written=${offset} ` +
`newChunk=${chunk.length} ` +
`maxSize=${diff.size}`
);
if (abortSignal?.aborted) {
return;
}
await output.write(chunk, 0, chunk.length, offset + diff.writeOffset);
offset += chunk.length;
chunkStatusCallback(chunk.length);
}
strictAssert(
offset === diff.size,
`Not enough data to download from offset=${diff.readOffset} ` +
`size=${diff.size}`
);
await saveDiffStream({
diff,
stream: part,
abortSignal,
output,
chunkStatusCallback,
});
};
const [{ statusCode, headers }] = await wrapEventEmitterOnce(
stream,
'response'
);
strictAssert(statusCode === 206, `Invalid status code: ${statusCode}`);
let boundary: string;
try {
const [{ statusCode, headers }] = await wrapEventEmitterOnce(
stream,
'response'
);
const match = headers['content-type']?.match(
/^multipart\/byteranges;\s*boundary=([^\s;]+)/
);
strictAssert(match, `Invalid Content-Type: ${headers['content-type']}`);
// When the result is single range we might get 200 status code
if (ranges.length === 1 && statusCode === 200) {
await saveDiffStream({
diff: ranges[0],
stream,
abortSignal,
output,
chunkStatusCallback,
});
return;
}
const dicer = new Dicer({ boundary: match[1] });
strictAssert(statusCode === 206, `Invalid status code: ${statusCode}`);
const match = headers['content-type']?.match(
/^multipart\/byteranges;\s*boundary=([^\s;]+)/
);
strictAssert(match, `Invalid Content-Type: ${headers['content-type']}`);
// eslint-disable-next-line prefer-destructuring
boundary = match[1];
} catch (error) {
// Ignore further errors and destroy stream early
stream.on('error', noop);
stream.destroy();
throw error;
}
const dicer = new Dicer({ boundary });
const partPromises = new Array<Promise<void>>();
dicer.on('part', part => partPromises.push(onPart(part)));
@ -472,3 +480,43 @@ async function takeDiffFromPart(
return diff;
}
async function saveDiffStream({
diff,
stream,
output,
abortSignal,
chunkStatusCallback,
}: {
diff: DiffType;
stream: Readable;
output: FileHandle;
abortSignal?: AbortSignal;
chunkStatusCallback: (chunkSize: number) => void;
}): Promise<void> {
let offset = 0;
for await (const chunk of stream) {
strictAssert(
offset + chunk.length <= diff.size,
'Server returned more data than expected, ' +
`written=${offset} ` +
`newChunk=${chunk.length} ` +
`maxSize=${diff.size}`
);
if (abortSignal?.aborted) {
return;
}
await output.write(chunk, 0, chunk.length, offset + diff.writeOffset);
offset += chunk.length;
chunkStatusCallback(chunk.length);
}
strictAssert(
offset === diff.size,
`Not enough data to download from offset=${diff.readOffset} ` +
`size=${diff.size}`
);
}