Skip to content
Snippets Groups Projects
Commit 99e829a1 authored by David Sehnal's avatar David Sehnal
Browse files

Cleaned up computation API

parent 9951dce6
No related branches found
No related tags found
No related merge requests found
......@@ -31,7 +31,7 @@ function Category(data: Encoding.EncodedCategory): Data.Category {
export default function parse(data: Uint8Array) {
return new Computation<Result<Data.File>>(async ctx => {
return Computation.create<Result<Data.File>>(async ctx => {
const minVersion = [0, 3];
try {
......@@ -53,7 +53,7 @@ interface TokenizerState {
currentTokenStart: number;
currentTokenEnd: number;
computation: Computation.Chunked
chunker: Computation.Chunker
......@@ -397,7 +397,8 @@ function createTokenizer(data: string, ctx: Computation.Context): TokenizerState
currentTokenType: CifTokenType.End,
currentLineNumber: 1,
isEscaped: false,
computation: Computation.chunked(ctx, 1000000)
chunker: Computation.chunker(ctx, 1000000)
......@@ -455,6 +456,7 @@ interface LoopReadState {
function readLoopChunk(state: LoopReadState, chunkSize: number) {
const { tokenizer, tokens, fieldCount } = state;
let tokenCount = state.tokenCount;
let counter = 0;
......@@ -464,16 +466,20 @@ function readLoopChunk(state: LoopReadState, chunkSize: number) {
state.tokenCount = tokenCount;
return tokenizer.currentTokenType === CifTokenType.Value;
return counter; //tokenizer.currentTokenType === CifTokenType.Value;
async function readLoopChunks(state: LoopReadState) {
const { computation } = state.tokenizer;
while (readLoopChunk(state, computation.chunkSize)) {
if (computation.requiresUpdate) {
await computation.updateProgress('Parsing...', void 0, state.tokenizer.position,;
function readLoopChunks(state: LoopReadState) {
const { chunker } = state.tokenizer;
// while (readLoopChunk(state, computation.chunkSize)) {
// if (computation.requiresUpdate) {
// await computation.updateProgress('Parsing...', void 0, state.tokenizer.position,;
// }
// }
return chunker.process(
chunkSize => readLoopChunk(state, chunkSize),
update => update('Parsing...', void 0, state.tokenizer.position,;
......@@ -634,7 +640,7 @@ async function parseInternal(data: string, ctx: Computation.Context) {
export default function parse(data: string) {
return new Computation<Result<Data.File>>(async ctx => {
return Computation.create<Result<Data.File>>(async ctx => {
return await parseInternal(data, ctx);
\ No newline at end of file
......@@ -17,8 +17,6 @@ export interface Tokenizer {
currentLineNumber: number
currentTokenStart: number
currentTokenEnd: number
computation: Computation.Chunked
export interface Tokens {
......@@ -27,15 +25,14 @@ export interface Tokens {
indices: ArrayLike<number>
export function Tokenizer(data: string, ctx: Computation.Context): Tokenizer {
export function Tokenizer(data: string): Tokenizer {
return {
position: 0,
length: data.length,
currentLineNumber: 1,
currentTokenStart: 0,
currentTokenEnd: 0,
computation: Computation.chunked(ctx, 1000000)
currentTokenEnd: 0
......@@ -105,20 +102,17 @@ export namespace Tokenizer {
/** Advance the state by the given number of lines and return line starts/ends as tokens. */
export async function readLinesAsync(state: Tokenizer, count: number): Promise<Tokens> {
const { computation, length } = state
export async function readLinesAsync(state: Tokenizer, count: number, chunker: Computation.Chunker): Promise<Tokens> {
const { length } = state;
const lineTokens = TokenBuilder.create(state, count * 2);
computation.chunkSize = 100000;
let linesAlreadyRead = 0;
while (linesAlreadyRead < count) {
const linesToRead = Math.min(count - linesAlreadyRead, computation.chunkSize);
await chunker.process(chunkSize => {
const linesToRead = Math.min(count - linesAlreadyRead, chunkSize);
readLinesChunk(state, linesToRead, lineTokens);
linesAlreadyRead += linesToRead;
if (computation.requiresUpdate) {
await computation.updateProgress('Parsing...', void 0, state.position, length);
return linesToRead;
}, update => update('Parsing...', void 0, state.position, length));
return lineTokens;
......@@ -16,6 +16,7 @@ interface State {
tokenizer: Tokenizer,
header: Schema.Header,
numberOfAtoms: number,
chunker: Computation.Chunker
function createEmptyHeader(): Schema.Header {
......@@ -28,11 +29,12 @@ function createEmptyHeader(): Schema.Header {
function State(tokenizer: Tokenizer): State {
function State(tokenizer: Tokenizer, ctx: Computation.Context): State {
return {
header: createEmptyHeader(),
numberOfAtoms: 0
numberOfAtoms: 0,
chunker: Computation.chunker(ctx, 100000)
......@@ -88,7 +90,7 @@ function handleNumberOfAtoms(state: State) {
async function handleAtoms(state: State): Promise<Schema.Atoms> {
const { tokenizer, numberOfAtoms } = state;
const lines = await Tokenizer.readLinesAsync(tokenizer, numberOfAtoms);
const lines = await Tokenizer.readLinesAsync(tokenizer, numberOfAtoms, state.chunker);
const positionSample =[0], lines.indices[1]).substring(20);
const precisions = positionSample.match(/\.\d+/g)!;
......@@ -136,11 +138,13 @@ function handleBoxVectors(state: State) {
async function parseInternal(data: string, ctx: Computation.Context): Promise<Result<Schema.File>> {
const tokenizer = Tokenizer(data, ctx);
const tokenizer = Tokenizer(data);
// 100000 lines is the default chunk size for this reader
const structures: Schema.Structure[] = [];
while (tokenizer.position < data.length) {
const state = State(tokenizer);
const state = State(tokenizer, ctx);
const atoms = await handleAtoms(state);
......@@ -153,7 +157,7 @@ async function parseInternal(data: string, ctx: Computation.Context): Promise<Re
export function parse(data: string) {
return new Computation<Result<Schema.File>>(async ctx => {
return Computation.create<Result<Schema.File>>(async ctx => {
return await parseInternal(data, ctx);
......@@ -19,7 +19,7 @@ const file = 'md_1u19_trj.gro'
async function runGro(input: string) {
const comp = Gro(input);
const running = comp.runObservable(Computation.observableContext({ updateRateMs: 250 }));
const running = comp.runObservable(Computation.observableContext({ updateRateMs: 150 }));
running.subscribe(p => console.log(`[Gro] ${(p.current / p.max * 100).toFixed(2)} (${p.elapsedMs | 0}ms)`));
const parsed = await running.result;
......@@ -131,10 +131,10 @@ export function _cif() {
import Computation from './utils/computation'
const comp = new Computation(async ctx => {
const comp = Computation.create(async ctx => {
for (let i = 0; i < 0; i++) {
await new Promise(res => setTimeout(res, 500));
if (ctx.requiresUpdate) await ctx.updateProgress('working', void 0, i, 2);
......@@ -7,54 +7,27 @@
import Scheduler from './scheduler'
class Computation<A> {
run(ctx?: Computation.Context) {
return this.runObservable(ctx).result;
runObservable(ctx?: Computation.Context): Computation.Running<A> {
const context = ctx ? ctx as ObservableContext : new ObservableContext();
return {
subscribe: (context as ObservableContext).subscribe || NoOpSubscribe,
result: new Promise<A>(async (resolve, reject) => {
try {
if (context.started) context.started();
const result = await this.computation(context);
} catch (e) {
if (Computation.PRINT_CONSOLE_ERROR) console.error(e);
} finally {
if (context.finished) context.finished();
constructor(private computation: (ctx: Computation.Context) => Promise<A>) {
interface Computation<A> {
run(ctx?: Computation.Context): Promise<A>,
runObservable(ctx?: Computation.Context): Computation.Running<A>
namespace Computation {
export let PRINT_CONSOLE_ERROR = false;
export function create<A>(computation: (ctx: Context) => Promise<A>) {
return new Computation(computation);
return new ComputationImpl(computation);
export function resolve<A>(a: A) {
return new Computation<A>(_ => Promise.resolve(a));
return create<A>(_ => Promise.resolve(a));
export function reject<A>(reason: any) {
return new Computation<A>(_ => Promise.reject(reason));
return create<A>(_ => Promise.reject(reason));
export interface Params {
isSynchronous: boolean,
updateRateMs: number
......@@ -70,14 +43,11 @@ namespace Computation {
export interface Context {
readonly isSynchronous: boolean,
/** Also checks if the computation was aborted. If so, throws. */
readonly requiresUpdate: boolean,
requestAbort(): void,
* Checks if the computation was aborted. If so, throws.
* Otherwise, updates the progress.
* Returns the number of ms since the last update.
/** Also checks if the computation was aborted. If so, throws. */
updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void
......@@ -88,7 +58,9 @@ namespace Computation {
result: Promise<A>
export const contextWithoutUpdates: Context = {
/** A context without updates. */
export const synchronousContext: Context = {
isSynchronous: true,
requiresUpdate: false,
requestAbort() { },
updateProgress(msg, abort, current, max) { }
......@@ -98,37 +70,31 @@ namespace Computation {
return new ObservableContext(params);
declare var process: any;
declare var window: any;
export const now: () => number = (function () {
if (typeof window !== 'undefined' && window.performance) {
const perf = window.performance;
return function () { return; }
return () =>;
} else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') {
return function () {
return () => {
let t = process.hrtime();
return t[0] * 1000 + t[1] / 1000000;
} else {
return function () { return +new Date(); }
return () => +new Date();
export interface Chunked {
* Get automatically computed chunk size
* Or set it a default value.
chunkSize: number,
readonly requiresUpdate: boolean,
updateProgress: Context['updateProgress'],
context: Context
/** A utility for splitting large computations into smaller parts. */
export interface Chunker {
setNextChunkSize(size: number): void,
/** nextChunk must return the number of actually processed chunks. */
process(nextChunk: (chunkSize: number) => number, update: (updater: Context['updateProgress']) => void, nextChunkSize?: number): Promise<void>
export function chunked(ctx: Context, defaultChunkSize: number): Chunked {
export function chunker(ctx: Context, defaultChunkSize: number): Chunker {
return new ChunkedImpl(ctx, defaultChunkSize);
......@@ -136,9 +102,39 @@ namespace Computation {
const DefaulUpdateRateMs = 150;
const NoOpSubscribe = () => { }
class ComputationImpl<A> implements Computation<A> {
run(ctx?: Computation.Context) {
return this.runObservable(ctx).result;
runObservable(ctx?: Computation.Context): Computation.Running<A> {
const context = ctx ? ctx as ObservableContext : new ObservableContext();
return {
subscribe: (context as ObservableContext).subscribe || NoOpSubscribe,
result: new Promise<A>(async (resolve, reject) => {
try {
if (context.started) context.started();
const result = await this.computation(context);
} catch (e) {
if (Computation.PRINT_CONSOLE_ERROR) console.error(e);
} finally {
if (context.finished) context.finished();
constructor(private computation: (ctx: Computation.Context) => Promise<A>) {
class ObservableContext implements Computation.Context {
readonly updateRate: number;
private isSynchronous: boolean;
readonly isSynchronous: boolean = false;
private level = 0;
private startedTime = 0;
private abortRequested = false;
......@@ -203,7 +199,7 @@ class ObservableContext implements Computation.Context {
get requiresUpdate() {
if (this.isSynchronous) return false;
return - this.lastUpdated > this.updateRate / 2;
return - this.lastUpdated > this.updateRate;
started() {
......@@ -221,43 +217,53 @@ class ObservableContext implements Computation.Context {
constructor(params?: Partial<Computation.Params>) {
this.updateRate = (params && params.updateRateMs) || DefaulUpdateRateMs;
this.isSynchronous = !!(params && params.isSynchronous);
class ChunkedImpl implements Computation.Chunked {
private currentChunkSize: number;
class ChunkedImpl implements Computation.Chunker {
private processedSinceUpdate = 0;
private updater: Computation.Context['updateProgress'];
private computeChunkSize() {
const lastDelta = (this.context as ObservableContext).lastDelta || 0;
if (!lastDelta) return this.defaultChunkSize;
if (!lastDelta) return this.nextChunkSize;
const rate = (this.context as ObservableContext).updateRate || 0;
return Math.round(this.currentChunkSize * rate / lastDelta + 1);
const ret = Math.round(this.processedSinceUpdate * rate / lastDelta + 1);
this.processedSinceUpdate = 0;
return ret;
get chunkSize() {
return this.defaultChunkSize;
private getNextChunkSize() {
const ctx = this.context as ObservableContext;
// be smart if the computation is synchronous and process the whole chunk at once.
if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER;
return this.nextChunkSize;
set chunkSize(value: number) {
this.defaultChunkSize = value;
this.currentChunkSize = value;
setNextChunkSize(size: number) {
this.nextChunkSize = size;
get requiresUpdate() {
const ret = this.context.requiresUpdate;
if (!ret) this.currentChunkSize += this.chunkSize;
return ret;
async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['updateProgress']) => Promise<void> | void, nextChunkSize?: number) {
if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
let lastChunk: number;
async updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number) {
await this.context.updateProgress(msg, abort, current, max);
this.defaultChunkSize = this.computeChunkSize();
while (( lastChunk = nextChunk(this.getNextChunkSize())) > 0) {
this.processedSinceUpdate += lastChunk;
if (this.context.requiresUpdate) {
await update(this.updater);
this.nextChunkSize = this.computeChunkSize();
if (this.context.requiresUpdate) {
await update(this.updater);
this.nextChunkSize = this.computeChunkSize();
constructor(public context: Computation.Context, private defaultChunkSize: number) {
this.currentChunkSize = defaultChunkSize;
constructor(public context: Computation.Context, private nextChunkSize: number) {
this.updater = this.context.updateProgress.bind(this.context);
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment