Adds Streams API support for networking task of PDF.js project.

network.js file moved to main thread and `PDFNetworkStream` implemented
at worker thread, that is used to ask for data whenever worker needs.
This commit is contained in:
Mukul Mishra 2017-06-30 23:29:52 +05:30
parent bd8c12119a
commit 109106794d
9 changed files with 481 additions and 238 deletions

View file

@ -1222,6 +1222,20 @@ function resolveCall(fn, args, thisArg = null) {
});
}
function wrapReason(reason) {
if (typeof reason !== 'object') {
return reason;
}
switch (reason.name) {
case 'MissingPDFException':
return new MissingPDFException(reason.message);
case 'UnexpectedResponseException':
return new UnexpectedResponseException(reason.message, reason.status);
default:
return new UnknownErrorException(reason.message, reason.details);
}
}
function resolveOrReject(capability, success, reason) {
if (success) {
capability.resolve();
@ -1431,13 +1445,14 @@ MessageHandler.prototype = {
let targetName = data.sourceName;
let capability = createPromiseCapability();
let sendStreamRequest = ({ stream, chunk, success, reason, }) => {
this.comObj.postMessage({ sourceName, targetName, stream, streamId,
chunk, success, reason, });
let sendStreamRequest = ({ stream, chunk, transfers,
success, reason, }) => {
this.postMessage({ sourceName, targetName, stream, streamId,
chunk, success, reason, }, transfers);
};
let streamSink = {
enqueue(chunk, size = 1) {
enqueue(chunk, size = 1, transfers) {
if (this.isCancelled) {
return;
}
@ -1450,7 +1465,7 @@ MessageHandler.prototype = {
this.sinkCapability = createPromiseCapability();
this.ready = this.sinkCapability.promise;
}
sendStreamRequest({ stream: 'enqueue', chunk, });
sendStreamRequest({ stream: 'enqueue', chunk, transfers, });
},
close() {
@ -1462,6 +1477,10 @@ MessageHandler.prototype = {
},
error(reason) {
if (this.isCancelled) {
return;
}
this.isCancelled = true;
sendStreamRequest({ stream: 'error', reason, });
},
@ -1510,11 +1529,11 @@ MessageHandler.prototype = {
switch (data.stream) {
case 'start_complete':
resolveOrReject(this.streamControllers[data.streamId].startCall,
data.success, data.reason);
data.success, wrapReason(data.reason));
break;
case 'pull_complete':
resolveOrReject(this.streamControllers[data.streamId].pullCall,
data.success, data.reason);
data.success, wrapReason(data.reason));
break;
case 'pull':
// Ignore any pull after close is called.
@ -1539,11 +1558,15 @@ MessageHandler.prototype = {
});
break;
case 'enqueue':
assert(this.streamControllers[data.streamId],
'enqueue should have stream controller');
if (!this.streamControllers[data.streamId].isClosed) {
this.streamControllers[data.streamId].controller.enqueue(data.chunk);
}
break;
case 'close':
assert(this.streamControllers[data.streamId],
'close should have stream controller');
if (this.streamControllers[data.streamId].isClosed) {
break;
}
@ -1552,12 +1575,15 @@ MessageHandler.prototype = {
deleteStreamController();
break;
case 'error':
this.streamControllers[data.streamId].controller.error(data.reason);
assert(this.streamControllers[data.streamId],
'error should have stream controller');
this.streamControllers[data.streamId].controller.
error(wrapReason(data.reason));
deleteStreamController();
break;
case 'cancel_complete':
resolveOrReject(this.streamControllers[data.streamId].cancelCall,
data.success, data.reason);
data.success, wrapReason(data.reason));
deleteStreamController();
break;
case 'cancel':
@ -1565,13 +1591,14 @@ MessageHandler.prototype = {
break;
}
resolveCall(this.streamSinks[data.streamId].onCancel,
[data.reason]).then(() => {
[wrapReason(data.reason)]).then(() => {
sendStreamResponse({ stream: 'cancel_complete', success: true, });
}, (reason) => {
sendStreamResponse({ stream: 'cancel_complete',
success: false, reason, });
});
this.streamSinks[data.streamId].sinkCapability.reject(data.reason);
this.streamSinks[data.streamId].sinkCapability.
reject(wrapReason(data.reason));
this.streamSinks[data.streamId].isCancelled = true;
delete this.streamSinks[data.streamId];
break;