Skip to content
Snippets Groups Projects
Commit f03e0b67 authored by David Sehnal's avatar David Sehnal
Browse files

simplified computation API

parent 5adc085d
Branches
Tags
No related merge requests found
...@@ -555,7 +555,7 @@ async function parseInternal(data: string, ctx: Computation.Context) { ...@@ -555,7 +555,7 @@ async function parseInternal(data: string, ctx: Computation.Context) {
//inSaveFrame = false, //inSaveFrame = false,
//blockSaveFrames: any; //blockSaveFrames: any;
ctx.updateProgress({ message: 'Parsing...' }); ctx.update({ message: 'Parsing...', current: 0, max: data.length });
moveNext(tokenizer); moveNext(tokenizer);
while (tokenizer.tokenType !== CifTokenType.End) { while (tokenizer.tokenType !== CifTokenType.End) {
......
...@@ -37,7 +37,6 @@ export function Tokenizer(data: string): Tokenizer { ...@@ -37,7 +37,6 @@ export function Tokenizer(data: string): Tokenizer {
} }
export namespace Tokenizer { export namespace Tokenizer {
export function getTokenString(state: Tokenizer) { export function getTokenString(state: Tokenizer) {
return state.data.substring(state.tokenStart, state.tokenEnd); return state.data.substring(state.tokenStart, state.tokenEnd);
} }
......
...@@ -34,7 +34,7 @@ function State(tokenizer: Tokenizer, ctx: Computation.Context): State { ...@@ -34,7 +34,7 @@ function State(tokenizer: Tokenizer, ctx: Computation.Context): State {
tokenizer, tokenizer,
header: createEmptyHeader(), header: createEmptyHeader(),
numberOfAtoms: 0, numberOfAtoms: 0,
chunker: Computation.chunker(ctx, 100000) chunker: Computation.chunker(ctx, 100000) // 100000 lines is the default chunk size for this reader
}; };
} }
...@@ -140,8 +140,7 @@ function handleBoxVectors(state: State) { ...@@ -140,8 +140,7 @@ function handleBoxVectors(state: State) {
async function parseInternal(data: string, ctx: Computation.Context): Promise<Result<Schema.File>> { async function parseInternal(data: string, ctx: Computation.Context): Promise<Result<Schema.File>> {
const tokenizer = Tokenizer(data); const tokenizer = Tokenizer(data);
// 100000 lines is the default chunk size for this reader ctx.update({ message: 'Parsing...', current: 0, max: data.length });
const structures: Schema.Structure[] = []; const structures: Schema.Structure[] = [];
while (tokenizer.position < data.length) { while (tokenizer.position < data.length) {
const state = State(tokenizer, ctx); const state = State(tokenizer, ctx);
......
...@@ -26,8 +26,7 @@ const groStringHighPrecision = `Generated by trjconv : 2168 system t= 15.00000 ...@@ -26,8 +26,7 @@ const groStringHighPrecision = `Generated by trjconv : 2168 system t= 15.00000
describe('gro reader', () => { describe('gro reader', () => {
it('basic', async () => { it('basic', async () => {
const comp = Gro(groString) const parsed = await Gro(groString)();
const parsed = await comp.run()
if (parsed.isError) { if (parsed.isError) {
console.log(parsed) console.log(parsed)
...@@ -59,8 +58,7 @@ describe('gro reader', () => { ...@@ -59,8 +58,7 @@ describe('gro reader', () => {
}); });
it('high precision', async () => { it('high precision', async () => {
const comp = Gro(groStringHighPrecision) const parsed = await Gro(groStringHighPrecision)()
const parsed = await comp.run()
if (parsed.isError) { if (parsed.isError) {
console.log(parsed) console.log(parsed)
......
...@@ -11,17 +11,21 @@ import * as fs from 'fs' ...@@ -11,17 +11,21 @@ import * as fs from 'fs'
import Gro from './reader/gro/parser' import Gro from './reader/gro/parser'
import CIF from './reader/cif/index' import CIF from './reader/cif/index'
// const file = '1crn.gro' const file = '1crn.gro'
// const file = 'water.gro' // const file = 'water.gro'
// const file = 'test.gro' // const file = 'test.gro'
const file = 'md_1u19_trj.gro' // const file = 'md_1u19_trj.gro'
function showProgress(tag: string, p: Computation.Progress) {
console.log(`[${tag}] ${p.message} ${p.isIndeterminate ? '' : (p.current / p.max * 100).toFixed(2) + '% '}(${p.elapsedMs | 0}ms)`)
}
async function runGro(input: string) { async function runGro(input: string) {
console.time('parseGro'); console.time('parseGro');
const comp = Gro(input); const comp = Gro(input);
const running = comp.runObservable(Computation.observable({ updateRateMs: 150 }));
running.subscribe(p => console.log(`[Gro] ${(p.current / p.max * 100).toFixed(2)} (${p.elapsedMs | 0}ms)`)); const ctx = Computation.observable({ updateRateMs: 150, observer: p => showProgress('GRO', p) });
const parsed = await running.result; const parsed = await comp(ctx);
console.timeEnd('parseGro'); console.timeEnd('parseGro');
if (parsed.isError) { if (parsed.isError) {
...@@ -85,9 +89,8 @@ async function runCIF(input: string | Uint8Array) { ...@@ -85,9 +89,8 @@ async function runCIF(input: string | Uint8Array) {
console.time('parseCIF'); console.time('parseCIF');
const comp = typeof input === 'string' ? CIF.parseText(input) : CIF.parseBinary(input); const comp = typeof input === 'string' ? CIF.parseText(input) : CIF.parseBinary(input);
const running = comp.runObservable(Computation.observable({ updateRateMs: 250 })); // Computation.synchronous const ctx = Computation.observable({ updateRateMs: 250, observer: p => showProgress('CIF', p) });
running.subscribe(p => console.log(`[CIF] ${p.message} ${(p.current / p.max * 100).toFixed(2)}% (${p.elapsedMs | 0}ms)`)); const parsed = await comp(ctx);
const parsed = await running.result;
console.timeEnd('parseCIF'); console.timeEnd('parseCIF');
if (parsed.isError) { if (parsed.isError) {
console.log(parsed); console.log(parsed);
...@@ -137,14 +140,13 @@ import Computation from './utils/computation' ...@@ -137,14 +140,13 @@ import Computation from './utils/computation'
const comp = Computation.create(async ctx => { const comp = Computation.create(async ctx => {
for (let i = 0; i < 0; i++) { for (let i = 0; i < 0; i++) {
await new Promise(res => setTimeout(res, 500)); await new Promise(res => setTimeout(res, 500));
if (ctx.requiresUpdate) await ctx.updateProgress({ message: 'working', current: i, max: 2 }); if (ctx.requiresUpdate) await ctx.update({ message: 'working', current: i, max: 2 });
} }
return 42; return 42;
}); });
async function testComp() { async function testComp() {
const running = comp.runObservable(); const ctx = Computation.observable({ observer: p => showProgress('test', p) });
running.subscribe(p => console.log(JSON.stringify(p))); const ret = await comp(ctx);
const ret = await running.result;
console.log('computation returned', ret); console.log('computation returned', ret);
} }
testComp(); testComp();
\ No newline at end of file
...@@ -8,15 +8,14 @@ ...@@ -8,15 +8,14 @@
import Scheduler from './scheduler' import Scheduler from './scheduler'
interface Computation<A> { interface Computation<A> {
run(ctx?: Computation.Context): Promise<A>, (ctx?: Computation.Context): Promise<A>
runObservable(ctx?: Computation.Context): Computation.Running<A>
} }
namespace Computation { namespace Computation {
export let PRINT_ERRORS_TO_CONSOLE = false; export let PRINT_ERRORS_TO_CONSOLE = false;
export function create<A>(computation: (ctx: Context) => Promise<A>) { export function create<A>(computation: (ctx: Context) => Promise<A>) {
return new ComputationImpl(computation); return ComputationImpl(computation);
} }
export function resolve<A>(a: A) { export function resolve<A>(a: A) {
...@@ -28,7 +27,8 @@ namespace Computation { ...@@ -28,7 +27,8 @@ namespace Computation {
} }
export interface Params { export interface Params {
updateRateMs: number updateRateMs?: number,
observer?: ProgressObserver
} }
export const Aborted = 'Aborted'; export const Aborted = 'Aborted';
...@@ -54,27 +54,29 @@ namespace Computation { ...@@ -54,27 +54,29 @@ namespace Computation {
/** Also checks if the computation was aborted. If so, throws. */ /** Also checks if the computation was aborted. If so, throws. */
readonly requiresUpdate: boolean, readonly requiresUpdate: boolean,
requestAbort(): void, requestAbort(): void,
subscribe(onProgress: ProgressObserver): { dispose: () => void },
/** Also checks if the computation was aborted. If so, throws. */ /** Also checks if the computation was aborted. If so, throws. */
updateProgress(info: ProgressUpdate): Promise<void> | void update(info: ProgressUpdate): Promise<void> | void
} }
export type ProgressObserver = (progress: Readonly<Progress>) => void; export type ProgressObserver = (progress: Readonly<Progress>) => void;
export interface Running<A> { const emptyDisposer = { dispose: () => { } }
subscribe(onProgress: ProgressObserver): void,
result: Promise<A>
}
/** A context without updates. */ /** A context without updates. */
export const synchronous: Context = { export const synchronous: Context = {
isSynchronous: true, isSynchronous: true,
requiresUpdate: false, requiresUpdate: false,
requestAbort() { }, requestAbort() { },
updateProgress(info) { } subscribe(onProgress) { return emptyDisposer; },
update(info) { }
} }
export function observable(params?: Partial<Params>) { export function observable(params?: Partial<Params>) {
return new ObservableContext(params); const ret = new ObservableContext(params && params.updateRateMs);
if (params && params.observer) ret.subscribe(params.observer);
return ret;
} }
declare var process: any; declare var process: any;
...@@ -98,7 +100,7 @@ namespace Computation { ...@@ -98,7 +100,7 @@ namespace Computation {
export interface Chunker { export interface Chunker {
setNextChunkSize(size: number): void, setNextChunkSize(size: number): void,
/** nextChunk must return the number of actually processed chunks. */ /** nextChunk must return the number of actually processed chunks. */
process(nextChunk: (chunkSize: number) => number, update: (updater: Context['updateProgress']) => void, nextChunkSize?: number): Promise<void> process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void>
} }
export function chunker(ctx: Context, nextChunkSize: number): Chunker { export function chunker(ctx: Context, nextChunkSize: number): Chunker {
...@@ -107,22 +109,14 @@ namespace Computation { ...@@ -107,22 +109,14 @@ namespace Computation {
} }
const DefaulUpdateRateMs = 150; const DefaulUpdateRateMs = 150;
const NoOpSubscribe = () => { }
class ComputationImpl<A> implements Computation<A> {
run(ctx?: Computation.Context) {
return this.runObservable(ctx).result;
}
runObservable(ctx?: Computation.Context): Computation.Running<A> { function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> {
const context = ctx ? ctx as ObservableContext : new ObservableContext(); return (ctx?: Computation.Context) => {
const context: ObservableContext = ctx ? ctx : Computation.synchronous as any;
return { return new Promise<A>(async (resolve, reject) => {
subscribe: (context as ObservableContext).subscribe || NoOpSubscribe,
result: new Promise<A>(async (resolve, reject) => {
try { try {
if (context.started) context.started(); if (context.started) context.started();
const result = await this.computation(context); const result = await computation(context);
resolve(result); resolve(result);
} catch (e) { } catch (e) {
if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e); if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e);
...@@ -130,12 +124,7 @@ class ComputationImpl<A> implements Computation<A> { ...@@ -130,12 +124,7 @@ class ComputationImpl<A> implements Computation<A> {
} finally { } finally {
if (context.finished) context.finished(); if (context.finished) context.finished();
} }
}) });
};
}
constructor(private computation: (ctx: Computation.Context) => Promise<A>) {
} }
} }
...@@ -160,6 +149,18 @@ class ObservableContext implements Computation.Context { ...@@ -160,6 +149,18 @@ class ObservableContext implements Computation.Context {
subscribe = (obs: Computation.ProgressObserver) => { subscribe = (obs: Computation.ProgressObserver) => {
if (!this.observers) this.observers = []; if (!this.observers) this.observers = [];
this.observers.push(obs); this.observers.push(obs);
return {
dispose: () => {
if (!this.observers) return;
for (let i = 0; i < this.observers.length; i++) {
if (this.observers[i] === obs) {
this.observers[i] = this.observers[this.observers.length - 1];
this.observers.pop();
return;
}
}
}
};
} }
requestAbort() { requestAbort() {
...@@ -170,7 +171,7 @@ class ObservableContext implements Computation.Context { ...@@ -170,7 +171,7 @@ class ObservableContext implements Computation.Context {
} catch (e) { } } catch (e) { }
} }
updateProgress({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void { update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void {
this.checkAborted(); this.checkAborted();
const time = Computation.now(); const time = Computation.now();
...@@ -210,7 +211,10 @@ class ObservableContext implements Computation.Context { ...@@ -210,7 +211,10 @@ class ObservableContext implements Computation.Context {
} }
started() { started() {
if (!this.level) this.startedTime = Computation.now(); if (!this.level) {
this.startedTime = Computation.now();
this.lastUpdated = this.startedTime;
}
this.level++; this.level++;
} }
...@@ -222,14 +226,14 @@ class ObservableContext implements Computation.Context { ...@@ -222,14 +226,14 @@ class ObservableContext implements Computation.Context {
if (!this.level) this.observers = void 0; if (!this.level) this.observers = void 0;
} }
constructor(params?: Partial<Computation.Params>) { constructor(updateRate?: number) {
this.updateRate = (params && params.updateRateMs) || DefaulUpdateRateMs; this.updateRate = updateRate || DefaulUpdateRateMs;
} }
} }
class ChunkerImpl implements Computation.Chunker { class ChunkerImpl implements Computation.Chunker {
private processedSinceUpdate = 0; private processedSinceUpdate = 0;
private updater: Computation.Context['updateProgress']; private updater: Computation.Context['update'];
private computeChunkSize() { private computeChunkSize() {
const lastDelta = (this.context as ObservableContext).lastDelta || 0; const lastDelta = (this.context as ObservableContext).lastDelta || 0;
...@@ -251,7 +255,7 @@ class ChunkerImpl implements Computation.Chunker { ...@@ -251,7 +255,7 @@ class ChunkerImpl implements Computation.Chunker {
this.nextChunkSize = size; this.nextChunkSize = size;
} }
async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['updateProgress']) => Promise<void> | void, nextChunkSize?: number) { async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) {
if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize); if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
let lastChunkSize: number; let lastChunkSize: number;
...@@ -269,7 +273,7 @@ class ChunkerImpl implements Computation.Chunker { ...@@ -269,7 +273,7 @@ class ChunkerImpl implements Computation.Chunker {
} }
constructor(public context: Computation.Context, private nextChunkSize: number) { constructor(public context: Computation.Context, private nextChunkSize: number) {
this.updater = this.context.updateProgress.bind(this.context); this.updater = this.context.update.bind(this.context);
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment