diff --git a/README.md b/README.md index 30294cdc052b6b23891dfe55d9c0666bf3b541d4..cdd07586a73e4fe45a7b945006a3b6ff1f3e1027 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,16 @@ - extending on the ideas of the CIFTools.js library +## Module Overview + +- `mol-task` Computation abstraction with progress tracking and cancellation support. +- `mol-data` Collections (integer based sets, inteface to columns/tables, etc.) +- `mol-math` Math related (loosely) algorithms and data structures. +- `mol-io` Parsing library. Each format is parsed into an interface that corresponds to the data stored by it. +- `mol-model` Data structures and algorithms (such as querying) for representing molecular data. +- `mol-ql` Mapping of `mol-model` to the MolQL query language spec. +- `mol-util` Useful things that do not fit elsewhere. + ## Building & Running ### Build: diff --git a/src/examples/computation.ts b/src/examples/computation.ts new file mode 100644 index 0000000000000000000000000000000000000000..0ffdd02fcbce683e436c0030ffe0517135c6ceda --- /dev/null +++ b/src/examples/computation.ts @@ -0,0 +1 @@ +// TODO \ No newline at end of file diff --git a/src/mol-task/computation.ts b/src/mol-task/computation.ts new file mode 100644 index 0000000000000000000000000000000000000000..3d956de70054e16b858a96c080393ac582517ab7 --- /dev/null +++ b/src/mol-task/computation.ts @@ -0,0 +1,283 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * Adapted from https://github.com/dsehnal/LiteMol + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import Scheduler from './scheduler' +import timeNow from './time' + +interface Computation<A> { + (ctx?: Computation.Context): Promise<A> +} + +namespace Computation { + export let PRINT_ERRORS_TO_CONSOLE = false; + + export function create<A>(computation: (ctx: Context) => Promise<A>) { + return ComputationImpl(computation); + } + + export function resolve<A>(a: A) { + return create<A>(_ => Promise.resolve(a)); + } + + export function reject<A>(reason: any) { + return create<A>(_ => Promise.reject(reason)); + } + + export interface Params { + updateRateMs?: number, + observer?: ProgressObserver + } + + export const Aborted = 'Aborted'; + + export interface Progress { + message: string, + isIndeterminate: boolean, + current: number, + max: number, + elapsedMs: number, + requestAbort?: () => void + } + + export interface ProgressUpdate { + message?: string, + abort?: boolean | (() => void), + current?: number, + max?: number + } + + export interface Context { + readonly isSynchronous: boolean, + /** Also checks if the computation was aborted. If so, throws. */ + readonly requiresUpdate: boolean, + requestAbort(): void, + + subscribe(onProgress: ProgressObserver): { dispose: () => void }, + /** Also checks if the computation was aborted. If so, throws. */ + update(info: ProgressUpdate): Promise<void> | void + } + + export type ProgressObserver = (progress: Readonly<Progress>) => void; + + const emptyDisposer = { dispose: () => { } } + + /** A context without updates. */ + export const synchronous: Context = { + isSynchronous: true, + requiresUpdate: false, + requestAbort() { }, + subscribe(onProgress) { return emptyDisposer; }, + update(info) { } + } + + export function observable(params?: Partial<Params>) { + const ret = new ObservableContext(params && params.updateRateMs); + if (params && params.observer) ret.subscribe(params.observer); + return ret; + } + + export const now = timeNow; + + /** A utility for splitting large computations into smaller parts. */ + export interface Chunker { + setNextChunkSize(size: number): void, + /** nextChunk must return the number of actually processed chunks. */ + process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void> + } + + export function chunker(ctx: Context, nextChunkSize: number): Chunker { + return new ChunkerImpl(ctx, nextChunkSize); + } +} + +const DefaulUpdateRateMs = 150; + +function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> { + return (ctx?: Computation.Context) => { + const context: ObservableContext = ctx ? ctx : Computation.synchronous as any; + return new Promise<A>(async (resolve, reject) => { + try { + if (context.started) context.started(); + const result = await computation(context); + resolve(result); + } catch (e) { + if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e); + reject(e); + } finally { + if (context.finished) context.finished(); + } + }); + } +} + +class ObservableContext implements Computation.Context { + readonly updateRate: number; + readonly isSynchronous: boolean = false; + private level = 0; + private startedTime = 0; + private abortRequested = false; + private lastUpdated = 0; + private observers: Computation.ProgressObserver[] | undefined = void 0; + private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 }; + + private checkAborted() { + if (this.abortRequested) throw Computation.Aborted; + } + + private abortRequester = () => { this.abortRequested = true }; + + subscribe = (obs: Computation.ProgressObserver) => { + if (!this.observers) this.observers = []; + this.observers.push(obs); + return { + dispose: () => { + if (!this.observers) return; + for (let i = 0; i < this.observers.length; i++) { + if (this.observers[i] === obs) { + this.observers[i] = this.observers[this.observers.length - 1]; + this.observers.pop(); + return; + } + } + } + }; + } + + requestAbort() { + try { + if (this.abortRequester) { + this.abortRequester.call(null); + } + } catch (e) { } + } + + update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void { + this.checkAborted(); + + const time = Computation.now(); + + if (typeof abort === 'boolean') { + this.progress.requestAbort = abort ? this.abortRequester : void 0; + } else { + if (abort) this.abortRequester = abort; + this.progress.requestAbort = abort ? this.abortRequester : void 0; + } + + if (typeof message !== 'undefined') this.progress.message = message; + this.progress.elapsedMs = time - this.startedTime; + if (isNaN(current!)) { + this.progress.isIndeterminate = true; + } else { + this.progress.isIndeterminate = false; + this.progress.current = current!; + if (!isNaN(max!)) this.progress.max = max!; + } + + if (this.observers) { + const p = { ...this.progress }; + for (let i = 0, _i = this.observers.length; i < _i; i++) { + Scheduler.immediate(this.observers[i], p); + } + } + + this.lastUpdated = time; + + return Scheduler.immediatePromise(); + } + + get requiresUpdate() { + this.checkAborted(); + if (this.isSynchronous) return false; + return Computation.now() - this.lastUpdated > this.updateRate; + } + + started() { + if (!this.level) { + this.startedTime = Computation.now(); + this.lastUpdated = this.startedTime; + } + this.level++; + } + + finished() { + this.level--; + if (this.level < 0) { + throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.'); + } + if (!this.level) this.observers = void 0; + } + + constructor(updateRate?: number) { + this.updateRate = updateRate || DefaulUpdateRateMs; + } +} + +class ChunkerImpl implements Computation.Chunker { + private processedSinceUpdate = 0; + private updater: Computation.Context['update']; + + private computeChunkSize(delta: number) { + if (!delta) { + this.processedSinceUpdate = 0; + return this.nextChunkSize; + } + const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs; + const ret = Math.round(this.processedSinceUpdate * rate / delta + 1); + this.processedSinceUpdate = 0; + return ret; + } + + private getNextChunkSize() { + const ctx = this.context as ObservableContext; + // be smart if the computation is synchronous and process the whole chunk at once. + if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER; + return this.nextChunkSize; + } + + setNextChunkSize(size: number) { + this.nextChunkSize = size; + } + + async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) { + if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize); + this.processedSinceUpdate = 0; + + // track time for the actual computation and exclude the "update time" + let chunkStart = Computation.now(); + let lastChunkSize: number; + let chunkCount = 0; + let totalSize = 0; + let updateCount = 0; + while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) { + chunkCount++; + this.processedSinceUpdate += lastChunkSize; + totalSize += lastChunkSize; + if (this.context.requiresUpdate) { + let time = Computation.now(); + await update(this.updater); + this.nextChunkSize = updateCount > 0 + ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1)) + : this.computeChunkSize(time - chunkStart) + updateCount++; + chunkStart = Computation.now(); + } + } + if (this.context.requiresUpdate) { + let time = Computation.now(); + await update(this.updater); + this.nextChunkSize = updateCount > 0 + ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1)) + : this.computeChunkSize(time - chunkStart) + } + } + + constructor(public context: Computation.Context, private nextChunkSize: number) { + this.updater = this.context.update.bind(this.context); + } +} + +export default Computation; \ No newline at end of file diff --git a/src/mol-task/context.ts b/src/mol-task/context.ts new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/mol-task/index.ts b/src/mol-task/index.ts new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/mol-task/scheduler.ts b/src/mol-task/scheduler.ts new file mode 100644 index 0000000000000000000000000000000000000000..e118ec97cbf221e6b21d7126687f39bcc78753e7 --- /dev/null +++ b/src/mol-task/scheduler.ts @@ -0,0 +1,207 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +/** + * setImmediate polyfill adapted from https://github.com/YuzuJS/setImmediate + * Copyright (c) 2012 Barnesandnoble.com, llc, Donavon West, and Domenic Denicola + * MIT license. + */ + +function createImmediateActions() { + type Callback = (...args: any[]) => void; + type Task = { callback: Callback, args: any[] } + + const tasksByHandle: { [handle: number]: Task } = { }; + const doc = typeof document !== 'undefined' ? document : void 0; + + let currentlyRunningATask = false; + let nextHandle = 1; // Spec says greater than zero + let registerImmediate: ((handle: number) => void); + + function setImmediate(callback: Callback, ...args: any[]) { + // Callback can either be a function or a string + if (typeof callback !== 'function') { + callback = new Function('' + callback) as Callback; + } + // Store and register the task + const task = { callback: callback, args: args }; + tasksByHandle[nextHandle] = task; + registerImmediate(nextHandle); + return nextHandle++; + } + + function clearImmediate(handle: number) { + delete tasksByHandle[handle]; + } + + function run(task: Task) { + const callback = task.callback; + const args = task.args; + switch (args.length) { + case 0: + callback(); + break; + case 1: + callback(args[0]); + break; + case 2: + callback(args[0], args[1]); + break; + case 3: + callback(args[0], args[1], args[2]); + break; + default: + callback.apply(undefined, args); + break; + } + } + + function runIfPresent(handle: number) { + // From the spec: 'Wait until any invocations of this algorithm started before this one have completed.' + // So if we're currently running a task, we'll need to delay this invocation. + if (currentlyRunningATask) { + // Delay by doing a setTimeout. setImmediate was tried instead, but in Firefox 7 it generated a + // 'too much recursion' error. + setTimeout(runIfPresent, 0, handle); + } else { + const task = tasksByHandle[handle]; + if (task) { + currentlyRunningATask = true; + try { + run(task); + } finally { + clearImmediate(handle); + currentlyRunningATask = false; + } + } + } + } + + function installNextTickImplementation() { + registerImmediate = function(handle) { + process.nextTick(function () { runIfPresent(handle); }); + }; + } + + function canUsePostMessage() { + // The test against `importScripts` prevents this implementation from being installed inside a web worker, + // where `global.postMessage` means something completely different and can't be used for this purpose. + const global = typeof window !== 'undefined' ? window as any : void 0; + if (global && global.postMessage && !global.importScripts) { + let postMessageIsAsynchronous = true; + const oldOnMessage = global.onmessage; + global.onmessage = function() { + postMessageIsAsynchronous = false; + }; + global.postMessage('', '*'); + global.onmessage = oldOnMessage; + return postMessageIsAsynchronous; + } + } + + function installPostMessageImplementation() { + // Installs an event handler on `global` for the `message` event: see + // * https://developer.mozilla.org/en/DOM/window.postMessage + // * http://www.whatwg.org/specs/web-apps/current-work/multipage/comms.html#crossDocumentMessages + + const messagePrefix = 'setImmediate$' + Math.random() + '$'; + const global = typeof window !== 'undefined' ? window as any : void 0; + const onGlobalMessage = function(event: any) { + if (event.source === global && + typeof event.data === 'string' && + event.data.indexOf(messagePrefix) === 0) { + runIfPresent(+event.data.slice(messagePrefix.length)); + } + }; + + if (window.addEventListener) { + window.addEventListener('message', onGlobalMessage, false); + } else { + (window as any).attachEvent('onmessage', onGlobalMessage); + } + + registerImmediate = function(handle) { + window.postMessage(messagePrefix + handle, '*'); + }; + } + + function installMessageChannelImplementation() { + const channel = new MessageChannel(); + channel.port1.onmessage = function(event) { + const handle = event.data; + runIfPresent(handle); + }; + + registerImmediate = function(handle) { + channel.port2.postMessage(handle); + }; + } + + function installReadyStateChangeImplementation() { + const html = doc!.documentElement; + registerImmediate = function(handle) { + // Create a <script> element; its readystatechange event will be fired asynchronously once it is inserted + // into the document. Do so, thus queuing up the task. Remember to clean up once it's been called. + let script = doc!.createElement('script') as any; + script.onreadystatechange = function () { + runIfPresent(handle); + script.onreadystatechange = null; + html.removeChild(script); + script = null; + }; + html.appendChild(script); + }; + } + + function installSetTimeoutImplementation() { + registerImmediate = function(handle) { + setTimeout(runIfPresent, 0, handle); + }; + } + + // Don't get fooled by e.g. browserify environments. + if (typeof process !== 'undefined' && {}.toString.call(process) === '[object process]') { + // For Node.js before 0.9 + installNextTickImplementation(); + } else if (canUsePostMessage()) { + // For non-IE10 modern browsers + installPostMessageImplementation(); + } else if (typeof MessageChannel !== 'undefined') { + // For web workers, where supported + installMessageChannelImplementation(); + } else if (doc && 'onreadystatechange' in doc.createElement('script')) { + // For IE 6–8 + installReadyStateChangeImplementation(); + } else { + // For older browsers + installSetTimeoutImplementation(); + } + + return { + setImmediate, + clearImmediate + }; +} + +const immediateActions = (function () { + if (typeof setImmediate !== 'undefined') { + if (typeof window !== 'undefined') { + return { setImmediate: (handler: any, ...args: any[]) => window.setImmediate(handler, ...args as any), clearImmediate: (handle: any) => window.clearImmediate(handle) }; + } else return { setImmediate, clearImmediate } + } + return createImmediateActions(); +}()); + +function resolveImmediate(res: () => void) { + immediateActions.setImmediate(res); +} + +export default { + immediate: immediateActions.setImmediate, + clearImmediate: immediateActions.clearImmediate, + + immediatePromise() { return new Promise<void>(resolveImmediate); } +}; diff --git a/src/mol-task/time.ts b/src/mol-task/time.ts new file mode 100644 index 0000000000000000000000000000000000000000..9f0d233adf92d20d0ab3cc5a27c03f26a1b42f73 --- /dev/null +++ b/src/mol-task/time.ts @@ -0,0 +1,24 @@ +/** + * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +declare var process: any; +declare var window: any; + +const now: () => number = (function () { + if (typeof window !== 'undefined' && window.performance) { + const perf = window.performance; + return () => perf.now(); + } else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') { + return () => { + const t = process.hrtime(); + return t[0] * 1000 + t[1] / 1000000; + }; + } else { + return () => +new Date(); + } +}()); + +export default now; \ No newline at end of file diff --git a/src/mol-task/util.ts b/src/mol-task/util.ts new file mode 100644 index 0000000000000000000000000000000000000000..17cbfea5e130cf35a660d02e2929f7eae1dfc6a1 --- /dev/null +++ b/src/mol-task/util.ts @@ -0,0 +1,71 @@ + + + +type ExecutionContext = { + run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, + subscribe(o: (p: string, compId: number) => void): void +} + +namespace ExecutionContext { + // export interface Synchronous extends ExecutionContext { + // run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, + // } + + // export interface Observable extends ExecutionContext { + // run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, + // } + export const Sync: ExecutionContext = 0 as any; +} + +interface RuntimeContext extends ExecutionContext { + yield(name: string): Promise<void> | void +} + +// if no context is specified, use the synchronous one. +type Computation<T> = { (ctx?: RuntimeContext): Promise<T>, _id: number } + +function create<T>(c: (ctx: RuntimeContext) => Promise<T>): Computation<T> { return 0 as any; } +function constant<T>(c: T) { return create(async ctx => c); } + +type MultistepFn<P, T> = (params: P, step: (s: number) => Promise<void> | void, ctx: RuntimeContext) => Promise<T> +type ComputationProvider<P, T> = (params: P) => Computation<T> +function MultistepComputation<P, T>(name: string, steps: string[], f: MultistepFn<P, T>): ComputationProvider<P, T> { + return params => create(async ctx => f(params, n => ctx.yield(steps[n]), ctx)); +} + + +// if total count is specified, could automatically provide percentage +type UniformlyChunkedFn<S> = (chunkSize: number, state: S, totalCount?: number) => number +type UniformlyChunkedProvider<S> = (ctx: RuntimeContext, state: S) => Promise<S> +function UniformlyChunked<S>(label: string, initialChunk: number, f: UniformlyChunkedFn<S>): UniformlyChunkedProvider<S> { + // TODO: track average time required for single element and then determine chunk size based on that. + return 0 as any; +} + +type LineReaderState = { str: string, position: number, lines: string[] } +const uniformPart = UniformlyChunked('Reading lines', 1000000, (size, state: LineReaderState) => { + state.position += size; + state.lines.push(''); + return 0 /* number of lines read */; +}); + +function readLines(str: string): Computation<string[]> { + return create(async ctx => { + const state = (await uniformPart(ctx, { str, position: 0, lines: [] })); + return state.lines; + }); +} + +const prependHiToLines = MultistepComputation('Hi prepend', ['Parse input', 'Prepend Hi'], async (p: string, step, ctx) => { + await step(0); + const lines = await ctx.run(readLines(p)); + await step(1); + const ret = lines.map(l => 'Hi ' + l); + return ret; +}); + + +(async function() { + const r = await ExecutionContext.Sync.run(prependHiToLines('1\n2'), { updateRateMs: 150 }); + console.log(r) +}()) diff --git a/src/perf-tests/structure.ts b/src/perf-tests/structure.ts index 4cb8ce453da97a58ff2b63ead4de33a5302d4b71..c996b85c9a53c8a8ca89dc35885574b9d967a501 100644 --- a/src/perf-tests/structure.ts +++ b/src/perf-tests/structure.ts @@ -29,15 +29,21 @@ async function readData(path: string) { } } -function *test() { - yield 10; - return 15; +(Symbol as any).asyncIterator = (Symbol as any).asyncIterator || Symbol.for('Symbol.asyncIterator'); + +interface ProgressGenerator<T> extends AsyncIterableIterator<number | T> { + next(cont?: boolean): Promise<IteratorResult<number | T>> +} + +async function *test(): ProgressGenerator<boolean> { + const r = yield await new Promise<number>(res => res(10)); + return r; } -async function runIt<T>(itP: () => IterableIterator<T>) { +async function runIt(itP: () => ProgressGenerator<boolean>) { const it = itP(); while (true) { - const { value, done } = it.next(); + const { value, done } = await it.next(true); if (done) return value; } } diff --git a/tsconfig.json b/tsconfig.json index c90e01741681c4b27613733792f8ef82c91bd6c6..2a05ce80fe9c8ec062c73e278f08c50915a92e5e 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -9,10 +9,11 @@ "strictNullChecks": true, "strictFunctionTypes": true, //"downlevelIteration": true, - "lib": [ "es6", "dom" ], + "lib": [ "es6", "dom", "esnext.asynciterable" ], "outDir": "build/node_modules", "baseUrl": "src", "paths": { + "mol-comp": ["./mol-comp", "./mol-comp/index.ts"], "mol-util": ["./mol-util", "./mol-util/index.ts"], "mol-data": ["./mol-data", "./mol-data/index.ts"], "mol-math": ["./mol-math"],