Skip to content
Snippets Groups Projects
computation.ts 8.65 KiB
/*
 * Copyright (c) 2017 molio contributors, licensed under MIT, See LICENSE file for more info.
 *
 * Adapted from https://github.com/dsehnal/LiteMol
 * @author David Sehnal <david.sehnal@gmail.com>
 */

import Scheduler from './scheduler'

interface Computation<A> {
    run(ctx?: Computation.Context): Promise<A>,
    runObservable(ctx?: Computation.Context): Computation.Running<A>
}

namespace Computation {
    export let PRINT_CONSOLE_ERROR = false;

    export function create<A>(computation: (ctx: Context) => Promise<A>) {
        return new ComputationImpl(computation);
    }

    export function resolve<A>(a: A) {
        return create<A>(_ => Promise.resolve(a));
    }

    export function reject<A>(reason: any) {
        return create<A>(_ => Promise.reject(reason));
    }

    export interface Params {
        updateRateMs: number
    }

    export const Aborted = 'Aborted';

    export interface Progress {
        message: string,
        isIndeterminate: boolean,
        current: number,
        max: number,
        elapsedMs: number,
        requestAbort?: () => void
    }

    export interface Context {
        readonly isSynchronous: boolean,
        /** Also checks if the computation was aborted. If so, throws. */
        readonly requiresUpdate: boolean,
        requestAbort(): void,
        /** Also checks if the computation was aborted. If so, throws. */
        updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void
    }

    export type ProgressObserver = (progress: Readonly<Progress>) => void;

    export interface Running<A> {
        subscribe(onProgress: ProgressObserver): void;
        result: Promise<A>
    }

    /** A context without updates. */
    export const synchronousContext: Context = {
        isSynchronous: true,
        requiresUpdate: false,
        requestAbort() { },
        updateProgress(msg, abort, current, max) { }
    }

    export function observableContext(params?: Partial<Params>) {
        return new ObservableContext(params);
    }

    declare var process: any;
    declare var window: any;

    export const now: () => number = (function () {
        if (typeof window !== 'undefined' && window.performance) {
            const perf = window.performance;
            return () => perf.now();
        } else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') {
            return () => {
                let t = process.hrtime();
                return t[0] * 1000 + t[1] / 1000000;
            };
        } else {
            return () => +new Date();
        }
    }());

    /** A utility for splitting large computations into smaller parts. */
    export interface Chunker {
        setNextChunkSize(size: number): void,
        /** nextChunk must return the number of actually processed chunks. */
        process(nextChunk: (chunkSize: number) => number, update: (updater: Context['updateProgress']) => void, nextChunkSize?: number): Promise<void>
    }

    export function chunker(ctx: Context, defaultChunkSize: number): Chunker {
        return new ChunkedImpl(ctx, defaultChunkSize);
    }
}

const DefaulUpdateRateMs = 150;
const NoOpSubscribe = () => { }

class ComputationImpl<A> implements Computation<A> {
    run(ctx?: Computation.Context) {
        return this.runObservable(ctx).result;
    }

    runObservable(ctx?: Computation.Context): Computation.Running<A> {
        const context = ctx ? ctx as ObservableContext : new ObservableContext();

        return {
            subscribe: (context as ObservableContext).subscribe || NoOpSubscribe,
            result: new Promise<A>(async (resolve, reject) => {
                try {
                    if (context.started) context.started();
                    const result = await this.computation(context);
                    resolve(result);
                } catch (e) {
                    if (Computation.PRINT_CONSOLE_ERROR) console.error(e);
                    reject(e);
                } finally {
                    if (context.finished) context.finished();
                }
            })
        };
    }

    constructor(private computation: (ctx: Computation.Context) => Promise<A>) {

    }
}

class ObservableContext implements Computation.Context {
    readonly updateRate: number;
    readonly isSynchronous: boolean = false;
    private level = 0;
    private startedTime = 0;
    private abortRequested = false;
    private lastUpdated = 0;
    private observers: Computation.ProgressObserver[] | undefined = void 0;
    private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };

    lastDelta = 0;

    private checkAborted() {
        if (this.abortRequested) throw Computation.Aborted;
    }

    private abortRequester = () => { this.abortRequested = true };

    subscribe = (obs: Computation.ProgressObserver) => {
        if (!this.observers) this.observers = [];
        this.observers.push(obs);
    }

    requestAbort() {
        try {
            if (this.abortRequester) {
                this.abortRequester.call(null);
            }
        } catch (e) { }
    }

    updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void {
        this.checkAborted();

        const time = Computation.now();

        if (typeof abort === 'boolean') {
            this.progress.requestAbort = abort ? this.abortRequester : void 0;
        } else {
            if (abort) this.abortRequester = abort;
            this.progress.requestAbort = abort ? this.abortRequester : void 0;
        }

        this.progress.message = msg;
        this.progress.elapsedMs = time - this.startedTime;
        if (isNaN(current!) || isNaN(max!)) {
            this.progress.isIndeterminate = true;
        } else {
            this.progress.isIndeterminate = false;
            this.progress.current = current!;
            this.progress.max = max!;
        }

        if (this.observers) {
            const p = { ...this.progress };
            for (const o of this.observers) Scheduler.immediate(o, p);
        }

        this.lastDelta = time - this.lastUpdated;
        this.lastUpdated = time;

        return Scheduler.immediatePromise();
    }

    get requiresUpdate() {
        this.checkAborted();
        if (this.isSynchronous) return false;
        return Computation.now() - this.lastUpdated > this.updateRate;
    }

    started() {
        if (!this.level) this.startedTime = Computation.now();
        this.level++;
    }

    finished() {
        this.level--;
        if (this.level < 0) {
            throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
        }
        if (!this.level) this.observers = void 0;
    }

    constructor(params?: Partial<Computation.Params>) {
        this.updateRate = (params && params.updateRateMs) || DefaulUpdateRateMs;
    }
}


class ChunkedImpl implements Computation.Chunker {
    private processedSinceUpdate = 0;
    private updater: Computation.Context['updateProgress'];

    private computeChunkSize() {
        const lastDelta = (this.context as ObservableContext).lastDelta || 0;
        if (!lastDelta) return this.nextChunkSize;
        const rate = (this.context as ObservableContext).updateRate || 0;
        const ret = Math.round(this.processedSinceUpdate * rate / lastDelta + 1);
        this.processedSinceUpdate = 0;
        return ret;
    }

    private getNextChunkSize() {
        const ctx = this.context as ObservableContext;
        // be smart if the computation is synchronous and process the whole chunk at once.
        if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER;
        return this.nextChunkSize;
    }

    setNextChunkSize(size: number) {
        this.nextChunkSize = size;
    }

    async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['updateProgress']) => Promise<void> | void, nextChunkSize?: number) {
        if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
        let lastChunk: number;

        while (( lastChunk = nextChunk(this.getNextChunkSize())) > 0) {
            this.processedSinceUpdate += lastChunk;
            if (this.context.requiresUpdate) {
                await update(this.updater);
                this.nextChunkSize = this.computeChunkSize();
            }
        }
        if (this.context.requiresUpdate) {
            await update(this.updater);
            this.nextChunkSize = this.computeChunkSize();
        }
    }

    constructor(public context: Computation.Context, private nextChunkSize: number) {
        this.updater = this.context.updateProgress.bind(this.context);
    }
}

export default Computation;