refactor(ngcc): create new entry-point for cluster workers (#36637)
PR Close #36637
This commit is contained in:

committed by
Matias Niemelä

parent
0075078179
commit
506beeddc1
@ -29,6 +29,7 @@ ts_library(
|
||||
"@npm//magic-string",
|
||||
"@npm//sourcemap-codec",
|
||||
"@npm//typescript",
|
||||
"@npm//yargs",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -8,47 +8,44 @@
|
||||
|
||||
/// <reference types="node" />
|
||||
|
||||
import {getFileSystem} from '@angular/compiler-cli/src/ngtsc/file_system';
|
||||
import * as cluster from 'cluster';
|
||||
|
||||
import {MockFileSystemNative} from '../../../../src/ngtsc/file_system/testing';
|
||||
import {MockFileSystemNative, runInEachFileSystem} from '../../../../src/ngtsc/file_system/testing';
|
||||
import {ClusterExecutor} from '../../../src/execution/cluster/executor';
|
||||
import {ClusterMaster} from '../../../src/execution/cluster/master';
|
||||
import {ClusterWorker} from '../../../src/execution/cluster/worker';
|
||||
import {AsyncLocker} from '../../../src/locking/async_locker';
|
||||
import {PackageJsonUpdater} from '../../../src/writing/package_json_updater';
|
||||
import {MockLockFile} from '../../helpers/mock_lock_file';
|
||||
import {MockLogger} from '../../helpers/mock_logger';
|
||||
import {mockProperty} from '../../helpers/spy_utils';
|
||||
|
||||
runInEachFileSystem(() => {
|
||||
describe('ClusterExecutor', () => {
|
||||
const runAsClusterMaster = mockProperty(cluster, 'isMaster');
|
||||
let masterRunSpy: jasmine.Spy;
|
||||
let mockLogger: MockLogger;
|
||||
let lockFileLog: string[];
|
||||
let mockLockFile: MockLockFile;
|
||||
let locker: AsyncLocker;
|
||||
let executor: ClusterExecutor;
|
||||
let createTaskCompletedCallback: jasmine.Spy;
|
||||
|
||||
describe('ClusterExecutor', () => {
|
||||
const runAsClusterMaster = mockProperty(cluster, 'isMaster');
|
||||
let masterRunSpy: jasmine.Spy;
|
||||
let workerRunSpy: jasmine.Spy;
|
||||
let mockLogger: MockLogger;
|
||||
let lockFileLog: string[];
|
||||
let mockLockFile: MockLockFile;
|
||||
let locker: AsyncLocker;
|
||||
let executor: ClusterExecutor;
|
||||
let createTaskCompletedCallback: jasmine.Spy;
|
||||
beforeEach(() => {
|
||||
masterRunSpy = spyOn(ClusterMaster.prototype, 'run')
|
||||
.and.returnValue(Promise.resolve('CusterMaster#run()' as any));
|
||||
createTaskCompletedCallback = jasmine.createSpy('createTaskCompletedCallback');
|
||||
|
||||
beforeEach(() => {
|
||||
masterRunSpy = spyOn(ClusterMaster.prototype, 'run')
|
||||
.and.returnValue(Promise.resolve('CusterMaster#run()' as any));
|
||||
workerRunSpy = spyOn(ClusterWorker.prototype, 'run')
|
||||
.and.returnValue(Promise.resolve('CusterWorker#run()' as any));
|
||||
createTaskCompletedCallback = jasmine.createSpy('createTaskCompletedCallback');
|
||||
mockLogger = new MockLogger();
|
||||
lockFileLog = [];
|
||||
mockLockFile = new MockLockFile(new MockFileSystemNative(), lockFileLog);
|
||||
locker = new AsyncLocker(mockLockFile, mockLogger, 200, 2);
|
||||
executor = new ClusterExecutor(
|
||||
42, getFileSystem(), mockLogger, null as unknown as PackageJsonUpdater, locker,
|
||||
createTaskCompletedCallback);
|
||||
});
|
||||
|
||||
mockLogger = new MockLogger();
|
||||
lockFileLog = [];
|
||||
mockLockFile = new MockLockFile(new MockFileSystemNative(), lockFileLog);
|
||||
locker = new AsyncLocker(mockLockFile, mockLogger, 200, 2);
|
||||
executor = new ClusterExecutor(
|
||||
42, mockLogger, null as unknown as PackageJsonUpdater, locker, createTaskCompletedCallback);
|
||||
});
|
||||
|
||||
describe('execute()', () => {
|
||||
describe('(on cluster master)', () => {
|
||||
describe('execute()', () => {
|
||||
beforeEach(() => runAsClusterMaster(true));
|
||||
|
||||
it('should log debug info about the executor', async () => {
|
||||
@ -68,7 +65,6 @@ describe('ClusterExecutor', () => {
|
||||
.toBe('CusterMaster#run()' as any);
|
||||
|
||||
expect(masterRunSpy).toHaveBeenCalledWith();
|
||||
expect(workerRunSpy).not.toHaveBeenCalled();
|
||||
|
||||
expect(analyzeEntryPointsSpy).toHaveBeenCalledWith();
|
||||
expect(createCompilerFnSpy).not.toHaveBeenCalled();
|
||||
@ -102,7 +98,7 @@ describe('ClusterExecutor', () => {
|
||||
});
|
||||
|
||||
executor = new ClusterExecutor(
|
||||
42, mockLogger, null as unknown as PackageJsonUpdater, locker,
|
||||
42, getFileSystem(), mockLogger, null as unknown as PackageJsonUpdater, locker,
|
||||
createTaskCompletedCallback);
|
||||
let error = '';
|
||||
try {
|
||||
@ -122,7 +118,7 @@ describe('ClusterExecutor', () => {
|
||||
});
|
||||
|
||||
executor = new ClusterExecutor(
|
||||
42, mockLogger, null as unknown as PackageJsonUpdater, locker,
|
||||
42, getFileSystem(), mockLogger, null as unknown as PackageJsonUpdater, locker,
|
||||
createTaskCompletedCallback);
|
||||
let error = '';
|
||||
try {
|
||||
@ -135,36 +131,5 @@ describe('ClusterExecutor', () => {
|
||||
expect(masterRunSpy).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('(on cluster worker)', () => {
|
||||
beforeEach(() => runAsClusterMaster(false));
|
||||
|
||||
it('should not log debug info about the executor', async () => {
|
||||
const anyFn: () => any = () => undefined;
|
||||
await executor.execute(anyFn, anyFn);
|
||||
|
||||
expect(mockLogger.logs.debug).toEqual([]);
|
||||
});
|
||||
|
||||
it('should delegate to `ClusterWorker#run()`', async () => {
|
||||
const analyzeEntryPointsSpy = jasmine.createSpy('analyzeEntryPoints');
|
||||
const createCompilerFnSpy = jasmine.createSpy('createCompilerFn');
|
||||
|
||||
expect(await executor.execute(analyzeEntryPointsSpy, createCompilerFnSpy))
|
||||
.toBe('CusterWorker#run()' as any);
|
||||
|
||||
expect(masterRunSpy).not.toHaveBeenCalledWith();
|
||||
expect(workerRunSpy).toHaveBeenCalled();
|
||||
|
||||
expect(analyzeEntryPointsSpy).not.toHaveBeenCalled();
|
||||
expect(createCompilerFnSpy).toHaveBeenCalledWith(jasmine.any(Function));
|
||||
});
|
||||
|
||||
it('should not call LockFile.write() or LockFile.remove()', async () => {
|
||||
const anyFn: () => any = () => undefined;
|
||||
await executor.execute(anyFn, anyFn);
|
||||
expect(lockFileLog).toEqual([]);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -11,13 +11,13 @@
|
||||
import * as cluster from 'cluster';
|
||||
import {EventEmitter} from 'events';
|
||||
|
||||
import {ClusterWorker} from '../../../src/execution/cluster/worker';
|
||||
import {startWorker} from '../../../src/execution/cluster/worker';
|
||||
import {Task, TaskCompletedCallback, TaskProcessingOutcome} from '../../../src/execution/tasks/api';
|
||||
import {MockLogger} from '../../helpers/mock_logger';
|
||||
import {mockProperty} from '../../helpers/spy_utils';
|
||||
|
||||
|
||||
describe('ClusterWorker', () => {
|
||||
describe('startWorker()', () => {
|
||||
const runAsClusterMaster = mockProperty(cluster, 'isMaster');
|
||||
const mockProcessSend = mockProperty(process, 'send');
|
||||
let processSendSpy: jasmine.Spy;
|
||||
@ -35,131 +35,116 @@ describe('ClusterWorker', () => {
|
||||
mockLogger = new MockLogger();
|
||||
});
|
||||
|
||||
describe('constructor()', () => {
|
||||
describe('(on cluster master)', () => {
|
||||
beforeEach(() => runAsClusterMaster(true));
|
||||
describe('(on cluster master)', () => {
|
||||
beforeEach(() => runAsClusterMaster(true));
|
||||
|
||||
it('should throw an error', () => {
|
||||
expect(() => new ClusterWorker(mockLogger, createCompileFnSpy))
|
||||
.toThrowError('Tried to instantiate `ClusterWorker` on the master process.');
|
||||
expect(createCompileFnSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('(on cluster worker)', () => {
|
||||
beforeEach(() => runAsClusterMaster(false));
|
||||
|
||||
it('should create the `compileFn()`', () => {
|
||||
new ClusterWorker(mockLogger, createCompileFnSpy);
|
||||
expect(createCompileFnSpy).toHaveBeenCalledWith(jasmine.any(Function));
|
||||
});
|
||||
|
||||
it('should set up `compileFn()` to send `task-completed` messages to master', () => {
|
||||
new ClusterWorker(mockLogger, createCompileFnSpy);
|
||||
const onTaskCompleted: TaskCompletedCallback = createCompileFnSpy.calls.argsFor(0)[0];
|
||||
|
||||
onTaskCompleted(null as any, TaskProcessingOutcome.Processed, null);
|
||||
expect(processSendSpy).toHaveBeenCalledTimes(1);
|
||||
expect(processSendSpy)
|
||||
.toHaveBeenCalledWith(
|
||||
{type: 'task-completed', outcome: TaskProcessingOutcome.Processed, message: null});
|
||||
|
||||
processSendSpy.calls.reset();
|
||||
|
||||
onTaskCompleted(null as any, TaskProcessingOutcome.Failed, 'error message');
|
||||
expect(processSendSpy).toHaveBeenCalledTimes(1);
|
||||
expect(processSendSpy).toHaveBeenCalledWith({
|
||||
type: 'task-completed',
|
||||
outcome: TaskProcessingOutcome.Failed,
|
||||
message: 'error message',
|
||||
});
|
||||
});
|
||||
it('should throw an error', async () => {
|
||||
await expectAsync(startWorker(mockLogger, createCompileFnSpy))
|
||||
.toBeRejectedWithError('Tried to run cluster worker on the master process.');
|
||||
expect(createCompileFnSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('run()', () => {
|
||||
describe(
|
||||
'(on cluster master)',
|
||||
() => {/* No tests needed, becasue the constructor would have thrown. */});
|
||||
describe('(on cluster worker)', () => {
|
||||
// The `cluster.worker` property is normally `undefined` on the master process and set to the
|
||||
// current `cluster.worker` on worker processes.
|
||||
const mockClusterWorker = mockProperty(cluster, 'worker');
|
||||
|
||||
describe('(on cluster worker)', () => {
|
||||
// The `cluster.worker` property is normally `undefined` on the master process and set to the
|
||||
// current `cluster.Worker` on worker processes.
|
||||
const mockClusterWorker = mockProperty(cluster, 'worker');
|
||||
let worker: ClusterWorker;
|
||||
beforeEach(() => {
|
||||
runAsClusterMaster(false);
|
||||
mockClusterWorker(Object.assign(new EventEmitter(), {id: 42}) as cluster.Worker);
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
runAsClusterMaster(false);
|
||||
mockClusterWorker(Object.assign(new EventEmitter(), {id: 42}) as cluster.Worker);
|
||||
it('should create the `compileFn()`', () => {
|
||||
startWorker(mockLogger, createCompileFnSpy);
|
||||
expect(createCompileFnSpy).toHaveBeenCalledWith(jasmine.any(Function));
|
||||
});
|
||||
|
||||
worker = new ClusterWorker(mockLogger, createCompileFnSpy);
|
||||
it('should set up `compileFn()` to send `task-completed` messages to master', () => {
|
||||
startWorker(mockLogger, createCompileFnSpy);
|
||||
const onTaskCompleted: TaskCompletedCallback = createCompileFnSpy.calls.argsFor(0)[0];
|
||||
|
||||
onTaskCompleted(null as any, TaskProcessingOutcome.Processed, null);
|
||||
expect(processSendSpy).toHaveBeenCalledTimes(1);
|
||||
expect(processSendSpy)
|
||||
.toHaveBeenCalledWith(
|
||||
{type: 'task-completed', outcome: TaskProcessingOutcome.Processed, message: null});
|
||||
|
||||
processSendSpy.calls.reset();
|
||||
|
||||
onTaskCompleted(null as any, TaskProcessingOutcome.Failed, 'error message');
|
||||
expect(processSendSpy).toHaveBeenCalledTimes(1);
|
||||
expect(processSendSpy).toHaveBeenCalledWith({
|
||||
type: 'task-completed',
|
||||
outcome: TaskProcessingOutcome.Failed,
|
||||
message: 'error message',
|
||||
});
|
||||
});
|
||||
|
||||
it('should return a promise (that is never resolved)', done => {
|
||||
const promise = startWorker(mockLogger, createCompileFnSpy);
|
||||
|
||||
expect(promise).toEqual(jasmine.any(Promise));
|
||||
|
||||
promise.then(
|
||||
() => done.fail('Expected promise not to resolve'),
|
||||
() => done.fail('Expected promise not to reject'));
|
||||
|
||||
// We can't wait forever to verify that the promise is not resolved, but at least verify
|
||||
// that it is not resolved immediately.
|
||||
setTimeout(done, 100);
|
||||
});
|
||||
|
||||
it('should handle `process-task` messages', () => {
|
||||
const mockTask = {
|
||||
entryPoint: {name: 'foo'},
|
||||
formatProperty: 'es2015',
|
||||
processDts: true,
|
||||
} as unknown as Task;
|
||||
|
||||
startWorker(mockLogger, createCompileFnSpy);
|
||||
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
||||
|
||||
expect(compileFnSpy).toHaveBeenCalledWith(mockTask);
|
||||
expect(processSendSpy).not.toHaveBeenCalled();
|
||||
|
||||
expect(mockLogger.logs.debug[0]).toEqual([
|
||||
'[Worker #42] Processing task: {entryPoint: foo, formatProperty: es2015, processDts: true}',
|
||||
]);
|
||||
});
|
||||
|
||||
it('should send errors during task processing back to the master process', () => {
|
||||
const mockTask = {
|
||||
entryPoint: {name: 'foo'},
|
||||
formatProperty: 'es2015',
|
||||
processDts: true,
|
||||
} as unknown as Task;
|
||||
|
||||
let err: string|Error;
|
||||
compileFnSpy.and.callFake(() => {
|
||||
throw err;
|
||||
});
|
||||
|
||||
it('should return a promise (that is never resolved)', done => {
|
||||
const promise = worker.run();
|
||||
startWorker(mockLogger, createCompileFnSpy);
|
||||
|
||||
expect(promise).toEqual(jasmine.any(Promise));
|
||||
err = 'Error string.';
|
||||
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
||||
expect(processSendSpy).toHaveBeenCalledWith({type: 'error', error: err});
|
||||
|
||||
promise.then(
|
||||
() => done.fail('Expected promise not to resolve'),
|
||||
() => done.fail('Expected promise not to reject'));
|
||||
err = new Error('Error object.');
|
||||
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
||||
expect(processSendSpy).toHaveBeenCalledWith({type: 'error', error: err.stack});
|
||||
});
|
||||
|
||||
// We can't wait forever to verify that the promise is not resolved, but at least verify
|
||||
// that it is not resolved immediately.
|
||||
setTimeout(done, 100);
|
||||
});
|
||||
it('should throw, when an unknown message type is received', () => {
|
||||
startWorker(mockLogger, createCompileFnSpy);
|
||||
cluster.worker.emit('message', {type: 'unknown', foo: 'bar'});
|
||||
|
||||
it('should handle `process-task` messages', () => {
|
||||
const mockTask = {
|
||||
entryPoint: {name: 'foo'},
|
||||
formatProperty: 'es2015',
|
||||
processDts: true,
|
||||
} as unknown as Task;
|
||||
|
||||
worker.run();
|
||||
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
||||
|
||||
expect(compileFnSpy).toHaveBeenCalledWith(mockTask);
|
||||
expect(processSendSpy).not.toHaveBeenCalled();
|
||||
|
||||
expect(mockLogger.logs.debug[0]).toEqual([
|
||||
'[Worker #42] Processing task: {entryPoint: foo, formatProperty: es2015, processDts: true}',
|
||||
]);
|
||||
});
|
||||
|
||||
it('should send errors during task processing back to the master process', () => {
|
||||
const mockTask = {
|
||||
entryPoint: {name: 'foo'},
|
||||
formatProperty: 'es2015',
|
||||
processDts: true,
|
||||
} as unknown as Task;
|
||||
|
||||
let err: string|Error;
|
||||
compileFnSpy.and.callFake(() => {
|
||||
throw err;
|
||||
});
|
||||
|
||||
worker.run();
|
||||
|
||||
err = 'Error string.';
|
||||
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
||||
expect(processSendSpy).toHaveBeenCalledWith({type: 'error', error: err});
|
||||
|
||||
err = new Error('Error object.');
|
||||
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
||||
expect(processSendSpy).toHaveBeenCalledWith({type: 'error', error: err.stack});
|
||||
});
|
||||
|
||||
it('should throw, when an unknown message type is received', () => {
|
||||
worker.run();
|
||||
cluster.worker.emit('message', {type: 'unknown', foo: 'bar'});
|
||||
|
||||
expect(compileFnSpy).not.toHaveBeenCalled();
|
||||
expect(processSendSpy).toHaveBeenCalledWith({
|
||||
type: 'error',
|
||||
error: jasmine.stringMatching(
|
||||
'Error: \\[Worker #42\\] Invalid message received: {"type":"unknown","foo":"bar"}'),
|
||||
});
|
||||
expect(compileFnSpy).not.toHaveBeenCalled();
|
||||
expect(processSendSpy).toHaveBeenCalledWith({
|
||||
type: 'error',
|
||||
error: jasmine.stringMatching(
|
||||
'Error: \\[Worker #42\\] Invalid message received: {"type":"unknown","foo":"bar"}'),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
Reference in New Issue
Block a user