diff --git a/package-lock.json b/package-lock.json index 445a602d57c899a8e98f013ba74f05382363076a..047413021a6fd3733756bae612ac80c1eb74a9cd 100644 Binary files a/package-lock.json and b/package-lock.json differ diff --git a/package.json b/package.json index 3185d9554462f5b524b2e682a7c2708e9e305a68..59f858186b753055f38774f3514554ec03fe15ee 100644 --- a/package.json +++ b/package.json @@ -33,24 +33,24 @@ "author": "", "license": "MIT", "devDependencies": { - "@types/benchmark": "^1.0.30", + "@types/benchmark": "^1.0.31", "@types/express": "^4.0.39", - "@types/jest": "^21.1.5", - "@types/node": "^8.0.47", + "@types/jest": "^21.1.8", + "@types/node": "^8.0.56", "@types/node-fetch": "^1.6.7", "benchmark": "^2.1.4", "download-cli": "^1.0.5", "jest": "^21.2.1", - "rollup": "^0.50.0", + "rollup": "^0.50.1", "rollup-plugin-buble": "^0.16.0", "rollup-plugin-commonjs": "^8.2.6", "rollup-plugin-json": "^2.3.0", "rollup-plugin-node-resolve": "^3.0.0", "rollup-watch": "^4.3.1", - "ts-jest": "^21.1.4", + "ts-jest": "^21.2.4", "tslint": "^5.8.0", - "typescript": "^2.6.1", - "uglify-js": "^3.1.7", + "typescript": "^2.6.2", + "uglify-js": "^3.2.1", "util.promisify": "^1.0.0" }, "dependencies": { diff --git a/src/examples/computation.ts b/src/examples/computation.ts index 0ffdd02fcbce683e436c0030ffe0517135c6ceda..8068bff3191764eacce29e8cc12b56377410daee 100644 --- a/src/examples/computation.ts +++ b/src/examples/computation.ts @@ -1 +1,81 @@ -// TODO \ No newline at end of file +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import { Task, Run, Progress, Scheduler, now } from 'mol-task' + +export async function test1() { + const t = Task.create('test', async () => 1); + const r = await Run(t); + console.log(r); +} + +function messageTree(root: Progress.Node, prefix = ''): string { + if (!root.children.length) return `${prefix}${root.progress.taskName}: ${root.progress.message}`; + + const newPrefix = prefix + ' |_ '; + const subTree = root.children.map(c => messageTree(c, newPrefix)); + return `${prefix}${root.progress.taskName}: ${root.progress.message}\n${subTree.join('\n')}`; +} + +function createTask<T>(delayMs: number, r: T): Task<T> { + return Task.create('delayed value ' + r, async ctx => { + ctx.update('Processing delayed... ' + r, true); + await Scheduler.delay(delayMs); + if (ctx.shouldUpdate) await ctx.update({ message: 'hello from delayed... ' }); + return r; + }, () => console.log('On abort called ' + r)); +} + +export function abortAfter(delay: number) { + return Task.create('abort after ' + delay, async ctx => { + await Scheduler.delay(delay); + throw Task.Aborted('test'); + //if (ctx.shouldUpdate) await ctx.update({ message: 'hello from delayed... ' }); + //return r; + }); +} + +function testTree() { + return Task.create('test o', async ctx => { + await Scheduler.delay(250); + if (ctx.shouldUpdate) await ctx.update({ message: 'hi! 1' }); + await Scheduler.delay(125); + if (ctx.shouldUpdate) await ctx.update({ message: 'hi! 2' }); + await Scheduler.delay(250); + if (ctx.shouldUpdate) await ctx.update('hi! 3'); + + // ctx.update('Running children...', true); + const c1 = ctx.runChild(createTask(250, 1)); + const c2 = ctx.runChild(createTask(500, 2)); + const c3 = ctx.runChild(createTask(750, 3)); + + //await ctx.runChild(abortAfter(350)); + + const r = await c1 + await c2 + await c3; + if (ctx.shouldUpdate) await ctx.update({ message: 'Almost done...' }); + return r + 1; + }); +} + +export function abortingObserver(p: Progress) { + console.log(messageTree(p.root)); + if (now() - p.root.progress.startedTime > 1000) { + p.requestAbort('test'); + } +} + +async function test() { + try { + //const r = await Run(testTree(), p => console.log(messageTree(p.root)), 250); + const r = await Run(testTree(), abortingObserver, 250); + console.log(r); + } catch (e) { + console.error(e); + } +} + +test(); +//testObs(); \ No newline at end of file diff --git a/src/mol-data/generic.ts b/src/mol-data/generic.ts new file mode 100644 index 0000000000000000000000000000000000000000..d837d1060ef77cafd9a0cf7c493a05f90b2a03b2 --- /dev/null +++ b/src/mol-data/generic.ts @@ -0,0 +1,9 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +export * from './generic/hash-set' +export * from './generic/linked-list' +export * from './generic/unique-array' \ No newline at end of file diff --git a/src/mol-data/generic/_spec/linked-list.spec.ts b/src/mol-data/generic/_spec/linked-list.spec.ts new file mode 100644 index 0000000000000000000000000000000000000000..cb439b3da78117e85734fa39ffd040f86cc260a5 --- /dev/null +++ b/src/mol-data/generic/_spec/linked-list.spec.ts @@ -0,0 +1,63 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import { LinkedList } from '../linked-list' + +describe('linked list', () => { + + function toArray<T>(list: LinkedList<T>) { + const ret: T[] = []; + for (let t = list.first; !!t; t = t.next) { + ret[ret.length] = t.value; + } + return ret; + } + + function create<T>(xs: T[]) { + const list = LinkedList<T>(); + for (const x of xs) list.addLast(x); + return list; + } + + it('add', () => { + const list = LinkedList<number>(); + list.addFirst(1); + list.addLast(2); + list.addFirst(3); + list.addFirst(4); + list.addLast(5); + expect(toArray(list)).toEqual([4, 3, 1, 2, 5]); + expect(list.count).toBe(5); + }); + + it ('remove', () => { + const list = create([1, 2, 3, 4]); + let fst = list.removeFirst(); + expect(fst).toBe(1); + expect(list.last!.value).toBe(4); + expect(list.count).toBe(3); + expect(toArray(list)).toEqual([2, 3, 4]); + + let last = list.removeLast(); + expect(last).toBe(4); + expect(list.last!.value).toBe(3); + expect(list.count).toBe(2); + expect(toArray(list)).toEqual([2, 3]); + + let n3 = list.find(3)!; + list.remove(n3); + expect(list.first!.value).toBe(2); + expect(list.last!.value).toBe(2); + expect(list.count).toBe(1); + expect(toArray(list)).toEqual([2]); + + list.removeFirst(); + expect(list.first).toBe(null); + expect(list.last).toBe(null); + expect(list.count).toBe(0); + expect(toArray(list)).toEqual([]); + }) +}); \ No newline at end of file diff --git a/src/mol-data/util/hash-set.ts b/src/mol-data/generic/hash-set.ts similarity index 100% rename from src/mol-data/util/hash-set.ts rename to src/mol-data/generic/hash-set.ts diff --git a/src/mol-data/generic/linked-list.ts b/src/mol-data/generic/linked-list.ts new file mode 100644 index 0000000000000000000000000000000000000000..3f54226999dec6cb4b33ac447e0342ace9a05abf --- /dev/null +++ b/src/mol-data/generic/linked-list.ts @@ -0,0 +1,119 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +interface LinkedList<T> { + readonly count: number, + readonly first: LinkedList.Node<T> | null, + readonly last: LinkedList.Node<T> | null, + addFirst(value: T): LinkedList.Node<T>, + addLast(value: T): LinkedList.Node<T>, + remove(node: LinkedList.Node<T>): void, + removeFirst(): T | undefined, + removeLast(): T | undefined, + find(value: T): LinkedList.Node<T> | undefined +} + +function LinkedList<T>(): LinkedList<T> { + return new LinkedListImpl(); +} + +namespace LinkedList { + export interface Node<T> { + previous: Node<T> | null, + next: Node<T> | null, + inList: boolean, + value: T + } +} + +function createListNode<T>(value: T): LinkedList.Node<T> { + return { previous: null, next: null, inList: true, value }; +} + +class LinkedListImpl<T> implements LinkedList<T> { + count: number = 0; + first: LinkedList.Node<T> | null = null; + last: LinkedList.Node<T> | null = null; + + addFirst(value: T) { + const node = createListNode(value); + node.inList = true; + if (this.first) this.first.previous = node; + node.next = this.first; + this.first = node; + this.count++; + if (!this.last) this.last = node; + return node; + } + + addLast(value: T) { + const node = createListNode(value); + if (this.last !== null) { + this.last.next = node; + } + node.previous = this.last; + this.last = node; + if (this.first === null) { + this.first = node; + } + node.inList = true; + this.count++; + return node; + } + + removeFirst(): T | undefined { + const fst = this.first; + if (fst) { + this.remove(fst); + return fst.value; + } + return void 0; + } + + removeLast(): T | undefined { + const last = this.last; + if (last) { + this.remove(last); + return last.value; + } + return void 0; + } + + remove(node: LinkedList.Node<T>) { + if (!node.inList) return; + + node.inList = false; + + if (node.previous !== null) { + node.previous.next = node.next; + } + else if (/*first == item*/ node.previous === null) { + this.first = node.next; + } + + if (node.next !== null) { + node.next.previous = node.previous; + } + else if (/*last == item*/ node.next === null) { + this.last = node.previous; + } + + node.next = null; + node.previous = null; + this.count--; + } + + find(value: T): LinkedList.Node<T> | undefined { + let current = this.first; + while (current !== null) { + if (current.value === value) return current; + current = current.next; + } + return void 0; + } +} + +export { LinkedList } \ No newline at end of file diff --git a/src/mol-data/util/unique-array.ts b/src/mol-data/generic/unique-array.ts similarity index 100% rename from src/mol-data/util/unique-array.ts rename to src/mol-data/generic/unique-array.ts diff --git a/src/mol-data/index.ts b/src/mol-data/index.ts index 1db90c26e324d3098c0a50b16b3ec2f957f37f4e..b1d2500848e1069a962d90d252e76406d47a9441 100644 --- a/src/mol-data/index.ts +++ b/src/mol-data/index.ts @@ -8,5 +8,6 @@ import * as DB from './db' import * as Int from './int' import Iterator from './iterator' import * as Util from './util' +import * as Generic from './generic' -export { DB, Int, Iterator, Util } \ No newline at end of file +export { DB, Int, Iterator, Util, Generic } \ No newline at end of file diff --git a/src/mol-data/util.ts b/src/mol-data/util.ts index 629b3b517d6f1ad80151a734f369083529c00ba5..c541a1e9e9dc71678081bc32c4f158fda3b18b39 100644 --- a/src/mol-data/util.ts +++ b/src/mol-data/util.ts @@ -5,8 +5,6 @@ */ export * from './util/chunked-array' -export * from './util/unique-array' -export * from './util/hash-set' export * from './util/equivalence-classes' export * from './util/hash-functions' export * from './util/sort' diff --git a/src/mol-data/util/array.ts b/src/mol-data/util/array.ts index 664138327b5b67fab885efd6e0ec035f6efaf895..93707e7978d657144d2b487a4c665c5f8eaf0198 100644 --- a/src/mol-data/util/array.ts +++ b/src/mol-data/util/array.ts @@ -14,7 +14,7 @@ export function arrayFind<T>(array: ArrayLike<T>, f: (v: T) => boolean): T | und export function iterableToArray<T>(it: IterableIterator<T>): T[] { if (Array.from) return Array.from(it); - + const ret = []; while (true) { const { done, value } = it.next(); diff --git a/src/mol-model/structure/query/selection.ts b/src/mol-model/structure/query/selection.ts index 2d8467e0764f13993077c5d6a49761ec66874acc..5b317240a2e1d02530c36f4c0ecf3e130965a96c 100644 --- a/src/mol-model/structure/query/selection.ts +++ b/src/mol-model/structure/query/selection.ts @@ -4,7 +4,7 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import { HashSet } from 'mol-data/util' +import { HashSet } from 'mol-data/generic' import { Structure, AtomSet } from '../structure' // A selection is a pair of a Structure and a sequence of unique AtomSets diff --git a/src/mol-model/structure/structure/structure.ts b/src/mol-model/structure/structure/structure.ts index 6cac5a7b8c4e05f9fdfc96abb12e902f68ae7bda..dfd764e4750977b8fb576f252ebbd4f838a5ef74 100644 --- a/src/mol-model/structure/structure/structure.ts +++ b/src/mol-model/structure/structure/structure.ts @@ -5,7 +5,7 @@ */ import { OrderedSet, Iterator } from 'mol-data/int' -import { UniqueArray } from 'mol-data/util' +import { UniqueArray } from 'mol-data/generic' import SymmetryOperator from 'mol-math/geometry/symmetry-operator' import { Model, Format } from '../model' import Unit from './unit' diff --git a/src/mol-task/computation.ts b/src/mol-task/computation.ts deleted file mode 100644 index 3d956de70054e16b858a96c080393ac582517ab7..0000000000000000000000000000000000000000 --- a/src/mol-task/computation.ts +++ /dev/null @@ -1,283 +0,0 @@ -/** - * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. - * - * Adapted from https://github.com/dsehnal/LiteMol - * @author David Sehnal <david.sehnal@gmail.com> - */ - -import Scheduler from './scheduler' -import timeNow from './time' - -interface Computation<A> { - (ctx?: Computation.Context): Promise<A> -} - -namespace Computation { - export let PRINT_ERRORS_TO_CONSOLE = false; - - export function create<A>(computation: (ctx: Context) => Promise<A>) { - return ComputationImpl(computation); - } - - export function resolve<A>(a: A) { - return create<A>(_ => Promise.resolve(a)); - } - - export function reject<A>(reason: any) { - return create<A>(_ => Promise.reject(reason)); - } - - export interface Params { - updateRateMs?: number, - observer?: ProgressObserver - } - - export const Aborted = 'Aborted'; - - export interface Progress { - message: string, - isIndeterminate: boolean, - current: number, - max: number, - elapsedMs: number, - requestAbort?: () => void - } - - export interface ProgressUpdate { - message?: string, - abort?: boolean | (() => void), - current?: number, - max?: number - } - - export interface Context { - readonly isSynchronous: boolean, - /** Also checks if the computation was aborted. If so, throws. */ - readonly requiresUpdate: boolean, - requestAbort(): void, - - subscribe(onProgress: ProgressObserver): { dispose: () => void }, - /** Also checks if the computation was aborted. If so, throws. */ - update(info: ProgressUpdate): Promise<void> | void - } - - export type ProgressObserver = (progress: Readonly<Progress>) => void; - - const emptyDisposer = { dispose: () => { } } - - /** A context without updates. */ - export const synchronous: Context = { - isSynchronous: true, - requiresUpdate: false, - requestAbort() { }, - subscribe(onProgress) { return emptyDisposer; }, - update(info) { } - } - - export function observable(params?: Partial<Params>) { - const ret = new ObservableContext(params && params.updateRateMs); - if (params && params.observer) ret.subscribe(params.observer); - return ret; - } - - export const now = timeNow; - - /** A utility for splitting large computations into smaller parts. */ - export interface Chunker { - setNextChunkSize(size: number): void, - /** nextChunk must return the number of actually processed chunks. */ - process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void> - } - - export function chunker(ctx: Context, nextChunkSize: number): Chunker { - return new ChunkerImpl(ctx, nextChunkSize); - } -} - -const DefaulUpdateRateMs = 150; - -function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> { - return (ctx?: Computation.Context) => { - const context: ObservableContext = ctx ? ctx : Computation.synchronous as any; - return new Promise<A>(async (resolve, reject) => { - try { - if (context.started) context.started(); - const result = await computation(context); - resolve(result); - } catch (e) { - if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e); - reject(e); - } finally { - if (context.finished) context.finished(); - } - }); - } -} - -class ObservableContext implements Computation.Context { - readonly updateRate: number; - readonly isSynchronous: boolean = false; - private level = 0; - private startedTime = 0; - private abortRequested = false; - private lastUpdated = 0; - private observers: Computation.ProgressObserver[] | undefined = void 0; - private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 }; - - private checkAborted() { - if (this.abortRequested) throw Computation.Aborted; - } - - private abortRequester = () => { this.abortRequested = true }; - - subscribe = (obs: Computation.ProgressObserver) => { - if (!this.observers) this.observers = []; - this.observers.push(obs); - return { - dispose: () => { - if (!this.observers) return; - for (let i = 0; i < this.observers.length; i++) { - if (this.observers[i] === obs) { - this.observers[i] = this.observers[this.observers.length - 1]; - this.observers.pop(); - return; - } - } - } - }; - } - - requestAbort() { - try { - if (this.abortRequester) { - this.abortRequester.call(null); - } - } catch (e) { } - } - - update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void { - this.checkAborted(); - - const time = Computation.now(); - - if (typeof abort === 'boolean') { - this.progress.requestAbort = abort ? this.abortRequester : void 0; - } else { - if (abort) this.abortRequester = abort; - this.progress.requestAbort = abort ? this.abortRequester : void 0; - } - - if (typeof message !== 'undefined') this.progress.message = message; - this.progress.elapsedMs = time - this.startedTime; - if (isNaN(current!)) { - this.progress.isIndeterminate = true; - } else { - this.progress.isIndeterminate = false; - this.progress.current = current!; - if (!isNaN(max!)) this.progress.max = max!; - } - - if (this.observers) { - const p = { ...this.progress }; - for (let i = 0, _i = this.observers.length; i < _i; i++) { - Scheduler.immediate(this.observers[i], p); - } - } - - this.lastUpdated = time; - - return Scheduler.immediatePromise(); - } - - get requiresUpdate() { - this.checkAborted(); - if (this.isSynchronous) return false; - return Computation.now() - this.lastUpdated > this.updateRate; - } - - started() { - if (!this.level) { - this.startedTime = Computation.now(); - this.lastUpdated = this.startedTime; - } - this.level++; - } - - finished() { - this.level--; - if (this.level < 0) { - throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.'); - } - if (!this.level) this.observers = void 0; - } - - constructor(updateRate?: number) { - this.updateRate = updateRate || DefaulUpdateRateMs; - } -} - -class ChunkerImpl implements Computation.Chunker { - private processedSinceUpdate = 0; - private updater: Computation.Context['update']; - - private computeChunkSize(delta: number) { - if (!delta) { - this.processedSinceUpdate = 0; - return this.nextChunkSize; - } - const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs; - const ret = Math.round(this.processedSinceUpdate * rate / delta + 1); - this.processedSinceUpdate = 0; - return ret; - } - - private getNextChunkSize() { - const ctx = this.context as ObservableContext; - // be smart if the computation is synchronous and process the whole chunk at once. - if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER; - return this.nextChunkSize; - } - - setNextChunkSize(size: number) { - this.nextChunkSize = size; - } - - async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) { - if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize); - this.processedSinceUpdate = 0; - - // track time for the actual computation and exclude the "update time" - let chunkStart = Computation.now(); - let lastChunkSize: number; - let chunkCount = 0; - let totalSize = 0; - let updateCount = 0; - while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) { - chunkCount++; - this.processedSinceUpdate += lastChunkSize; - totalSize += lastChunkSize; - if (this.context.requiresUpdate) { - let time = Computation.now(); - await update(this.updater); - this.nextChunkSize = updateCount > 0 - ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1)) - : this.computeChunkSize(time - chunkStart) - updateCount++; - chunkStart = Computation.now(); - } - } - if (this.context.requiresUpdate) { - let time = Computation.now(); - await update(this.updater); - this.nextChunkSize = updateCount > 0 - ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1)) - : this.computeChunkSize(time - chunkStart) - } - } - - constructor(public context: Computation.Context, private nextChunkSize: number) { - this.updater = this.context.update.bind(this.context); - } -} - -export default Computation; \ No newline at end of file diff --git a/src/mol-task/execution/observable.ts b/src/mol-task/execution/observable.ts new file mode 100644 index 0000000000000000000000000000000000000000..8c184c180944438d788b63a0000311d596a8c9f8 --- /dev/null +++ b/src/mol-task/execution/observable.ts @@ -0,0 +1,225 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import { Task } from '../task' +import { RuntimeContext } from './runtime-context' +import { Progress } from './progress' +import { now } from '../util/now' +import { Scheduler } from '../util/scheduler' + +function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) { + const info = ProgressInfo(task, observer, updateRateMs); + const ctx = new ObservableRuntimeContext(info, info.root); + return execute(task, ctx); +} + +namespace ExecuteObservable { + export let PRINT_ERRORS_TO_STD_ERR = false; +} + +function defaultProgress(task: Task<any>): Task.Progress { + return { + taskId: task.id, + taskName: task.name, + message: '', + startedTime: 0, + canAbort: true, + isIndeterminate: true, + current: 0, + max: 0 + }; +} + +interface ProgressInfo { + updateRateMs: number, + lastNotified: number, + observer: Progress.Observer, + + abortToken: { abortRequested: boolean, treeAborted: boolean, reason: string }, + taskId: number; + root: Progress.Node; + tryAbort: (reason?: string) => void; +} + +function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs: number): ProgressInfo { + const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, treeAborted: false, reason: '' }; + + return { + updateRateMs, + lastNotified: now(), + observer, + abortToken, + taskId: task.id, + root: { progress: defaultProgress(task), children: [] }, + tryAbort: createAbortFunction(abortToken) + }; +} + +function createAbortFunction(token: ProgressInfo['abortToken']) { + return (reason?: string) => { + token.abortRequested = true; + token.reason = reason || token.reason; + }; +} + +function cloneTree(root: Progress.Node): Progress.Node { + return { progress: { ...root.progress }, children: root.children.map(cloneTree) }; +} + +function canAbort(root: Progress.Node): boolean { + return root.progress.canAbort && root.children.every(canAbort); +} + +function snapshotProgress(info: ProgressInfo): Progress { + return { root: cloneTree(info.root), canAbort: canAbort(info.root), requestAbort: info.tryAbort }; +} + +async function execute<T>(task: Task<T>, ctx: ObservableRuntimeContext) { + ctx.node.progress.startedTime = now(); + try { + const ret = await task.__f(ctx); + if (ctx.info.abortToken.abortRequested) { + abort(ctx.info, ctx.node); + } + return ret; + } catch (e) { + if (Task.isAbort(e)) { + // wait for all child computations to go thru the abort phase. + if (ctx.node.children.length > 0) { + await new Promise(res => { ctx.onChildrenFinished = res; }); + } + if (task.__onAbort) task.__onAbort(); + } + if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e); + throw e; + } +} + +function abort(info: ProgressInfo, node: Progress.Node) { + if (!info.abortToken.treeAborted) { + info.abortToken.treeAborted = true; + abortTree(info.root); + notifyObserver(info, now()); + } + + throw Task.Aborted(info.abortToken.reason); +} + +function abortTree(root: Progress.Node) { + const progress = root.progress; + progress.isIndeterminate = true; + progress.canAbort = false; + progress.message = 'Aborting...'; + for (const c of root.children) abortTree(c); +} + +function shouldNotify(info: ProgressInfo, time: number) { + return time - info.lastNotified > info.updateRateMs; +} + +function notifyObserver(info: ProgressInfo, time: number) { + info.lastNotified = time; + const snapshot = snapshotProgress(info); + info.observer(snapshot); +} + +class ObservableRuntimeContext implements RuntimeContext { + isExecuting = true; + lastUpdatedTime = 0; + + node: Progress.Node; + info: ProgressInfo; + + // used for waiting for cancelled computation trees + onChildrenFinished?: () => void = void 0; + + private checkAborted() { + if (this.info.abortToken.abortRequested) { + abort(this.info, this.node); + } + } + + get shouldUpdate(): boolean { + this.checkAborted(); + return now() - this.lastUpdatedTime > this.info.updateRateMs; + } + + private updateProgress(update?: string | Partial<RuntimeContext.ProgressUpdate>) { + this.checkAborted(); + + if (!update) return; + + const progress = this.node.progress; + if (typeof update === 'string') { + progress.message = update; + } else { + if (typeof update.canAbort !== 'undefined') progress.canAbort = update.canAbort; + if (typeof update.current !== 'undefined') progress.current = update.current; + if (typeof update.isIndeterminate !== 'undefined') progress.isIndeterminate = update.isIndeterminate; + if (typeof update.max !== 'undefined') progress.max = update.max; + if (typeof update.message !== 'undefined') progress.message = update.message; + } + } + + update(progress?: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void { + // The progress tracking and observer notification are separated + // because the computation can have a tree structure. + // All nodes of the tree should be regualarly updated at the specified frequency, + // however, the notification should only be invoked once per the whole tree. + + this.lastUpdatedTime = now(); + this.updateProgress(progress); + + if (!!dontNotify || !shouldNotify(this.info, this.lastUpdatedTime)) return; + + notifyObserver(this.info, this.lastUpdatedTime); + + // The computation could have been aborted during the notifycation phase. + this.checkAborted(); + + return Scheduler.immediatePromise(); + } + + async runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> { + this.updateProgress(progress); + + // Create a new child context and add it to the progress tree. + // When the child task finishes, remove the tree node. + + const node: Progress.Node = { progress: defaultProgress(task), children: [] }; + const children = this.node.children as Progress.Node[]; + children.push(node); + const ctx = new ObservableRuntimeContext(this.info, node); + try { + return await execute(task, ctx); + } catch (e) { + if (Task.isAbort(e)) { + // need to catch the error here because otherwise + // promises for running child tasks in a tree-like computation + // will get orphaned and cause "uncaught error in Promise". + return void 0 as any; + } + throw e; + } finally { + // remove the progress node after the computation has finished. + const idx = children.indexOf(node); + if (idx >= 0) { + for (let i = idx, _i = children.length - 1; i < _i; i++) { + children[i] = children[i + 1]; + } + children.pop(); + } + if (children.length === 0 && this.onChildrenFinished) this.onChildrenFinished(); + } + } + + constructor(info: ProgressInfo, node: Progress.Node) { + this.node = node; + this.info = info; + } +} + +export { ExecuteObservable } \ No newline at end of file diff --git a/src/mol-task/execution/progress.ts b/src/mol-task/execution/progress.ts new file mode 100644 index 0000000000000000000000000000000000000000..513c262c67574879b6bf17f952f2569f00d8ebd1 --- /dev/null +++ b/src/mol-task/execution/progress.ts @@ -0,0 +1,24 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import { Task } from '../task' + +interface Progress { + root: Progress.Node, + canAbort: boolean, + requestAbort: (reason?: string) => void +} + +namespace Progress { + export interface Node { + readonly progress: Task.Progress, + readonly children: ReadonlyArray<Node> + } + + export interface Observer { (progress: Progress): void } +} + +export { Progress } \ No newline at end of file diff --git a/src/mol-task/execution/runtime-context.ts b/src/mol-task/execution/runtime-context.ts new file mode 100644 index 0000000000000000000000000000000000000000..e8812c55f733fbbd70779ec98bc5664f656024f8 --- /dev/null +++ b/src/mol-task/execution/runtime-context.ts @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import { Task } from '../task' + +interface RuntimeContext { + readonly shouldUpdate: boolean, + + // Idiomatic usage: + // if (ctx.needsYield) await ctx.yield({ ... }); + // + // Alternatively, progress can be updated without notifying (and yielding) using update(progress, true). + // This is useful for nested tasks. + update(progress?: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void, + + // Run a child task that adds a new node to the progress tree. + // Allow to pass the progress so that the progress tree can be kept in a "good state" without having to separately call update. + runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> +} + +namespace RuntimeContext { + export interface AbortToken { isAborted: boolean } + + export interface ProgressUpdate { + message: string, + isIndeterminate: boolean, + current: number, + max: number, + canAbort: boolean + } +} + +export { RuntimeContext } \ No newline at end of file diff --git a/src/mol-task/execution/synchronous.ts b/src/mol-task/execution/synchronous.ts new file mode 100644 index 0000000000000000000000000000000000000000..74b1180b51a7274afa5887aef5d9e3e6eeba243a --- /dev/null +++ b/src/mol-task/execution/synchronous.ts @@ -0,0 +1,22 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import { Task } from '../task' +import { RuntimeContext } from './runtime-context' + +class SynchronousRuntimeContext implements RuntimeContext { + shouldUpdate: boolean = false; + update(progress: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void { } + runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> { return ExecuteSynchronous(task); } +} + +const SyncRuntimeInstance = new SynchronousRuntimeContext(); + +function ExecuteSynchronous<T>(task: Task<T>) { + return task.__f(SyncRuntimeInstance); +} + +export { ExecuteSynchronous } \ No newline at end of file diff --git a/src/mol-task/index.ts b/src/mol-task/index.ts index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..66449f0fb16c3e632963118f26e69d1604e83eed 100644 --- a/src/mol-task/index.ts +++ b/src/mol-task/index.ts @@ -0,0 +1,24 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import { Task } from './task' +import { RuntimeContext } from './execution/runtime-context' +import { ExecuteSynchronous } from './execution/synchronous' +import { ExecuteObservable } from './execution/observable' +import { Progress } from './execution/progress' +import { now } from './util/now' +import { Scheduler } from './util/scheduler' + +// Run the task without the ability to observe its progress. +function Run<T>(task: Task<T>): Promise<T>; +// Run the task with the ability to observe its progress and request cancellation. +function Run<T>(task: Task<T>, observer: Progress.Observer, updateRateMs?: number): Promise<T>; +function Run<T>(task: Task<T>, observer?: Progress.Observer, updateRateMs?: number): Promise<T> { + if (observer) return ExecuteObservable(task, observer, updateRateMs || 250); + return ExecuteSynchronous(task); +} + +export { Task, RuntimeContext, Progress, Run, now, Scheduler } \ No newline at end of file diff --git a/src/mol-task/task.ts b/src/mol-task/task.ts new file mode 100644 index 0000000000000000000000000000000000000000..0ce5ca40bf65ef59de655b3ecdf39d4a2b12abec --- /dev/null +++ b/src/mol-task/task.ts @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import { RuntimeContext } from './execution/runtime-context' + +// A "named function wrapper" with built in "computation tree progress tracking". +// Use Run(t, ?observer, ?updateRate) to execute +interface Task<T> { + readonly id: number, + readonly name: string, + // Do not call this directly, use Run. + readonly __f: (ctx: RuntimeContext) => Promise<T>, + // Do not call this directly, use Run. + readonly __onAbort: (() => void) | undefined +} + +namespace Task { + export interface Aborted { isAborted: true, reason: string } + export function isAbort(e: any): e is Aborted { return !!e && !!e.isAborted; } + export function Aborted(reason: string): Aborted { return { isAborted: true, reason }; } + + export function create<T>(name: string, f: (ctx: RuntimeContext) => Promise<T>, onAbort?: () => void): Task<T> { + return { id: nextId(), name, __f: f, __onAbort: onAbort }; + } + + export function constant<T>(name: string, value: T): Task<T> { return create(name, async ctx => value); } + export function fail(name: string, reason: string): Task<any> { return create(name, async ctx => { throw new Error(reason); }); } + + export interface Progress { + taskId: number, + taskName: string, + startedTime: number, + message: string, + canAbort: boolean, + isIndeterminate: boolean, + current: number, + max: number + } + + let _id = 0; + function nextId() { + const ret = _id; + _id = (_id + 1) % 0x3fffffff; + return ret; + } +} + +export { Task } \ No newline at end of file diff --git a/src/mol-task/util.ts b/src/mol-task/util.ts index 17cbfea5e130cf35a660d02e2929f7eae1dfc6a1..a3ce20ff33975a7f205ce180c2f377ab65e4715b 100644 --- a/src/mol-task/util.ts +++ b/src/mol-task/util.ts @@ -1,9 +1,34 @@ +// enum TaskState { +// Pending, +// Running, +// Aborted, +// Completed, +// Failed +// } +interface TaskState { + +} + +namespace TaskState { + export interface Pending { kind: 'Pending' } + export interface Running { kind: 'Running', } + + export interface Progress { + message: string, + isIndeterminate: boolean, + current: number, + max: number, + elapsedMs: number + } +} type ExecutionContext = { run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, - subscribe(o: (p: string, compId: number) => void): void + subscribe(o: (p: string, compId: number) => void): void, + + requestAbort(compId: number): void } namespace ExecutionContext { @@ -17,12 +42,13 @@ namespace ExecutionContext { export const Sync: ExecutionContext = 0 as any; } -interface RuntimeContext extends ExecutionContext { +interface RuntimeContext { + run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, yield(name: string): Promise<void> | void } // if no context is specified, use the synchronous one. -type Computation<T> = { (ctx?: RuntimeContext): Promise<T>, _id: number } +interface Computation<T> { (ctx: RuntimeContext): Promise<T>, _id: number } function create<T>(c: (ctx: RuntimeContext) => Promise<T>): Computation<T> { return 0 as any; } function constant<T>(c: T) { return create(async ctx => c); } @@ -33,7 +59,6 @@ function MultistepComputation<P, T>(name: string, steps: string[], f: MultistepF return params => create(async ctx => f(params, n => ctx.yield(steps[n]), ctx)); } - // if total count is specified, could automatically provide percentage type UniformlyChunkedFn<S> = (chunkSize: number, state: S, totalCount?: number) => number type UniformlyChunkedProvider<S> = (ctx: RuntimeContext, state: S) => Promise<S> @@ -58,7 +83,7 @@ function readLines(str: string): Computation<string[]> { const prependHiToLines = MultistepComputation('Hi prepend', ['Parse input', 'Prepend Hi'], async (p: string, step, ctx) => { await step(0); - const lines = await ctx.run(readLines(p)); + const lines = await readLines(p)(ctx); await step(1); const ret = lines.map(l => 'Hi ' + l); return ret; diff --git a/src/mol-task/context.ts b/src/mol-task/util/chunked.ts similarity index 100% rename from src/mol-task/context.ts rename to src/mol-task/util/chunked.ts diff --git a/src/mol-task/util/multistep.ts b/src/mol-task/util/multistep.ts new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/mol-task/time.ts b/src/mol-task/util/now.ts similarity index 89% rename from src/mol-task/time.ts rename to src/mol-task/util/now.ts index 9f0d233adf92d20d0ab3cc5a27c03f26a1b42f73..f4961d217bf3570878d2d0f687c1c19a2f11f8b0 100644 --- a/src/mol-task/time.ts +++ b/src/mol-task/util/now.ts @@ -16,9 +16,11 @@ const now: () => number = (function () { const t = process.hrtime(); return t[0] * 1000 + t[1] / 1000000; }; + } else if (Date.now) { + return () => Date.now(); } else { return () => +new Date(); } }()); -export default now; \ No newline at end of file +export { now } \ No newline at end of file diff --git a/src/mol-task/scheduler.ts b/src/mol-task/util/scheduler.ts similarity index 79% rename from src/mol-task/scheduler.ts rename to src/mol-task/util/scheduler.ts index e118ec97cbf221e6b21d7126687f39bcc78753e7..87a787332318588acec187fa2ab4e6f9b836d6a5 100644 --- a/src/mol-task/scheduler.ts +++ b/src/mol-task/util/scheduler.ts @@ -10,14 +10,21 @@ * MIT license. */ +declare var WorkerGlobalScope: any; function createImmediateActions() { + const global: any = (function () { + const _window = typeof window !== 'undefined' && window; + const _self = typeof self !== 'undefined' && typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope && self; + const _global = typeof global !== 'undefined' && global; + return _window || _global || _self; + })(); + type Callback = (...args: any[]) => void; type Task = { callback: Callback, args: any[] } const tasksByHandle: { [handle: number]: Task } = { }; const doc = typeof document !== 'undefined' ? document : void 0; - let currentlyRunningATask = false; let nextHandle = 1; // Spec says greater than zero let registerImmediate: ((handle: number) => void); @@ -60,24 +67,9 @@ function createImmediateActions() { } function runIfPresent(handle: number) { - // From the spec: 'Wait until any invocations of this algorithm started before this one have completed.' - // So if we're currently running a task, we'll need to delay this invocation. - if (currentlyRunningATask) { - // Delay by doing a setTimeout. setImmediate was tried instead, but in Firefox 7 it generated a - // 'too much recursion' error. - setTimeout(runIfPresent, 0, handle); - } else { - const task = tasksByHandle[handle]; - if (task) { - currentlyRunningATask = true; - try { - run(task); - } finally { - clearImmediate(handle); - currentlyRunningATask = false; - } - } - } + const task = tasksByHandle[handle]; + clearImmediate(handle); + run(task); } function installNextTickImplementation() { @@ -87,9 +79,6 @@ function createImmediateActions() { } function canUsePostMessage() { - // The test against `importScripts` prevents this implementation from being installed inside a web worker, - // where `global.postMessage` means something completely different and can't be used for this purpose. - const global = typeof window !== 'undefined' ? window as any : void 0; if (global && global.postMessage && !global.importScripts) { let postMessageIsAsynchronous = true; const oldOnMessage = global.onmessage; @@ -108,7 +97,6 @@ function createImmediateActions() { // * http://www.whatwg.org/specs/web-apps/current-work/multipage/comms.html#crossDocumentMessages const messagePrefix = 'setImmediate$' + Math.random() + '$'; - const global = typeof window !== 'undefined' ? window as any : void 0; const onGlobalMessage = function(event: any) { if (event.source === global && typeof event.data === 'string' && @@ -189,8 +177,13 @@ function createImmediateActions() { const immediateActions = (function () { if (typeof setImmediate !== 'undefined') { if (typeof window !== 'undefined') { - return { setImmediate: (handler: any, ...args: any[]) => window.setImmediate(handler, ...args as any), clearImmediate: (handle: any) => window.clearImmediate(handle) }; - } else return { setImmediate, clearImmediate } + return { + setImmediate: (handler: any, ...args: any[]) => window.setImmediate(handler, ...args as any), + clearImmediate: (handle: any) => window.clearImmediate(handle) + }; + } else { + return { setImmediate, clearImmediate } + } } return createImmediateActions(); }()); @@ -199,9 +192,12 @@ function resolveImmediate(res: () => void) { immediateActions.setImmediate(res); } -export default { - immediate: immediateActions.setImmediate, +const Scheduler = { + setImmediate: immediateActions.setImmediate, clearImmediate: immediateActions.clearImmediate, + immediatePromise() { return new Promise<void>(resolveImmediate); }, + + delay<T>(timeout: number, value: T | undefined = void 0): Promise<T> { return new Promise(r => setTimeout(r, timeout, value)) } +} - immediatePromise() { return new Promise<void>(resolveImmediate); } -}; +export { Scheduler } diff --git a/src/perf-tests/tasks.ts b/src/perf-tests/tasks.ts new file mode 100644 index 0000000000000000000000000000000000000000..c1fa6c9c9d8111cadf33caf600c056e566f3926c --- /dev/null +++ b/src/perf-tests/tasks.ts @@ -0,0 +1,129 @@ +import * as B from 'benchmark' +import { now } from 'mol-task/util/now' +import { Scheduler } from 'mol-task/util/scheduler' + +export namespace Tasks { + export class Yielding { + lastUpdated = 0; + yield(): Promise<void> | void { + const t = now(); + if (t - this.lastUpdated < 250) return; + this.lastUpdated = t; + return Scheduler.immediatePromise(); + } + } + + export class CheckYielding { + lastUpdated = 0; + + get needsYield() { + return now() - this.lastUpdated > 250; + } + + yield(): Promise<void> { + this.lastUpdated = now(); + return Scheduler.immediatePromise(); + } + } + + export async function yielding() { + console.time('yielding'); + const y = new Yielding(); + let ret = 0; + for (let i = 0; i < 1000000; i++) { + ret += +(i.toString() + i.toString()); + if (i % 10000 === 0) await y.yield(); + } + console.timeEnd('yielding'); + console.log(ret); + return ret; + } + + export async function yielding1() { + console.time('yielding1'); + const y = new Yielding(); + let ret = 0; + let yy: any; + for (let i = 0; i < 1000000; i++) { + ret += +(i.toString() + i.toString()); + if (i % 10000 === 0 && (yy = y.yield())) await yy; + } + console.timeEnd('yielding1'); + console.log(ret); + return ret; + } + + export async function testYielding() { + console.time('check yielding'); + const y = new CheckYielding(); + let ret = 0; + for (let i = 0; i < 1000000; i++) { + ret += +(i.toString() + i.toString()); + if (i % 10000 === 0 && y.needsYield) await y.yield(); + } + console.timeEnd('check yielding'); + console.log(ret); + return ret; + } + + export async function baseline() { + console.time('baseline'); + let ret = 0; + for (let i = 0; i < 1000000; i++) { + ret += +(i.toString() + i.toString()); + } + console.timeEnd('baseline'); + console.log(ret); + return ret; + } + + export async function testImmediate() { + console.time('immediate'); + let ret = 0; + const y = new CheckYielding(); + for (let i = 0; i < 1000000; i++) { + //ret += +(i.toString() + i.toString()); + if (i % 10000 === 0) await y.yield(); + } + console.timeEnd('immediate'); + console.log(ret); + return ret; + } + + export function run() { + const suite = new B.Suite(); + suite + .add(`yielding`, async () => { return await yielding() }) + //.add(`test yielding`, () => testYielding().then(() => { })) + .on('cycle', (e: any) => console.log(String(e.target))) + .run(); + } +} + +(async function() { + // await Tasks.testImmediate(); + // await Tasks.testImmediate(); + + await Tasks.baseline(); + await Tasks.yielding(); + await Tasks.yielding1(); + await Tasks.testYielding(); + await Tasks.baseline(); + await Tasks.yielding(); + await Tasks.yielding1(); + await Tasks.testYielding(); +}()) + +// console.time('test') +// Tasks.yielding(); +// console.timeEnd('test') +// console.time('test') +// Tasks.yielding(); +// console.timeEnd('test') + +// console.time('test') +// Tasks.testYielding(); +// console.timeEnd('test') +// console.time('test') +// Tasks.testYielding(); +// console.timeEnd('test') \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json index 2a05ce80fe9c8ec062c73e278f08c50915a92e5e..980f66ade2ab27a39b776b800d612ca4c9d64822 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -13,6 +13,7 @@ "outDir": "build/node_modules", "baseUrl": "src", "paths": { + "mol-task": ["./mol-task", "./mol-task/index.ts"], "mol-comp": ["./mol-comp", "./mol-comp/index.ts"], "mol-util": ["./mol-util", "./mol-util/index.ts"], "mol-data": ["./mol-data", "./mol-data/index.ts"],