Skip to content
Snippets Groups Projects
Commit 3d29b7e4 authored by David Sehnal's avatar David Sehnal
Browse files

working on mol-task

parent 4428feda
Branches
Tags
No related merge requests found
// TODO /**
\ No newline at end of file * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
import { Task, ExecuteSynchronous } from 'mol-task'
async function test() {
const t = Task.create('test', async () => 1);
const r = await ExecuteSynchronous(t);
console.log(r);
}
test();
\ No newline at end of file
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
/**
* setImmediate polyfill adapted from https://github.com/YuzuJS/setImmediate
* Copyright (c) 2012 Barnesandnoble.com, llc, Donavon West, and Domenic Denicola
* MIT license.
*/
function createImmediateActions() {
type Callback = (...args: any[]) => void;
type Task = { callback: Callback, args: any[] }
const tasksByHandle: { [handle: number]: Task } = { };
const doc = typeof document !== 'undefined' ? document : void 0;
let currentlyRunningATask = false;
let nextHandle = 1; // Spec says greater than zero
let registerImmediate: ((handle: number) => void);
function setImmediate(callback: Callback, ...args: any[]) {
// Callback can either be a function or a string
if (typeof callback !== 'function') {
callback = new Function('' + callback) as Callback;
}
// Store and register the task
const task = { callback: callback, args: args };
tasksByHandle[nextHandle] = task;
registerImmediate(nextHandle);
return nextHandle++;
}
function clearImmediate(handle: number) {
delete tasksByHandle[handle];
}
function run(task: Task) {
const callback = task.callback;
const args = task.args;
switch (args.length) {
case 0:
callback();
break;
case 1:
callback(args[0]);
break;
case 2:
callback(args[0], args[1]);
break;
case 3:
callback(args[0], args[1], args[2]);
break;
default:
callback.apply(undefined, args);
break;
}
}
function runIfPresent(handle: number) {
// From the spec: 'Wait until any invocations of this algorithm started before this one have completed.'
// So if we're currently running a task, we'll need to delay this invocation.
if (currentlyRunningATask) {
// Delay by doing a setTimeout. setImmediate was tried instead, but in Firefox 7 it generated a
// 'too much recursion' error.
setTimeout(runIfPresent, 0, handle);
} else {
const task = tasksByHandle[handle];
if (task) {
currentlyRunningATask = true;
try {
run(task);
} finally {
clearImmediate(handle);
currentlyRunningATask = false;
}
}
}
}
function installNextTickImplementation() {
registerImmediate = function(handle) {
process.nextTick(function () { runIfPresent(handle); });
};
}
function canUsePostMessage() {
// The test against `importScripts` prevents this implementation from being installed inside a web worker,
// where `global.postMessage` means something completely different and can't be used for this purpose.
const global = typeof window !== 'undefined' ? window as any : void 0;
if (global && global.postMessage && !global.importScripts) {
let postMessageIsAsynchronous = true;
const oldOnMessage = global.onmessage;
global.onmessage = function() {
postMessageIsAsynchronous = false;
};
global.postMessage('', '*');
global.onmessage = oldOnMessage;
return postMessageIsAsynchronous;
}
}
function installPostMessageImplementation() {
// Installs an event handler on `global` for the `message` event: see
// * https://developer.mozilla.org/en/DOM/window.postMessage
// * http://www.whatwg.org/specs/web-apps/current-work/multipage/comms.html#crossDocumentMessages
const messagePrefix = 'setImmediate$' + Math.random() + '$';
const global = typeof window !== 'undefined' ? window as any : void 0;
const onGlobalMessage = function(event: any) {
if (event.source === global &&
typeof event.data === 'string' &&
event.data.indexOf(messagePrefix) === 0) {
runIfPresent(+event.data.slice(messagePrefix.length));
}
};
if (window.addEventListener) {
window.addEventListener('message', onGlobalMessage, false);
} else {
(window as any).attachEvent('onmessage', onGlobalMessage);
}
registerImmediate = function(handle) {
window.postMessage(messagePrefix + handle, '*');
};
}
function installMessageChannelImplementation() {
const channel = new MessageChannel();
channel.port1.onmessage = function(event) {
const handle = event.data;
runIfPresent(handle);
};
registerImmediate = function(handle) {
channel.port2.postMessage(handle);
};
}
function installReadyStateChangeImplementation() {
const html = doc!.documentElement;
registerImmediate = function(handle) {
// Create a <script> element; its readystatechange event will be fired asynchronously once it is inserted
// into the document. Do so, thus queuing up the task. Remember to clean up once it's been called.
let script = doc!.createElement('script') as any;
script.onreadystatechange = function () {
runIfPresent(handle);
script.onreadystatechange = null;
html.removeChild(script);
script = null;
};
html.appendChild(script);
};
}
function installSetTimeoutImplementation() {
registerImmediate = function(handle) {
setTimeout(runIfPresent, 0, handle);
};
}
// Don't get fooled by e.g. browserify environments.
if (typeof process !== 'undefined' && {}.toString.call(process) === '[object process]') {
// For Node.js before 0.9
installNextTickImplementation();
} else if (canUsePostMessage()) {
// For non-IE10 modern browsers
installPostMessageImplementation();
} else if (typeof MessageChannel !== 'undefined') {
// For web workers, where supported
installMessageChannelImplementation();
} else if (doc && 'onreadystatechange' in doc.createElement('script')) {
// For IE 6–8
installReadyStateChangeImplementation();
} else {
// For older browsers
installSetTimeoutImplementation();
}
return {
setImmediate,
clearImmediate
};
}
const immediateActions = (function () {
if (typeof setImmediate !== 'undefined') {
if (typeof window !== 'undefined') {
return { setImmediate: (handler: any, ...args: any[]) => window.setImmediate(handler, ...args as any), clearImmediate: (handle: any) => window.clearImmediate(handle) };
} else return { setImmediate, clearImmediate }
}
return createImmediateActions();
}());
function resolveImmediate(res: () => void) {
immediateActions.setImmediate(res);
}
export default {
immediate: immediateActions.setImmediate,
clearImmediate: immediateActions.clearImmediate,
immediatePromise() { return new Promise<void>(resolveImmediate); }
};
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
import Task from '../task'
import RuntimeContext from './runtime-context'
interface Progress {
taskId: number,
elapsedMs: { real: number, cpu: number },
tree: Progress.Node,
tryAbort?: () => void
}
namespace Progress {
export interface Node {
readonly progress: Task.Progress,
readonly children: ReadonlyArray<Node>
}
export interface Observer { (progress: Progress): void }
}
class ObservableExecutor {
async run<T>(task: Task<T>): Promise<T> {
const ctx = new ObservableRuntimeContext();
if (!task.__onAbort) return task.__f(ctx);
try {
return await task.__f(ctx);
} catch (e) {
if (e === Task.Aborted) task.__onAbort();
throw e;
}
}
constructor(observer: Progress.Observer, updateRateMs: number) {
}
}
class ObservableRuntimeContext implements RuntimeContext {
id: number = 0;
requiresUpdate: boolean = false;
update(progress: Partial<RuntimeContext.ProgressUpdate>): Promise<void> {
return 0 as any;
}
runChild<T>(progress: Partial<RuntimeContext.ProgressUpdate>, task: Task<T>): Promise<T> {
return 0 as any;
}
}
function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) {
return new ObservableExecutor(observer, updateRateMs).run(task);
}
namespace ExecuteObservable {
export let PRINT_ERRORS_TO_CONSOLE = false;
}
export { ExecuteObservable, Progress }
\ No newline at end of file
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
import Task from '../task'
interface RuntimeContext {
readonly requiresUpdate: boolean,
// Idiomatic usage:
// if (ctx.requiresUpdate) await ctx.update({ ... });
update(progress: Partial<RuntimeContext.ProgressUpdate>): Promise<void>,
// Force the user to pass the progress so that the progress tree can be kept in a "good state".
runChild<T>(progress: Partial<RuntimeContext.ProgressUpdate>, task: Task<T>): Promise<T>
}
namespace RuntimeContext {
export interface AbortToken { isAborted: boolean }
export interface ProgressUpdate {
message: string,
isIndeterminate: boolean,
current: number,
max: number,
canAbort: boolean
}
}
export default RuntimeContext
\ No newline at end of file
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
import Task from '../task'
import RuntimeContext from './runtime-context'
const voidPromise = Promise.resolve(void 0);
class SynchronousRuntimeContext implements RuntimeContext {
id: number = 0;
requiresUpdate: boolean = false;
update(progress: Partial<RuntimeContext.ProgressUpdate>): Promise<void> { return voidPromise; }
runChild<T>(progress: Partial<RuntimeContext.ProgressUpdate>, task: Task<T>): Promise<T> { return ExecuteSynchronous(task); }
}
const SyncRuntimeInstance = new SynchronousRuntimeContext();
function ExecuteSynchronous<T>(task: Task<T>) {
return task.__f(SyncRuntimeInstance);
}
export default ExecuteSynchronous
\ No newline at end of file
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
import Task from './task'
import RuntimeContext from './execution/runtime-context'
import ExecuteSynchronous from './execution/synchronous'
export { Task, RuntimeContext, ExecuteSynchronous }
\ No newline at end of file
/**
* Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
*
* @author David Sehnal <david.sehnal@gmail.com>
*/
import RuntimeContext from './execution/runtime-context'
interface Task<T> {
readonly id: number,
readonly name: string,
readonly __f: (ctx: RuntimeContext) => Promise<T>,
readonly __onAbort: (() => void) | undefined
}
namespace Task {
export const Aborted = 'Aborted.';
export function create<T>(name: string, f: (ctx: RuntimeContext) => Promise<T>, onAbort?: () => void): Task<T> {
return { id: nextId(), name, __f: f, __onAbort: onAbort };
}
export function constant<T>(name: string, value: T): Task<T> { return create(name, async ctx => value); }
export function fail(name: string, reason: string): Task<any> { return create(name, async ctx => { throw new Error(reason); }); }
let _id = 0;
function nextId() {
const ret = _id;
_id = (_id + 1) % 0x3fffffff;
return ret;
}
export type Progress = IndeterminateProgress | DeterminateProgress
interface ProgressBase {
runtimeId: number,
taskId: number,
message: string,
elapsedMs: { real: number, cpu: number },
canAbort: boolean,
children?: Progress[]
}
export interface IndeterminateProgress extends ProgressBase { isIndeterminate: true }
export interface DeterminateProgress extends ProgressBase { isIndeterminate: false, current: number, max: number }
export interface State {
runtimeId: number,
taskId: number,
message: string,
elapsedMs: number,
canAbort: boolean,
isIndeterminate: boolean,
current: number,
max: number,
children?: State[]
}
}
export default Task
\ No newline at end of file
// enum TaskState {
// Pending,
// Running,
// Aborted,
// Completed,
// Failed
// }
interface TaskState {
}
namespace TaskState {
export interface Pending { kind: 'Pending' }
export interface Running { kind: 'Running', }
export interface Progress {
message: string,
isIndeterminate: boolean,
current: number,
max: number,
elapsedMs: number
}
}
type ExecutionContext = { type ExecutionContext = {
run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>, run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>,
subscribe(o: (p: string, compId: number) => void): void subscribe(o: (p: string, compId: number) => void): void,
requestAbort(compId: number): void
} }
namespace ExecutionContext { namespace ExecutionContext {
...@@ -17,12 +42,13 @@ namespace ExecutionContext { ...@@ -17,12 +42,13 @@ namespace ExecutionContext {
export const Sync: ExecutionContext = 0 as any; export const Sync: ExecutionContext = 0 as any;
} }
interface RuntimeContext extends ExecutionContext { interface RuntimeContext {
run<T>(c: Computation<T>, params?: { updateRateMs: number }): Promise<T>,
yield(name: string): Promise<void> | void yield(name: string): Promise<void> | void
} }
// if no context is specified, use the synchronous one. // if no context is specified, use the synchronous one.
type Computation<T> = { (ctx?: RuntimeContext): Promise<T>, _id: number } interface Computation<T> { (ctx: RuntimeContext): Promise<T>, _id: number }
function create<T>(c: (ctx: RuntimeContext) => Promise<T>): Computation<T> { return 0 as any; } function create<T>(c: (ctx: RuntimeContext) => Promise<T>): Computation<T> { return 0 as any; }
function constant<T>(c: T) { return create(async ctx => c); } function constant<T>(c: T) { return create(async ctx => c); }
...@@ -58,7 +84,7 @@ function readLines(str: string): Computation<string[]> { ...@@ -58,7 +84,7 @@ function readLines(str: string): Computation<string[]> {
const prependHiToLines = MultistepComputation('Hi prepend', ['Parse input', 'Prepend Hi'], async (p: string, step, ctx) => { const prependHiToLines = MultistepComputation('Hi prepend', ['Parse input', 'Prepend Hi'], async (p: string, step, ctx) => {
await step(0); await step(0);
const lines = await ctx.run(readLines(p)); const lines = await readLines(p)(ctx);
await step(1); await step(1);
const ret = lines.map(l => 'Hi ' + l); const ret = lines.map(l => 'Hi ' + l);
return ret; return ret;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
"outDir": "build/node_modules", "outDir": "build/node_modules",
"baseUrl": "src", "baseUrl": "src",
"paths": { "paths": {
"mol-task": ["./mol-task", "./mol-task/index.ts"],
"mol-comp": ["./mol-comp", "./mol-comp/index.ts"], "mol-comp": ["./mol-comp", "./mol-comp/index.ts"],
"mol-util": ["./mol-util", "./mol-util/index.ts"], "mol-util": ["./mol-util", "./mol-util/index.ts"],
"mol-data": ["./mol-data", "./mol-data/index.ts"], "mol-data": ["./mol-data", "./mol-data/index.ts"],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment