From 9951dce68458978de425eb9fb7b75a2a496f15f6 Mon Sep 17 00:00:00 2001
From: David Sehnal <david.sehnal@gmail.com>
Date: Wed, 4 Oct 2017 12:24:56 +0200
Subject: [PATCH] fixed readLinesAsync

---
 src/reader/cif/text/parser.ts       |   2 +-
 src/reader/common/text/tokenizer.ts |  32 +++-
 src/reader/gro/parser.ts            |   2 +-
 src/script.ts                       |  10 +-
 src/utils/computation.ts            | 262 +++++++++++++++-------------
 5 files changed, 175 insertions(+), 133 deletions(-)

diff --git a/src/reader/cif/text/parser.ts b/src/reader/cif/text/parser.ts
index e90ef8c48..e9a819b5b 100644
--- a/src/reader/cif/text/parser.ts
+++ b/src/reader/cif/text/parser.ts
@@ -397,7 +397,7 @@ function createTokenizer(data: string, ctx: Computation.Context): TokenizerState
         currentTokenType: CifTokenType.End,
         currentLineNumber: 1,
         isEscaped: false,
-        computation: new Computation.Chunked(ctx, 1000000)
+        computation: Computation.chunked(ctx, 1000000)
     };
 }
 
diff --git a/src/reader/common/text/tokenizer.ts b/src/reader/common/text/tokenizer.ts
index e4fac8678..1f8354eab 100644
--- a/src/reader/common/text/tokenizer.ts
+++ b/src/reader/common/text/tokenizer.ts
@@ -35,7 +35,7 @@ export function Tokenizer(data: string, ctx: Computation.Context): Tokenizer {
         currentLineNumber: 1,
         currentTokenStart: 0,
         currentTokenEnd: 0,
-        computation: new Computation.Chunked(ctx, 1000000)
+        computation: Computation.chunked(ctx, 1000000)
     };
 }
 
@@ -90,21 +90,37 @@ export namespace Tokenizer {
         return getTokenString(state);
     }
 
+    function readLinesChunk(state: Tokenizer, count: number, tokens: Tokens) {
+        for (let i = 0; i < count; i++) {
+            markLine(state);
+            TokenBuilder.addUnchecked(tokens, state.currentTokenStart, state.currentTokenEnd);
+        }
+    }
+
     /** Advance the state by the given number of lines and return line starts/ends as tokens. */
-    export async function readLines(state: Tokenizer, count: number): Promise<Tokens> {
-        const { computation, position, length } = state
+    export function readLines(state: Tokenizer, count: number): Tokens {
         const lineTokens = TokenBuilder.create(state, count * 2);
+        readLinesChunk(state, count, lineTokens);
+        return lineTokens;
+    }
 
-        for (let i = 0; i < count; i++) {
-            markLine(state);
-            TokenBuilder.addUnchecked(lineTokens, state.currentTokenStart, state.currentTokenEnd);
+    /** Advance the state by the given number of lines and return line starts/ends as tokens. */
+    export async function readLinesAsync(state: Tokenizer, count: number): Promise<Tokens> {
+        const { computation, length } = state
+        const lineTokens = TokenBuilder.create(state, count * 2);
 
+        computation.chunkSize = 100000;
+        let linesAlreadyRead = 0;
+        while (linesAlreadyRead < count) {
+            const linesToRead = Math.min(count - linesAlreadyRead, computation.chunkSize);
+            readLinesChunk(state, linesToRead, lineTokens);
+            linesAlreadyRead += linesToRead;
             if (computation.requiresUpdate) {
-                await computation.updateProgress('Parsing...', void 0, position, length);
+                await computation.updateProgress('Parsing...', void 0, state.position, length);
             }
         }
 
-        return { data: state.data, count, indices: lineTokens.indices };
+        return lineTokens;
     }
 
     /**
diff --git a/src/reader/gro/parser.ts b/src/reader/gro/parser.ts
index 7573b6d4c..bf475fc14 100644
--- a/src/reader/gro/parser.ts
+++ b/src/reader/gro/parser.ts
@@ -88,7 +88,7 @@ function handleNumberOfAtoms(state: State) {
  */
 async function handleAtoms(state: State): Promise<Schema.Atoms> {
     const { tokenizer, numberOfAtoms } = state;
-    const lines = await Tokenizer.readLines(tokenizer, numberOfAtoms);
+    const lines = await Tokenizer.readLinesAsync(tokenizer, numberOfAtoms);
 
     const positionSample = tokenizer.data.substring(lines.indices[0], lines.indices[1]).substring(20);
     const precisions = positionSample.match(/\.\d+/g)!;
diff --git a/src/script.ts b/src/script.ts
index 5d2d1ce54..781b7a5a3 100644
--- a/src/script.ts
+++ b/src/script.ts
@@ -19,7 +19,7 @@ const file = 'md_1u19_trj.gro'
 async function runGro(input: string) {
     console.time('parseGro');
     const comp = Gro(input);
-    const running = comp.runWithContext(new Computation.ObservableContext({ updateRateMs: 250 }));
+    const running = comp.runObservable(Computation.observableContext({ updateRateMs: 250 }));
     running.subscribe(p => console.log(`[Gro] ${(p.current / p.max * 100).toFixed(2)} (${p.elapsedMs | 0}ms)`));
     const parsed = await running.result;
     console.timeEnd('parseGro');
@@ -70,7 +70,7 @@ async function runGro(input: string) {
     console.log(residueNumber.length, residueNumber[0], residueNumber[residueNumber.length - 1])
 }
 
-async function _gro() {
+function _gro() {
     fs.readFile(`./examples/${file}`, 'utf8', function (err, input) {
         if (err) {
             return console.log(err);
@@ -85,7 +85,7 @@ async function runCIF(input: string | Uint8Array) {
     console.time('parseCIF');
     const comp = typeof input === 'string' ? CIF.parseText(input) : CIF.parseBinary(input);
 
-    const running = comp.runWithContext(new Computation.ObservableContext({ updateRateMs: 250 }));
+    const running = comp.runObservable(Computation.observableContext({ updateRateMs: 250 }));
     running.subscribe(p => console.log(`[CIF] ${(p.current / p.max * 100).toFixed(2)} (${p.elapsedMs | 0}ms)`));
     const parsed = await running.result;
     console.timeEnd('parseCIF');
@@ -131,7 +131,7 @@ export function _cif() {
     });
 }
 
-_cif();
+//_cif();
 
 import Computation from './utils/computation'
 const comp = new Computation(async ctx => {
@@ -142,7 +142,7 @@ const comp = new Computation(async ctx => {
     return 42;
 });
 async function testComp() {
-    const running = comp.runWithContext();
+    const running = comp.runObservable();
     running.subscribe(p => console.log(JSON.stringify(p)));
     const ret = await running.result;
     console.log('computation returned', ret);
diff --git a/src/utils/computation.ts b/src/utils/computation.ts
index 02c82f901..896d19833 100644
--- a/src/utils/computation.ts
+++ b/src/utils/computation.ts
@@ -9,14 +9,14 @@ import Scheduler from './scheduler'
 
 class Computation<A> {
     run(ctx?: Computation.Context) {
-        return this.runWithContext(ctx).result;
+        return this.runObservable(ctx).result;
     }
 
-    runWithContext(ctx?: Computation.Context): Computation.Running<A> {
-        const context = ctx ? ctx as Computation.ObservableContext : new Computation.ObservableContext();
+    runObservable(ctx?: Computation.Context): Computation.Running<A> {
+        const context = ctx ? ctx as ObservableContext : new ObservableContext();
 
         return {
-            subscribe: (context as Computation.ObservableContext).subscribe || NoOpSubscribe,
+            subscribe: (context as ObservableContext).subscribe || NoOpSubscribe,
             result: new Promise<A>(async (resolve, reject) => {
                 try {
                     if (context.started) context.started();
@@ -37,19 +37,20 @@ class Computation<A> {
     }
 }
 
-const NoOpSubscribe = () => {}
 
 namespace Computation {
-    const DefaulUpdateRateMs = 100;
-
     export let PRINT_CONSOLE_ERROR = false;
 
+    export function create<A>(computation: (ctx: Context) => Promise<A>) {
+        return new Computation(computation);
+    }
+
     export function resolve<A>(a: A) {
-        return new Computation<A>(() => Promise.resolve(a));
+        return new Computation<A>(_ => Promise.resolve(a));
     }
 
     export function reject<A>(reason: any) {
-        return new Computation<A>(() => Promise.reject(reason));
+        return new Computation<A>(_ => Promise.reject(reason));
     }
 
     export interface Params {
@@ -80,159 +81,184 @@ namespace Computation {
         updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void
     }
 
-    type ProgressObserver = (progress: Readonly<Progress>) => void;
+    export type ProgressObserver = (progress: Readonly<Progress>) => void;
 
     export interface Running<A> {
         subscribe(onProgress: ProgressObserver): void;
         result: Promise<A>
     }
 
-    export const ContextWithoutUpdates: Context = {
+    export const contextWithoutUpdates: Context = {
         requiresUpdate: false,
         requestAbort() { },
         updateProgress(msg, abort, current, max) { }
     }
 
-    export class ObservableContext implements Context {
-        readonly updateRate: number;
-        private isSynchronous: boolean;
-        private level = 0;
-        private startedTime = 0;
-        private abortRequested = false;
-        private lastUpdated = 0;
-        private observers: ProgressObserver[] | undefined = void 0;
-        private progress: Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
+    export function observableContext(params?: Partial<Params>) {
+        return new ObservableContext(params);
+    }
 
-        lastDelta = 0;
 
-        private checkAborted() {
-            if (this.abortRequested) throw Aborted;
-        }
 
-        private abortRequester = () => { this.abortRequested = true };
+    declare var process: any;
+    declare var window: any;
 
-        subscribe = (obs: ProgressObserver) => {
-            if (!this.observers) this.observers = [];
-            this.observers.push(obs);
+    export const now: () => number = (function () {
+        if (typeof window !== 'undefined' && window.performance) {
+            const perf = window.performance;
+            return function () { return perf.now(); }
+        } else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') {
+            return function () {
+                let t = process.hrtime();
+                return t[0] * 1000 + t[1] / 1000000;
+            };
+        } else {
+            return function () { return +new Date(); }
         }
+    })();
 
-        requestAbort() {
-            try {
-                if (this.abortRequester) {
-                    this.abortRequester.call(null);
-                }
-            } catch (e) { }
-        }
+    export interface Chunked {
+        /**
+         * Get automatically computed chunk size
+         * Or set it a default value.
+         */
+        chunkSize: number,
+        readonly requiresUpdate: boolean,
+        updateProgress: Context['updateProgress'],
+        context: Context
+    }
 
-        updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void {
-            this.checkAborted();
+    export function chunked(ctx: Context, defaultChunkSize: number): Chunked {
+        return new ChunkedImpl(ctx, defaultChunkSize);
+    }
+}
 
-            const time = now();
+const DefaulUpdateRateMs = 150;
+const NoOpSubscribe = () => { }
 
-            if (typeof abort === 'boolean') {
-                this.progress.requestAbort = abort ? this.abortRequester : void 0;
-            } else {
-                if (abort) this.abortRequester = abort;
-                this.progress.requestAbort = abort ? this.abortRequester : void 0;
-            }
+class ObservableContext implements Computation.Context {
+    readonly updateRate: number;
+    private isSynchronous: boolean;
+    private level = 0;
+    private startedTime = 0;
+    private abortRequested = false;
+    private lastUpdated = 0;
+    private observers: Computation.ProgressObserver[] | undefined = void 0;
+    private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
 
-            this.progress.message = msg;
-            this.progress.elapsedMs = time - this.startedTime;
-            if (isNaN(current!) || isNaN(max!)) {
-                this.progress.isIndeterminate = true;
-            } else {
-                this.progress.isIndeterminate = false;
-                this.progress.current = current!;
-                this.progress.max = max!;
-            }
+    lastDelta = 0;
+
+    private checkAborted() {
+        if (this.abortRequested) throw Computation.Aborted;
+    }
 
-            if (this.observers) {
-                const p = { ...this.progress };
-                for (const o of this.observers) Scheduler.immediate(o, p);
+    private abortRequester = () => { this.abortRequested = true };
+
+    subscribe = (obs: Computation.ProgressObserver) => {
+        if (!this.observers) this.observers = [];
+        this.observers.push(obs);
+    }
+
+    requestAbort() {
+        try {
+            if (this.abortRequester) {
+                this.abortRequester.call(null);
             }
+        } catch (e) { }
+    }
 
-            this.lastDelta = time - this.lastUpdated;
-            this.lastUpdated = time;
+    updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void {
+        this.checkAborted();
 
-            return Scheduler.immediatePromise();
-        }
+        const time = Computation.now();
 
-        get requiresUpdate() {
-            this.checkAborted();
-            if (this.isSynchronous) return false;
-            return now() - this.lastUpdated > this.updateRate / 2;
+        if (typeof abort === 'boolean') {
+            this.progress.requestAbort = abort ? this.abortRequester : void 0;
+        } else {
+            if (abort) this.abortRequester = abort;
+            this.progress.requestAbort = abort ? this.abortRequester : void 0;
         }
 
-        started() {
-            if (!this.level) this.startedTime = now();
-            this.level++;
+        this.progress.message = msg;
+        this.progress.elapsedMs = time - this.startedTime;
+        if (isNaN(current!) || isNaN(max!)) {
+            this.progress.isIndeterminate = true;
+        } else {
+            this.progress.isIndeterminate = false;
+            this.progress.current = current!;
+            this.progress.max = max!;
         }
 
-        finished() {
-            this.level--;
-            if (this.level < 0) {
-                throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
-            }
-            if (!this.level) this.observers = void 0;
+        if (this.observers) {
+            const p = { ...this.progress };
+            for (const o of this.observers) Scheduler.immediate(o, p);
         }
 
-        constructor(params?: Partial<Params>) {
-            this.updateRate = (params && params.updateRateMs) || DefaulUpdateRateMs;
-            this.isSynchronous = !!(params && params.isSynchronous);
-        }
+        this.lastDelta = time - this.lastUpdated;
+        this.lastUpdated = time;
+
+        return Scheduler.immediatePromise();
     }
 
-    export class Chunked {
-        private currentChunkSize: number;
+    get requiresUpdate() {
+        this.checkAborted();
+        if (this.isSynchronous) return false;
+        return Computation.now() - this.lastUpdated > this.updateRate / 2;
+    }
 
-        private computeChunkSize() {
-            const lastDelta = (this.context as ObservableContext).lastDelta || 0;
-            if (!lastDelta) return this.defaultChunkSize;
-            const rate = (this.context as ObservableContext).updateRate || 0;
-            return Math.round(this.currentChunkSize * rate / lastDelta + 1);
-        }
+    started() {
+        if (!this.level) this.startedTime = Computation.now();
+        this.level++;
+    }
 
-        get chunkSize() {
-            return this.defaultChunkSize;
+    finished() {
+        this.level--;
+        if (this.level < 0) {
+            throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
         }
+        if (!this.level) this.observers = void 0;
+    }
 
-        set chunkSize(value: number) {
-            this.defaultChunkSize = value;
-            this.currentChunkSize = value;
-        }
+    constructor(params?: Partial<Computation.Params>) {
+        this.updateRate = (params && params.updateRateMs) || DefaulUpdateRateMs;
+        this.isSynchronous = !!(params && params.isSynchronous);
+    }
+}
 
-        get requiresUpdate() {
-            const ret = this.context.requiresUpdate;
-            if (!ret) this.currentChunkSize += this.chunkSize;
-            return ret;
-        }
 
-        async updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number) {
-            await this.context.updateProgress(msg, abort, current, max);
-            this.defaultChunkSize = this.computeChunkSize();
-        }
+class ChunkedImpl implements Computation.Chunked {
+    private currentChunkSize: number;
 
-        constructor(public context: Context, private defaultChunkSize: number) {
-            this.currentChunkSize = defaultChunkSize;
-        }
+    private computeChunkSize() {
+        const lastDelta = (this.context as ObservableContext).lastDelta || 0;
+        if (!lastDelta) return this.defaultChunkSize;
+        const rate = (this.context as ObservableContext).updateRate || 0;
+        return Math.round(this.currentChunkSize * rate / lastDelta + 1);
     }
 
-    declare var process: any;
-    declare var window: any;
+    get chunkSize() {
+        return this.defaultChunkSize;
+    }
 
-    export const now: () => number = (function () {
-        if (typeof window !== 'undefined' && window.performance) {
-            const perf = window.performance;
-            return function () { return perf.now(); }
-        } else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') {
-            return function () {
-                let t = process.hrtime();
-                return t[0] * 1000 + t[1] / 1000000;
-            };
-        } else {
-            return function () { return +new Date(); }
-        }
-    })();
+    set chunkSize(value: number) {
+        this.defaultChunkSize = value;
+        this.currentChunkSize = value;
+    }
+
+    get requiresUpdate() {
+        const ret = this.context.requiresUpdate;
+        if (!ret) this.currentChunkSize += this.chunkSize;
+        return ret;
+    }
+
+    async updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number) {
+        await this.context.updateProgress(msg, abort, current, max);
+        this.defaultChunkSize = this.computeChunkSize();
+    }
+
+    constructor(public context: Computation.Context, private defaultChunkSize: number) {
+        this.currentChunkSize = defaultChunkSize;
+    }
 }
 
 export default Computation;
\ No newline at end of file
-- 
GitLab