From b79d5b26daf5b239e12b7fa1c1f7224a67ace834 Mon Sep 17 00:00:00 2001 From: David Sehnal <david.sehnal@gmail.com> Date: Sat, 9 Dec 2017 17:13:13 +0100 Subject: [PATCH] mol-task abort logic --- src/mol-task/execution/observable.ts | 61 ++++++++++++++++------------ 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/src/mol-task/execution/observable.ts b/src/mol-task/execution/observable.ts index ced11df50..66139e564 100644 --- a/src/mol-task/execution/observable.ts +++ b/src/mol-task/execution/observable.ts @@ -13,7 +13,7 @@ import { Scheduler } from '../util/scheduler' function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) { const info = ProgressInfo(task, observer, updateRateMs); const ctx = new ObservableRuntimeContext(info, info.root); - return runRoot(task, ctx); + return execute(task, ctx); } namespace ExecuteObservable { @@ -38,14 +38,14 @@ interface ProgressInfo { lastNotified: number, observer: Progress.Observer, - abortToken: { abortRequested: boolean, reason: string }, + abortToken: { abortRequested: boolean, treeAborted: boolean, reason: string }, taskId: number; root: Progress.Node; tryAbort: (reason?: string) => void; } function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs: number): ProgressInfo { - const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, reason: '' }; + const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, treeAborted: false, reason: '' }; return { updateRateMs, @@ -77,39 +77,42 @@ function snapshotProgress(info: ProgressInfo): Progress { return { root: cloneTree(info.root), canAbort: canAbort(info.root), requestAbort: info.tryAbort }; } -async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) { +async function execute<T>(task: Task<T>, ctx: ObservableRuntimeContext) { ctx.node.progress.startedTime = now(); - if (!task.__onAbort) return task.__f(ctx); try { const ret = await task.__f(ctx); - // if (ctx.info.abortToken.abortRequested) { - // task.__onAbort(); - // } + if (ctx.info.abortToken.abortRequested) abort(ctx.info, ctx.node); return ret; } catch (e) { - // TODO: track cancellation - if (Task.isAborted(e)) task.__onAbort(); + if (Task.isAborted(e)) { + // wait for all child computations to go thru the abort phase. + if (ctx.node.children.length > 0) { + await new Promise(res => { ctx.onChildrenFinished = res; }); + } + if (task.__onAbort) task.__onAbort(); + } if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e); throw e; } } -async function run<T>(task: Task<T>, ctx: ObservableRuntimeContext) { - ctx.node.progress.startedTime = now(); - if (!task.__onAbort) return task.__f(ctx); - try { - const ret = await task.__f(ctx); - // if (ctx.info.abortToken.abortRequested) { - // task.__onAbort(); - // } - return ret; - } catch (e) { - if (Task.isAborted(e)) task.__onAbort(); - if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e); - throw e; +function abort(info: ProgressInfo, node: Progress.Node) { + if (!info.abortToken.treeAborted) { + info.abortToken.treeAborted = true; + abortTree(info.root); + notifyObserver(info, now()); } + + throw Task.Aborted(info.abortToken.reason); } +function abortTree(root: Progress.Node) { + const progress = root.progress; + progress.isIndeterminate = true; + progress.canAbort = false; + progress.message = 'Aborting...'; + for (const c of root.children) abortTree(c); +} function shouldNotify(info: ProgressInfo, time: number) { return time - info.lastNotified > info.updateRateMs; @@ -128,9 +131,12 @@ class ObservableRuntimeContext implements RuntimeContext { node: Progress.Node; info: ProgressInfo; + // used for waiting for cancelled computation trees + onChildrenFinished?: () => void = void 0; + private checkAborted() { if (this.info.abortToken.abortRequested) { - throw Task.Aborted(this.info.abortToken.reason); + abort(this.info, this.node); } } @@ -168,6 +174,10 @@ class ObservableRuntimeContext implements RuntimeContext { if (!!dontNotify || !shouldNotify(this.info, this.lastUpdatedTime)) return; notifyObserver(this.info, this.lastUpdatedTime); + + // The computation could have been aborted during the notifycation phase. + this.checkAborted(); + return Scheduler.immediatePromise(); } @@ -182,7 +192,7 @@ class ObservableRuntimeContext implements RuntimeContext { children.push(node); const ctx = new ObservableRuntimeContext(this.info, node); try { - return await run(task, ctx); + return await execute(task, ctx); } finally { // remove the progress node after the computation has finished. const idx = children.indexOf(node); @@ -192,6 +202,7 @@ class ObservableRuntimeContext implements RuntimeContext { } children.pop(); } + if (children.length === 0 && this.onChildrenFinished) this.onChildrenFinished(); } } -- GitLab