Select Git revision
zs1-n-option-impl
computation.ts 9.26 KiB
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* Adapted from https://github.com/dsehnal/LiteMol
* @author David Sehnal <david.sehnal@gmail.com>
*/
import Scheduler from './scheduler'
import timeNow from './time'
interface Computation<A> {
(ctx?: Computation.Context): Promise<A>
}
namespace Computation {
export let PRINT_ERRORS_TO_CONSOLE = false;
export function create<A>(computation: (ctx: Context) => Promise<A>) {
return ComputationImpl(computation);
}
export function resolve<A>(a: A) {
return create<A>(_ => Promise.resolve(a));
}
export function reject<A>(reason: any) {
return create<A>(_ => Promise.reject(reason));
}
export interface Params {
updateRateMs?: number,
observer?: ProgressObserver
}
export const Aborted = 'Aborted';
export interface Progress {
message: string,
isIndeterminate: boolean,
current: number,
max: number,
elapsedMs: number,
requestAbort?: () => void
}
export interface ProgressUpdate {
message?: string,
abort?: boolean | (() => void),
current?: number,
max?: number
}
export interface Context {
readonly isSynchronous: boolean,
/** Also checks if the computation was aborted. If so, throws. */
readonly requiresUpdate: boolean,
requestAbort(): void,
subscribe(onProgress: ProgressObserver): { dispose: () => void },
/** Also checks if the computation was aborted. If so, throws. */
update(info: ProgressUpdate): Promise<void> | void
}
export type ProgressObserver = (progress: Readonly<Progress>) => void;
const emptyDisposer = { dispose: () => { } }
/** A context without updates. */
export const synchronous: Context = {
isSynchronous: true,
requiresUpdate: false,
requestAbort() { },
subscribe(onProgress) { return emptyDisposer; },
update(info) { }
}
export function observable(params?: Partial<Params>) {
const ret = new ObservableContext(params && params.updateRateMs);
if (params && params.observer) ret.subscribe(params.observer);
return ret;
}
export const now = timeNow;
/** A utility for splitting large computations into smaller parts. */
export interface Chunker {
setNextChunkSize(size: number): void,
/** nextChunk must return the number of actually processed chunks. */
process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void>
}
export function chunker(ctx: Context, nextChunkSize: number): Chunker {
return new ChunkerImpl(ctx, nextChunkSize);
}
}
const DefaulUpdateRateMs = 150;
function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> {
return (ctx?: Computation.Context) => {
const context: ObservableContext = ctx ? ctx : Computation.synchronous as any;
return new Promise<A>(async (resolve, reject) => {
try {
if (context.started) context.started();
const result = await computation(context);
resolve(result);
} catch (e) {
if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e);
reject(e);
} finally {
if (context.finished) context.finished();
}
});
}
}
class ObservableContext implements Computation.Context {
readonly updateRate: number;
readonly isSynchronous: boolean = false;
private level = 0;
private startedTime = 0;
private abortRequested = false;
private lastUpdated = 0;
private observers: Computation.ProgressObserver[] | undefined = void 0;
private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
private checkAborted() {
if (this.abortRequested) throw Computation.Aborted;
}
private abortRequester = () => { this.abortRequested = true };
subscribe = (obs: Computation.ProgressObserver) => {
if (!this.observers) this.observers = [];
this.observers.push(obs);
return {
dispose: () => {
if (!this.observers) return;
for (let i = 0; i < this.observers.length; i++) {
if (this.observers[i] === obs) {
this.observers[i] = this.observers[this.observers.length - 1];
this.observers.pop();
return;
}
}
}
};
}
requestAbort() {
try {
if (this.abortRequester) {
this.abortRequester.call(null);
}
} catch (e) { }
}
update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void {
this.checkAborted();
const time = Computation.now();
if (typeof abort === 'boolean') {
this.progress.requestAbort = abort ? this.abortRequester : void 0;
} else {
if (abort) this.abortRequester = abort;
this.progress.requestAbort = abort ? this.abortRequester : void 0;
}
if (typeof message !== 'undefined') this.progress.message = message;
this.progress.elapsedMs = time - this.startedTime;
if (isNaN(current!)) {
this.progress.isIndeterminate = true;
} else {
this.progress.isIndeterminate = false;
this.progress.current = current!;
if (!isNaN(max!)) this.progress.max = max!;
}
if (this.observers) {
const p = { ...this.progress };
for (let i = 0, _i = this.observers.length; i < _i; i++) {
Scheduler.immediate(this.observers[i], p);
}
}
this.lastUpdated = time;
return Scheduler.immediatePromise();
}
get requiresUpdate() {
this.checkAborted();
if (this.isSynchronous) return false;
return Computation.now() - this.lastUpdated > this.updateRate;
}
started() {
if (!this.level) {
this.startedTime = Computation.now();
this.lastUpdated = this.startedTime;
}
this.level++;
}
finished() {
this.level--;
if (this.level < 0) {
throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
}
if (!this.level) this.observers = void 0;
}
constructor(updateRate?: number) {
this.updateRate = updateRate || DefaulUpdateRateMs;
}
}
class ChunkerImpl implements Computation.Chunker {
private processedSinceUpdate = 0;
private updater: Computation.Context['update'];
private computeChunkSize(delta: number) {
if (!delta) {
this.processedSinceUpdate = 0;
return this.nextChunkSize;
}
const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs;
const ret = Math.round(this.processedSinceUpdate * rate / delta + 1);
this.processedSinceUpdate = 0;
return ret;
}
private getNextChunkSize() {
const ctx = this.context as ObservableContext;
// be smart if the computation is synchronous and process the whole chunk at once.
if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER;
return this.nextChunkSize;
}
setNextChunkSize(size: number) {
this.nextChunkSize = size;
}
async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) {
if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
this.processedSinceUpdate = 0;
// track time for the actual computation and exclude the "update time"
let chunkStart = Computation.now();
let lastChunkSize: number;
let chunkCount = 0;
let totalSize = 0;
let updateCount = 0;
while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) {
chunkCount++;
this.processedSinceUpdate += lastChunkSize;
totalSize += lastChunkSize;
if (this.context.requiresUpdate) {
let time = Computation.now();
await update(this.updater);
this.nextChunkSize = updateCount > 0
? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1))
: this.computeChunkSize(time - chunkStart)
updateCount++;
chunkStart = Computation.now();
}
}
if (this.context.requiresUpdate) {
let time = Computation.now();
await update(this.updater);
this.nextChunkSize = updateCount > 0
? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1))
: this.computeChunkSize(time - chunkStart)
}
}
constructor(public context: Computation.Context, private nextChunkSize: number) {
this.updater = this.context.update.bind(this.context);
}
}
export default Computation;