Skip to content
Snippets Groups Projects
Commit 6fad112c authored by David Sehnal's avatar David Sehnal
Browse files

Removed computation from mol-util, use mol-task instead

parent 3ff823a6
No related branches found
No related tags found
No related merge requests found
...@@ -48,7 +48,7 @@ async function runGenerateSchema(name: string, fieldNamesPath?: string, minCount ...@@ -48,7 +48,7 @@ async function runGenerateSchema(name: string, fieldNamesPath?: string, minCount
async function getFieldNamesFilter(fieldNamesPath: string): Promise<Filter> { async function getFieldNamesFilter(fieldNamesPath: string): Promise<Filter> {
const fieldNamesStr = fs.readFileSync(fieldNamesPath, 'utf8') const fieldNamesStr = fs.readFileSync(fieldNamesPath, 'utf8')
const parsed = await Csv(fieldNamesStr, { noColumnNames: true })(); const parsed = await Run(Csv(fieldNamesStr, { noColumnNames: true }));
if (parsed.isError) throw parser.error if (parsed.isError) throw parser.error
const csvFile = parsed.result; const csvFile = parsed.result;
...@@ -69,7 +69,7 @@ async function getFieldNamesFilter(fieldNamesPath: string): Promise<Filter> { ...@@ -69,7 +69,7 @@ async function getFieldNamesFilter(fieldNamesPath: string): Promise<Filter> {
async function getUsageCountsFilter(minCount: number): Promise<Filter> { async function getUsageCountsFilter(minCount: number): Promise<Filter> {
const usageCountsStr = fs.readFileSync(MMCIF_USAGE_COUNTS_PATH, 'utf8') const usageCountsStr = fs.readFileSync(MMCIF_USAGE_COUNTS_PATH, 'utf8')
const parsed = await Csv(usageCountsStr, { delimiter: ' ' })(); const parsed = await Run(Csv(usageCountsStr, { delimiter: ' ' }));
if (parsed.isError) throw parser.error if (parsed.isError) throw parser.error
const csvFile = parsed.result; const csvFile = parsed.result;
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
*/ */
import Csv from '../csv/parser' import Csv from '../csv/parser'
import { Run } from 'mol-task';
const csvStringBasic = `StrCol,IntCol,FloatCol const csvStringBasic = `StrCol,IntCol,FloatCol
# comment # comment
...@@ -23,7 +24,7 @@ string2\t42\t2.44` ...@@ -23,7 +24,7 @@ string2\t42\t2.44`
describe('csv reader', () => { describe('csv reader', () => {
it('basic', async () => { it('basic', async () => {
const parsed = await Csv(csvStringBasic)(); const parsed = await Run(Csv(csvStringBasic));
if (parsed.isError) return; if (parsed.isError) return;
const csvFile = parsed.result; const csvFile = parsed.result;
...@@ -45,7 +46,7 @@ describe('csv reader', () => { ...@@ -45,7 +46,7 @@ describe('csv reader', () => {
}); });
it('advanced', async () => { it('advanced', async () => {
const parsed = await Csv(csvStringAdvanced)(); const parsed = await Run(Csv(csvStringAdvanced));
if (parsed.isError) return; if (parsed.isError) return;
const csvFile = parsed.result; const csvFile = parsed.result;
...@@ -62,7 +63,7 @@ describe('csv reader', () => { ...@@ -62,7 +63,7 @@ describe('csv reader', () => {
}); });
it('tabs', async () => { it('tabs', async () => {
const parsed = await Csv(tabString, { delimiter: '\t' })(); const parsed = await Run(Csv(tabString, { delimiter: '\t' }));
if (parsed.isError) return; if (parsed.isError) return;
const csvFile = parsed.result; const csvFile = parsed.result;
......
...@@ -9,7 +9,7 @@ import { Tokens, TokenBuilder, Tokenizer } from '../common/text/tokenizer' ...@@ -9,7 +9,7 @@ import { Tokens, TokenBuilder, Tokenizer } from '../common/text/tokenizer'
import * as Data from './data-model' import * as Data from './data-model'
import Field from './field' import Field from './field'
import Result from '../result' import Result from '../result'
import Computation from 'mol-util/computation' import { Task, RuntimeContext, chunkedSubtask, } from 'mol-task'
const enum CsvTokenType { const enum CsvTokenType {
Value = 0, Value = 0,
...@@ -22,7 +22,7 @@ interface State { ...@@ -22,7 +22,7 @@ interface State {
tokenizer: Tokenizer, tokenizer: Tokenizer,
tokenType: CsvTokenType; tokenType: CsvTokenType;
chunker: Computation.Chunker, runtimeCtx: RuntimeContext,
tokens: Tokens[], tokens: Tokens[],
fieldCount: number, fieldCount: number,
...@@ -38,7 +38,7 @@ interface State { ...@@ -38,7 +38,7 @@ interface State {
noColumnNamesRecord: boolean noColumnNamesRecord: boolean
} }
function State(data: string, ctx: Computation.Context, opts: CsvOptions): State { function State(data: string, runtimeCtx: RuntimeContext, opts: CsvOptions): State {
const tokenizer = Tokenizer(data) const tokenizer = Tokenizer(data)
return { return {
...@@ -46,7 +46,7 @@ function State(data: string, ctx: Computation.Context, opts: CsvOptions): State ...@@ -46,7 +46,7 @@ function State(data: string, ctx: Computation.Context, opts: CsvOptions): State
tokenizer, tokenizer,
tokenType: CsvTokenType.End, tokenType: CsvTokenType.End,
chunker: Computation.chunker(ctx, 100000), runtimeCtx,
tokens: [], tokens: [],
fieldCount: 0, fieldCount: 0,
...@@ -206,7 +206,7 @@ function moveNext(state: State) { ...@@ -206,7 +206,7 @@ function moveNext(state: State) {
return newRecord return newRecord
} }
function readRecordsChunk(state: State, chunkSize: number) { function readRecordsChunk(chunkSize: number, state: State) {
if (state.tokenType === CsvTokenType.End) return 0 if (state.tokenType === CsvTokenType.End) return 0
let newRecord = moveNext(state); let newRecord = moveNext(state);
...@@ -225,9 +225,8 @@ function readRecordsChunk(state: State, chunkSize: number) { ...@@ -225,9 +225,8 @@ function readRecordsChunk(state: State, chunkSize: number) {
} }
function readRecordsChunks(state: State) { function readRecordsChunks(state: State) {
return state.chunker.process( return chunkedSubtask(state.runtimeCtx, 100000, state, readRecordsChunk,
chunkSize => readRecordsChunk(state, chunkSize), (ctx, state) => ctx.update({ message: 'Parsing...', current: state.tokenizer.position, max: state.data.length }));
update => update({ message: 'Parsing...', current: state.tokenizer.position, max: state.data.length }));
} }
function addColumn (state: State) { function addColumn (state: State) {
...@@ -261,7 +260,7 @@ async function handleRecords(state: State): Promise<Data.Table> { ...@@ -261,7 +260,7 @@ async function handleRecords(state: State): Promise<Data.Table> {
return Data.Table(state.recordCount, state.columnNames, columns) return Data.Table(state.recordCount, state.columnNames, columns)
} }
async function parseInternal(data: string, ctx: Computation.Context, opts: CsvOptions): Promise<Result<Data.File>> { async function parseInternal(data: string, ctx: RuntimeContext, opts: CsvOptions): Promise<Result<Data.File>> {
const state = State(data, ctx, opts); const state = State(data, ctx, opts);
ctx.update({ message: 'Parsing...', current: 0, max: data.length }); ctx.update({ message: 'Parsing...', current: 0, max: data.length });
...@@ -279,7 +278,7 @@ interface CsvOptions { ...@@ -279,7 +278,7 @@ interface CsvOptions {
export function parse(data: string, opts?: Partial<CsvOptions>) { export function parse(data: string, opts?: Partial<CsvOptions>) {
const completeOpts = Object.assign({}, { quote: '"', comment: '#', delimiter: ',', noColumnNames: false }, opts) const completeOpts = Object.assign({}, { quote: '"', comment: '#', delimiter: ',', noColumnNames: false }, opts)
return Computation.create<Result<Data.File>>(async ctx => { return Task.create<Result<Data.File>>('Parse CSV', async ctx => {
return await parseInternal(data, ctx, completeOpts); return await parseInternal(data, ctx, completeOpts);
}); });
} }
......
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* Adapted from https://github.com/dsehnal/LiteMol
* @author David Sehnal <david.sehnal@gmail.com>
*/
import Scheduler from './scheduler'
import timeNow from './time'
interface Computation<A> {
(ctx?: Computation.Context): Promise<A>
}
namespace Computation {
export let PRINT_ERRORS_TO_CONSOLE = false;
export function create<A>(computation: (ctx: Context) => Promise<A>) {
return ComputationImpl(computation);
}
export function resolve<A>(a: A) {
return create<A>(_ => Promise.resolve(a));
}
export function reject<A>(reason: any) {
return create<A>(_ => Promise.reject(reason));
}
export interface Params {
updateRateMs?: number,
observer?: ProgressObserver
}
export const Aborted = 'Aborted';
export interface Progress {
message: string,
isIndeterminate: boolean,
current: number,
max: number,
elapsedMs: number,
requestAbort?: () => void
}
export interface ProgressUpdate {
message?: string,
abort?: boolean | (() => void),
current?: number,
max?: number
}
export interface Context {
readonly isSynchronous: boolean,
/** Also checks if the computation was aborted. If so, throws. */
readonly requiresUpdate: boolean,
requestAbort(): void,
subscribe(onProgress: ProgressObserver): { dispose: () => void },
/** Also checks if the computation was aborted. If so, throws. */
update(info: ProgressUpdate): Promise<void> | void
}
export type ProgressObserver = (progress: Readonly<Progress>) => void;
const emptyDisposer = { dispose: () => { } }
/** A context without updates. */
export const synchronous: Context = {
isSynchronous: true,
requiresUpdate: false,
requestAbort() { },
subscribe(onProgress) { return emptyDisposer; },
update(info) { }
}
export function observable(params?: Partial<Params>) {
const ret = new ObservableContext(params && params.updateRateMs);
if (params && params.observer) ret.subscribe(params.observer);
return ret;
}
export const now = timeNow;
/** A utility for splitting large computations into smaller parts. */
export interface Chunker {
setNextChunkSize(size: number): void,
/** nextChunk must return the number of actually processed chunks. */
process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void>
}
export function chunker(ctx: Context, nextChunkSize: number): Chunker {
return new ChunkerImpl(ctx, nextChunkSize);
}
}
const DefaulUpdateRateMs = 150;
function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> {
return (ctx?: Computation.Context) => {
const context: ObservableContext = ctx ? ctx : Computation.synchronous as any;
return new Promise<A>(async (resolve, reject) => {
try {
if (context.started) context.started();
const result = await computation(context);
resolve(result);
} catch (e) {
if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e);
reject(e);
} finally {
if (context.finished) context.finished();
}
});
}
}
class ObservableContext implements Computation.Context {
readonly updateRate: number;
readonly isSynchronous: boolean = false;
private level = 0;
private startedTime = 0;
private abortRequested = false;
private lastUpdated = 0;
private observers: Computation.ProgressObserver[] | undefined = void 0;
private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
private checkAborted() {
if (this.abortRequested) throw Computation.Aborted;
}
private abortRequester = () => { this.abortRequested = true };
subscribe = (obs: Computation.ProgressObserver) => {
if (!this.observers) this.observers = [];
this.observers.push(obs);
return {
dispose: () => {
if (!this.observers) return;
for (let i = 0; i < this.observers.length; i++) {
if (this.observers[i] === obs) {
this.observers[i] = this.observers[this.observers.length - 1];
this.observers.pop();
return;
}
}
}
};
}
requestAbort() {
try {
if (this.abortRequester) {
this.abortRequester.call(null);
}
} catch (e) { }
}
update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void {
this.checkAborted();
const time = Computation.now();
if (typeof abort === 'boolean') {
this.progress.requestAbort = abort ? this.abortRequester : void 0;
} else {
if (abort) this.abortRequester = abort;
this.progress.requestAbort = abort ? this.abortRequester : void 0;
}
if (typeof message !== 'undefined') this.progress.message = message;
this.progress.elapsedMs = time - this.startedTime;
if (isNaN(current!)) {
this.progress.isIndeterminate = true;
} else {
this.progress.isIndeterminate = false;
this.progress.current = current!;
if (!isNaN(max!)) this.progress.max = max!;
}
if (this.observers) {
const p = { ...this.progress };
for (let i = 0, _i = this.observers.length; i < _i; i++) {
Scheduler.immediate(this.observers[i], p);
}
}
this.lastUpdated = time;
return Scheduler.immediatePromise();
}
get requiresUpdate() {
this.checkAborted();
if (this.isSynchronous) return false;
return Computation.now() - this.lastUpdated > this.updateRate;
}
started() {
if (!this.level) {
this.startedTime = Computation.now();
this.lastUpdated = this.startedTime;
}
this.level++;
}
finished() {
this.level--;
if (this.level < 0) {
throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
}
if (!this.level) this.observers = void 0;
}
constructor(updateRate?: number) {
this.updateRate = updateRate || DefaulUpdateRateMs;
}
}
class ChunkerImpl implements Computation.Chunker {
private processedSinceUpdate = 0;
private updater: Computation.Context['update'];
private computeChunkSize(delta: number) {
if (!delta) {
this.processedSinceUpdate = 0;
return this.nextChunkSize;
}
const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs;
const ret = Math.round(this.processedSinceUpdate * rate / delta + 1);
this.processedSinceUpdate = 0;
return ret;
}
private getNextChunkSize() {
const ctx = this.context as ObservableContext;
// be smart if the computation is synchronous and process the whole chunk at once.
if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER;
return this.nextChunkSize;
}
setNextChunkSize(size: number) {
this.nextChunkSize = size;
}
async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) {
if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
this.processedSinceUpdate = 0;
// track time for the actual computation and exclude the "update time"
let chunkStart = Computation.now();
let lastChunkSize: number;
let chunkCount = 0;
let totalSize = 0;
let updateCount = 0;
while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) {
chunkCount++;
this.processedSinceUpdate += lastChunkSize;
totalSize += lastChunkSize;
if (this.context.requiresUpdate) {
let time = Computation.now();
await update(this.updater);
this.nextChunkSize = updateCount > 0
? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1))
: this.computeChunkSize(time - chunkStart)
updateCount++;
chunkStart = Computation.now();
}
}
if (this.context.requiresUpdate) {
let time = Computation.now();
await update(this.updater);
this.nextChunkSize = updateCount > 0
? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1))
: this.computeChunkSize(time - chunkStart)
}
}
constructor(public context: Computation.Context, private nextChunkSize: number) {
this.updater = this.context.update.bind(this.context);
}
}
export default Computation;
\ No newline at end of file
...@@ -6,13 +6,11 @@ ...@@ -6,13 +6,11 @@
*/ */
import BitFlags from './bit-flags' import BitFlags from './bit-flags'
import Computation from './computation'
import Scheduler from './scheduler'
import StringBuilder from './string-builder' import StringBuilder from './string-builder'
import Time from './time'
import UUID from './uuid' import UUID from './uuid'
import Mask from './mask'
export { BitFlags, Computation, Scheduler, StringBuilder, Time, UUID } export { BitFlags, StringBuilder, UUID, Mask }
export function arrayEqual<T>(arr1: T[], arr2: T[]) { export function arrayEqual<T>(arr1: T[], arr2: T[]) {
const length = arr1.length const length = arr1.length
......
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
/**
* setImmediate polyfill adapted from https://github.com/YuzuJS/setImmediate
* Copyright (c) 2012 Barnesandnoble.com, llc, Donavon West, and Domenic Denicola
* MIT license.
*/
function createImmediateActions() {
type Callback = (...args: any[]) => void;
type Task = { callback: Callback, args: any[] }
const tasksByHandle: { [handle: number]: Task } = { };
const doc = typeof document !== 'undefined' ? document : void 0;
let currentlyRunningATask = false;
let nextHandle = 1; // Spec says greater than zero
let registerImmediate: ((handle: number) => void);
function setImmediate(callback: Callback, ...args: any[]) {
// Callback can either be a function or a string
if (typeof callback !== 'function') {
callback = new Function('' + callback) as Callback;
}
// Store and register the task
const task = { callback: callback, args: args };
tasksByHandle[nextHandle] = task;
registerImmediate(nextHandle);
return nextHandle++;
}
function clearImmediate(handle: number) {
delete tasksByHandle[handle];
}
function run(task: Task) {
const callback = task.callback;
const args = task.args;
switch (args.length) {
case 0:
callback();
break;
case 1:
callback(args[0]);
break;
case 2:
callback(args[0], args[1]);
break;
case 3:
callback(args[0], args[1], args[2]);
break;
default:
callback.apply(undefined, args);
break;
}
}
function runIfPresent(handle: number) {
// From the spec: 'Wait until any invocations of this algorithm started before this one have completed.'
// So if we're currently running a task, we'll need to delay this invocation.
if (currentlyRunningATask) {
// Delay by doing a setTimeout. setImmediate was tried instead, but in Firefox 7 it generated a
// 'too much recursion' error.
setTimeout(runIfPresent, 0, handle);
} else {
const task = tasksByHandle[handle];
if (task) {
currentlyRunningATask = true;
try {
run(task);
} finally {
clearImmediate(handle);
currentlyRunningATask = false;
}
}
}
}
function installNextTickImplementation() {
registerImmediate = function(handle) {
process.nextTick(function () { runIfPresent(handle); });
};
}
function canUsePostMessage() {
// The test against `importScripts` prevents this implementation from being installed inside a web worker,
// where `global.postMessage` means something completely different and can't be used for this purpose.
const global = typeof window !== 'undefined' ? window as any : void 0;
if (global && global.postMessage && !global.importScripts) {
let postMessageIsAsynchronous = true;
const oldOnMessage = global.onmessage;
global.onmessage = function() {
postMessageIsAsynchronous = false;
};
global.postMessage('', '*');
global.onmessage = oldOnMessage;
return postMessageIsAsynchronous;
}
}
function installPostMessageImplementation() {
// Installs an event handler on `global` for the `message` event: see
// * https://developer.mozilla.org/en/DOM/window.postMessage
// * http://www.whatwg.org/specs/web-apps/current-work/multipage/comms.html#crossDocumentMessages
const messagePrefix = 'setImmediate$' + Math.random() + '$';
const global = typeof window !== 'undefined' ? window as any : void 0;
const onGlobalMessage = function(event: any) {
if (event.source === global &&
typeof event.data === 'string' &&
event.data.indexOf(messagePrefix) === 0) {
runIfPresent(+event.data.slice(messagePrefix.length));
}
};
if (window.addEventListener) {
window.addEventListener('message', onGlobalMessage, false);
} else {
(window as any).attachEvent('onmessage', onGlobalMessage);
}
registerImmediate = function(handle) {
window.postMessage(messagePrefix + handle, '*');
};
}
function installMessageChannelImplementation() {
const channel = new MessageChannel();
channel.port1.onmessage = function(event) {
const handle = event.data;
runIfPresent(handle);
};
registerImmediate = function(handle) {
channel.port2.postMessage(handle);
};
}
function installReadyStateChangeImplementation() {
const html = doc!.documentElement;
registerImmediate = function(handle) {
// Create a <script> element; its readystatechange event will be fired asynchronously once it is inserted
// into the document. Do so, thus queuing up the task. Remember to clean up once it's been called.
let script = doc!.createElement('script') as any;
script.onreadystatechange = function () {
runIfPresent(handle);
script.onreadystatechange = null;
html.removeChild(script);
script = null;
};
html.appendChild(script);
};
}
function installSetTimeoutImplementation() {
registerImmediate = function(handle) {
setTimeout(runIfPresent, 0, handle);
};
}
// Don't get fooled by e.g. browserify environments.
if (typeof process !== 'undefined' && {}.toString.call(process) === '[object process]') {
// For Node.js before 0.9
installNextTickImplementation();
} else if (canUsePostMessage()) {
// For non-IE10 modern browsers
installPostMessageImplementation();
} else if (typeof MessageChannel !== 'undefined') {
// For web workers, where supported
installMessageChannelImplementation();
} else if (doc && 'onreadystatechange' in doc.createElement('script')) {
// For IE 6–8
installReadyStateChangeImplementation();
} else {
// For older browsers
installSetTimeoutImplementation();
}
return {
setImmediate,
clearImmediate
};
}
const immediateActions = (function () {
if (typeof setImmediate !== 'undefined') {
if (typeof window !== 'undefined') {
return { setImmediate: (handler: any, ...args: any[]) => window.setImmediate(handler, ...args as any), clearImmediate: (handle: any) => window.clearImmediate(handle) };
} else return { setImmediate, clearImmediate }
}
return createImmediateActions();
}());
function resolveImmediate(res: () => void) {
immediateActions.setImmediate(res);
}
export default {
immediate: immediateActions.setImmediate,
clearImmediate: immediateActions.clearImmediate,
immediatePromise() { return new Promise<void>(resolveImmediate); }
};
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
declare var process: any;
declare var window: any;
const now: () => number = (function () {
if (typeof window !== 'undefined' && window.performance) {
const perf = window.performance;
return () => perf.now();
} else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') {
return () => {
let t = process.hrtime();
return t[0] * 1000 + t[1] / 1000000;
};
} else {
return () => +new Date();
}
}());
export default now;
\ No newline at end of file
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
* @author David Sehnal <david.sehnal@gmail.com> * @author David Sehnal <david.sehnal@gmail.com>
*/ */
import now from './time' import { now } from 'mol-task'
interface UUID extends String { '@type': 'uuid' } interface UUID extends String { '@type': 'uuid' }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment