Skip to content
Snippets Groups Projects
Commit b79d5b26 authored by David Sehnal's avatar David Sehnal
Browse files

mol-task abort logic

parent 70116624
No related branches found
No related tags found
No related merge requests found
...@@ -13,7 +13,7 @@ import { Scheduler } from '../util/scheduler' ...@@ -13,7 +13,7 @@ import { Scheduler } from '../util/scheduler'
function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) { function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) {
const info = ProgressInfo(task, observer, updateRateMs); const info = ProgressInfo(task, observer, updateRateMs);
const ctx = new ObservableRuntimeContext(info, info.root); const ctx = new ObservableRuntimeContext(info, info.root);
return runRoot(task, ctx); return execute(task, ctx);
} }
namespace ExecuteObservable { namespace ExecuteObservable {
...@@ -38,14 +38,14 @@ interface ProgressInfo { ...@@ -38,14 +38,14 @@ interface ProgressInfo {
lastNotified: number, lastNotified: number,
observer: Progress.Observer, observer: Progress.Observer,
abortToken: { abortRequested: boolean, reason: string }, abortToken: { abortRequested: boolean, treeAborted: boolean, reason: string },
taskId: number; taskId: number;
root: Progress.Node; root: Progress.Node;
tryAbort: (reason?: string) => void; tryAbort: (reason?: string) => void;
} }
function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs: number): ProgressInfo { function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs: number): ProgressInfo {
const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, reason: '' }; const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, treeAborted: false, reason: '' };
return { return {
updateRateMs, updateRateMs,
...@@ -77,39 +77,42 @@ function snapshotProgress(info: ProgressInfo): Progress { ...@@ -77,39 +77,42 @@ function snapshotProgress(info: ProgressInfo): Progress {
return { root: cloneTree(info.root), canAbort: canAbort(info.root), requestAbort: info.tryAbort }; return { root: cloneTree(info.root), canAbort: canAbort(info.root), requestAbort: info.tryAbort };
} }
async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) { async function execute<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
ctx.node.progress.startedTime = now(); ctx.node.progress.startedTime = now();
if (!task.__onAbort) return task.__f(ctx);
try { try {
const ret = await task.__f(ctx); const ret = await task.__f(ctx);
// if (ctx.info.abortToken.abortRequested) { if (ctx.info.abortToken.abortRequested) abort(ctx.info, ctx.node);
// task.__onAbort();
// }
return ret; return ret;
} catch (e) { } catch (e) {
// TODO: track cancellation if (Task.isAborted(e)) {
if (Task.isAborted(e)) task.__onAbort(); // wait for all child computations to go thru the abort phase.
if (ctx.node.children.length > 0) {
await new Promise(res => { ctx.onChildrenFinished = res; });
}
if (task.__onAbort) task.__onAbort();
}
if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e); if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e);
throw e; throw e;
} }
} }
async function run<T>(task: Task<T>, ctx: ObservableRuntimeContext) { function abort(info: ProgressInfo, node: Progress.Node) {
ctx.node.progress.startedTime = now(); if (!info.abortToken.treeAborted) {
if (!task.__onAbort) return task.__f(ctx); info.abortToken.treeAborted = true;
try { abortTree(info.root);
const ret = await task.__f(ctx); notifyObserver(info, now());
// if (ctx.info.abortToken.abortRequested) {
// task.__onAbort();
// }
return ret;
} catch (e) {
if (Task.isAborted(e)) task.__onAbort();
if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e);
throw e;
} }
throw Task.Aborted(info.abortToken.reason);
} }
function abortTree(root: Progress.Node) {
const progress = root.progress;
progress.isIndeterminate = true;
progress.canAbort = false;
progress.message = 'Aborting...';
for (const c of root.children) abortTree(c);
}
function shouldNotify(info: ProgressInfo, time: number) { function shouldNotify(info: ProgressInfo, time: number) {
return time - info.lastNotified > info.updateRateMs; return time - info.lastNotified > info.updateRateMs;
...@@ -128,9 +131,12 @@ class ObservableRuntimeContext implements RuntimeContext { ...@@ -128,9 +131,12 @@ class ObservableRuntimeContext implements RuntimeContext {
node: Progress.Node; node: Progress.Node;
info: ProgressInfo; info: ProgressInfo;
// used for waiting for cancelled computation trees
onChildrenFinished?: () => void = void 0;
private checkAborted() { private checkAborted() {
if (this.info.abortToken.abortRequested) { if (this.info.abortToken.abortRequested) {
throw Task.Aborted(this.info.abortToken.reason); abort(this.info, this.node);
} }
} }
...@@ -168,6 +174,10 @@ class ObservableRuntimeContext implements RuntimeContext { ...@@ -168,6 +174,10 @@ class ObservableRuntimeContext implements RuntimeContext {
if (!!dontNotify || !shouldNotify(this.info, this.lastUpdatedTime)) return; if (!!dontNotify || !shouldNotify(this.info, this.lastUpdatedTime)) return;
notifyObserver(this.info, this.lastUpdatedTime); notifyObserver(this.info, this.lastUpdatedTime);
// The computation could have been aborted during the notifycation phase.
this.checkAborted();
return Scheduler.immediatePromise(); return Scheduler.immediatePromise();
} }
...@@ -182,7 +192,7 @@ class ObservableRuntimeContext implements RuntimeContext { ...@@ -182,7 +192,7 @@ class ObservableRuntimeContext implements RuntimeContext {
children.push(node); children.push(node);
const ctx = new ObservableRuntimeContext(this.info, node); const ctx = new ObservableRuntimeContext(this.info, node);
try { try {
return await run(task, ctx); return await execute(task, ctx);
} finally { } finally {
// remove the progress node after the computation has finished. // remove the progress node after the computation has finished.
const idx = children.indexOf(node); const idx = children.indexOf(node);
...@@ -192,6 +202,7 @@ class ObservableRuntimeContext implements RuntimeContext { ...@@ -192,6 +202,7 @@ class ObservableRuntimeContext implements RuntimeContext {
} }
children.pop(); children.pop();
} }
if (children.length === 0 && this.onChildrenFinished) this.onChildrenFinished();
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment