Skip to content
Snippets Groups Projects
Commit 0442d953 authored by David Sehnal's avatar David Sehnal
Browse files

observable tasks

parent f16c699d
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@
* @author David Sehnal <david.sehnal@gmail.com>
*/
import { Task, Run } from 'mol-task'
import { Task, Run, Progress } from 'mol-task'
async function test() {
const t = Task.create('test', async () => 1);
......@@ -12,4 +12,38 @@ async function test() {
console.log(r);
}
test();
\ No newline at end of file
function messageTree(root: Progress.Node, prefix = ''): string {
if (!root.children.length) return `${prefix}${root.progress.message}`;
const newPrefix = prefix + ' |_ ';
const subTree = root.children.map(c => messageTree(c, newPrefix));
return `${prefix}${root.progress.message}\n${subTree.join('\n')}`;
}
function createTask<T>(delay: number, r: T): Task<T> {
return Task.create('delayed', async ctx => {
await new Promise(r => setTimeout(r, delay));
if (ctx.requiresUpdate) await ctx.update({ message: 'hello from delayed...' });
return r;
});
}
async function testObs() {
const t = Task.create('test o', async ctx => {
await new Promise(r => setTimeout(r, 250));
if (ctx.requiresUpdate) await ctx.update({ message: 'hi! 1' });
await new Promise(r => setTimeout(r, 125));
if (ctx.requiresUpdate) await ctx.update({ message: 'hi! 2' });
await new Promise(r => setTimeout(r, 250));
if (ctx.requiresUpdate) await ctx.update({ message: 'hi! 3' });
const r = await ctx.runChild({ message: 'Running child!' }, createTask(250, 100));
if (ctx.requiresUpdate) await ctx.update({ message: 'Almost done...' });
return r + 1;
});
const r = await Run(t, p => console.log(messageTree(p.root)), 250);
console.log(r);
}
test();
testObs();
\ No newline at end of file
......@@ -8,6 +8,7 @@ import Task from '../task'
import RuntimeContext from './runtime-context'
import Progress from './progress'
import now from '../util/now'
import ImmediateScheduler from '../scheduler/immediate'
function defaultProgress(rootTaskId: number, task: Task<any>): Task.Progress {
return {
......@@ -23,75 +24,161 @@ function defaultProgress(rootTaskId: number, task: Task<any>): Task.Progress {
};
}
class ProgressInfo {
interface ProgressInfo {
updateRateMs: number,
lastUpdated: number,
observer: Progress.Observer,
abortToken: { abortRequested: boolean, reason: string },
taskId: number;
elapsedMs: { real: number, cpu: number };
tree: Progress.Node;
tryAbort?: (reason?: string) => void;
root: Progress.Node;
tryAbort: (reason?: string) => void;
}
snapshot(): Progress {
return 0 as any;
}
function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs: number): ProgressInfo {
const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, reason: '' };
return {
updateRateMs,
lastUpdated: now(),
observer,
abortToken,
taskId: task.id,
root: { progress: defaultProgress(task.id, task), children: [] },
tryAbort: abortFn(abortToken)
};
}
class ObservableExecutor {
progressInfo: ProgressInfo;
function abortFn(token: ProgressInfo['abortToken']) {
return (reason?: string) => {
token.abortRequested = true;
token.reason = reason || token.reason;
};
}
async run<T>(task: Task<T>): Promise<T> {
const ctx = new ObservableRuntimeContext(task.id, task, 0);
if (!task.__onAbort) return task.__f(ctx);
function cloneTree(root: Progress.Node): Progress.Node {
return { progress: { ...root.progress, elapsedMs: { ...root.progress.elapsedMs } }, children: root.children.map(cloneTree) };
}
function canAbort(root: Progress.Node): boolean {
return root.progress.canAbort && root.children.every(canAbort);
}
function snapshotProgress(info: ProgressInfo): Progress {
return { root: cloneTree(info.root), canAbort: canAbort(info.root), tryAbort: info.tryAbort };
}
try {
return await task.__f(ctx);
} catch (e) {
if (Task.isAborted(e)) task.__onAbort();
throw e;
}
}
constructor(observer: Progress.Observer, updateRateMs: number) {
async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
ctx.started = now();
if (!task.__onAbort) return task.__f(ctx);
try {
const ret = await task.__f(ctx);
// if (ctx.info.abortToken.abortRequested) {
// task.__onAbort();
// }
return ret;
} catch (e) {
if (Task.isAborted(e)) task.__onAbort();
if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e);
throw e;
}
}
async function run<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
ctx.started = now();
if (!task.__onAbort) return task.__f(ctx);
try {
const ret = await task.__f(ctx);
// if (ctx.info.abortToken.abortRequested) {
// task.__onAbort();
// }
return ret;
} catch (e) {
if (Task.isAborted(e)) task.__onAbort();
if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e);
throw e;
}
}
class ObservableRuntimeContext implements RuntimeContext {
isExecuting = true;
elapsedCpuMs: number;
lastScheduledTime: number;
started: number;
taskId: number;
taskName: string;
progress: Task.Progress;
updateRateMs: number;
started: number = 0;
node: Progress.Node;
info: ProgressInfo;
private checkAborted() {
if (this.info.abortToken.abortRequested) {
throw Task.Aborted(this.info.abortToken.reason);
}
}
get requiresUpdate(): boolean {
return now() - this.started > this.updateRateMs;
this.checkAborted();
return now() - this.info.lastUpdated > this.info.updateRateMs;
}
private setProgress(update: Partial<RuntimeContext.ProgressUpdate>) {
this.checkAborted();
const progress = this.node.progress;
if (typeof update.canAbort !== 'undefined') progress.canAbort = update.canAbort;
if (typeof update.current !== 'undefined') progress.current = update.current;
if (typeof update.isIndeterminate !== 'undefined') progress.isIndeterminate = update.isIndeterminate;
if (typeof update.max !== 'undefined') progress.max = update.max;
if (typeof update.message !== 'undefined') progress.message = update.message;
}
private resume = () => {
this.isExecuting = true;
this.lastScheduledTime = now();
}
update(progress: Partial<RuntimeContext.ProgressUpdate>): Promise<void> {
return 0 as any;
this.isExecuting = false;
this.setProgress(progress);
this.info.lastUpdated = now();
const snapshot = snapshotProgress(this.info);
this.info.observer(snapshot);
return ImmediateScheduler.last(this.resume);
}
runChild<T>(progress: Partial<RuntimeContext.ProgressUpdate>, task: Task<T>): Promise<T> {
return 0 as any;
async runChild<T>(progress: Partial<RuntimeContext.ProgressUpdate>, task: Task<T>): Promise<T> {
this.setProgress(progress);
const node: Progress.Node = { progress: defaultProgress(this.info.taskId, task), children: [] };
const children = this.node.children as Progress.Node[];
children.push(node);
const ctx = new ObservableRuntimeContext(task, this.info, node);
try {
return await run(task, ctx);
} finally {
// remove the progress node after the computation has finished.
const idx = children.indexOf(node);
if (idx >= 0) {
children[idx] = children[children.length - 1];
children.pop();
}
}
}
constructor(parentId: number, task: Task<any>, updateRateMs: number) {
this.started = now();
constructor(task: Task<any>, info: ProgressInfo, node: Progress.Node) {
this.lastScheduledTime = this.started;
this.taskId = task.id;
this.taskName = task.name;
this.progress = defaultProgress(parentId, task);
this.updateRateMs = updateRateMs;
this.node = node;
this.info = info;
}
}
function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) {
return new ObservableExecutor(observer, updateRateMs).run(task);
const info = ProgressInfo(task, observer, updateRateMs);
const ctx = new ObservableRuntimeContext(task, info, info.root);
return runRoot(task, ctx);
}
namespace ExecuteObservable {
export let PRINT_ERRORS_TO_CONSOLE = false;
export let PRINT_ERRORS_TO_STD_ERR = false;
}
export default ExecuteObservable
\ No newline at end of file
......@@ -7,10 +7,9 @@
import Task from '../task'
interface Progress {
taskId: number,
elapsedMs: { real: number, cpu: number },
tree: Progress.Node,
tryAbort?: (reason?: string) => void
root: Progress.Node,
canAbort: boolean,
tryAbort: (reason?: string) => void
}
namespace Progress {
......
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
namespace ImmediateScheduler {
// Adds the function to the start of the "immediate queue"
export async function first<T>(f: () => T): Promise<T> {
return f();
}
// Adds the function to the end of the "immediate queue"
export async function last<T>(f: () => T): Promise<T> {
return f();
}
}
export default ImmediateScheduler
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment