diff --git a/src/reader/cif/binary/parser.ts b/src/reader/cif/binary/parser.ts index fa62f7812e4ae2c793a4e3a4a5175e41b1dfb4d7..318bd61bf141106adac00e9842de63c24efd7dec 100644 --- a/src/reader/cif/binary/parser.ts +++ b/src/reader/cif/binary/parser.ts @@ -31,7 +31,7 @@ function Category(data: Encoding.EncodedCategory): Data.Category { } export default function parse(data: Uint8Array) { - return new Computation<Result<Data.File>>(async ctx => { + return Computation.create<Result<Data.File>>(async ctx => { const minVersion = [0, 3]; try { diff --git a/src/reader/cif/text/parser.ts b/src/reader/cif/text/parser.ts index e9a819b5bb1f4a6983d4c846954fefc56573a4e2..22191b0c5b14490e5148053a6f47a5b6dc8da6f1 100644 --- a/src/reader/cif/text/parser.ts +++ b/src/reader/cif/text/parser.ts @@ -53,7 +53,7 @@ interface TokenizerState { currentTokenStart: number; currentTokenEnd: number; - computation: Computation.Chunked + chunker: Computation.Chunker } /** @@ -397,7 +397,8 @@ function createTokenizer(data: string, ctx: Computation.Context): TokenizerState currentTokenType: CifTokenType.End, currentLineNumber: 1, isEscaped: false, - computation: Computation.chunked(ctx, 1000000) + + chunker: Computation.chunker(ctx, 1000000) }; } @@ -455,6 +456,7 @@ interface LoopReadState { } function readLoopChunk(state: LoopReadState, chunkSize: number) { + //console.log(chunkSize); const { tokenizer, tokens, fieldCount } = state; let tokenCount = state.tokenCount; let counter = 0; @@ -464,16 +466,20 @@ function readLoopChunk(state: LoopReadState, chunkSize: number) { counter++; } state.tokenCount = tokenCount; - return tokenizer.currentTokenType === CifTokenType.Value; + return counter; //tokenizer.currentTokenType === CifTokenType.Value; } -async function readLoopChunks(state: LoopReadState) { - const { computation } = state.tokenizer; - while (readLoopChunk(state, computation.chunkSize)) { - if (computation.requiresUpdate) { - await computation.updateProgress('Parsing...', void 0, state.tokenizer.position, state.tokenizer.data.length); - } - } +function readLoopChunks(state: LoopReadState) { + const { chunker } = state.tokenizer; + // while (readLoopChunk(state, computation.chunkSize)) { + // if (computation.requiresUpdate) { + // await computation.updateProgress('Parsing...', void 0, state.tokenizer.position, state.tokenizer.data.length); + // } + // } + + return chunker.process( + chunkSize => readLoopChunk(state, chunkSize), + update => update('Parsing...', void 0, state.tokenizer.position, state.tokenizer.data.length)); } /** @@ -634,7 +640,7 @@ async function parseInternal(data: string, ctx: Computation.Context) { } export default function parse(data: string) { - return new Computation<Result<Data.File>>(async ctx => { + return Computation.create<Result<Data.File>>(async ctx => { return await parseInternal(data, ctx); }); } \ No newline at end of file diff --git a/src/reader/common/text/tokenizer.ts b/src/reader/common/text/tokenizer.ts index 1f8354eabf3dca2b3f44e1778c303e8a55702cf4..314b4ff42453066774d791db0a9141e9a4ddfef4 100644 --- a/src/reader/common/text/tokenizer.ts +++ b/src/reader/common/text/tokenizer.ts @@ -17,8 +17,6 @@ export interface Tokenizer { currentLineNumber: number currentTokenStart: number currentTokenEnd: number - - computation: Computation.Chunked } export interface Tokens { @@ -27,15 +25,14 @@ export interface Tokens { indices: ArrayLike<number> } -export function Tokenizer(data: string, ctx: Computation.Context): Tokenizer { +export function Tokenizer(data: string): Tokenizer { return { data, position: 0, length: data.length, currentLineNumber: 1, currentTokenStart: 0, - currentTokenEnd: 0, - computation: Computation.chunked(ctx, 1000000) + currentTokenEnd: 0 }; } @@ -105,20 +102,17 @@ export namespace Tokenizer { } /** Advance the state by the given number of lines and return line starts/ends as tokens. */ - export async function readLinesAsync(state: Tokenizer, count: number): Promise<Tokens> { - const { computation, length } = state + export async function readLinesAsync(state: Tokenizer, count: number, chunker: Computation.Chunker): Promise<Tokens> { + const { length } = state; const lineTokens = TokenBuilder.create(state, count * 2); - computation.chunkSize = 100000; let linesAlreadyRead = 0; - while (linesAlreadyRead < count) { - const linesToRead = Math.min(count - linesAlreadyRead, computation.chunkSize); + await chunker.process(chunkSize => { + const linesToRead = Math.min(count - linesAlreadyRead, chunkSize); readLinesChunk(state, linesToRead, lineTokens); linesAlreadyRead += linesToRead; - if (computation.requiresUpdate) { - await computation.updateProgress('Parsing...', void 0, state.position, length); - } - } + return linesToRead; + }, update => update('Parsing...', void 0, state.position, length)); return lineTokens; } diff --git a/src/reader/gro/parser.ts b/src/reader/gro/parser.ts index bf475fc140549682e73c3f510fa9fa0d77bf5aff..55db97ccbe11e76eb8011d508aa487c4b118277f 100644 --- a/src/reader/gro/parser.ts +++ b/src/reader/gro/parser.ts @@ -16,6 +16,7 @@ interface State { tokenizer: Tokenizer, header: Schema.Header, numberOfAtoms: number, + chunker: Computation.Chunker } function createEmptyHeader(): Schema.Header { @@ -28,11 +29,12 @@ function createEmptyHeader(): Schema.Header { }; } -function State(tokenizer: Tokenizer): State { +function State(tokenizer: Tokenizer, ctx: Computation.Context): State { return { tokenizer, header: createEmptyHeader(), - numberOfAtoms: 0 + numberOfAtoms: 0, + chunker: Computation.chunker(ctx, 100000) }; } @@ -88,7 +90,7 @@ function handleNumberOfAtoms(state: State) { */ async function handleAtoms(state: State): Promise<Schema.Atoms> { const { tokenizer, numberOfAtoms } = state; - const lines = await Tokenizer.readLinesAsync(tokenizer, numberOfAtoms); + const lines = await Tokenizer.readLinesAsync(tokenizer, numberOfAtoms, state.chunker); const positionSample = tokenizer.data.substring(lines.indices[0], lines.indices[1]).substring(20); const precisions = positionSample.match(/\.\d+/g)!; @@ -136,11 +138,13 @@ function handleBoxVectors(state: State) { } async function parseInternal(data: string, ctx: Computation.Context): Promise<Result<Schema.File>> { - const tokenizer = Tokenizer(data, ctx); + const tokenizer = Tokenizer(data); + + // 100000 lines is the default chunk size for this reader const structures: Schema.Structure[] = []; while (tokenizer.position < data.length) { - const state = State(tokenizer); + const state = State(tokenizer, ctx); handleTitleString(state); handleNumberOfAtoms(state); const atoms = await handleAtoms(state); @@ -153,7 +157,7 @@ async function parseInternal(data: string, ctx: Computation.Context): Promise<Re } export function parse(data: string) { - return new Computation<Result<Schema.File>>(async ctx => { + return Computation.create<Result<Schema.File>>(async ctx => { return await parseInternal(data, ctx); }); } diff --git a/src/script.ts b/src/script.ts index 781b7a5a363470db6ec9af27b40f6d04b02196a7..671f9f374e1a26c4390b4ea3a1856791ec416bbe 100644 --- a/src/script.ts +++ b/src/script.ts @@ -19,7 +19,7 @@ const file = 'md_1u19_trj.gro' async function runGro(input: string) { console.time('parseGro'); const comp = Gro(input); - const running = comp.runObservable(Computation.observableContext({ updateRateMs: 250 })); + const running = comp.runObservable(Computation.observableContext({ updateRateMs: 150 })); running.subscribe(p => console.log(`[Gro] ${(p.current / p.max * 100).toFixed(2)} (${p.elapsedMs | 0}ms)`)); const parsed = await running.result; console.timeEnd('parseGro'); @@ -131,10 +131,10 @@ export function _cif() { }); } -//_cif(); +_cif(); import Computation from './utils/computation' -const comp = new Computation(async ctx => { +const comp = Computation.create(async ctx => { for (let i = 0; i < 0; i++) { await new Promise(res => setTimeout(res, 500)); if (ctx.requiresUpdate) await ctx.updateProgress('working', void 0, i, 2); diff --git a/src/utils/computation.ts b/src/utils/computation.ts index 896d198334ed9d4f0f07ef72a414f559b7ef52e0..9619abd2feb20203d7779d2f742f059d20622609 100644 --- a/src/utils/computation.ts +++ b/src/utils/computation.ts @@ -7,54 +7,27 @@ import Scheduler from './scheduler' -class Computation<A> { - run(ctx?: Computation.Context) { - return this.runObservable(ctx).result; - } - - runObservable(ctx?: Computation.Context): Computation.Running<A> { - const context = ctx ? ctx as ObservableContext : new ObservableContext(); - - return { - subscribe: (context as ObservableContext).subscribe || NoOpSubscribe, - result: new Promise<A>(async (resolve, reject) => { - try { - if (context.started) context.started(); - const result = await this.computation(context); - resolve(result); - } catch (e) { - if (Computation.PRINT_CONSOLE_ERROR) console.error(e); - reject(e); - } finally { - if (context.finished) context.finished(); - } - }) - }; - } - - constructor(private computation: (ctx: Computation.Context) => Promise<A>) { - - } +interface Computation<A> { + run(ctx?: Computation.Context): Promise<A>, + runObservable(ctx?: Computation.Context): Computation.Running<A> } - namespace Computation { export let PRINT_CONSOLE_ERROR = false; export function create<A>(computation: (ctx: Context) => Promise<A>) { - return new Computation(computation); + return new ComputationImpl(computation); } export function resolve<A>(a: A) { - return new Computation<A>(_ => Promise.resolve(a)); + return create<A>(_ => Promise.resolve(a)); } export function reject<A>(reason: any) { - return new Computation<A>(_ => Promise.reject(reason)); + return create<A>(_ => Promise.reject(reason)); } export interface Params { - isSynchronous: boolean, updateRateMs: number } @@ -70,14 +43,11 @@ namespace Computation { } export interface Context { + readonly isSynchronous: boolean, + /** Also checks if the computation was aborted. If so, throws. */ readonly requiresUpdate: boolean, requestAbort(): void, - /** - * Checks if the computation was aborted. If so, throws. - * Otherwise, updates the progress. - * - * Returns the number of ms since the last update. - */ + /** Also checks if the computation was aborted. If so, throws. */ updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void } @@ -88,7 +58,9 @@ namespace Computation { result: Promise<A> } - export const contextWithoutUpdates: Context = { + /** A context without updates. */ + export const synchronousContext: Context = { + isSynchronous: true, requiresUpdate: false, requestAbort() { }, updateProgress(msg, abort, current, max) { } @@ -98,37 +70,31 @@ namespace Computation { return new ObservableContext(params); } - - declare var process: any; declare var window: any; export const now: () => number = (function () { if (typeof window !== 'undefined' && window.performance) { const perf = window.performance; - return function () { return perf.now(); } + return () => perf.now(); } else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') { - return function () { + return () => { let t = process.hrtime(); return t[0] * 1000 + t[1] / 1000000; }; } else { - return function () { return +new Date(); } + return () => +new Date(); } - })(); - - export interface Chunked { - /** - * Get automatically computed chunk size - * Or set it a default value. - */ - chunkSize: number, - readonly requiresUpdate: boolean, - updateProgress: Context['updateProgress'], - context: Context + }()); + + /** A utility for splitting large computations into smaller parts. */ + export interface Chunker { + setNextChunkSize(size: number): void, + /** nextChunk must return the number of actually processed chunks. */ + process(nextChunk: (chunkSize: number) => number, update: (updater: Context['updateProgress']) => void, nextChunkSize?: number): Promise<void> } - export function chunked(ctx: Context, defaultChunkSize: number): Chunked { + export function chunker(ctx: Context, defaultChunkSize: number): Chunker { return new ChunkedImpl(ctx, defaultChunkSize); } } @@ -136,9 +102,39 @@ namespace Computation { const DefaulUpdateRateMs = 150; const NoOpSubscribe = () => { } +class ComputationImpl<A> implements Computation<A> { + run(ctx?: Computation.Context) { + return this.runObservable(ctx).result; + } + + runObservable(ctx?: Computation.Context): Computation.Running<A> { + const context = ctx ? ctx as ObservableContext : new ObservableContext(); + + return { + subscribe: (context as ObservableContext).subscribe || NoOpSubscribe, + result: new Promise<A>(async (resolve, reject) => { + try { + if (context.started) context.started(); + const result = await this.computation(context); + resolve(result); + } catch (e) { + if (Computation.PRINT_CONSOLE_ERROR) console.error(e); + reject(e); + } finally { + if (context.finished) context.finished(); + } + }) + }; + } + + constructor(private computation: (ctx: Computation.Context) => Promise<A>) { + + } +} + class ObservableContext implements Computation.Context { readonly updateRate: number; - private isSynchronous: boolean; + readonly isSynchronous: boolean = false; private level = 0; private startedTime = 0; private abortRequested = false; @@ -203,7 +199,7 @@ class ObservableContext implements Computation.Context { get requiresUpdate() { this.checkAborted(); if (this.isSynchronous) return false; - return Computation.now() - this.lastUpdated > this.updateRate / 2; + return Computation.now() - this.lastUpdated > this.updateRate; } started() { @@ -221,43 +217,53 @@ class ObservableContext implements Computation.Context { constructor(params?: Partial<Computation.Params>) { this.updateRate = (params && params.updateRateMs) || DefaulUpdateRateMs; - this.isSynchronous = !!(params && params.isSynchronous); } } -class ChunkedImpl implements Computation.Chunked { - private currentChunkSize: number; +class ChunkedImpl implements Computation.Chunker { + private processedSinceUpdate = 0; + private updater: Computation.Context['updateProgress']; private computeChunkSize() { const lastDelta = (this.context as ObservableContext).lastDelta || 0; - if (!lastDelta) return this.defaultChunkSize; + if (!lastDelta) return this.nextChunkSize; const rate = (this.context as ObservableContext).updateRate || 0; - return Math.round(this.currentChunkSize * rate / lastDelta + 1); + const ret = Math.round(this.processedSinceUpdate * rate / lastDelta + 1); + this.processedSinceUpdate = 0; + return ret; } - get chunkSize() { - return this.defaultChunkSize; + private getNextChunkSize() { + const ctx = this.context as ObservableContext; + // be smart if the computation is synchronous and process the whole chunk at once. + if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER; + return this.nextChunkSize; } - set chunkSize(value: number) { - this.defaultChunkSize = value; - this.currentChunkSize = value; + setNextChunkSize(size: number) { + this.nextChunkSize = size; } - get requiresUpdate() { - const ret = this.context.requiresUpdate; - if (!ret) this.currentChunkSize += this.chunkSize; - return ret; - } + async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['updateProgress']) => Promise<void> | void, nextChunkSize?: number) { + if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize); + let lastChunk: number; - async updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number) { - await this.context.updateProgress(msg, abort, current, max); - this.defaultChunkSize = this.computeChunkSize(); + while (( lastChunk = nextChunk(this.getNextChunkSize())) > 0) { + this.processedSinceUpdate += lastChunk; + if (this.context.requiresUpdate) { + await update(this.updater); + this.nextChunkSize = this.computeChunkSize(); + } + } + if (this.context.requiresUpdate) { + await update(this.updater); + this.nextChunkSize = this.computeChunkSize(); + } } - constructor(public context: Computation.Context, private defaultChunkSize: number) { - this.currentChunkSize = defaultChunkSize; + constructor(public context: Computation.Context, private nextChunkSize: number) { + this.updater = this.context.updateProgress.bind(this.context); } }