refactor(ngcc): implement task selection for parallel task execution (#32427)
This commit adds a new `TaskQueue` implementation that supports executing multiple tasks in parallel (while respecting interdependencies between them). This new implementation is currently not used, thus the behavior of `ngcc` is not affected by this change. The parallel `TaskQueue` will be used in a subsequent commit that will introduce parallel task execution. PR Close #32427
This commit is contained in:
parent
2844dd2972
commit
f4e4bb2085
@ -0,0 +1,176 @@
|
|||||||
|
/**
|
||||||
|
* @license
|
||||||
|
* Copyright Google Inc. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by an MIT-style license that can be
|
||||||
|
* found in the LICENSE file at https://angular.io/license
|
||||||
|
*/
|
||||||
|
|
||||||
|
import {DepGraph} from 'dependency-graph';
|
||||||
|
|
||||||
|
import {EntryPoint} from '../../packages/entry_point';
|
||||||
|
import {PartiallyOrderedTasks, Task} from '../api';
|
||||||
|
import {stringifyTask} from '../utils';
|
||||||
|
|
||||||
|
import {BaseTaskQueue} from './base_task_queue';
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A `TaskQueue` implementation that assumes tasks are processed in parallel, thus has to ensure a
|
||||||
|
* task's dependencies have been processed before processing the task.
|
||||||
|
*/
|
||||||
|
export class ParallelTaskQueue extends BaseTaskQueue {
|
||||||
|
/**
|
||||||
|
* A mapping from each task to the list of tasks that are blocking it (if any).
|
||||||
|
*
|
||||||
|
* A task can block another task, if the latter's entry-point depends on the former's entry-point
|
||||||
|
* _and_ the former is also generating typings (i.e. has `processDts: true`).
|
||||||
|
*
|
||||||
|
* NOTE: If a task is not generating typings, then it cannot affect anything which depends on its
|
||||||
|
* entry-point, regardless of the dependency graph. To put this another way, only the task
|
||||||
|
* which produces the typings for a dependency needs to have been completed.
|
||||||
|
*/
|
||||||
|
private blockedTasks: Map<Task, Set<Task>>;
|
||||||
|
|
||||||
|
constructor(tasks: PartiallyOrderedTasks, graph: DepGraph<EntryPoint>) {
|
||||||
|
const blockedTasks = computeBlockedTasks(tasks, graph);
|
||||||
|
const sortedTasks = sortTasksByPriority(tasks, blockedTasks);
|
||||||
|
|
||||||
|
super(sortedTasks);
|
||||||
|
this.blockedTasks = blockedTasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
getNextTask(): Task|null {
|
||||||
|
// Look for the first available (i.e. not blocked) task.
|
||||||
|
// (NOTE: Since tasks are sorted by priority, the first available one is the best choice.)
|
||||||
|
const nextTaskIdx = this.tasks.findIndex(task => !this.blockedTasks.has(task));
|
||||||
|
if (nextTaskIdx === -1) return null;
|
||||||
|
|
||||||
|
// Remove the task from the list of available tasks and add it to the list of in-progress tasks.
|
||||||
|
const nextTask = this.tasks[nextTaskIdx];
|
||||||
|
this.tasks.splice(nextTaskIdx, 1);
|
||||||
|
this.inProgressTasks.add(nextTask);
|
||||||
|
|
||||||
|
return nextTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
markTaskCompleted(task: Task): void {
|
||||||
|
super.markTaskCompleted(task);
|
||||||
|
|
||||||
|
const unblockedTasks: Task[] = [];
|
||||||
|
|
||||||
|
// Remove the completed task from the lists of tasks blocking other tasks.
|
||||||
|
for (const [otherTask, blockingTasks] of Array.from(this.blockedTasks)) {
|
||||||
|
if (blockingTasks.has(task)) {
|
||||||
|
blockingTasks.delete(task);
|
||||||
|
|
||||||
|
// If the other task is not blocked any more, mark it for unblocking.
|
||||||
|
if (blockingTasks.size === 0) {
|
||||||
|
unblockedTasks.push(otherTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unblock tasks that are no longer blocked.
|
||||||
|
unblockedTasks.forEach(task => this.blockedTasks.delete(task));
|
||||||
|
}
|
||||||
|
|
||||||
|
toString(): string {
|
||||||
|
return `${super.toString()}\n` +
|
||||||
|
` Blocked tasks (${this.blockedTasks.size}): ${this.stringifyBlockedTasks(' ')}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
private stringifyBlockedTasks(indentation: string): string {
|
||||||
|
return Array.from(this.blockedTasks)
|
||||||
|
.map(
|
||||||
|
([task, blockingTasks]) =>
|
||||||
|
`\n${indentation}- ${stringifyTask(task)} (${blockingTasks.size}): ` +
|
||||||
|
this.stringifyTasks(Array.from(blockingTasks), `${indentation} `))
|
||||||
|
.join('');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helpers
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute a mapping of blocked tasks to the tasks that are blocking them.
|
||||||
|
*
|
||||||
|
* As a performance optimization, we take into account the fact that `tasks` are sorted in such a
|
||||||
|
* way that a task can only be blocked by earlier tasks (i.e. dependencies always come before
|
||||||
|
* dependants in the list of tasks).
|
||||||
|
*
|
||||||
|
* @param tasks A (partially ordered) list of tasks.
|
||||||
|
* @param graph The dependency graph between entry-points.
|
||||||
|
* @return The map of blocked tasks to the tasks that are blocking them.
|
||||||
|
*/
|
||||||
|
function computeBlockedTasks(
|
||||||
|
tasks: PartiallyOrderedTasks, graph: DepGraph<EntryPoint>): Map<Task, Set<Task>> {
|
||||||
|
const blockedTasksMap = new Map<Task, Set<Task>>();
|
||||||
|
const candidateBlockers = new Map<string, Task>();
|
||||||
|
|
||||||
|
tasks.forEach(task => {
|
||||||
|
// Find the earlier tasks (`candidateBlockers`) that are blocking this task.
|
||||||
|
const deps = graph.dependenciesOf(task.entryPoint.path);
|
||||||
|
const blockingTasks =
|
||||||
|
deps.filter(dep => candidateBlockers.has(dep)).map(dep => candidateBlockers.get(dep) !);
|
||||||
|
|
||||||
|
// If this task is blocked, add it to the map of blocked tasks.
|
||||||
|
if (blockingTasks.length > 0) {
|
||||||
|
blockedTasksMap.set(task, new Set(blockingTasks));
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this task can be potentially blocking (i.e. it generates typings), add it to the list
|
||||||
|
// of candidate blockers for subsequent tasks.
|
||||||
|
if (task.processDts) {
|
||||||
|
const entryPointPath = task.entryPoint.path;
|
||||||
|
|
||||||
|
// There should only be one task per entry-point that generates typings (and thus can block
|
||||||
|
// other tasks), so the following should theoretically never happen, but check just in case.
|
||||||
|
if (candidateBlockers.has(entryPointPath)) {
|
||||||
|
const otherTask = candidateBlockers.get(entryPointPath) !;
|
||||||
|
|
||||||
|
throw new Error(
|
||||||
|
'Invariant violated: Multiple tasks are assigned generating typings for ' +
|
||||||
|
`'${entryPointPath}':\n - ${stringifyTask(otherTask)}\n - ${stringifyTask(task)}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
candidateBlockers.set(entryPointPath, task);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return blockedTasksMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sort a list of tasks by priority.
|
||||||
|
*
|
||||||
|
* Priority is determined by the number of other tasks that a task is (transitively) blocking:
|
||||||
|
* The more tasks a task is blocking the higher its priority is, because processing it will
|
||||||
|
* potentially unblock more tasks.
|
||||||
|
*
|
||||||
|
* To keep the behavior predictable, if two tasks block the same number of other tasks, their
|
||||||
|
* relative order in the original `tasks` lists is preserved.
|
||||||
|
*
|
||||||
|
* @param tasks A (partially ordered) list of tasks.
|
||||||
|
* @param blockedTasks A mapping from a task to the list of tasks that are blocking it (if any).
|
||||||
|
* @return The list of tasks sorted by priority.
|
||||||
|
*/
|
||||||
|
function sortTasksByPriority(
|
||||||
|
tasks: PartiallyOrderedTasks, blockedTasks: Map<Task, Set<Task>>): PartiallyOrderedTasks {
|
||||||
|
const priorityPerTask = new Map<Task, [number, number]>();
|
||||||
|
const allBlockingTaskSets = Array.from(blockedTasks.values());
|
||||||
|
const computePriority = (task: Task, idx: number): [number, number] =>
|
||||||
|
[allBlockingTaskSets.reduce(
|
||||||
|
(count, blockingTasks) => count + (blockingTasks.has(task) ? 1 : 0), 0),
|
||||||
|
idx,
|
||||||
|
];
|
||||||
|
|
||||||
|
tasks.forEach((task, i) => priorityPerTask.set(task, computePriority(task, i)));
|
||||||
|
|
||||||
|
return tasks.slice().sort((task1, task2) => {
|
||||||
|
const [p1, idx1] = priorityPerTask.get(task1) !;
|
||||||
|
const [p2, idx2] = priorityPerTask.get(task2) !;
|
||||||
|
|
||||||
|
return (p2 - p1) || (idx1 - idx2);
|
||||||
|
});
|
||||||
|
}
|
@ -24,6 +24,7 @@ ts_library(
|
|||||||
"//packages/compiler-cli/test/helpers",
|
"//packages/compiler-cli/test/helpers",
|
||||||
"@npm//@types/convert-source-map",
|
"@npm//@types/convert-source-map",
|
||||||
"@npm//convert-source-map",
|
"@npm//convert-source-map",
|
||||||
|
"@npm//dependency-graph",
|
||||||
"@npm//magic-string",
|
"@npm//magic-string",
|
||||||
"@npm//typescript",
|
"@npm//typescript",
|
||||||
],
|
],
|
||||||
|
@ -0,0 +1,556 @@
|
|||||||
|
/**
|
||||||
|
* @license
|
||||||
|
* Copyright Google Inc. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by an MIT-style license that can be
|
||||||
|
* found in the LICENSE file at https://angular.io/license
|
||||||
|
*/
|
||||||
|
|
||||||
|
import {DepGraph} from 'dependency-graph';
|
||||||
|
|
||||||
|
import {PartiallyOrderedTasks, Task, TaskQueue} from '../../../src/execution/api';
|
||||||
|
import {ParallelTaskQueue} from '../../../src/execution/task_selection/parallel_task_queue';
|
||||||
|
import {EntryPoint} from '../../../src/packages/entry_point';
|
||||||
|
|
||||||
|
|
||||||
|
describe('ParallelTaskQueue', () => {
|
||||||
|
// Helpers
|
||||||
|
/**
|
||||||
|
* Create a `TaskQueue` by generating mock tasks (optionally with interdependencies).
|
||||||
|
*
|
||||||
|
* NOTE 1: The first task for each entry-point generates typings (which is similar to what happens
|
||||||
|
* in the actual code).
|
||||||
|
* NOTE 2: The `ParallelTaskQueue` implementation relies on the fact that tasks are sorted in such
|
||||||
|
* a way that a task can only be blocked by earlier tasks (i.e. dependencies always come
|
||||||
|
* before dependants in the list of tasks).
|
||||||
|
* To preserve this attribute, you need to ensure that entry-points will only depend on
|
||||||
|
* entry-points with a lower index. Take this into account when defining `entryPointDeps`.
|
||||||
|
* (Failing to do so, will result in an error.)
|
||||||
|
*
|
||||||
|
* @param entryPointCount The number of different entry-points to mock.
|
||||||
|
* @param tasksPerEntryPointCount The number of tasks to generate per entry-point (i.e. simulating
|
||||||
|
* processing multiple format properties).
|
||||||
|
* @param entryPointDeps An object mapping an entry-point to its dependencies. Keys are
|
||||||
|
* entry-point indices and values are arrays of entry-point indices that the
|
||||||
|
* entry-point corresponding to the key depends on.
|
||||||
|
* For example, if entry-point #2 depends on entry-points #0 and #1,
|
||||||
|
* `entryPointDeps` would be `{2: [0, 1]}`.
|
||||||
|
* @return An object with the following properties:
|
||||||
|
* - `graph`: The dependency graph for the generated mock entry-point.
|
||||||
|
* - `tasks`: The (partially ordered) list of generated mock tasks.
|
||||||
|
* - `queue`: The created `TaskQueue`.
|
||||||
|
*/
|
||||||
|
const createQueue = (entryPointCount: number, tasksPerEntryPointCount = 1, entryPointDeps: {
|
||||||
|
[entryPointIndex: string]: number[]
|
||||||
|
} = {}): {tasks: PartiallyOrderedTasks, graph: DepGraph<EntryPoint>, queue: TaskQueue} => {
|
||||||
|
const entryPoints: EntryPoint[] = [];
|
||||||
|
const tasks: PartiallyOrderedTasks = [] as any;
|
||||||
|
const graph = new DepGraph<EntryPoint>();
|
||||||
|
|
||||||
|
// Create the entry-points and the associated tasks.
|
||||||
|
for (let epIdx = 0; epIdx < entryPointCount; epIdx++) {
|
||||||
|
const entryPoint = {
|
||||||
|
name: `entry-point-${epIdx}`,
|
||||||
|
path: `/path/to/entry/point/${epIdx}`,
|
||||||
|
} as EntryPoint;
|
||||||
|
|
||||||
|
entryPoints.push(entryPoint);
|
||||||
|
graph.addNode(entryPoint.path);
|
||||||
|
|
||||||
|
for (let tIdx = 0; tIdx < tasksPerEntryPointCount; tIdx++) {
|
||||||
|
tasks.push({ entryPoint, formatProperty: `prop-${tIdx}`, processDts: tIdx === 0, } as Task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Define entry-point interdependencies.
|
||||||
|
for (const epIdx of Object.keys(entryPointDeps).map(strIdx => +strIdx)) {
|
||||||
|
const fromPath = entryPoints[epIdx].path;
|
||||||
|
for (const depIdx of entryPointDeps[epIdx]) {
|
||||||
|
// Ensure that each entry-point only depends on entry-points at a lower index.
|
||||||
|
if (depIdx >= epIdx) {
|
||||||
|
throw Error(
|
||||||
|
'Invalid `entryPointDeps`: Entry-points can only depend on entry-points at a lower ' +
|
||||||
|
`index, but entry-point #${epIdx} depends on #${depIdx} in: ` +
|
||||||
|
JSON.stringify(entryPointDeps, null, 2));
|
||||||
|
}
|
||||||
|
|
||||||
|
const toPath = entryPoints[depIdx].path;
|
||||||
|
graph.addDependency(fromPath, toPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {tasks, graph, queue: new ParallelTaskQueue(tasks.slice(), graph)};
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate processing the next task:
|
||||||
|
* - Request the next task from the specified queue.
|
||||||
|
* - If a task was returned, mark it as completed.
|
||||||
|
* - Return the task (this allows making assertions against the picked tasks in tests).
|
||||||
|
*
|
||||||
|
* @param queue The `TaskQueue` to get the next task from.
|
||||||
|
* @return The "processed" task (if any).
|
||||||
|
*/
|
||||||
|
const processNextTask = (queue: TaskQueue): ReturnType<TaskQueue['getNextTask']> => {
|
||||||
|
const task = queue.getNextTask();
|
||||||
|
if (task !== null) queue.markTaskCompleted(task);
|
||||||
|
return task;
|
||||||
|
};
|
||||||
|
|
||||||
|
describe('allTaskCompleted', () => {
|
||||||
|
it('should be `false`, when there are unprocessed tasks', () => {
|
||||||
|
const {queue} = createQueue(2);
|
||||||
|
expect(queue.allTasksCompleted).toBe(false);
|
||||||
|
|
||||||
|
processNextTask(queue);
|
||||||
|
expect(queue.allTasksCompleted).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should be `false`, when there are tasks in progress', () => {
|
||||||
|
const {queue} = createQueue(2);
|
||||||
|
|
||||||
|
queue.getNextTask();
|
||||||
|
expect(queue.allTasksCompleted).toBe(false);
|
||||||
|
|
||||||
|
processNextTask(queue);
|
||||||
|
expect(queue.allTasksCompleted).toBe(false); // The first task is still in progress.
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should be `true`, when there are no unprocess or in-progress tasks', () => {
|
||||||
|
const {queue} = createQueue(3);
|
||||||
|
|
||||||
|
const task1 = queue.getNextTask() !;
|
||||||
|
const task2 = queue.getNextTask() !;
|
||||||
|
const task3 = queue.getNextTask() !;
|
||||||
|
expect(queue.allTasksCompleted).toBe(false);
|
||||||
|
|
||||||
|
queue.markTaskCompleted(task1);
|
||||||
|
queue.markTaskCompleted(task3);
|
||||||
|
expect(queue.allTasksCompleted).toBe(false); // The second task is still in progress.
|
||||||
|
|
||||||
|
queue.markTaskCompleted(task2);
|
||||||
|
expect(queue.allTasksCompleted).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should be `true`, if the queue was empty from the beginning', () => {
|
||||||
|
const {queue} = createQueue(0);
|
||||||
|
expect(queue.allTasksCompleted).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should remain `true` once the queue has been emptied', () => {
|
||||||
|
const {queue} = createQueue(1);
|
||||||
|
expect(queue.allTasksCompleted).toBe(false);
|
||||||
|
|
||||||
|
processNextTask(queue);
|
||||||
|
expect(queue.allTasksCompleted).toBe(true);
|
||||||
|
|
||||||
|
queue.getNextTask();
|
||||||
|
expect(queue.allTasksCompleted).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('constructor()', () => {
|
||||||
|
it('should throw, if there are multiple tasks that generate typings for a single entry-point',
|
||||||
|
() => {
|
||||||
|
const {tasks, graph} = createQueue(2, 2, {
|
||||||
|
0: [], // Entry-point #0 does not depend on anything.
|
||||||
|
1: [0], // Entry-point #1 depends on #0.
|
||||||
|
});
|
||||||
|
tasks[1].processDts = true; // Tweak task #1 to also generate typings.
|
||||||
|
|
||||||
|
expect(() => new ParallelTaskQueue(tasks, graph))
|
||||||
|
.toThrowError(
|
||||||
|
'Invariant violated: Multiple tasks are assigned generating typings for ' +
|
||||||
|
'\'/path/to/entry/point/0\':\n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-1, processDts: true}');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getNextTask()', () => {
|
||||||
|
it('should return the tasks in order (when they are not blocked by other tasks)', () => {
|
||||||
|
const {tasks, queue} = createQueue(3, 2, {}); // 2 tasks per entry-point; no dependencies.
|
||||||
|
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[0]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[1]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[2]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[3]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[4]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[5]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return `null`, when there are no more tasks', () => {
|
||||||
|
const {tasks, queue} = createQueue(3);
|
||||||
|
tasks.forEach(() => expect(queue.getNextTask()).not.toBe(null));
|
||||||
|
|
||||||
|
expect(queue.getNextTask()).toBe(null);
|
||||||
|
expect(queue.getNextTask()).toBe(null);
|
||||||
|
|
||||||
|
const {tasks: tasks2, queue: queue2} = createQueue(0);
|
||||||
|
|
||||||
|
expect(tasks2).toEqual([]);
|
||||||
|
expect(queue.getNextTask()).toBe(null);
|
||||||
|
expect(queue.getNextTask()).toBe(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return `null`, if all unprocessed tasks are blocked', () => {
|
||||||
|
const {tasks, queue} = createQueue(2, 2, {
|
||||||
|
0: [], // Entry-point #0 does not depend on anything.
|
||||||
|
1: [0], // Entry-point #1 depends on #0.
|
||||||
|
});
|
||||||
|
|
||||||
|
// Verify that the first two tasks are for the first entry-point.
|
||||||
|
expect(tasks[0].entryPoint.name).toBe('entry-point-0');
|
||||||
|
expect(tasks[1].entryPoint.name).toBe('entry-point-0');
|
||||||
|
|
||||||
|
// Verify that the last two tasks are for the second entry-point.
|
||||||
|
expect(tasks[2].entryPoint.name).toBe('entry-point-1');
|
||||||
|
expect(tasks[3].entryPoint.name).toBe('entry-point-1');
|
||||||
|
|
||||||
|
// Return the first two tasks first, since they are not blocked.
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[0]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[1]);
|
||||||
|
|
||||||
|
// No task available, until task #0 (which geenrates typings for entry-point #0) is completed.
|
||||||
|
expect(tasks[0].processDts).toBe(true);
|
||||||
|
expect(tasks[1].processDts).toBe(false);
|
||||||
|
|
||||||
|
queue.markTaskCompleted(tasks[1]);
|
||||||
|
expect(queue.getNextTask()).toBe(null);
|
||||||
|
|
||||||
|
// Finally, unblock tasks for entry-point #1, once task #0 is completed.
|
||||||
|
queue.markTaskCompleted(tasks[0]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[2]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[3]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should prefer tasks that are blocking many tasks', () => {
|
||||||
|
// Tasks by priority: #1, #0, #2, #3
|
||||||
|
// - Entry-point #0 transitively blocks 1 entry-point(s): 2
|
||||||
|
// - Entry-point #1 transitively blocks 2 entry-point(s): 2, 3
|
||||||
|
// - Entry-point #2 transitively blocks 0 entry-point(s): -
|
||||||
|
// - Entry-point #3 transitively blocks 0 entry-point(s): -
|
||||||
|
const {tasks, queue} = createQueue(5, 1, {
|
||||||
|
0: [],
|
||||||
|
1: [],
|
||||||
|
2: [0, 1],
|
||||||
|
3: [1],
|
||||||
|
});
|
||||||
|
|
||||||
|
// First return task #1, since it blocks the most other tasks.
|
||||||
|
expect(processNextTask(queue)).toBe(tasks[1]);
|
||||||
|
|
||||||
|
// Then return task #0, since it blocks the most other tasks after #1.
|
||||||
|
expect(processNextTask(queue)).toBe(tasks[0]);
|
||||||
|
|
||||||
|
// Then return task #2, since it comes before #3.
|
||||||
|
expect(processNextTask(queue)).toBe(tasks[2]);
|
||||||
|
|
||||||
|
// Finally return task #3.
|
||||||
|
expect(processNextTask(queue)).toBe(tasks[3]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return a lower priority task, if higher priority tasks are blocked', () => {
|
||||||
|
// Tasks by priority: #0, #1, #3, #2, #4
|
||||||
|
// - Entry-point #0 transitively blocks 3 entry-point(s): 1, 2, 4
|
||||||
|
// - Entry-point #1 transitively blocks 2 entry-point(s): 2, 4
|
||||||
|
// - Entry-point #2 transitively blocks 0 entry-point(s): -
|
||||||
|
// - Entry-point #3 transitively blocks 1 entry-point(s): 4
|
||||||
|
// - Entry-point #4 transitively blocks 0 entry-point(s): -
|
||||||
|
const deps = {
|
||||||
|
0: [],
|
||||||
|
1: [0],
|
||||||
|
2: [0, 1],
|
||||||
|
3: [],
|
||||||
|
4: [0, 1, 3],
|
||||||
|
};
|
||||||
|
|
||||||
|
const {tasks, queue} = createQueue(5, 1, deps);
|
||||||
|
|
||||||
|
// First return task #0, since that blocks the most other tasks.
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[0]);
|
||||||
|
|
||||||
|
// While task #0 is still in progress, return task #3.
|
||||||
|
// Despite task #3's having a lower priority than #1 (blocking 1 vs 2 other tasks), task #1 is
|
||||||
|
// currently blocked on #0 (while #3 is not blocked by anything).
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[3]);
|
||||||
|
|
||||||
|
// Use the same dependencies as above, but complete task #0 before asking for another task to
|
||||||
|
// verify that task #1 would be returned in this case.
|
||||||
|
const {tasks: tasks2, queue: queue2} = createQueue(5, 1, deps);
|
||||||
|
|
||||||
|
expect(processNextTask(queue2)).toBe(tasks2[0]);
|
||||||
|
expect(processNextTask(queue2)).toBe(tasks2[1]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should keep they initial relative order of tasks for tasks with the same priority', () => {
|
||||||
|
// Tasks by priority: #1, #3, #0, #2, #4
|
||||||
|
// - Entry-point #0 transitively blocks 0 entry-point(s): -
|
||||||
|
// - Entry-point #1 transitively blocks 1 entry-point(s): 2
|
||||||
|
// - Entry-point #2 transitively blocks 0 entry-point(s): -
|
||||||
|
// - Entry-point #3 transitively blocks 1 entry-point(s): 4
|
||||||
|
// - Entry-point #4 transitively blocks 0 entry-point(s): -
|
||||||
|
const {tasks, queue} = createQueue(5, 1, {
|
||||||
|
0: [],
|
||||||
|
1: [],
|
||||||
|
2: [1],
|
||||||
|
3: [],
|
||||||
|
4: [3],
|
||||||
|
});
|
||||||
|
|
||||||
|
// First return task #1 (even if it has the same priority as #3), because it comes before #3
|
||||||
|
// in the initial task list.
|
||||||
|
// Note that task #0 is not returned (even if it comes first in the initial task list),
|
||||||
|
// because it has a lower priority (i.e. blocks fewer other tasks).
|
||||||
|
expect(processNextTask(queue)).toBe(tasks[1]);
|
||||||
|
|
||||||
|
// Then return task #3 (skipping over both #0 and #2), becasue it blocks the most other tasks.
|
||||||
|
expect(processNextTask(queue)).toBe(tasks[3]);
|
||||||
|
|
||||||
|
// The rest of the tasks (#0, #2, #4) block no tasks, so their initial relative order is
|
||||||
|
// preserved.
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[0]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[2]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[4]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('markTaskCompleted()', () => {
|
||||||
|
it('should mark a task as completed', () => {
|
||||||
|
const {queue} = createQueue(2);
|
||||||
|
|
||||||
|
const task1 = queue.getNextTask() !;
|
||||||
|
const task2 = queue.getNextTask() !;
|
||||||
|
expect(queue.allTasksCompleted).toBe(false);
|
||||||
|
|
||||||
|
queue.markTaskCompleted(task1);
|
||||||
|
queue.markTaskCompleted(task2);
|
||||||
|
expect(queue.allTasksCompleted).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw, if the specified task is not in progress', () => {
|
||||||
|
const {tasks, queue} = createQueue(3);
|
||||||
|
queue.getNextTask();
|
||||||
|
|
||||||
|
expect(() => queue.markTaskCompleted(tasks[2]))
|
||||||
|
.toThrowError(
|
||||||
|
`Trying to mark task that was not in progress as completed: ` +
|
||||||
|
`{entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should remove the completed task from the lists of blocking tasks (so other tasks can be unblocked)',
|
||||||
|
() => {
|
||||||
|
const {tasks, queue} = createQueue(3, 1, {
|
||||||
|
0: [], // Entry-point #0 does not depend on anything.
|
||||||
|
1: [0], // Entry-point #1 depends on #0.
|
||||||
|
2: [0, 1], // Entry-point #2 depends on #0 and #1.
|
||||||
|
});
|
||||||
|
|
||||||
|
// Pick task #0 first, since it is the only one that is not blocked by other tasks.
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[0]);
|
||||||
|
|
||||||
|
// No task available, until task #0 is completed.
|
||||||
|
expect(queue.getNextTask()).toBe(null);
|
||||||
|
|
||||||
|
// Once task #0 is completed, task #1 is unblocked.
|
||||||
|
queue.markTaskCompleted(tasks[0]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[1]);
|
||||||
|
|
||||||
|
// Task #2 is still blocked on #1.
|
||||||
|
expect(queue.getNextTask()).toBe(null);
|
||||||
|
|
||||||
|
// Once task #1 is completed, task #2 is unblocked.
|
||||||
|
queue.markTaskCompleted(tasks[1]);
|
||||||
|
expect(queue.getNextTask()).toBe(tasks[2]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('toString()', () => {
|
||||||
|
it('should include the `TaskQueue` constructor\'s name', () => {
|
||||||
|
const {queue} = createQueue(0);
|
||||||
|
expect(queue.toString()).toMatch(/^ParallelTaskQueue\n/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should include the value of `allTasksCompleted`', () => {
|
||||||
|
const {queue: queue1} = createQueue(0);
|
||||||
|
expect(queue1.toString()).toContain(' All tasks completed: true\n');
|
||||||
|
|
||||||
|
const {queue: queue2} = createQueue(3);
|
||||||
|
expect(queue2.toString()).toContain(' All tasks completed: false\n');
|
||||||
|
|
||||||
|
processNextTask(queue2);
|
||||||
|
processNextTask(queue2);
|
||||||
|
const task = queue2.getNextTask() !;
|
||||||
|
|
||||||
|
expect(queue2.toString()).toContain(' All tasks completed: false\n');
|
||||||
|
|
||||||
|
queue2.markTaskCompleted(task);
|
||||||
|
expect(queue2.toString()).toContain(' All tasks completed: true\n');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should include the unprocessed tasks', () => {
|
||||||
|
const {queue} = createQueue(3);
|
||||||
|
expect(queue.toString())
|
||||||
|
.toContain(
|
||||||
|
' Unprocessed tasks (3): \n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}\n');
|
||||||
|
|
||||||
|
const task1 = queue.getNextTask() !;
|
||||||
|
expect(queue.toString())
|
||||||
|
.toContain(
|
||||||
|
' Unprocessed tasks (2): \n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}\n');
|
||||||
|
|
||||||
|
queue.markTaskCompleted(task1);
|
||||||
|
const task2 = queue.getNextTask() !;
|
||||||
|
expect(queue.toString())
|
||||||
|
.toContain(
|
||||||
|
' Unprocessed tasks (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}\n');
|
||||||
|
|
||||||
|
queue.markTaskCompleted(task2);
|
||||||
|
processNextTask(queue);
|
||||||
|
expect(queue.toString()).toContain(' Unprocessed tasks (0): \n');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should include the in-progress tasks', () => {
|
||||||
|
const {queue} = createQueue(3);
|
||||||
|
expect(queue.toString()).toContain(' In-progress tasks (0): \n');
|
||||||
|
|
||||||
|
const task1 = queue.getNextTask() !;
|
||||||
|
expect(queue.toString())
|
||||||
|
.toContain(
|
||||||
|
' In-progress tasks (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n');
|
||||||
|
|
||||||
|
queue.markTaskCompleted(task1);
|
||||||
|
const task2 = queue.getNextTask() !;
|
||||||
|
expect(queue.toString())
|
||||||
|
.toContain(
|
||||||
|
' In-progress tasks (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true}\n');
|
||||||
|
|
||||||
|
queue.markTaskCompleted(task2);
|
||||||
|
processNextTask(queue);
|
||||||
|
expect(queue.toString()).toContain(' In-progress tasks (0): \n');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should include the blocked/blocking tasks', () => {
|
||||||
|
// Entry-point #0 transitively blocks 2 entry-point(s): 1, 3
|
||||||
|
// Entry-point #1 transitively blocks 1 entry-point(s): 3
|
||||||
|
// Entry-point #2 transitively blocks 1 entry-point(s): 3
|
||||||
|
// Entry-point #3 transitively blocks 0 entry-point(s): -
|
||||||
|
const {tasks, queue} = createQueue(4, 2, {
|
||||||
|
1: [0],
|
||||||
|
3: [1, 2],
|
||||||
|
});
|
||||||
|
|
||||||
|
// Since there 4 entry-points and two tasks per entry-point (8 tasks in total), in comments
|
||||||
|
// below, tasks are denoted as `#X.Y` (where `X` is the entry-point index and Y is the task
|
||||||
|
// index).
|
||||||
|
// For example, the second task for the third entry-point would be `#2.1`.
|
||||||
|
|
||||||
|
expect(queue.toString())
|
||||||
|
.toContain(
|
||||||
|
' Blocked tasks (4): \n' +
|
||||||
|
// Tasks #1.0 and #1.1 are blocked by #0.0.
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true} (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-1, processDts: false} (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
// Tasks #3.0 and #3.1 are blocked by #0.0 (transitively), #1.0 and #2.0.
|
||||||
|
' - {entryPoint: entry-point-3, formatProperty: prop-0, processDts: true} (3): \n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-3, formatProperty: prop-1, processDts: false} (3): \n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}');
|
||||||
|
|
||||||
|
expect(processNextTask(queue)).toBe(tasks[0]); // Process #0.0.
|
||||||
|
expect(processNextTask(queue)).toBe(tasks[2]); // Process #1.0.
|
||||||
|
expect(queue.toString())
|
||||||
|
.toContain(
|
||||||
|
' Blocked tasks (2): \n' +
|
||||||
|
// Tasks #3.0 and #3.1 are blocked by #2.0.
|
||||||
|
' - {entryPoint: entry-point-3, formatProperty: prop-0, processDts: true} (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-3, formatProperty: prop-1, processDts: false} (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}');
|
||||||
|
|
||||||
|
expect(processNextTask(queue)).toBe(tasks[4]); // Process #2.0.
|
||||||
|
expect(queue.toString()).toContain(' Blocked tasks (0): ');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should display all info together', () => {
|
||||||
|
// An initially empty queue.
|
||||||
|
const {queue: queue1} = createQueue(0);
|
||||||
|
expect(queue1.toString())
|
||||||
|
.toBe(
|
||||||
|
'ParallelTaskQueue\n' +
|
||||||
|
' All tasks completed: true\n' +
|
||||||
|
' Unprocessed tasks (0): \n' +
|
||||||
|
' In-progress tasks (0): \n' +
|
||||||
|
' Blocked tasks (0): ');
|
||||||
|
|
||||||
|
// A queue with three tasks (and one interdependency).
|
||||||
|
const {tasks: tasks2, queue: queue2} = createQueue(3, 1, {2: [1]});
|
||||||
|
expect(queue2.toString())
|
||||||
|
.toBe(
|
||||||
|
'ParallelTaskQueue\n' +
|
||||||
|
' All tasks completed: false\n' +
|
||||||
|
' Unprocessed tasks (3): \n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' In-progress tasks (0): \n' +
|
||||||
|
' Blocked tasks (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true} (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true}');
|
||||||
|
|
||||||
|
// Start processing tasks #1 and #0 (#2 is still blocked on #1).
|
||||||
|
expect(queue2.getNextTask()).toBe(tasks2[1]);
|
||||||
|
expect(queue2.getNextTask()).toBe(tasks2[0]);
|
||||||
|
expect(queue2.toString())
|
||||||
|
.toBe(
|
||||||
|
'ParallelTaskQueue\n' +
|
||||||
|
' All tasks completed: false\n' +
|
||||||
|
' Unprocessed tasks (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' In-progress tasks (2): \n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' Blocked tasks (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true} (1): \n' +
|
||||||
|
' - {entryPoint: entry-point-1, formatProperty: prop-0, processDts: true}');
|
||||||
|
|
||||||
|
// Complete task #1 nd start processing #2 (which is not unblocked).
|
||||||
|
queue2.markTaskCompleted(tasks2[1]);
|
||||||
|
expect(queue2.getNextTask()).toBe(tasks2[2]);
|
||||||
|
expect(queue2.toString())
|
||||||
|
.toBe(
|
||||||
|
'ParallelTaskQueue\n' +
|
||||||
|
' All tasks completed: false\n' +
|
||||||
|
' Unprocessed tasks (0): \n' +
|
||||||
|
' In-progress tasks (2): \n' +
|
||||||
|
' - {entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' - {entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}\n' +
|
||||||
|
' Blocked tasks (0): ');
|
||||||
|
|
||||||
|
// Complete tasks #2 and #0. All tasks are now completed.
|
||||||
|
queue2.markTaskCompleted(tasks2[2]);
|
||||||
|
queue2.markTaskCompleted(tasks2[0]);
|
||||||
|
expect(queue2.toString())
|
||||||
|
.toBe(
|
||||||
|
'ParallelTaskQueue\n' +
|
||||||
|
' All tasks completed: true\n' +
|
||||||
|
' Unprocessed tasks (0): \n' +
|
||||||
|
' In-progress tasks (0): \n' +
|
||||||
|
' Blocked tasks (0): ');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
Loading…
x
Reference in New Issue
Block a user