Skip to content
Snippets Groups Projects
Commit 0cffd325 authored by David Sehnal's avatar David Sehnal
Browse files

Chunked task

parent 7f3b2316
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@
* @author David Sehnal <david.sehnal@gmail.com>
*/
import { Task, Run, Progress, Scheduler, now, MultistepTask } from 'mol-task'
import { Task, Run, Progress, Scheduler, now, MultistepTask, ChunkedSubtask } from 'mol-task'
export async function test1() {
const t = Task.create('test', async () => 1);
......@@ -66,17 +66,38 @@ export function testTree() {
});
}
const ms = MultistepTask('ms-task', ['step 1', 'step 2', 'step 3'], async (p: { i: number }, step) => {
await Scheduler.delay(250);
export const chunk = ChunkedSubtask(25, (n, state: { i: number, current: number, total: number }) => {
const toProcess = Math.min(state.current + n, state.total);
const start = state.current;
for (let i = start; i < toProcess; i++) {
for (let j = 0; j < 1000000; j++) {
state.i += (i * j + 1 + state.i) % 1023;
state.i = state.i % 1000;
}
}
state.current = toProcess;
return toProcess - start;
}, (ctx, s, p) => ctx.update('chunk test ' + p));
export const ms = MultistepTask('ms-task', ['step 1', 'step 2', 'step 3'], async (p: { i: number }, step, ctx) => {
await step(0);
const child = Task.create('chunked', async ctx => {
const s = await chunk(ctx, { i: 0, current: 0, total: 125 });
return s.i;
});
await ctx.runChild(child);
await Scheduler.delay(250);
await step(1);
await chunk(ctx, { i: 0, current: 0, total: 80 });
await Scheduler.delay(250);
await step(2);
await Scheduler.delay(250);
return p.i + 3;
})
export function abortingObserver(p: Progress) {
console.log(messageTree(p.root));
if (now() - p.root.progress.startedTime > 1000) {
......
......@@ -127,6 +127,8 @@ function notifyObserver(info: ProgressInfo, time: number) {
}
class ObservableRuntimeContext implements RuntimeContext {
isSynchronous = false;
isExecuting = true;
lastUpdatedTime = 0;
......
......@@ -8,6 +8,7 @@ import { Task } from '../task'
interface RuntimeContext {
readonly shouldUpdate: boolean,
readonly isSynchronous: boolean,
// Idiomatic usage:
// if (ctx.needsYield) await ctx.yield({ ... });
......
......@@ -8,7 +8,9 @@ import { Task } from '../task'
import { RuntimeContext } from './runtime-context'
class SynchronousRuntimeContext implements RuntimeContext {
shouldUpdate: boolean = false;
shouldUpdate = false;
isSynchronous = true;
update(progress: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void { }
runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> { return ExecuteSynchronous(task); }
}
......
......@@ -12,6 +12,7 @@ import { Progress } from './execution/progress'
import { now } from './util/now'
import { Scheduler } from './util/scheduler'
import { MultistepTask } from './util/multistep'
import { ChunkedSubtask } from './util/chunked'
// Run the task without the ability to observe its progress.
function Run<T>(task: Task<T>): Promise<T>;
......@@ -22,4 +23,4 @@ function Run<T>(task: Task<T>, observer?: Progress.Observer, updateRateMs?: numb
return ExecuteSynchronous(task);
}
export { Task, RuntimeContext, Progress, Run, now, Scheduler, MultistepTask }
\ No newline at end of file
export { Task, RuntimeContext, Progress, Run, now, Scheduler, MultistepTask, ChunkedSubtask }
\ No newline at end of file
......@@ -40,8 +40,6 @@ namespace Task {
max: number
}
export type Provider<P, T> = (params: P) => Task<T>
let _id = 0;
function nextId() {
const ret = _id;
......
// enum TaskState {
// Pending,
// Running,
// Aborted,
// Completed,
// Failed
// }
interface TaskState {
}
namespace TaskState {
export interface Pending { kind: 'Pending' }
export interface Running { kind: 'Running', }
export interface Progress {
message: string,
isIndeterminate: boolean,
current: number,
max: number,
elapsedMs: number
}
}
type ExecutionContext = {
run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>,
subscribe(o: (p: string, compId: number) => void): void,
requestAbort(compId: number): void
}
namespace ExecutionContext {
// export interface Synchronous extends ExecutionContext {
// run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>,
// }
// export interface Observable extends ExecutionContext {
// run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>,
// }
export const Sync: ExecutionContext = 0 as any;
}
interface RuntimeContext {
run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>,
yield(name: string): Promise<void> | void
}
// if no context is specified, use the synchronous one.
interface Computation<T> { (ctx: RuntimeContext): Promise<T>, _id: number }
function create<T>(c: (ctx: RuntimeContext) => Promise<T>): Computation<T> { return 0 as any; }
function constant<T>(c: T) { return create(async ctx => c); }
type MultistepFn<P, T> = (params: P, step: (s: number) => Promise<void> | void, ctx: RuntimeContext) => Promise<T>
type ComputationProvider<P, T> = (params: P) => Computation<T>
function MultistepComputation<P, T>(name: string, steps: string[], f: MultistepFn<P, T>): ComputationProvider<P, T> {
return params => create(async ctx => f(params, n => ctx.yield(steps[n]), ctx));
}
// if total count is specified, could automatically provide percentage
type UniformlyChunkedFn<S> = (chunkSize: number, state: S, totalCount?: number) => number
type UniformlyChunkedProvider<S> = (ctx: RuntimeContext, state: S) => Promise<S>
function UniformlyChunked<S>(label: string, initialChunk: number, f: UniformlyChunkedFn<S>): UniformlyChunkedProvider<S> {
// TODO: track average time required for single element and then determine chunk size based on that.
return 0 as any;
}
type LineReaderState = { str: string, position: number, lines: string[] }
const uniformPart = UniformlyChunked('Reading lines', 1000000, (size, state: LineReaderState) => {
state.position += size;
state.lines.push('');
return 0 /* number of lines read */;
});
function readLines(str: string): Computation<string[]> {
return create(async ctx => {
const state = (await uniformPart(ctx, { str, position: 0, lines: [] }));
return state.lines;
});
}
const prependHiToLines = MultistepComputation('Hi prepend', ['Parse input', 'Prepend Hi'], async (p: string, step, ctx) => {
await step(0);
const lines = await readLines(p)(ctx);
await step(1);
const ret = lines.map(l => 'Hi ' + l);
return ret;
});
(async function() {
const r = await ExecutionContext.Sync.run(prependHiToLines('1\n2'), { updateRateMs: 150 });
console.log(r)
}())
/**
* Copyright (c) 2018 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
import { now } from './now'
import { RuntimeContext } from '../execution/runtime-context'
type UniformlyChunkedFn<S> = (chunkSize: number, state: S) => number
type ChunkedSubtaskProvider<S> = (ctx: RuntimeContext, state: S) => Promise<S>
function ChunkedSubtask<S>(initialChunk: number, f: UniformlyChunkedFn<S>,
update: (ctx: RuntimeContext, state: S, processed: number) => Promise<void> | void): ChunkedSubtaskProvider<S> {
return async (ctx: RuntimeContext, state: S) => {
let chunkSize = Math.max(initialChunk, 0);
let globalProcessed = 0, globalTime = 0;
if (ctx.isSynchronous) {
f(Number.MAX_SAFE_INTEGER, state);
return state;
}
let start = now();
let lastSize = 0, currentTime = 0;
while ((lastSize = f(chunkSize, state)) > 0) {
globalProcessed += lastSize;
const delta = now() - start;
currentTime += delta;
globalTime += delta;
if (ctx.shouldUpdate) {
await update(ctx, state, globalProcessed);
chunkSize = Math.round(currentTime * globalProcessed / globalTime) + 1;
start = now();
currentTime = 0;
}
}
if (ctx.shouldUpdate) {
await update(ctx, state, globalProcessed);
}
return state;
}
}
export { ChunkedSubtask }
\ No newline at end of file
......@@ -10,8 +10,8 @@ import { RuntimeContext } from '../execution/runtime-context'
export type MultistepFn<P, T> =
(params: P, step: (s: number) => Promise<void> | void, ctx: RuntimeContext) => Promise<T>
function MultistepTask<P, T>(name: string, steps: string[], f: MultistepFn<P, T>, onAbort?: () => void): Task.Provider<P, T> {
return params => Task.create(name, async ctx => f(params, n => ctx.update({ message: `${steps[n]}`, current: n + 1, max: steps.length }), ctx), onAbort);
function MultistepTask<P, T>(name: string, steps: string[], f: MultistepFn<P, T>, onAbort?: () => void) {
return (params: P) => Task.create(name, async ctx => f(params, n => ctx.update({ message: `${steps[n]}`, current: n + 1, max: steps.length }), ctx), onAbort);
}
export { MultistepTask }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment