Skip to content
Snippets Groups Projects
Commit 28c575bb authored by David Sehnal's avatar David Sehnal
Browse files

Chunked task tweaks

parent a8070e41
No related branches found
No related tags found
No related merge requests found
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
* @author David Sehnal <david.sehnal@gmail.com> * @author David Sehnal <david.sehnal@gmail.com>
*/ */
import { Task, Run, Progress, Scheduler, now, MultistepTask, ChunkedSubtask } from 'mol-task' import { Task, Run, Progress, Scheduler, now, MultistepTask, chunkedSubtask } from 'mol-task'
export async function test1() { export async function test1() {
const t = Task.create('test', async () => 1); const t = Task.create('test', async () => 1);
...@@ -65,7 +65,9 @@ export function testTree() { ...@@ -65,7 +65,9 @@ export function testTree() {
}); });
} }
export const chunk = ChunkedSubtask(25, (n, state: { i: number, current: number, total: number }) => { export type ChunkedState = { i: number, current: number, total: number }
function processChunk(n: number, state: ChunkedState): number {
const toProcess = Math.min(state.current + n, state.total); const toProcess = Math.min(state.current + n, state.total);
const start = state.current; const start = state.current;
for (let i = start; i < toProcess; i++) { for (let i = start; i < toProcess; i++) {
...@@ -76,20 +78,20 @@ export const chunk = ChunkedSubtask(25, (n, state: { i: number, current: number, ...@@ -76,20 +78,20 @@ export const chunk = ChunkedSubtask(25, (n, state: { i: number, current: number,
} }
state.current = toProcess; state.current = toProcess;
return toProcess - start; return toProcess - start;
}, (ctx, s, p) => ctx.update('chunk test ' + p)); }
export const ms = MultistepTask('ms-task', ['step 1', 'step 2', 'step 3'], async (p: { i: number }, step, ctx) => { export const ms = MultistepTask('ms-task', ['step 1', 'step 2', 'step 3'], async (p: { i: number }, step, ctx) => {
await step(0); await step(0);
const child = Task.create('chunked', async ctx => { const child = Task.create('chunked', async ctx => {
const s = await chunk(ctx, { i: 0, current: 0, total: 125 }); const s = await chunkedSubtask(ctx, 25, { i: 0, current: 0, total: 125 }, processChunk, (ctx, s, p) => ctx.update('chunk test ' + p))
return s.i; return s.i;
}); });
await ctx.runChild(child); await ctx.runChild(child);
await Scheduler.delay(250); await Scheduler.delay(250);
await step(1); await step(1);
await chunk(ctx, { i: 0, current: 0, total: 80 }); await chunkedSubtask(ctx, 25, { i: 0, current: 0, total: 80 }, processChunk, (ctx, s, p) => ctx.update('chunk test ' + p))
await Scheduler.delay(250); await Scheduler.delay(250);
await step(2); await step(2);
await Scheduler.delay(250); await Scheduler.delay(250);
......
...@@ -26,7 +26,7 @@ import * as Data from '../data-model' ...@@ -26,7 +26,7 @@ import * as Data from '../data-model'
import Field from './field' import Field from './field'
import { Tokens, TokenBuilder } from '../../common/text/tokenizer' import { Tokens, TokenBuilder } from '../../common/text/tokenizer'
import Result from '../../result' import Result from '../../result'
import { Task, RuntimeContext, ChunkedSubtask } from 'mol-task' import { Task, RuntimeContext, chunkedSubtask } from 'mol-task'
/** /**
* Types of supported mmCIF tokens. * Types of supported mmCIF tokens.
...@@ -468,7 +468,7 @@ interface LoopReadState { ...@@ -468,7 +468,7 @@ interface LoopReadState {
tokenCount: number tokenCount: number
} }
function readLoopChunk(state: LoopReadState, chunkSize: number) { function readLoopChunk(chunkSize: number, state: LoopReadState) {
const { tokenizer, tokens, fieldCount } = state; const { tokenizer, tokens, fieldCount } = state;
let tokenCount = state.tokenCount; let tokenCount = state.tokenCount;
let counter = 0; let counter = 0;
...@@ -481,9 +481,13 @@ function readLoopChunk(state: LoopReadState, chunkSize: number) { ...@@ -481,9 +481,13 @@ function readLoopChunk(state: LoopReadState, chunkSize: number) {
return counter; return counter;
} }
const readLoopChunks = ChunkedSubtask(1000000, function updateLoopChunk(ctx: RuntimeContext, state: LoopReadState) {
(size, state: LoopReadState) => readLoopChunk(state, size), return ctx.update({ message: 'Parsing...', current: state.tokenizer.position, max: state.tokenizer.data.length });
(ctx, state) => ctx.update({ message: 'Parsing...', current: state.tokenizer.position, max: state.tokenizer.data.length })); }
// const readLoopChunks = ChunkedSubtask(1000000,
// (size, state: LoopReadState) => readLoopChunk(state, size),
// (ctx, state) => ctx.update({ message: 'Parsing...', current: state.tokenizer.position, max: state.tokenizer.data.length }));
/** /**
* Reads a loop. * Reads a loop.
...@@ -512,7 +516,7 @@ async function handleLoop(tokenizer: TokenizerState, ctx: FrameContext): Promise ...@@ -512,7 +516,7 @@ async function handleLoop(tokenizer: TokenizerState, ctx: FrameContext): Promise
tokens tokens
}; };
await readLoopChunks(tokenizer.runtimeCtx, state); await chunkedSubtask(tokenizer.runtimeCtx, 1000000, state, readLoopChunk, updateLoopChunk);
if (state.tokenCount % fieldCount !== 0) { if (state.tokenCount % fieldCount !== 0) {
return { return {
......
...@@ -12,7 +12,7 @@ import { Progress } from './execution/progress' ...@@ -12,7 +12,7 @@ import { Progress } from './execution/progress'
import { now } from './util/now' import { now } from './util/now'
import { Scheduler } from './util/scheduler' import { Scheduler } from './util/scheduler'
import { MultistepTask } from './util/multistep' import { MultistepTask } from './util/multistep'
import { ChunkedSubtask } from './util/chunked' import { chunkedSubtask } from './util/chunked'
// Run the task without the ability to observe its progress. // Run the task without the ability to observe its progress.
function Run<T>(task: Task<T>): Promise<T>; function Run<T>(task: Task<T>): Promise<T>;
...@@ -23,4 +23,4 @@ function Run<T>(task: Task<T>, observer?: Progress.Observer, updateRateMs?: numb ...@@ -23,4 +23,4 @@ function Run<T>(task: Task<T>, observer?: Progress.Observer, updateRateMs?: numb
return ExecuteSynchronous(task); return ExecuteSynchronous(task);
} }
export { Task, RuntimeContext, Progress, Run, now, Scheduler, MultistepTask, ChunkedSubtask } export { Task, RuntimeContext, Progress, Run, now, Scheduler, MultistepTask, chunkedSubtask }
\ No newline at end of file \ No newline at end of file
...@@ -8,42 +8,39 @@ import { now } from './now' ...@@ -8,42 +8,39 @@ import { now } from './now'
import { RuntimeContext } from '../execution/runtime-context' import { RuntimeContext } from '../execution/runtime-context'
type UniformlyChunkedFn<S> = (chunkSize: number, state: S) => number type UniformlyChunkedFn<S> = (chunkSize: number, state: S) => number
type ChunkedSubtaskProvider<S> = (ctx: RuntimeContext, state: S) => Promise<S>
function ChunkedSubtask<S>(initialChunk: number, f: UniformlyChunkedFn<S>, async function chunkedSubtask<S>(ctx: RuntimeContext, initialChunk: number, state: S,
update: (ctx: RuntimeContext, state: S, processed: number) => Promise<void> | void): ChunkedSubtaskProvider<S> { f: UniformlyChunkedFn<S>, update: (ctx: RuntimeContext, state: S, processed: number) => Promise<void> | void): Promise<S> {
return async (ctx: RuntimeContext, state: S) => { let chunkSize = Math.max(initialChunk, 0);
let chunkSize = Math.max(initialChunk, 0); let globalProcessed = 0, globalTime = 0;
let globalProcessed = 0, globalTime = 0;
if (ctx.isSynchronous) { if (ctx.isSynchronous) {
f(Number.MAX_SAFE_INTEGER, state); f(Number.MAX_SAFE_INTEGER, state);
return state; return state;
} }
let start = now();
let lastSize = 0, currentTime = 0;
while ((lastSize = f(chunkSize, state)) > 0) { let start = now();
globalProcessed += lastSize; let lastSize = 0, currentTime = 0;
const delta = now() - start; while ((lastSize = f(chunkSize, state)) > 0) {
currentTime += delta; globalProcessed += lastSize;
globalTime += delta;
if (ctx.shouldUpdate) { const delta = now() - start;
await update(ctx, state, globalProcessed); currentTime += delta;
globalTime += delta;
chunkSize = Math.round(currentTime * globalProcessed / globalTime) + 1;
start = now();
currentTime = 0;
}
}
if (ctx.shouldUpdate) { if (ctx.shouldUpdate) {
await update(ctx, state, globalProcessed); await update(ctx, state, globalProcessed);
chunkSize = Math.round(currentTime * globalProcessed / globalTime) + 1;
start = now();
currentTime = 0;
} }
return state;
} }
if (ctx.shouldUpdate) {
await update(ctx, state, globalProcessed);
}
return state;
} }
export { ChunkedSubtask } export { chunkedSubtask }
\ No newline at end of file \ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment