Skip to content
Snippets Groups Projects
Commit 9951dce6 authored by David Sehnal's avatar David Sehnal
Browse files

fixed readLinesAsync

parent d46610f4
No related branches found
No related tags found
No related merge requests found
......@@ -397,7 +397,7 @@ function createTokenizer(data: string, ctx: Computation.Context): TokenizerState
currentTokenType: CifTokenType.End,
currentLineNumber: 1,
isEscaped: false,
computation: new Computation.Chunked(ctx, 1000000)
computation: Computation.chunked(ctx, 1000000)
};
}
......
......@@ -35,7 +35,7 @@ export function Tokenizer(data: string, ctx: Computation.Context): Tokenizer {
currentLineNumber: 1,
currentTokenStart: 0,
currentTokenEnd: 0,
computation: new Computation.Chunked(ctx, 1000000)
computation: Computation.chunked(ctx, 1000000)
};
}
......@@ -90,21 +90,37 @@ export namespace Tokenizer {
return getTokenString(state);
}
function readLinesChunk(state: Tokenizer, count: number, tokens: Tokens) {
for (let i = 0; i < count; i++) {
markLine(state);
TokenBuilder.addUnchecked(tokens, state.currentTokenStart, state.currentTokenEnd);
}
}
/** Advance the state by the given number of lines and return line starts/ends as tokens. */
export async function readLines(state: Tokenizer, count: number): Promise<Tokens> {
const { computation, position, length } = state
export function readLines(state: Tokenizer, count: number): Tokens {
const lineTokens = TokenBuilder.create(state, count * 2);
readLinesChunk(state, count, lineTokens);
return lineTokens;
}
for (let i = 0; i < count; i++) {
markLine(state);
TokenBuilder.addUnchecked(lineTokens, state.currentTokenStart, state.currentTokenEnd);
/** Advance the state by the given number of lines and return line starts/ends as tokens. */
export async function readLinesAsync(state: Tokenizer, count: number): Promise<Tokens> {
const { computation, length } = state
const lineTokens = TokenBuilder.create(state, count * 2);
computation.chunkSize = 100000;
let linesAlreadyRead = 0;
while (linesAlreadyRead < count) {
const linesToRead = Math.min(count - linesAlreadyRead, computation.chunkSize);
readLinesChunk(state, linesToRead, lineTokens);
linesAlreadyRead += linesToRead;
if (computation.requiresUpdate) {
await computation.updateProgress('Parsing...', void 0, position, length);
await computation.updateProgress('Parsing...', void 0, state.position, length);
}
}
return { data: state.data, count, indices: lineTokens.indices };
return lineTokens;
}
/**
......
......@@ -88,7 +88,7 @@ function handleNumberOfAtoms(state: State) {
*/
async function handleAtoms(state: State): Promise<Schema.Atoms> {
const { tokenizer, numberOfAtoms } = state;
const lines = await Tokenizer.readLines(tokenizer, numberOfAtoms);
const lines = await Tokenizer.readLinesAsync(tokenizer, numberOfAtoms);
const positionSample = tokenizer.data.substring(lines.indices[0], lines.indices[1]).substring(20);
const precisions = positionSample.match(/\.\d+/g)!;
......
......@@ -19,7 +19,7 @@ const file = 'md_1u19_trj.gro'
async function runGro(input: string) {
console.time('parseGro');
const comp = Gro(input);
const running = comp.runWithContext(new Computation.ObservableContext({ updateRateMs: 250 }));
const running = comp.runObservable(Computation.observableContext({ updateRateMs: 250 }));
running.subscribe(p => console.log(`[Gro] ${(p.current / p.max * 100).toFixed(2)} (${p.elapsedMs | 0}ms)`));
const parsed = await running.result;
console.timeEnd('parseGro');
......@@ -70,7 +70,7 @@ async function runGro(input: string) {
console.log(residueNumber.length, residueNumber[0], residueNumber[residueNumber.length - 1])
}
async function _gro() {
function _gro() {
fs.readFile(`./examples/${file}`, 'utf8', function (err, input) {
if (err) {
return console.log(err);
......@@ -85,7 +85,7 @@ async function runCIF(input: string | Uint8Array) {
console.time('parseCIF');
const comp = typeof input === 'string' ? CIF.parseText(input) : CIF.parseBinary(input);
const running = comp.runWithContext(new Computation.ObservableContext({ updateRateMs: 250 }));
const running = comp.runObservable(Computation.observableContext({ updateRateMs: 250 }));
running.subscribe(p => console.log(`[CIF] ${(p.current / p.max * 100).toFixed(2)} (${p.elapsedMs | 0}ms)`));
const parsed = await running.result;
console.timeEnd('parseCIF');
......@@ -131,7 +131,7 @@ export function _cif() {
});
}
_cif();
//_cif();
import Computation from './utils/computation'
const comp = new Computation(async ctx => {
......@@ -142,7 +142,7 @@ const comp = new Computation(async ctx => {
return 42;
});
async function testComp() {
const running = comp.runWithContext();
const running = comp.runObservable();
running.subscribe(p => console.log(JSON.stringify(p)));
const ret = await running.result;
console.log('computation returned', ret);
......
......@@ -9,14 +9,14 @@ import Scheduler from './scheduler'
class Computation<A> {
run(ctx?: Computation.Context) {
return this.runWithContext(ctx).result;
return this.runObservable(ctx).result;
}
runWithContext(ctx?: Computation.Context): Computation.Running<A> {
const context = ctx ? ctx as Computation.ObservableContext : new Computation.ObservableContext();
runObservable(ctx?: Computation.Context): Computation.Running<A> {
const context = ctx ? ctx as ObservableContext : new ObservableContext();
return {
subscribe: (context as Computation.ObservableContext).subscribe || NoOpSubscribe,
subscribe: (context as ObservableContext).subscribe || NoOpSubscribe,
result: new Promise<A>(async (resolve, reject) => {
try {
if (context.started) context.started();
......@@ -37,19 +37,20 @@ class Computation<A> {
}
}
const NoOpSubscribe = () => {}
namespace Computation {
const DefaulUpdateRateMs = 100;
export let PRINT_CONSOLE_ERROR = false;
export function create<A>(computation: (ctx: Context) => Promise<A>) {
return new Computation(computation);
}
export function resolve<A>(a: A) {
return new Computation<A>(() => Promise.resolve(a));
return new Computation<A>(_ => Promise.resolve(a));
}
export function reject<A>(reason: any) {
return new Computation<A>(() => Promise.reject(reason));
return new Computation<A>(_ => Promise.reject(reason));
}
export interface Params {
......@@ -80,159 +81,184 @@ namespace Computation {
updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void
}
type ProgressObserver = (progress: Readonly<Progress>) => void;
export type ProgressObserver = (progress: Readonly<Progress>) => void;
export interface Running<A> {
subscribe(onProgress: ProgressObserver): void;
result: Promise<A>
}
export const ContextWithoutUpdates: Context = {
export const contextWithoutUpdates: Context = {
requiresUpdate: false,
requestAbort() { },
updateProgress(msg, abort, current, max) { }
}
export class ObservableContext implements Context {
readonly updateRate: number;
private isSynchronous: boolean;
private level = 0;
private startedTime = 0;
private abortRequested = false;
private lastUpdated = 0;
private observers: ProgressObserver[] | undefined = void 0;
private progress: Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
export function observableContext(params?: Partial<Params>) {
return new ObservableContext(params);
}
lastDelta = 0;
private checkAborted() {
if (this.abortRequested) throw Aborted;
}
private abortRequester = () => { this.abortRequested = true };
declare var process: any;
declare var window: any;
subscribe = (obs: ProgressObserver) => {
if (!this.observers) this.observers = [];
this.observers.push(obs);
export const now: () => number = (function () {
if (typeof window !== 'undefined' && window.performance) {
const perf = window.performance;
return function () { return perf.now(); }
} else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') {
return function () {
let t = process.hrtime();
return t[0] * 1000 + t[1] / 1000000;
};
} else {
return function () { return +new Date(); }
}
})();
requestAbort() {
try {
if (this.abortRequester) {
this.abortRequester.call(null);
}
} catch (e) { }
}
export interface Chunked {
/**
* Get automatically computed chunk size
* Or set it a default value.
*/
chunkSize: number,
readonly requiresUpdate: boolean,
updateProgress: Context['updateProgress'],
context: Context
}
updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void {
this.checkAborted();
export function chunked(ctx: Context, defaultChunkSize: number): Chunked {
return new ChunkedImpl(ctx, defaultChunkSize);
}
}
const time = now();
const DefaulUpdateRateMs = 150;
const NoOpSubscribe = () => { }
if (typeof abort === 'boolean') {
this.progress.requestAbort = abort ? this.abortRequester : void 0;
} else {
if (abort) this.abortRequester = abort;
this.progress.requestAbort = abort ? this.abortRequester : void 0;
}
class ObservableContext implements Computation.Context {
readonly updateRate: number;
private isSynchronous: boolean;
private level = 0;
private startedTime = 0;
private abortRequested = false;
private lastUpdated = 0;
private observers: Computation.ProgressObserver[] | undefined = void 0;
private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
this.progress.message = msg;
this.progress.elapsedMs = time - this.startedTime;
if (isNaN(current!) || isNaN(max!)) {
this.progress.isIndeterminate = true;
} else {
this.progress.isIndeterminate = false;
this.progress.current = current!;
this.progress.max = max!;
}
lastDelta = 0;
private checkAborted() {
if (this.abortRequested) throw Computation.Aborted;
}
if (this.observers) {
const p = { ...this.progress };
for (const o of this.observers) Scheduler.immediate(o, p);
private abortRequester = () => { this.abortRequested = true };
subscribe = (obs: Computation.ProgressObserver) => {
if (!this.observers) this.observers = [];
this.observers.push(obs);
}
requestAbort() {
try {
if (this.abortRequester) {
this.abortRequester.call(null);
}
} catch (e) { }
}
this.lastDelta = time - this.lastUpdated;
this.lastUpdated = time;
updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number): Promise<void> | void {
this.checkAborted();
return Scheduler.immediatePromise();
}
const time = Computation.now();
get requiresUpdate() {
this.checkAborted();
if (this.isSynchronous) return false;
return now() - this.lastUpdated > this.updateRate / 2;
if (typeof abort === 'boolean') {
this.progress.requestAbort = abort ? this.abortRequester : void 0;
} else {
if (abort) this.abortRequester = abort;
this.progress.requestAbort = abort ? this.abortRequester : void 0;
}
started() {
if (!this.level) this.startedTime = now();
this.level++;
this.progress.message = msg;
this.progress.elapsedMs = time - this.startedTime;
if (isNaN(current!) || isNaN(max!)) {
this.progress.isIndeterminate = true;
} else {
this.progress.isIndeterminate = false;
this.progress.current = current!;
this.progress.max = max!;
}
finished() {
this.level--;
if (this.level < 0) {
throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
}
if (!this.level) this.observers = void 0;
if (this.observers) {
const p = { ...this.progress };
for (const o of this.observers) Scheduler.immediate(o, p);
}
constructor(params?: Partial<Params>) {
this.updateRate = (params && params.updateRateMs) || DefaulUpdateRateMs;
this.isSynchronous = !!(params && params.isSynchronous);
}
this.lastDelta = time - this.lastUpdated;
this.lastUpdated = time;
return Scheduler.immediatePromise();
}
export class Chunked {
private currentChunkSize: number;
get requiresUpdate() {
this.checkAborted();
if (this.isSynchronous) return false;
return Computation.now() - this.lastUpdated > this.updateRate / 2;
}
private computeChunkSize() {
const lastDelta = (this.context as ObservableContext).lastDelta || 0;
if (!lastDelta) return this.defaultChunkSize;
const rate = (this.context as ObservableContext).updateRate || 0;
return Math.round(this.currentChunkSize * rate / lastDelta + 1);
}
started() {
if (!this.level) this.startedTime = Computation.now();
this.level++;
}
get chunkSize() {
return this.defaultChunkSize;
finished() {
this.level--;
if (this.level < 0) {
throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
}
if (!this.level) this.observers = void 0;
}
set chunkSize(value: number) {
this.defaultChunkSize = value;
this.currentChunkSize = value;
}
constructor(params?: Partial<Computation.Params>) {
this.updateRate = (params && params.updateRateMs) || DefaulUpdateRateMs;
this.isSynchronous = !!(params && params.isSynchronous);
}
}
get requiresUpdate() {
const ret = this.context.requiresUpdate;
if (!ret) this.currentChunkSize += this.chunkSize;
return ret;
}
async updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number) {
await this.context.updateProgress(msg, abort, current, max);
this.defaultChunkSize = this.computeChunkSize();
}
class ChunkedImpl implements Computation.Chunked {
private currentChunkSize: number;
constructor(public context: Context, private defaultChunkSize: number) {
this.currentChunkSize = defaultChunkSize;
}
private computeChunkSize() {
const lastDelta = (this.context as ObservableContext).lastDelta || 0;
if (!lastDelta) return this.defaultChunkSize;
const rate = (this.context as ObservableContext).updateRate || 0;
return Math.round(this.currentChunkSize * rate / lastDelta + 1);
}
declare var process: any;
declare var window: any;
get chunkSize() {
return this.defaultChunkSize;
}
export const now: () => number = (function () {
if (typeof window !== 'undefined' && window.performance) {
const perf = window.performance;
return function () { return perf.now(); }
} else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') {
return function () {
let t = process.hrtime();
return t[0] * 1000 + t[1] / 1000000;
};
} else {
return function () { return +new Date(); }
}
})();
set chunkSize(value: number) {
this.defaultChunkSize = value;
this.currentChunkSize = value;
}
get requiresUpdate() {
const ret = this.context.requiresUpdate;
if (!ret) this.currentChunkSize += this.chunkSize;
return ret;
}
async updateProgress(msg: string, abort?: boolean | (() => void), current?: number, max?: number) {
await this.context.updateProgress(msg, abort, current, max);
this.defaultChunkSize = this.computeChunkSize();
}
constructor(public context: Computation.Context, private defaultChunkSize: number) {
this.currentChunkSize = defaultChunkSize;
}
}
export default Computation;
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment