Skip to content
Snippets Groups Projects
Commit 5a0ef763 authored by David Sehnal's avatar David Sehnal
Browse files

mol-task aborting

parent b79d5b26
No related branches found
No related tags found
No related merge requests found
...@@ -4,9 +4,9 @@ ...@@ -4,9 +4,9 @@
* @author David Sehnal <david.sehnal@gmail.com> * @author David Sehnal <david.sehnal@gmail.com>
*/ */
import { Task, Run, Progress, Scheduler } from 'mol-task' import { Task, Run, Progress, Scheduler, now } from 'mol-task'
async function test() { export async function test1() {
const t = Task.create('test', async () => 1); const t = Task.create('test', async () => 1);
const r = await Run(t); const r = await Run(t);
console.log(r); console.log(r);
...@@ -26,11 +26,20 @@ function createTask<T>(delayMs: number, r: T): Task<T> { ...@@ -26,11 +26,20 @@ function createTask<T>(delayMs: number, r: T): Task<T> {
await Scheduler.delay(delayMs); await Scheduler.delay(delayMs);
if (ctx.shouldUpdate) await ctx.update({ message: 'hello from delayed... ' }); if (ctx.shouldUpdate) await ctx.update({ message: 'hello from delayed... ' });
return r; return r;
}, () => console.log('On abort called ' + r));
}
export function abortAfter(delay: number) {
return Task.create('abort after ' + delay, async ctx => {
await Scheduler.delay(delay);
throw Task.Aborted('test');
//if (ctx.shouldUpdate) await ctx.update({ message: 'hello from delayed... ' });
//return r;
}); });
} }
async function testObs() { function testTree() {
const t = Task.create('test o', async ctx => { return Task.create('test o', async ctx => {
await Scheduler.delay(250); await Scheduler.delay(250);
if (ctx.shouldUpdate) await ctx.update({ message: 'hi! 1' }); if (ctx.shouldUpdate) await ctx.update({ message: 'hi! 1' });
await Scheduler.delay(125); await Scheduler.delay(125);
...@@ -38,17 +47,35 @@ async function testObs() { ...@@ -38,17 +47,35 @@ async function testObs() {
await Scheduler.delay(250); await Scheduler.delay(250);
if (ctx.shouldUpdate) await ctx.update('hi! 3'); if (ctx.shouldUpdate) await ctx.update('hi! 3');
ctx.update('Running children...', true); // ctx.update('Running children...', true);
const c1 = ctx.runChild(createTask(250, 1)); const c1 = ctx.runChild(createTask(250, 1));
const c2 = ctx.runChild(createTask(500, 2)); const c2 = ctx.runChild(createTask(500, 2));
const c3 = ctx.runChild(createTask(750, 3)); const c3 = ctx.runChild(createTask(750, 3));
//await ctx.runChild(abortAfter(350));
const r = await c1 + await c2 + await c3; const r = await c1 + await c2 + await c3;
if (ctx.shouldUpdate) await ctx.update({ message: 'Almost done...' }); if (ctx.shouldUpdate) await ctx.update({ message: 'Almost done...' });
return r + 1; return r + 1;
}); });
const r = await Run(t, p => console.log(messageTree(p.root)), 250); }
console.log(r);
export function abortingObserver(p: Progress) {
console.log(messageTree(p.root));
if (now() - p.root.progress.startedTime > 1000) {
p.requestAbort('test');
}
}
async function test() {
try {
//const r = await Run(testTree(), p => console.log(messageTree(p.root)), 250);
const r = await Run(testTree(), abortingObserver, 250);
console.log(r);
} catch (e) {
console.error(e);
}
} }
test(); test();
testObs(); //testObs();
\ No newline at end of file \ No newline at end of file
...@@ -81,10 +81,12 @@ async function execute<T>(task: Task<T>, ctx: ObservableRuntimeContext) { ...@@ -81,10 +81,12 @@ async function execute<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
ctx.node.progress.startedTime = now(); ctx.node.progress.startedTime = now();
try { try {
const ret = await task.__f(ctx); const ret = await task.__f(ctx);
if (ctx.info.abortToken.abortRequested) abort(ctx.info, ctx.node); if (ctx.info.abortToken.abortRequested) {
abort(ctx.info, ctx.node);
}
return ret; return ret;
} catch (e) { } catch (e) {
if (Task.isAborted(e)) { if (Task.isAbort(e)) {
// wait for all child computations to go thru the abort phase. // wait for all child computations to go thru the abort phase.
if (ctx.node.children.length > 0) { if (ctx.node.children.length > 0) {
await new Promise(res => { ctx.onChildrenFinished = res; }); await new Promise(res => { ctx.onChildrenFinished = res; });
...@@ -193,6 +195,14 @@ class ObservableRuntimeContext implements RuntimeContext { ...@@ -193,6 +195,14 @@ class ObservableRuntimeContext implements RuntimeContext {
const ctx = new ObservableRuntimeContext(this.info, node); const ctx = new ObservableRuntimeContext(this.info, node);
try { try {
return await execute(task, ctx); return await execute(task, ctx);
} catch (e) {
if (Task.isAbort(e)) {
// need to catch the error here because otherwise
// promises for running child tasks in a tree-like computation
// will get orphaned and cause "uncaught error in Promise".
return void 0 as any;
}
throw e;
} finally { } finally {
// remove the progress node after the computation has finished. // remove the progress node after the computation has finished.
const idx = children.indexOf(node); const idx = children.indexOf(node);
......
...@@ -19,7 +19,7 @@ interface Task<T> { ...@@ -19,7 +19,7 @@ interface Task<T> {
namespace Task { namespace Task {
export interface Aborted { isAborted: true, reason: string } export interface Aborted { isAborted: true, reason: string }
export function isAborted(e: any): e is Aborted { return !!e && !!e.isAborted; } export function isAbort(e: any): e is Aborted { return !!e && !!e.isAborted; }
export function Aborted(reason: string): Aborted { return { isAborted: true, reason }; } export function Aborted(reason: string): Aborted { return { isAborted: true, reason }; }
export function create<T>(name: string, f: (ctx: RuntimeContext) => Promise<T>, onAbort?: () => void): Task<T> { export function create<T>(name: string, f: (ctx: RuntimeContext) => Promise<T>, onAbort?: () => void): Task<T> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment