diff --git a/src/examples/computation.ts b/src/examples/computation.ts index 053ccb514e280272abe277ff9fc839ebf0a122c1..6e4d33d9d4590e968b3f3a822128eda980782803 100644 --- a/src/examples/computation.ts +++ b/src/examples/computation.ts @@ -4,7 +4,7 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import { Task, Run } from 'mol-task' +import { Task, Run, Progress } from 'mol-task' async function test() { const t = Task.create('test', async () => 1); @@ -12,4 +12,38 @@ async function test() { console.log(r); } -test(); \ No newline at end of file +function messageTree(root: Progress.Node, prefix = ''): string { + if (!root.children.length) return `${prefix}${root.progress.message}`; + + const newPrefix = prefix + ' |_ '; + const subTree = root.children.map(c => messageTree(c, newPrefix)); + return `${prefix}${root.progress.message}\n${subTree.join('\n')}`; +} + +function createTask<T>(delay: number, r: T): Task<T> { + return Task.create('delayed', async ctx => { + await new Promise(r => setTimeout(r, delay)); + if (ctx.requiresUpdate) await ctx.update({ message: 'hello from delayed...' }); + return r; + }); +} + +async function testObs() { + const t = Task.create('test o', async ctx => { + await new Promise(r => setTimeout(r, 250)); + if (ctx.requiresUpdate) await ctx.update({ message: 'hi! 1' }); + await new Promise(r => setTimeout(r, 125)); + if (ctx.requiresUpdate) await ctx.update({ message: 'hi! 2' }); + await new Promise(r => setTimeout(r, 250)); + if (ctx.requiresUpdate) await ctx.update({ message: 'hi! 3' }); + + const r = await ctx.runChild({ message: 'Running child!' }, createTask(250, 100)); + if (ctx.requiresUpdate) await ctx.update({ message: 'Almost done...' }); + return r + 1; + }); + const r = await Run(t, p => console.log(messageTree(p.root)), 250); + console.log(r); +} + +test(); +testObs(); \ No newline at end of file diff --git a/src/mol-task/execution/observable.ts b/src/mol-task/execution/observable.ts index 44fa6517005f7f643a477a90adde8e2155a6d05b..ef594fd840ec0e30124b42b7c23bc4dda21beb56 100644 --- a/src/mol-task/execution/observable.ts +++ b/src/mol-task/execution/observable.ts @@ -8,6 +8,7 @@ import Task from '../task' import RuntimeContext from './runtime-context' import Progress from './progress' import now from '../util/now' +import ImmediateScheduler from '../scheduler/immediate' function defaultProgress(rootTaskId: number, task: Task<any>): Task.Progress { return { @@ -23,75 +24,161 @@ function defaultProgress(rootTaskId: number, task: Task<any>): Task.Progress { }; } -class ProgressInfo { +interface ProgressInfo { + updateRateMs: number, + lastUpdated: number, + observer: Progress.Observer, + + abortToken: { abortRequested: boolean, reason: string }, taskId: number; - elapsedMs: { real: number, cpu: number }; - tree: Progress.Node; - tryAbort?: (reason?: string) => void; + root: Progress.Node; + tryAbort: (reason?: string) => void; +} - snapshot(): Progress { - return 0 as any; - } +function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs: number): ProgressInfo { + const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, reason: '' }; + + return { + updateRateMs, + lastUpdated: now(), + observer, + abortToken, + taskId: task.id, + root: { progress: defaultProgress(task.id, task), children: [] }, + tryAbort: abortFn(abortToken) + }; } -class ObservableExecutor { - progressInfo: ProgressInfo; +function abortFn(token: ProgressInfo['abortToken']) { + return (reason?: string) => { + token.abortRequested = true; + token.reason = reason || token.reason; + }; +} - async run<T>(task: Task<T>): Promise<T> { - const ctx = new ObservableRuntimeContext(task.id, task, 0); - if (!task.__onAbort) return task.__f(ctx); +function cloneTree(root: Progress.Node): Progress.Node { + return { progress: { ...root.progress, elapsedMs: { ...root.progress.elapsedMs } }, children: root.children.map(cloneTree) }; +} + +function canAbort(root: Progress.Node): boolean { + return root.progress.canAbort && root.children.every(canAbort); +} + +function snapshotProgress(info: ProgressInfo): Progress { + return { root: cloneTree(info.root), canAbort: canAbort(info.root), tryAbort: info.tryAbort }; +} - try { - return await task.__f(ctx); - } catch (e) { - if (Task.isAborted(e)) task.__onAbort(); - throw e; - } - } - constructor(observer: Progress.Observer, updateRateMs: number) { +async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) { + ctx.started = now(); + if (!task.__onAbort) return task.__f(ctx); + try { + const ret = await task.__f(ctx); + // if (ctx.info.abortToken.abortRequested) { + // task.__onAbort(); + // } + return ret; + } catch (e) { + if (Task.isAborted(e)) task.__onAbort(); + if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e); + throw e; + } +} +async function run<T>(task: Task<T>, ctx: ObservableRuntimeContext) { + ctx.started = now(); + if (!task.__onAbort) return task.__f(ctx); + try { + const ret = await task.__f(ctx); + // if (ctx.info.abortToken.abortRequested) { + // task.__onAbort(); + // } + return ret; + } catch (e) { + if (Task.isAborted(e)) task.__onAbort(); + if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e); + throw e; } } class ObservableRuntimeContext implements RuntimeContext { + isExecuting = true; elapsedCpuMs: number; lastScheduledTime: number; - started: number; - taskId: number; - taskName: string; - progress: Task.Progress; - updateRateMs: number; + started: number = 0; + node: Progress.Node; + info: ProgressInfo; + + private checkAborted() { + if (this.info.abortToken.abortRequested) { + throw Task.Aborted(this.info.abortToken.reason); + } + } get requiresUpdate(): boolean { - return now() - this.started > this.updateRateMs; + this.checkAborted(); + return now() - this.info.lastUpdated > this.info.updateRateMs; + } + + private setProgress(update: Partial<RuntimeContext.ProgressUpdate>) { + this.checkAborted(); + + const progress = this.node.progress; + if (typeof update.canAbort !== 'undefined') progress.canAbort = update.canAbort; + if (typeof update.current !== 'undefined') progress.current = update.current; + if (typeof update.isIndeterminate !== 'undefined') progress.isIndeterminate = update.isIndeterminate; + if (typeof update.max !== 'undefined') progress.max = update.max; + if (typeof update.message !== 'undefined') progress.message = update.message; + } + + private resume = () => { + this.isExecuting = true; + this.lastScheduledTime = now(); } update(progress: Partial<RuntimeContext.ProgressUpdate>): Promise<void> { - return 0 as any; + this.isExecuting = false; + this.setProgress(progress); + this.info.lastUpdated = now(); + const snapshot = snapshotProgress(this.info); + this.info.observer(snapshot); + return ImmediateScheduler.last(this.resume); } - runChild<T>(progress: Partial<RuntimeContext.ProgressUpdate>, task: Task<T>): Promise<T> { - return 0 as any; + async runChild<T>(progress: Partial<RuntimeContext.ProgressUpdate>, task: Task<T>): Promise<T> { + this.setProgress(progress); + const node: Progress.Node = { progress: defaultProgress(this.info.taskId, task), children: [] }; + const children = this.node.children as Progress.Node[]; + children.push(node); + const ctx = new ObservableRuntimeContext(task, this.info, node); + try { + return await run(task, ctx); + } finally { + // remove the progress node after the computation has finished. + const idx = children.indexOf(node); + if (idx >= 0) { + children[idx] = children[children.length - 1]; + children.pop(); + } + } } - constructor(parentId: number, task: Task<any>, updateRateMs: number) { - this.started = now(); + constructor(task: Task<any>, info: ProgressInfo, node: Progress.Node) { this.lastScheduledTime = this.started; - this.taskId = task.id; - this.taskName = task.name; - this.progress = defaultProgress(parentId, task); - this.updateRateMs = updateRateMs; + this.node = node; + this.info = info; } } function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) { - return new ObservableExecutor(observer, updateRateMs).run(task); + const info = ProgressInfo(task, observer, updateRateMs); + const ctx = new ObservableRuntimeContext(task, info, info.root); + return runRoot(task, ctx); } namespace ExecuteObservable { - export let PRINT_ERRORS_TO_CONSOLE = false; + export let PRINT_ERRORS_TO_STD_ERR = false; } export default ExecuteObservable \ No newline at end of file diff --git a/src/mol-task/execution/progress.ts b/src/mol-task/execution/progress.ts index cda2cf15ebeff6a12ecaf4cfb6297761e2583bc2..91c40d683557da24af1eaa2be6c0a921bd6abc8b 100644 --- a/src/mol-task/execution/progress.ts +++ b/src/mol-task/execution/progress.ts @@ -7,10 +7,9 @@ import Task from '../task' interface Progress { - taskId: number, - elapsedMs: { real: number, cpu: number }, - tree: Progress.Node, - tryAbort?: (reason?: string) => void + root: Progress.Node, + canAbort: boolean, + tryAbort: (reason?: string) => void } namespace Progress { diff --git a/src/mol-task/scheduler/immediate.ts b/src/mol-task/scheduler/immediate.ts new file mode 100644 index 0000000000000000000000000000000000000000..678dae7c25c07b61284c683b82d8ebdecfd6d07b --- /dev/null +++ b/src/mol-task/scheduler/immediate.ts @@ -0,0 +1,19 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +namespace ImmediateScheduler { + // Adds the function to the start of the "immediate queue" + export async function first<T>(f: () => T): Promise<T> { + return f(); + } + + // Adds the function to the end of the "immediate queue" + export async function last<T>(f: () => T): Promise<T> { + return f(); + } +} + +export default ImmediateScheduler \ No newline at end of file