From f03e0b6736e68dfcc0aa745e4dd3aa4076e97ee1 Mon Sep 17 00:00:00 2001
From: David Sehnal <david.sehnal@gmail.com>
Date: Wed, 4 Oct 2017 16:42:17 +0200
Subject: [PATCH] simplified computation API

---
 src/reader/cif/text/parser.ts       |  2 +-
 src/reader/common/text/tokenizer.ts |  1 -
 src/reader/gro/parser.ts            |  5 +-
 src/reader/spec/gro.spec.ts         |  6 +-
 src/script.ts                       | 28 +++++----
 src/utils/computation.ts            | 98 +++++++++++++++--------------
 6 files changed, 71 insertions(+), 69 deletions(-)

diff --git a/src/reader/cif/text/parser.ts b/src/reader/cif/text/parser.ts
index b7f593f86..d27dcd240 100644
--- a/src/reader/cif/text/parser.ts
+++ b/src/reader/cif/text/parser.ts
@@ -555,7 +555,7 @@ async function parseInternal(data: string, ctx: Computation.Context) {
     //inSaveFrame = false,
     //blockSaveFrames: any;
 
-    ctx.updateProgress({ message: 'Parsing...' });
+    ctx.update({ message: 'Parsing...', current: 0, max: data.length });
 
     moveNext(tokenizer);
     while (tokenizer.tokenType !== CifTokenType.End) {
diff --git a/src/reader/common/text/tokenizer.ts b/src/reader/common/text/tokenizer.ts
index a21f90c21..17991fbdb 100644
--- a/src/reader/common/text/tokenizer.ts
+++ b/src/reader/common/text/tokenizer.ts
@@ -37,7 +37,6 @@ export function Tokenizer(data: string): Tokenizer {
 }
 
 export namespace Tokenizer {
-
     export function getTokenString(state: Tokenizer) {
         return state.data.substring(state.tokenStart, state.tokenEnd);
     }
diff --git a/src/reader/gro/parser.ts b/src/reader/gro/parser.ts
index 60d3dcddb..7ec1c994b 100644
--- a/src/reader/gro/parser.ts
+++ b/src/reader/gro/parser.ts
@@ -34,7 +34,7 @@ function State(tokenizer: Tokenizer, ctx: Computation.Context): State {
         tokenizer,
         header: createEmptyHeader(),
         numberOfAtoms: 0,
-        chunker: Computation.chunker(ctx, 100000)
+        chunker: Computation.chunker(ctx, 100000) // 100000 lines is the default chunk size for this reader
     };
 }
 
@@ -140,8 +140,7 @@ function handleBoxVectors(state: State) {
 async function parseInternal(data: string, ctx: Computation.Context): Promise<Result<Schema.File>> {
     const tokenizer = Tokenizer(data);
 
-    // 100000 lines is the default chunk size for this reader
-
+    ctx.update({ message: 'Parsing...', current: 0, max: data.length });
     const structures: Schema.Structure[] = [];
     while (tokenizer.position < data.length) {
         const state = State(tokenizer, ctx);
diff --git a/src/reader/spec/gro.spec.ts b/src/reader/spec/gro.spec.ts
index ba2e48aef..8f78668d5 100644
--- a/src/reader/spec/gro.spec.ts
+++ b/src/reader/spec/gro.spec.ts
@@ -26,8 +26,7 @@ const groStringHighPrecision = `Generated by trjconv : 2168 system t=  15.00000
 
 describe('gro reader', () => {
     it('basic', async () => {
-        const comp = Gro(groString)
-        const parsed = await comp.run()
+        const parsed = await Gro(groString)();
 
         if (parsed.isError) {
             console.log(parsed)
@@ -59,8 +58,7 @@ describe('gro reader', () => {
     });
 
     it('high precision', async () => {
-        const comp = Gro(groStringHighPrecision)
-        const parsed = await comp.run()
+        const parsed = await Gro(groStringHighPrecision)()
 
         if (parsed.isError) {
             console.log(parsed)
diff --git a/src/script.ts b/src/script.ts
index b6df67554..f4cf5a07a 100644
--- a/src/script.ts
+++ b/src/script.ts
@@ -11,17 +11,21 @@ import * as fs from 'fs'
 import Gro from './reader/gro/parser'
 import CIF from './reader/cif/index'
 
-// const file = '1crn.gro'
+const file = '1crn.gro'
 // const file = 'water.gro'
 // const file = 'test.gro'
-const file = 'md_1u19_trj.gro'
+// const file = 'md_1u19_trj.gro'
+
+function showProgress(tag: string, p: Computation.Progress) {
+    console.log(`[${tag}] ${p.message} ${p.isIndeterminate ? '' : (p.current / p.max * 100).toFixed(2) + '% '}(${p.elapsedMs | 0}ms)`)
+}
 
 async function runGro(input: string) {
     console.time('parseGro');
     const comp = Gro(input);
-    const running = comp.runObservable(Computation.observable({ updateRateMs: 150 }));
-    running.subscribe(p => console.log(`[Gro] ${(p.current / p.max * 100).toFixed(2)} (${p.elapsedMs | 0}ms)`));
-    const parsed = await running.result;
+
+    const ctx = Computation.observable({ updateRateMs: 150, observer: p => showProgress('GRO', p) });
+    const parsed = await comp(ctx);
     console.timeEnd('parseGro');
 
     if (parsed.isError) {
@@ -85,9 +89,8 @@ async function runCIF(input: string | Uint8Array) {
     console.time('parseCIF');
     const comp = typeof input === 'string' ? CIF.parseText(input) : CIF.parseBinary(input);
 
-    const running = comp.runObservable(Computation.observable({ updateRateMs: 250 })); // Computation.synchronous
-    running.subscribe(p => console.log(`[CIF] ${p.message} ${(p.current / p.max * 100).toFixed(2)}% (${p.elapsedMs | 0}ms)`));
-    const parsed = await running.result;
+    const ctx = Computation.observable({ updateRateMs: 250, observer: p => showProgress('CIF', p) });
+    const parsed = await comp(ctx);
     console.timeEnd('parseCIF');
     if (parsed.isError) {
         console.log(parsed);
@@ -118,7 +121,7 @@ export function _cif() {
     });
 
     path = `./examples/1cbs_full.bcif`;
-    //const path = 'c:/test/quick/3j3q.cif';
+    // const path = 'c:/test/quick/3j3q.cif';
     fs.readFile(path, function (err, input) {
         if (err) {
             return console.log(err);
@@ -137,14 +140,13 @@ import Computation from './utils/computation'
 const comp = Computation.create(async ctx => {
     for (let i = 0; i < 0; i++) {
         await new Promise(res => setTimeout(res, 500));
-        if (ctx.requiresUpdate) await ctx.updateProgress({ message: 'working', current: i, max: 2 });
+        if (ctx.requiresUpdate) await ctx.update({ message: 'working', current: i, max: 2 });
     }
     return 42;
 });
 async function testComp() {
-    const running = comp.runObservable();
-    running.subscribe(p => console.log(JSON.stringify(p)));
-    const ret = await running.result;
+    const ctx = Computation.observable({ observer: p => showProgress('test', p) });
+    const ret = await comp(ctx);
     console.log('computation returned', ret);
 }
 testComp();
\ No newline at end of file
diff --git a/src/utils/computation.ts b/src/utils/computation.ts
index fa91d83f7..efaade355 100644
--- a/src/utils/computation.ts
+++ b/src/utils/computation.ts
@@ -8,15 +8,14 @@
 import Scheduler from './scheduler'
 
 interface Computation<A> {
-    run(ctx?: Computation.Context): Promise<A>,
-    runObservable(ctx?: Computation.Context): Computation.Running<A>
+    (ctx?: Computation.Context): Promise<A>
 }
 
 namespace Computation {
     export let PRINT_ERRORS_TO_CONSOLE = false;
 
     export function create<A>(computation: (ctx: Context) => Promise<A>) {
-        return new ComputationImpl(computation);
+        return ComputationImpl(computation);
     }
 
     export function resolve<A>(a: A) {
@@ -28,7 +27,8 @@ namespace Computation {
     }
 
     export interface Params {
-        updateRateMs: number
+        updateRateMs?: number,
+        observer?: ProgressObserver
     }
 
     export const Aborted = 'Aborted';
@@ -54,27 +54,29 @@ namespace Computation {
         /** Also checks if the computation was aborted. If so, throws. */
         readonly requiresUpdate: boolean,
         requestAbort(): void,
+
+        subscribe(onProgress: ProgressObserver): { dispose: () => void },
         /** Also checks if the computation was aborted. If so, throws. */
-        updateProgress(info: ProgressUpdate): Promise<void> | void
+        update(info: ProgressUpdate): Promise<void> | void
     }
 
     export type ProgressObserver = (progress: Readonly<Progress>) => void;
 
-    export interface Running<A> {
-        subscribe(onProgress: ProgressObserver): void,
-        result: Promise<A>
-    }
+    const emptyDisposer = { dispose: () => { } }
 
     /** A context without updates. */
     export const synchronous: Context = {
         isSynchronous: true,
         requiresUpdate: false,
         requestAbort() { },
-        updateProgress(info) { }
+        subscribe(onProgress) { return emptyDisposer; },
+        update(info) { }
     }
 
     export function observable(params?: Partial<Params>) {
-        return new ObservableContext(params);
+        const ret = new ObservableContext(params && params.updateRateMs);
+        if (params && params.observer) ret.subscribe(params.observer);
+        return ret;
     }
 
     declare var process: any;
@@ -98,7 +100,7 @@ namespace Computation {
     export interface Chunker {
         setNextChunkSize(size: number): void,
         /** nextChunk must return the number of actually processed chunks. */
-        process(nextChunk: (chunkSize: number) => number, update: (updater: Context['updateProgress']) => void, nextChunkSize?: number): Promise<void>
+        process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void>
     }
 
     export function chunker(ctx: Context, nextChunkSize: number): Chunker {
@@ -107,35 +109,22 @@ namespace Computation {
 }
 
 const DefaulUpdateRateMs = 150;
-const NoOpSubscribe = () => { }
-
-class ComputationImpl<A> implements Computation<A> {
-    run(ctx?: Computation.Context) {
-        return this.runObservable(ctx).result;
-    }
-
-    runObservable(ctx?: Computation.Context): Computation.Running<A> {
-        const context = ctx ? ctx as ObservableContext : new ObservableContext();
-
-        return {
-            subscribe: (context as ObservableContext).subscribe || NoOpSubscribe,
-            result: new Promise<A>(async (resolve, reject) => {
-                try {
-                    if (context.started) context.started();
-                    const result = await this.computation(context);
-                    resolve(result);
-                } catch (e) {
-                    if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e);
-                    reject(e);
-                } finally {
-                    if (context.finished) context.finished();
-                }
-            })
-        };
-    }
-
-    constructor(private computation: (ctx: Computation.Context) => Promise<A>) {
 
+function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> {
+    return (ctx?: Computation.Context) => {
+        const context: ObservableContext = ctx ? ctx : Computation.synchronous as any;
+        return new Promise<A>(async (resolve, reject) => {
+            try {
+                if (context.started) context.started();
+                const result = await computation(context);
+                resolve(result);
+            } catch (e) {
+                if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e);
+                reject(e);
+            } finally {
+                if (context.finished) context.finished();
+            }
+        });
     }
 }
 
@@ -160,6 +149,18 @@ class ObservableContext implements Computation.Context {
     subscribe = (obs: Computation.ProgressObserver) => {
         if (!this.observers) this.observers = [];
         this.observers.push(obs);
+        return {
+            dispose: () => {
+                if (!this.observers) return;
+                for (let i = 0; i < this.observers.length; i++) {
+                    if (this.observers[i] === obs) {
+                        this.observers[i] = this.observers[this.observers.length - 1];
+                        this.observers.pop();
+                        return;
+                    }
+                }
+            }
+        };
     }
 
     requestAbort() {
@@ -170,7 +171,7 @@ class ObservableContext implements Computation.Context {
         } catch (e) { }
     }
 
-    updateProgress({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void {
+    update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void {
         this.checkAborted();
 
         const time = Computation.now();
@@ -210,7 +211,10 @@ class ObservableContext implements Computation.Context {
     }
 
     started() {
-        if (!this.level) this.startedTime = Computation.now();
+        if (!this.level) {
+            this.startedTime = Computation.now();
+            this.lastUpdated = this.startedTime;
+        }
         this.level++;
     }
 
@@ -222,14 +226,14 @@ class ObservableContext implements Computation.Context {
         if (!this.level) this.observers = void 0;
     }
 
-    constructor(params?: Partial<Computation.Params>) {
-        this.updateRate = (params && params.updateRateMs) || DefaulUpdateRateMs;
+    constructor(updateRate?: number) {
+        this.updateRate = updateRate || DefaulUpdateRateMs;
     }
 }
 
 class ChunkerImpl implements Computation.Chunker {
     private processedSinceUpdate = 0;
-    private updater: Computation.Context['updateProgress'];
+    private updater: Computation.Context['update'];
 
     private computeChunkSize() {
         const lastDelta = (this.context as ObservableContext).lastDelta || 0;
@@ -251,7 +255,7 @@ class ChunkerImpl implements Computation.Chunker {
         this.nextChunkSize = size;
     }
 
-    async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['updateProgress']) => Promise<void> | void, nextChunkSize?: number) {
+    async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) {
         if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
 
         let lastChunkSize: number;
@@ -269,7 +273,7 @@ class ChunkerImpl implements Computation.Chunker {
     }
 
     constructor(public context: Computation.Context, private nextChunkSize: number) {
-        this.updater = this.context.updateProgress.bind(this.context);
+        this.updater = this.context.update.bind(this.context);
     }
 }
 
-- 
GitLab