From 701166247ecc99b6c4ab102fb0df506212de3936 Mon Sep 17 00:00:00 2001 From: David Sehnal <david.sehnal@gmail.com> Date: Sat, 9 Dec 2017 13:22:43 +0100 Subject: [PATCH] working on mol-task --- src/examples/computation.ts | 36 ++- src/mol-task/computation.ts | 283 ---------------------- src/mol-task/execution/observable.ts | 122 +++++----- src/mol-task/execution/progress.ts | 6 +- src/mol-task/execution/runtime-context.ts | 17 +- src/mol-task/execution/synchronous.ts | 13 +- src/mol-task/index.ts | 17 +- src/mol-task/scheduler/immediate.ts | 19 -- src/mol-task/task.ts | 24 +- src/mol-task/util.ts | 1 - src/mol-task/util/immediate.ts | 200 --------------- src/mol-task/util/now.ts | 2 +- src/mol-task/{ => util}/scheduler.ts | 54 ++--- src/perf-tests/tasks.ts | 8 +- 14 files changed, 153 insertions(+), 649 deletions(-) delete mode 100644 src/mol-task/computation.ts delete mode 100644 src/mol-task/scheduler/immediate.ts delete mode 100644 src/mol-task/util/immediate.ts rename src/mol-task/{ => util}/scheduler.ts (79%) diff --git a/src/examples/computation.ts b/src/examples/computation.ts index 00d2dd2f8..6c3a5e0fa 100644 --- a/src/examples/computation.ts +++ b/src/examples/computation.ts @@ -4,7 +4,7 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import { Task, Run, Progress } from 'mol-task' +import { Task, Run, Progress, Scheduler } from 'mol-task' async function test() { const t = Task.create('test', async () => 1); @@ -12,42 +12,38 @@ async function test() { console.log(r); } -function delay(ms: number) { - return new Promise(r => setTimeout(r, ms)); -} - function messageTree(root: Progress.Node, prefix = ''): string { - if (!root.children.length) return `${prefix}${root.progress.message}`; + if (!root.children.length) return `${prefix}${root.progress.taskName}: ${root.progress.message}`; const newPrefix = prefix + ' |_ '; const subTree = root.children.map(c => messageTree(c, newPrefix)); - return `${prefix}${root.progress.message}\n${subTree.join('\n')}`; + return `${prefix}${root.progress.taskName}: ${root.progress.message}\n${subTree.join('\n')}`; } function createTask<T>(delayMs: number, r: T): Task<T> { - return Task.create('delayed', async ctx => { - ctx.updateProgress('Processing delayed... ' + r); - await delay(delayMs); - if (ctx.needsYield) await ctx.yield({ message: 'hello from delayed... ' + r }); + return Task.create('delayed value ' + r, async ctx => { + ctx.update('Processing delayed... ' + r, true); + await Scheduler.delay(delayMs); + if (ctx.shouldUpdate) await ctx.update({ message: 'hello from delayed... ' }); return r; }); } async function testObs() { const t = Task.create('test o', async ctx => { - await delay(250); - if (ctx.needsYield) await ctx.yield({ message: 'hi! 1' }); - await delay(125); - if (ctx.needsYield) await ctx.yield({ message: 'hi! 2' }); - await delay(250); - if (ctx.needsYield) await ctx.yield('hi! 3'); - - ctx.updateProgress('Running children...'); + await Scheduler.delay(250); + if (ctx.shouldUpdate) await ctx.update({ message: 'hi! 1' }); + await Scheduler.delay(125); + if (ctx.shouldUpdate) await ctx.update({ message: 'hi! 2' }); + await Scheduler.delay(250); + if (ctx.shouldUpdate) await ctx.update('hi! 3'); + + ctx.update('Running children...', true); const c1 = ctx.runChild(createTask(250, 1)); const c2 = ctx.runChild(createTask(500, 2)); const c3 = ctx.runChild(createTask(750, 3)); const r = await c1 + await c2 + await c3; - if (ctx.needsYield) await ctx.yield({ message: 'Almost done...' }); + if (ctx.shouldUpdate) await ctx.update({ message: 'Almost done...' }); return r + 1; }); const r = await Run(t, p => console.log(messageTree(p.root)), 250); diff --git a/src/mol-task/computation.ts b/src/mol-task/computation.ts deleted file mode 100644 index b4b1c9d13..000000000 --- a/src/mol-task/computation.ts +++ /dev/null @@ -1,283 +0,0 @@ -/** - * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. - * - * Adapted from https://github.com/dsehnal/LiteMol - * @author David Sehnal <david.sehnal@gmail.com> - */ - -import Scheduler from './scheduler' -import timeNow from './util/now' - -interface Computation<A> { - (ctx?: Computation.Context): Promise<A> -} - -namespace Computation { - export let PRINT_ERRORS_TO_CONSOLE = false; - - export function create<A>(computation: (ctx: Context) => Promise<A>) { - return ComputationImpl(computation); - } - - export function resolve<A>(a: A) { - return create<A>(_ => Promise.resolve(a)); - } - - export function reject<A>(reason: any) { - return create<A>(_ => Promise.reject(reason)); - } - - export interface Params { - updateRateMs?: number, - observer?: ProgressObserver - } - - export const Aborted = 'Aborted'; - - export interface Progress { - message: string, - isIndeterminate: boolean, - current: number, - max: number, - elapsedMs: number, - requestAbort?: () => void - } - - export interface ProgressUpdate { - message?: string, - abort?: boolean | (() => void), - current?: number, - max?: number - } - - export interface Context { - readonly isSynchronous: boolean, - /** Also checks if the computation was aborted. If so, throws. */ - readonly requiresUpdate: boolean, - requestAbort(): void, - - subscribe(onProgress: ProgressObserver): { dispose: () => void }, - /** Also checks if the computation was aborted. If so, throws. */ - update(info: ProgressUpdate): Promise<void> | void - } - - export type ProgressObserver = (progress: Readonly<Progress>) => void; - - const emptyDisposer = { dispose: () => { } } - - /** A context without updates. */ - export const synchronous: Context = { - isSynchronous: true, - requiresUpdate: false, - requestAbort() { }, - subscribe(onProgress) { return emptyDisposer; }, - update(info) { } - } - - export function observable(params?: Partial<Params>) { - const ret = new ObservableContext(params && params.updateRateMs); - if (params && params.observer) ret.subscribe(params.observer); - return ret; - } - - export const now = timeNow; - - /** A utility for splitting large computations into smaller parts. */ - export interface Chunker { - setNextChunkSize(size: number): void, - /** nextChunk must return the number of actually processed chunks. */ - process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void> - } - - export function chunker(ctx: Context, nextChunkSize: number): Chunker { - return new ChunkerImpl(ctx, nextChunkSize); - } -} - -const DefaulUpdateRateMs = 150; - -function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> { - return (ctx?: Computation.Context) => { - const context: ObservableContext = ctx ? ctx : Computation.synchronous as any; - return new Promise<A>(async (resolve, reject) => { - try { - if (context.started) context.started(); - const result = await computation(context); - resolve(result); - } catch (e) { - if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e); - reject(e); - } finally { - if (context.finished) context.finished(); - } - }); - } -} - -class ObservableContext implements Computation.Context { - readonly updateRate: number; - readonly isSynchronous: boolean = false; - private level = 0; - private startedTime = 0; - private abortRequested = false; - private lastUpdated = 0; - private observers: Computation.ProgressObserver[] | undefined = void 0; - private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 }; - - private checkAborted() { - if (this.abortRequested) throw Computation.Aborted; - } - - private abortRequester = () => { this.abortRequested = true }; - - subscribe = (obs: Computation.ProgressObserver) => { - if (!this.observers) this.observers = []; - this.observers.push(obs); - return { - dispose: () => { - if (!this.observers) return; - for (let i = 0; i < this.observers.length; i++) { - if (this.observers[i] === obs) { - this.observers[i] = this.observers[this.observers.length - 1]; - this.observers.pop(); - return; - } - } - } - }; - } - - requestAbort() { - try { - if (this.abortRequester) { - this.abortRequester.call(null); - } - } catch (e) { } - } - - update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void { - this.checkAborted(); - - const time = Computation.now(); - - if (typeof abort === 'boolean') { - this.progress.requestAbort = abort ? this.abortRequester : void 0; - } else { - if (abort) this.abortRequester = abort; - this.progress.requestAbort = abort ? this.abortRequester : void 0; - } - - if (typeof message !== 'undefined') this.progress.message = message; - this.progress.elapsedMs = time - this.startedTime; - if (isNaN(current!)) { - this.progress.isIndeterminate = true; - } else { - this.progress.isIndeterminate = false; - this.progress.current = current!; - if (!isNaN(max!)) this.progress.max = max!; - } - - if (this.observers) { - const p = { ...this.progress }; - for (let i = 0, _i = this.observers.length; i < _i; i++) { - Scheduler.immediate(this.observers[i], p); - } - } - - this.lastUpdated = time; - - return Scheduler.immediatePromise(); - } - - get requiresUpdate() { - this.checkAborted(); - if (this.isSynchronous) return false; - return Computation.now() - this.lastUpdated > this.updateRate; - } - - started() { - if (!this.level) { - this.startedTime = Computation.now(); - this.lastUpdated = this.startedTime; - } - this.level++; - } - - finished() { - this.level--; - if (this.level < 0) { - throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.'); - } - if (!this.level) this.observers = void 0; - } - - constructor(updateRate?: number) { - this.updateRate = updateRate || DefaulUpdateRateMs; - } -} - -class ChunkerImpl implements Computation.Chunker { - private processedSinceUpdate = 0; - private updater: Computation.Context['update']; - - private computeChunkSize(delta: number) { - if (!delta) { - this.processedSinceUpdate = 0; - return this.nextChunkSize; - } - const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs; - const ret = Math.round(this.processedSinceUpdate * rate / delta + 1); - this.processedSinceUpdate = 0; - return ret; - } - - private getNextChunkSize() { - const ctx = this.context as ObservableContext; - // be smart if the computation is synchronous and process the whole chunk at once. - if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER; - return this.nextChunkSize; - } - - setNextChunkSize(size: number) { - this.nextChunkSize = size; - } - - async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) { - if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize); - this.processedSinceUpdate = 0; - - // track time for the actual computation and exclude the "update time" - let chunkStart = Computation.now(); - let lastChunkSize: number; - let chunkCount = 0; - let totalSize = 0; - let updateCount = 0; - while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) { - chunkCount++; - this.processedSinceUpdate += lastChunkSize; - totalSize += lastChunkSize; - if (this.context.requiresUpdate) { - let time = Computation.now(); - await update(this.updater); - this.nextChunkSize = updateCount > 0 - ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1)) - : this.computeChunkSize(time - chunkStart) - updateCount++; - chunkStart = Computation.now(); - } - } - if (this.context.requiresUpdate) { - let time = Computation.now(); - await update(this.updater); - this.nextChunkSize = updateCount > 0 - ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1)) - : this.computeChunkSize(time - chunkStart) - } - } - - constructor(public context: Computation.Context, private nextChunkSize: number) { - this.updater = this.context.update.bind(this.context); - } -} - -export default Computation; \ No newline at end of file diff --git a/src/mol-task/execution/observable.ts b/src/mol-task/execution/observable.ts index d2139c207..ced11df50 100644 --- a/src/mol-task/execution/observable.ts +++ b/src/mol-task/execution/observable.ts @@ -4,19 +4,28 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import Task from '../task' -import RuntimeContext from './runtime-context' -import Progress from './progress' -import now from '../util/now' -import ImmediateScheduler from '../scheduler/immediate' +import { Task } from '../task' +import { RuntimeContext } from './runtime-context' +import { Progress } from './progress' +import { now } from '../util/now' +import { Scheduler } from '../util/scheduler' -function defaultProgress(rootTaskId: number, task: Task<any>): Task.Progress { +function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) { + const info = ProgressInfo(task, observer, updateRateMs); + const ctx = new ObservableRuntimeContext(info, info.root); + return runRoot(task, ctx); +} + +namespace ExecuteObservable { + export let PRINT_ERRORS_TO_STD_ERR = false; +} + +function defaultProgress(task: Task<any>): Task.Progress { return { - rootTaskId, taskId: task.id, taskName: task.name, - message: 'Running...', - elapsedMs: { real: 0, cpu: 0 }, + message: '', + startedTime: 0, canAbort: true, isIndeterminate: true, current: 0, @@ -26,7 +35,7 @@ function defaultProgress(rootTaskId: number, task: Task<any>): Task.Progress { interface ProgressInfo { updateRateMs: number, - lastUpdated: number, + lastNotified: number, observer: Progress.Observer, abortToken: { abortRequested: boolean, reason: string }, @@ -40,16 +49,16 @@ function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs return { updateRateMs, - lastUpdated: now(), + lastNotified: now(), observer, abortToken, taskId: task.id, - root: { progress: defaultProgress(task.id, task), children: [] }, - tryAbort: abortFn(abortToken) + root: { progress: defaultProgress(task), children: [] }, + tryAbort: createAbortFunction(abortToken) }; } -function abortFn(token: ProgressInfo['abortToken']) { +function createAbortFunction(token: ProgressInfo['abortToken']) { return (reason?: string) => { token.abortRequested = true; token.reason = reason || token.reason; @@ -57,7 +66,7 @@ function abortFn(token: ProgressInfo['abortToken']) { } function cloneTree(root: Progress.Node): Progress.Node { - return { progress: { ...root.progress, elapsedMs: { ...root.progress.elapsedMs } }, children: root.children.map(cloneTree) }; + return { progress: { ...root.progress }, children: root.children.map(cloneTree) }; } function canAbort(root: Progress.Node): boolean { @@ -65,12 +74,11 @@ function canAbort(root: Progress.Node): boolean { } function snapshotProgress(info: ProgressInfo): Progress { - return { root: cloneTree(info.root), canAbort: canAbort(info.root), tryAbort: info.tryAbort }; + return { root: cloneTree(info.root), canAbort: canAbort(info.root), requestAbort: info.tryAbort }; } - async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) { - ctx.started = now(); + ctx.node.progress.startedTime = now(); if (!task.__onAbort) return task.__f(ctx); try { const ret = await task.__f(ctx); @@ -79,6 +87,7 @@ async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) { // } return ret; } catch (e) { + // TODO: track cancellation if (Task.isAborted(e)) task.__onAbort(); if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e); throw e; @@ -86,7 +95,7 @@ async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) { } async function run<T>(task: Task<T>, ctx: ObservableRuntimeContext) { - ctx.started = now(); + ctx.node.progress.startedTime = now(); if (!task.__onAbort) return task.__f(ctx); try { const ret = await task.__f(ctx); @@ -101,12 +110,21 @@ async function run<T>(task: Task<T>, ctx: ObservableRuntimeContext) { } } + +function shouldNotify(info: ProgressInfo, time: number) { + return time - info.lastNotified > info.updateRateMs; +} + +function notifyObserver(info: ProgressInfo, time: number) { + info.lastNotified = time; + const snapshot = snapshotProgress(info); + info.observer(snapshot); +} + class ObservableRuntimeContext implements RuntimeContext { isExecuting = true; - elapsedCpuMs: number; - lastScheduledTime: number; + lastUpdatedTime = 0; - started: number = 0; node: Progress.Node; info: ProgressInfo; @@ -116,12 +134,12 @@ class ObservableRuntimeContext implements RuntimeContext { } } - get needsYield(): boolean { + get shouldUpdate(): boolean { this.checkAborted(); - return now() - this.info.lastUpdated > this.info.updateRateMs; + return now() - this.lastUpdatedTime > this.info.updateRateMs; } - private setProgress(update?: string | Partial<RuntimeContext.ProgressUpdate>) { + private updateProgress(update?: string | Partial<RuntimeContext.ProgressUpdate>) { this.checkAborted(); if (!update) return; @@ -138,57 +156,49 @@ class ObservableRuntimeContext implements RuntimeContext { } } - private resume = () => { - this.isExecuting = true; - this.lastScheduledTime = now(); - } + update(progress?: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void { + // The progress tracking and observer notification are separated + // because the computation can have a tree structure. + // All nodes of the tree should be regualarly updated at the specified frequency, + // however, the notification should only be invoked once per the whole tree. - updateProgress(progress?: string | Partial<RuntimeContext.ProgressUpdate>) { - this.setProgress(progress); - } + this.lastUpdatedTime = now(); + this.updateProgress(progress); - yield(progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<void> { - this.isExecuting = false; - this.setProgress(progress); - this.info.lastUpdated = now(); - const snapshot = snapshotProgress(this.info); - this.info.observer(snapshot); - return ImmediateScheduler.last(this.resume); + if (!!dontNotify || !shouldNotify(this.info, this.lastUpdatedTime)) return; + + notifyObserver(this.info, this.lastUpdatedTime); + return Scheduler.immediatePromise(); } async runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> { - this.setProgress(progress); - const node: Progress.Node = { progress: defaultProgress(this.info.taskId, task), children: [] }; + this.updateProgress(progress); + + // Create a new child context and add it to the progress tree. + // When the child task finishes, remove the tree node. + + const node: Progress.Node = { progress: defaultProgress(task), children: [] }; const children = this.node.children as Progress.Node[]; children.push(node); - const ctx = new ObservableRuntimeContext(task, this.info, node); + const ctx = new ObservableRuntimeContext(this.info, node); try { return await run(task, ctx); } finally { // remove the progress node after the computation has finished. const idx = children.indexOf(node); if (idx >= 0) { - children[idx] = children[children.length - 1]; + for (let i = idx, _i = children.length - 1; i < _i; i++) { + children[i] = children[i + 1]; + } children.pop(); } } } - constructor(task: Task<any>, info: ProgressInfo, node: Progress.Node) { - this.lastScheduledTime = this.started; + constructor(info: ProgressInfo, node: Progress.Node) { this.node = node; this.info = info; } } -function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) { - const info = ProgressInfo(task, observer, updateRateMs); - const ctx = new ObservableRuntimeContext(task, info, info.root); - return runRoot(task, ctx); -} - -namespace ExecuteObservable { - export let PRINT_ERRORS_TO_STD_ERR = false; -} - -export default ExecuteObservable \ No newline at end of file +export { ExecuteObservable } \ No newline at end of file diff --git a/src/mol-task/execution/progress.ts b/src/mol-task/execution/progress.ts index 91c40d683..513c262c6 100644 --- a/src/mol-task/execution/progress.ts +++ b/src/mol-task/execution/progress.ts @@ -4,12 +4,12 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import Task from '../task' +import { Task } from '../task' interface Progress { root: Progress.Node, canAbort: boolean, - tryAbort: (reason?: string) => void + requestAbort: (reason?: string) => void } namespace Progress { @@ -21,4 +21,4 @@ namespace Progress { export interface Observer { (progress: Progress): void } } -export default Progress \ No newline at end of file +export { Progress } \ No newline at end of file diff --git a/src/mol-task/execution/runtime-context.ts b/src/mol-task/execution/runtime-context.ts index 754f18a0d..e8812c55f 100644 --- a/src/mol-task/execution/runtime-context.ts +++ b/src/mol-task/execution/runtime-context.ts @@ -4,15 +4,20 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import Task from '../task' +import { Task } from '../task' interface RuntimeContext { - readonly needsYield: boolean, - updateProgress(progress: string | Partial<RuntimeContext.ProgressUpdate>): void, + readonly shouldUpdate: boolean, + // Idiomatic usage: // if (ctx.needsYield) await ctx.yield({ ... }); - yield(progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<void>, - // Force the user to pass the progress so that the progress tree can be kept in a "good state". + // + // Alternatively, progress can be updated without notifying (and yielding) using update(progress, true). + // This is useful for nested tasks. + update(progress?: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void, + + // Run a child task that adds a new node to the progress tree. + // Allow to pass the progress so that the progress tree can be kept in a "good state" without having to separately call update. runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> } @@ -28,4 +33,4 @@ namespace RuntimeContext { } } -export default RuntimeContext \ No newline at end of file +export { RuntimeContext } \ No newline at end of file diff --git a/src/mol-task/execution/synchronous.ts b/src/mol-task/execution/synchronous.ts index 0267a36ea..74b1180b5 100644 --- a/src/mol-task/execution/synchronous.ts +++ b/src/mol-task/execution/synchronous.ts @@ -4,15 +4,12 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import Task from '../task' -import RuntimeContext from './runtime-context' - -const voidPromise = Promise.resolve(void 0); +import { Task } from '../task' +import { RuntimeContext } from './runtime-context' class SynchronousRuntimeContext implements RuntimeContext { - needsYield: boolean = false; - updateProgress(progress: string | Partial<RuntimeContext.ProgressUpdate>): void { } - yield(progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<void> { return voidPromise; } + shouldUpdate: boolean = false; + update(progress: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void { } runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> { return ExecuteSynchronous(task); } } @@ -22,4 +19,4 @@ function ExecuteSynchronous<T>(task: Task<T>) { return task.__f(SyncRuntimeInstance); } -export default ExecuteSynchronous \ No newline at end of file +export { ExecuteSynchronous } \ No newline at end of file diff --git a/src/mol-task/index.ts b/src/mol-task/index.ts index 6367cb572..66449f0fb 100644 --- a/src/mol-task/index.ts +++ b/src/mol-task/index.ts @@ -4,18 +4,21 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import Task from './task' -import RuntimeContext from './execution/runtime-context' -import ExecuteSynchronous from './execution/synchronous' -import ExecuteObservable from './execution/observable' -import Progress from './execution/progress' -import now from './util/now' +import { Task } from './task' +import { RuntimeContext } from './execution/runtime-context' +import { ExecuteSynchronous } from './execution/synchronous' +import { ExecuteObservable } from './execution/observable' +import { Progress } from './execution/progress' +import { now } from './util/now' +import { Scheduler } from './util/scheduler' +// Run the task without the ability to observe its progress. function Run<T>(task: Task<T>): Promise<T>; +// Run the task with the ability to observe its progress and request cancellation. function Run<T>(task: Task<T>, observer: Progress.Observer, updateRateMs?: number): Promise<T>; function Run<T>(task: Task<T>, observer?: Progress.Observer, updateRateMs?: number): Promise<T> { if (observer) return ExecuteObservable(task, observer, updateRateMs || 250); return ExecuteSynchronous(task); } -export { Task, RuntimeContext, Progress, Run, now } \ No newline at end of file +export { Task, RuntimeContext, Progress, Run, now, Scheduler } \ No newline at end of file diff --git a/src/mol-task/scheduler/immediate.ts b/src/mol-task/scheduler/immediate.ts deleted file mode 100644 index 678dae7c2..000000000 --- a/src/mol-task/scheduler/immediate.ts +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. - * - * @author David Sehnal <david.sehnal@gmail.com> - */ - -namespace ImmediateScheduler { - // Adds the function to the start of the "immediate queue" - export async function first<T>(f: () => T): Promise<T> { - return f(); - } - - // Adds the function to the end of the "immediate queue" - export async function last<T>(f: () => T): Promise<T> { - return f(); - } -} - -export default ImmediateScheduler \ No newline at end of file diff --git a/src/mol-task/task.ts b/src/mol-task/task.ts index 8f6a700c3..64933d25f 100644 --- a/src/mol-task/task.ts +++ b/src/mol-task/task.ts @@ -4,9 +4,10 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import RuntimeContext from './execution/runtime-context' +import { RuntimeContext } from './execution/runtime-context' -// Run(t, ?observer, ?updateRate) to execute +// A "named function wrapper" with built in "computation tree progress tracking". +// Use Run(t, ?observer, ?updateRate) to execute interface Task<T> { readonly id: number, readonly name: string, @@ -28,24 +29,23 @@ namespace Task { export function constant<T>(name: string, value: T): Task<T> { return create(name, async ctx => value); } export function fail(name: string, reason: string): Task<any> { return create(name, async ctx => { throw new Error(reason); }); } - let _id = 0; - function nextId() { - const ret = _id; - _id = (_id + 1) % 0x3fffffff; - return ret; - } - export interface Progress { - rootTaskId: number, taskId: number, taskName: string, + startedTime: number, message: string, - elapsedMs: { real: number, cpu: number }, canAbort: boolean, isIndeterminate: boolean, current: number, max: number } + + let _id = 0; + function nextId() { + const ret = _id; + _id = (_id + 1) % 0x3fffffff; + return ret; + } } -export default Task \ No newline at end of file +export { Task } \ No newline at end of file diff --git a/src/mol-task/util.ts b/src/mol-task/util.ts index 35a421848..a3ce20ff3 100644 --- a/src/mol-task/util.ts +++ b/src/mol-task/util.ts @@ -59,7 +59,6 @@ function MultistepComputation<P, T>(name: string, steps: string[], f: MultistepF return params => create(async ctx => f(params, n => ctx.yield(steps[n]), ctx)); } - // if total count is specified, could automatically provide percentage type UniformlyChunkedFn<S> = (chunkSize: number, state: S, totalCount?: number) => number type UniformlyChunkedProvider<S> = (ctx: RuntimeContext, state: S) => Promise<S> diff --git a/src/mol-task/util/immediate.ts b/src/mol-task/util/immediate.ts deleted file mode 100644 index df8b2f764..000000000 --- a/src/mol-task/util/immediate.ts +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. - * - * @author David Sehnal <david.sehnal@gmail.com> - */ - -/** - * setImmediate polyfill adapted from https://github.com/YuzuJS/setImmediate - * Copyright (c) 2012 Barnesandnoble.com, llc, Donavon West, and Domenic Denicola - * MIT license. - */ - -declare var WorkerGlobalScope: any; -function createImmediateActions() { - const global: any = (function () { - const _window = typeof window !== 'undefined' && window; - const _self = typeof self !== 'undefined' && typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope && self; - const _global = typeof global !== 'undefined' && global; - return _window || _global || _self; - })(); - - type Callback = (...args: any[]) => void; - type Task = { callback: Callback, args: any[] } - - const tasksByHandle: { [handle: number]: Task } = { }; - const doc = typeof document !== 'undefined' ? document : void 0; - - let nextHandle = 1; // Spec says greater than zero - let registerImmediate: ((handle: number) => void); - - function setImmediate(callback: Callback, ...args: any[]) { - // Callback can either be a function or a string - if (typeof callback !== 'function') { - callback = new Function('' + callback) as Callback; - } - // Store and register the task - const task = { callback: callback, args: args }; - tasksByHandle[nextHandle] = task; - registerImmediate(nextHandle); - return nextHandle++; - } - - function clearImmediate(handle: number) { - delete tasksByHandle[handle]; - } - - function run(task: Task) { - const callback = task.callback; - const args = task.args; - switch (args.length) { - case 0: - callback(); - break; - case 1: - callback(args[0]); - break; - case 2: - callback(args[0], args[1]); - break; - case 3: - callback(args[0], args[1], args[2]); - break; - default: - callback.apply(undefined, args); - break; - } - } - - function runIfPresent(handle: number) { - const task = tasksByHandle[handle]; - clearImmediate(handle); - run(task); - } - - function installNextTickImplementation() { - registerImmediate = function(handle) { - process.nextTick(function () { runIfPresent(handle); }); - }; - } - - function canUsePostMessage() { - if (global && global.postMessage && !global.importScripts) { - let postMessageIsAsynchronous = true; - const oldOnMessage = global.onmessage; - global.onmessage = function() { - postMessageIsAsynchronous = false; - }; - global.postMessage('', '*'); - global.onmessage = oldOnMessage; - return postMessageIsAsynchronous; - } - } - - function installPostMessageImplementation() { - // Installs an event handler on `global` for the `message` event: see - // * https://developer.mozilla.org/en/DOM/window.postMessage - // * http://www.whatwg.org/specs/web-apps/current-work/multipage/comms.html#crossDocumentMessages - - const messagePrefix = 'setImmediate$' + Math.random() + '$'; - const onGlobalMessage = function(event: any) { - if (event.source === global && - typeof event.data === 'string' && - event.data.indexOf(messagePrefix) === 0) { - runIfPresent(+event.data.slice(messagePrefix.length)); - } - }; - - if (window.addEventListener) { - window.addEventListener('message', onGlobalMessage, false); - } else { - (window as any).attachEvent('onmessage', onGlobalMessage); - } - - registerImmediate = function(handle) { - window.postMessage(messagePrefix + handle, '*'); - }; - } - - function installMessageChannelImplementation() { - const channel = new MessageChannel(); - channel.port1.onmessage = function(event) { - const handle = event.data; - runIfPresent(handle); - }; - - registerImmediate = function(handle) { - channel.port2.postMessage(handle); - }; - } - - function installReadyStateChangeImplementation() { - const html = doc!.documentElement; - registerImmediate = function(handle) { - // Create a <script> element; its readystatechange event will be fired asynchronously once it is inserted - // into the document. Do so, thus queuing up the task. Remember to clean up once it's been called. - let script = doc!.createElement('script') as any; - script.onreadystatechange = function () { - runIfPresent(handle); - script.onreadystatechange = null; - html.removeChild(script); - script = null; - }; - html.appendChild(script); - }; - } - - function installSetTimeoutImplementation() { - registerImmediate = function(handle) { - setTimeout(runIfPresent, 0, handle); - }; - } - - // Don't get fooled by e.g. browserify environments. - if (typeof process !== 'undefined' && {}.toString.call(process) === '[object process]') { - // For Node.js before 0.9 - installNextTickImplementation(); - } else if (canUsePostMessage()) { - // For non-IE10 modern browsers - installPostMessageImplementation(); - } else if (typeof MessageChannel !== 'undefined') { - // For web workers, where supported - installMessageChannelImplementation(); - } else if (doc && 'onreadystatechange' in doc.createElement('script')) { - // For IE 6–8 - installReadyStateChangeImplementation(); - } else { - // For older browsers - installSetTimeoutImplementation(); - } - - return { - setImmediate, - clearImmediate - }; -} - -const immediateActions = (function () { - if (typeof setImmediate !== 'undefined') { - if (typeof window !== 'undefined') { - return { - setImmediate: (handler: any, ...args: any[]) => window.setImmediate(handler, ...args as any), - clearImmediate: (handle: any) => window.clearImmediate(handle) - }; - } else { - return { setImmediate, clearImmediate } - } - } - return createImmediateActions(); -}()); - -function resolveImmediate(res: () => void) { - immediateActions.setImmediate(res); -} - -export default { - immediate: immediateActions.setImmediate, - clearImmediate: immediateActions.clearImmediate, - - immediatePromise() { return new Promise<void>(resolveImmediate); } -}; diff --git a/src/mol-task/util/now.ts b/src/mol-task/util/now.ts index 1b29d548d..f4961d217 100644 --- a/src/mol-task/util/now.ts +++ b/src/mol-task/util/now.ts @@ -23,4 +23,4 @@ const now: () => number = (function () { } }()); -export default now; \ No newline at end of file +export { now } \ No newline at end of file diff --git a/src/mol-task/scheduler.ts b/src/mol-task/util/scheduler.ts similarity index 79% rename from src/mol-task/scheduler.ts rename to src/mol-task/util/scheduler.ts index e118ec97c..87a787332 100644 --- a/src/mol-task/scheduler.ts +++ b/src/mol-task/util/scheduler.ts @@ -10,14 +10,21 @@ * MIT license. */ +declare var WorkerGlobalScope: any; function createImmediateActions() { + const global: any = (function () { + const _window = typeof window !== 'undefined' && window; + const _self = typeof self !== 'undefined' && typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope && self; + const _global = typeof global !== 'undefined' && global; + return _window || _global || _self; + })(); + type Callback = (...args: any[]) => void; type Task = { callback: Callback, args: any[] } const tasksByHandle: { [handle: number]: Task } = { }; const doc = typeof document !== 'undefined' ? document : void 0; - let currentlyRunningATask = false; let nextHandle = 1; // Spec says greater than zero let registerImmediate: ((handle: number) => void); @@ -60,24 +67,9 @@ function createImmediateActions() { } function runIfPresent(handle: number) { - // From the spec: 'Wait until any invocations of this algorithm started before this one have completed.' - // So if we're currently running a task, we'll need to delay this invocation. - if (currentlyRunningATask) { - // Delay by doing a setTimeout. setImmediate was tried instead, but in Firefox 7 it generated a - // 'too much recursion' error. - setTimeout(runIfPresent, 0, handle); - } else { - const task = tasksByHandle[handle]; - if (task) { - currentlyRunningATask = true; - try { - run(task); - } finally { - clearImmediate(handle); - currentlyRunningATask = false; - } - } - } + const task = tasksByHandle[handle]; + clearImmediate(handle); + run(task); } function installNextTickImplementation() { @@ -87,9 +79,6 @@ function createImmediateActions() { } function canUsePostMessage() { - // The test against `importScripts` prevents this implementation from being installed inside a web worker, - // where `global.postMessage` means something completely different and can't be used for this purpose. - const global = typeof window !== 'undefined' ? window as any : void 0; if (global && global.postMessage && !global.importScripts) { let postMessageIsAsynchronous = true; const oldOnMessage = global.onmessage; @@ -108,7 +97,6 @@ function createImmediateActions() { // * http://www.whatwg.org/specs/web-apps/current-work/multipage/comms.html#crossDocumentMessages const messagePrefix = 'setImmediate$' + Math.random() + '$'; - const global = typeof window !== 'undefined' ? window as any : void 0; const onGlobalMessage = function(event: any) { if (event.source === global && typeof event.data === 'string' && @@ -189,8 +177,13 @@ function createImmediateActions() { const immediateActions = (function () { if (typeof setImmediate !== 'undefined') { if (typeof window !== 'undefined') { - return { setImmediate: (handler: any, ...args: any[]) => window.setImmediate(handler, ...args as any), clearImmediate: (handle: any) => window.clearImmediate(handle) }; - } else return { setImmediate, clearImmediate } + return { + setImmediate: (handler: any, ...args: any[]) => window.setImmediate(handler, ...args as any), + clearImmediate: (handle: any) => window.clearImmediate(handle) + }; + } else { + return { setImmediate, clearImmediate } + } } return createImmediateActions(); }()); @@ -199,9 +192,12 @@ function resolveImmediate(res: () => void) { immediateActions.setImmediate(res); } -export default { - immediate: immediateActions.setImmediate, +const Scheduler = { + setImmediate: immediateActions.setImmediate, clearImmediate: immediateActions.clearImmediate, + immediatePromise() { return new Promise<void>(resolveImmediate); }, + + delay<T>(timeout: number, value: T | undefined = void 0): Promise<T> { return new Promise(r => setTimeout(r, timeout, value)) } +} - immediatePromise() { return new Promise<void>(resolveImmediate); } -}; +export { Scheduler } diff --git a/src/perf-tests/tasks.ts b/src/perf-tests/tasks.ts index 38c8b9885..c1fa6c9c9 100644 --- a/src/perf-tests/tasks.ts +++ b/src/perf-tests/tasks.ts @@ -1,6 +1,6 @@ import * as B from 'benchmark' -import now from 'mol-task/util/now' -import Immedite from 'mol-task/util/immediate' +import { now } from 'mol-task/util/now' +import { Scheduler } from 'mol-task/util/scheduler' export namespace Tasks { export class Yielding { @@ -9,7 +9,7 @@ export namespace Tasks { const t = now(); if (t - this.lastUpdated < 250) return; this.lastUpdated = t; - return Immedite.immediatePromise(); + return Scheduler.immediatePromise(); } } @@ -22,7 +22,7 @@ export namespace Tasks { yield(): Promise<void> { this.lastUpdated = now(); - return Immedite.immediatePromise(); + return Scheduler.immediatePromise(); } } -- GitLab