diff --git a/src/examples/computation.ts b/src/examples/computation.ts index 467d9b62430ae3607965517fafbfdb985c9010b2..9ba132953ca2da43eb7c6a5230c112c55d7bc629 100644 --- a/src/examples/computation.ts +++ b/src/examples/computation.ts @@ -4,7 +4,7 @@ * @author David Sehnal <david.sehnal@gmail.com> */ -import { Task, Run, Progress, Scheduler, now, MultistepTask } from 'mol-task' +import { Task, Run, Progress, Scheduler, now, MultistepTask, ChunkedSubtask } from 'mol-task' export async function test1() { const t = Task.create('test', async () => 1); @@ -66,17 +66,38 @@ export function testTree() { }); } -const ms = MultistepTask('ms-task', ['step 1', 'step 2', 'step 3'], async (p: { i: number }, step) => { - await Scheduler.delay(250); +export const chunk = ChunkedSubtask(25, (n, state: { i: number, current: number, total: number }) => { + const toProcess = Math.min(state.current + n, state.total); + const start = state.current; + for (let i = start; i < toProcess; i++) { + for (let j = 0; j < 1000000; j++) { + state.i += (i * j + 1 + state.i) % 1023; + state.i = state.i % 1000; + } + } + state.current = toProcess; + return toProcess - start; +}, (ctx, s, p) => ctx.update('chunk test ' + p)); + +export const ms = MultistepTask('ms-task', ['step 1', 'step 2', 'step 3'], async (p: { i: number }, step, ctx) => { await step(0); + + const child = Task.create('chunked', async ctx => { + const s = await chunk(ctx, { i: 0, current: 0, total: 125 }); + return s.i; + }); + + await ctx.runChild(child); await Scheduler.delay(250); await step(1); + await chunk(ctx, { i: 0, current: 0, total: 80 }); await Scheduler.delay(250); await step(2); await Scheduler.delay(250); return p.i + 3; }) + export function abortingObserver(p: Progress) { console.log(messageTree(p.root)); if (now() - p.root.progress.startedTime > 1000) { diff --git a/src/mol-task/execution/observable.ts b/src/mol-task/execution/observable.ts index 196a434c98e51c15ec6e10057b02e55a387020aa..60d6c4e291d4e3c6fd5d37de1312604fb6652c1b 100644 --- a/src/mol-task/execution/observable.ts +++ b/src/mol-task/execution/observable.ts @@ -127,6 +127,8 @@ function notifyObserver(info: ProgressInfo, time: number) { } class ObservableRuntimeContext implements RuntimeContext { + isSynchronous = false; + isExecuting = true; lastUpdatedTime = 0; diff --git a/src/mol-task/execution/runtime-context.ts b/src/mol-task/execution/runtime-context.ts index e8812c55f733fbbd70779ec98bc5664f656024f8..d2529a3eb3599d9ece65c7183b7a36cdc0117b33 100644 --- a/src/mol-task/execution/runtime-context.ts +++ b/src/mol-task/execution/runtime-context.ts @@ -8,6 +8,7 @@ import { Task } from '../task' interface RuntimeContext { readonly shouldUpdate: boolean, + readonly isSynchronous: boolean, // Idiomatic usage: // if (ctx.needsYield) await ctx.yield({ ... }); diff --git a/src/mol-task/execution/synchronous.ts b/src/mol-task/execution/synchronous.ts index 74b1180b51a7274afa5887aef5d9e3e6eeba243a..4ca40a04a5ccc91024b08ca3bcf6093ac8643e34 100644 --- a/src/mol-task/execution/synchronous.ts +++ b/src/mol-task/execution/synchronous.ts @@ -8,7 +8,9 @@ import { Task } from '../task' import { RuntimeContext } from './runtime-context' class SynchronousRuntimeContext implements RuntimeContext { - shouldUpdate: boolean = false; + shouldUpdate = false; + isSynchronous = true; + update(progress: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void { } runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> { return ExecuteSynchronous(task); } } diff --git a/src/mol-task/index.ts b/src/mol-task/index.ts index b46d873c2110ddc5d75902e1729f5ea87d2fa3da..9e4c2e6474a63b537301b30e1a7c5f3c7db1bbb6 100644 --- a/src/mol-task/index.ts +++ b/src/mol-task/index.ts @@ -12,6 +12,7 @@ import { Progress } from './execution/progress' import { now } from './util/now' import { Scheduler } from './util/scheduler' import { MultistepTask } from './util/multistep' +import { ChunkedSubtask } from './util/chunked' // Run the task without the ability to observe its progress. function Run<T>(task: Task<T>): Promise<T>; @@ -22,4 +23,4 @@ function Run<T>(task: Task<T>, observer?: Progress.Observer, updateRateMs?: numb return ExecuteSynchronous(task); } -export { Task, RuntimeContext, Progress, Run, now, Scheduler, MultistepTask } \ No newline at end of file +export { Task, RuntimeContext, Progress, Run, now, Scheduler, MultistepTask, ChunkedSubtask } \ No newline at end of file diff --git a/src/mol-task/task.ts b/src/mol-task/task.ts index 5daab3d3437b4f0dab84e758285de109864f91f9..0ce5ca40bf65ef59de655b3ecdf39d4a2b12abec 100644 --- a/src/mol-task/task.ts +++ b/src/mol-task/task.ts @@ -40,8 +40,6 @@ namespace Task { max: number } - export type Provider<P, T> = (params: P) => Task<T> - let _id = 0; function nextId() { const ret = _id; diff --git a/src/mol-task/util.ts b/src/mol-task/util.ts deleted file mode 100644 index a3ce20ff33975a7f205ce180c2f377ab65e4715b..0000000000000000000000000000000000000000 --- a/src/mol-task/util.ts +++ /dev/null @@ -1,96 +0,0 @@ - -// enum TaskState { -// Pending, -// Running, -// Aborted, -// Completed, -// Failed -// } - -interface TaskState { - -} - -namespace TaskState { - export interface Pending { kind: 'Pending' } - export interface Running { kind: 'Running', } - - export interface Progress { - message: string, - isIndeterminate: boolean, - current: number, - max: number, - elapsedMs: number - } -} - -type ExecutionContext = { - run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, - subscribe(o: (p: string, compId: number) => void): void, - - requestAbort(compId: number): void -} - -namespace ExecutionContext { - // export interface Synchronous extends ExecutionContext { - // run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, - // } - - // export interface Observable extends ExecutionContext { - // run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, - // } - export const Sync: ExecutionContext = 0 as any; -} - -interface RuntimeContext { - run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, - yield(name: string): Promise<void> | void -} - -// if no context is specified, use the synchronous one. -interface Computation<T> { (ctx: RuntimeContext): Promise<T>, _id: number } - -function create<T>(c: (ctx: RuntimeContext) => Promise<T>): Computation<T> { return 0 as any; } -function constant<T>(c: T) { return create(async ctx => c); } - -type MultistepFn<P, T> = (params: P, step: (s: number) => Promise<void> | void, ctx: RuntimeContext) => Promise<T> -type ComputationProvider<P, T> = (params: P) => Computation<T> -function MultistepComputation<P, T>(name: string, steps: string[], f: MultistepFn<P, T>): ComputationProvider<P, T> { - return params => create(async ctx => f(params, n => ctx.yield(steps[n]), ctx)); -} - -// if total count is specified, could automatically provide percentage -type UniformlyChunkedFn<S> = (chunkSize: number, state: S, totalCount?: number) => number -type UniformlyChunkedProvider<S> = (ctx: RuntimeContext, state: S) => Promise<S> -function UniformlyChunked<S>(label: string, initialChunk: number, f: UniformlyChunkedFn<S>): UniformlyChunkedProvider<S> { - // TODO: track average time required for single element and then determine chunk size based on that. - return 0 as any; -} - -type LineReaderState = { str: string, position: number, lines: string[] } -const uniformPart = UniformlyChunked('Reading lines', 1000000, (size, state: LineReaderState) => { - state.position += size; - state.lines.push(''); - return 0 /* number of lines read */; -}); - -function readLines(str: string): Computation<string[]> { - return create(async ctx => { - const state = (await uniformPart(ctx, { str, position: 0, lines: [] })); - return state.lines; - }); -} - -const prependHiToLines = MultistepComputation('Hi prepend', ['Parse input', 'Prepend Hi'], async (p: string, step, ctx) => { - await step(0); - const lines = await readLines(p)(ctx); - await step(1); - const ret = lines.map(l => 'Hi ' + l); - return ret; -}); - - -(async function() { - const r = await ExecutionContext.Sync.run(prependHiToLines('1\n2'), { updateRateMs: 150 }); - console.log(r) -}()) diff --git a/src/mol-task/util/chunked.ts b/src/mol-task/util/chunked.ts index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..8056f184bb57f82733a6c2b2158223fd166abf6e 100644 --- a/src/mol-task/util/chunked.ts +++ b/src/mol-task/util/chunked.ts @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2018 mol* contributors, licensed under MIT, See LICENSE file for more info. + * + * @author David Sehnal <david.sehnal@gmail.com> + */ + +import { now } from './now' +import { RuntimeContext } from '../execution/runtime-context' + +type UniformlyChunkedFn<S> = (chunkSize: number, state: S) => number +type ChunkedSubtaskProvider<S> = (ctx: RuntimeContext, state: S) => Promise<S> + +function ChunkedSubtask<S>(initialChunk: number, f: UniformlyChunkedFn<S>, + update: (ctx: RuntimeContext, state: S, processed: number) => Promise<void> | void): ChunkedSubtaskProvider<S> { + return async (ctx: RuntimeContext, state: S) => { + let chunkSize = Math.max(initialChunk, 0); + let globalProcessed = 0, globalTime = 0; + + if (ctx.isSynchronous) { + f(Number.MAX_SAFE_INTEGER, state); + return state; + } + + let start = now(); + let lastSize = 0, currentTime = 0; + + while ((lastSize = f(chunkSize, state)) > 0) { + globalProcessed += lastSize; + + const delta = now() - start; + currentTime += delta; + globalTime += delta; + + if (ctx.shouldUpdate) { + await update(ctx, state, globalProcessed); + + chunkSize = Math.round(currentTime * globalProcessed / globalTime) + 1; + start = now(); + currentTime = 0; + } + } + if (ctx.shouldUpdate) { + await update(ctx, state, globalProcessed); + } + return state; + } +} + +export { ChunkedSubtask } \ No newline at end of file diff --git a/src/mol-task/util/multistep.ts b/src/mol-task/util/multistep.ts index d56ce2e9009990b9a2884af7aa62bec7f96ca722..dba09e8485627d5e3c08bbf703ca6af7994df750 100644 --- a/src/mol-task/util/multistep.ts +++ b/src/mol-task/util/multistep.ts @@ -10,8 +10,8 @@ import { RuntimeContext } from '../execution/runtime-context' export type MultistepFn<P, T> = (params: P, step: (s: number) => Promise<void> | void, ctx: RuntimeContext) => Promise<T> -function MultistepTask<P, T>(name: string, steps: string[], f: MultistepFn<P, T>, onAbort?: () => void): Task.Provider<P, T> { - return params => Task.create(name, async ctx => f(params, n => ctx.update({ message: `${steps[n]}`, current: n + 1, max: steps.length }), ctx), onAbort); +function MultistepTask<P, T>(name: string, steps: string[], f: MultistepFn<P, T>, onAbort?: () => void) { + return (params: P) => Task.create(name, async ctx => f(params, n => ctx.update({ message: `${steps[n]}`, current: n + 1, max: steps.length }), ctx), onAbort); } export { MultistepTask } \ No newline at end of file