diff --git a/packages/compiler-cli/ngcc/src/execution/cluster/api.ts b/packages/compiler-cli/ngcc/src/execution/cluster/api.ts index 2325d52670..59d28ceabb 100644 --- a/packages/compiler-cli/ngcc/src/execution/cluster/api.ts +++ b/packages/compiler-cli/ngcc/src/execution/cluster/api.ts @@ -36,6 +36,12 @@ export interface TaskCompletedMessage extends JsonObject { message: string|null; } +/** A message listing the paths to transformed files about to be written to disk. */ +export interface TransformedFilesMessage extends JsonObject { + type: 'transformed-files'; + files: AbsoluteFsPath[]; +} + /** A message requesting the update of a `package.json` file. */ export interface UpdatePackageJsonMessage extends JsonObject { type: 'update-package-json'; @@ -44,7 +50,8 @@ export interface UpdatePackageJsonMessage extends JsonObject { } /** The type of messages sent from cluster workers to the cluster master. */ -export type MessageFromWorker = ErrorMessage|TaskCompletedMessage|UpdatePackageJsonMessage; +export type MessageFromWorker = + ErrorMessage|TaskCompletedMessage|TransformedFilesMessage|UpdatePackageJsonMessage; /** The type of messages sent from the cluster master to cluster workers. */ export type MessageToWorker = ProcessTaskMessage; diff --git a/packages/compiler-cli/ngcc/src/execution/cluster/master.ts b/packages/compiler-cli/ngcc/src/execution/cluster/master.ts index c4ad3fcb6a..f32da5a723 100644 --- a/packages/compiler-cli/ngcc/src/execution/cluster/master.ts +++ b/packages/compiler-cli/ngcc/src/execution/cluster/master.ts @@ -17,7 +17,7 @@ import {AnalyzeEntryPointsFn} from '../api'; import {CreateTaskCompletedCallback, Task, TaskCompletedCallback, TaskQueue} from '../tasks/api'; import {stringifyTask} from '../tasks/utils'; -import {MessageFromWorker, TaskCompletedMessage, UpdatePackageJsonMessage} from './api'; +import {MessageFromWorker, TaskCompletedMessage, TransformedFilesMessage, UpdatePackageJsonMessage} from './api'; import {Deferred, sendMessageToWorker} from './utils'; @@ -180,6 +180,8 @@ export class ClusterMaster { throw new Error(`Error on worker #${workerId}: ${msg.error}`); case 'task-completed': return this.onWorkerTaskCompleted(workerId, msg); + case 'transformed-files': + return this.onWorkerTransformedFiles(workerId, msg); case 'update-package-json': return this.onWorkerUpdatePackageJson(workerId, msg); default: @@ -220,6 +222,11 @@ export class ClusterMaster { this.maybeDistributeWork(); } + /** Handle a worker's message regarding the files transformed while processing its task. */ + private onWorkerTransformedFiles(workerId: number, msg: TransformedFilesMessage): void { + // TODO: Do something useful with this info. + } + /** Handle a worker's request to update a `package.json` file. */ private onWorkerUpdatePackageJson(workerId: number, msg: UpdatePackageJsonMessage): void { const task = this.taskAssignments.get(workerId) || null; diff --git a/packages/compiler-cli/ngcc/src/execution/cluster/utils.ts b/packages/compiler-cli/ngcc/src/execution/cluster/utils.ts index b9ca051299..200cdd8d65 100644 --- a/packages/compiler-cli/ngcc/src/execution/cluster/utils.ts +++ b/packages/compiler-cli/ngcc/src/execution/cluster/utils.ts @@ -44,18 +44,21 @@ export class Deferred { * (This function should be invoked from cluster workers only.) * * @param msg The message to send to the cluster master. + * @return A promise that is resolved once the message has been sent. */ -export const sendMessageToMaster = (msg: MessageFromWorker): void => { +export const sendMessageToMaster = (msg: MessageFromWorker): Promise => { if (cluster.isMaster) { throw new Error('Unable to send message to the master process: Already on the master process.'); } - if (process.send === undefined) { - // Theoretically, this should never happen on a worker process. - throw new Error('Unable to send message to the master process: Missing `process.send()`.'); - } + return new Promise((resolve, reject) => { + if (process.send === undefined) { + // Theoretically, this should never happen on a worker process. + throw new Error('Unable to send message to the master process: Missing `process.send()`.'); + } - process.send(msg); + process.send(msg, (err: Error|null) => (err === null) ? resolve() : reject(err)); + }); }; /** @@ -64,8 +67,9 @@ export const sendMessageToMaster = (msg: MessageFromWorker): void => { * * @param workerId The ID of the recipient worker. * @param msg The message to send to the worker. + * @return A promise that is resolved once the message has been sent. */ -export const sendMessageToWorker = (workerId: number, msg: MessageToWorker): void => { +export const sendMessageToWorker = (workerId: number, msg: MessageToWorker): Promise => { if (!cluster.isMaster) { throw new Error('Unable to send message to worker process: Sender is not the master process.'); } @@ -77,5 +81,7 @@ export const sendMessageToWorker = (workerId: number, msg: MessageToWorker): voi 'Unable to send message to worker process: Recipient does not exist or has disconnected.'); } - worker.send(msg); + return new Promise((resolve, reject) => { + worker.send(msg, (err: Error|null) => (err === null) ? resolve() : reject(err)); + }); }; diff --git a/packages/compiler-cli/ngcc/src/execution/cluster/worker.ts b/packages/compiler-cli/ngcc/src/execution/cluster/worker.ts index e825a1b69f..181cd2c9c0 100644 --- a/packages/compiler-cli/ngcc/src/execution/cluster/worker.ts +++ b/packages/compiler-cli/ngcc/src/execution/cluster/worker.ts @@ -62,7 +62,10 @@ export async function startWorker(logger: Logger, createCompileFn: CreateCompile } const compile = createCompileFn( - () => {}, + transformedFiles => sendMessageToMaster({ + type: 'transformed-files', + files: transformedFiles.map(f => f.path), + }), (_task, outcome, message) => sendMessageToMaster({type: 'task-completed', outcome, message})); @@ -79,7 +82,7 @@ export async function startWorker(logger: Logger, createCompileFn: CreateCompile `[Worker #${cluster.worker.id}] Invalid message received: ${JSON.stringify(msg)}`); } } catch (err) { - sendMessageToMaster({ + await sendMessageToMaster({ type: 'error', error: (err instanceof Error) ? (err.stack || err.message) : err, }); @@ -88,4 +91,4 @@ export async function startWorker(logger: Logger, createCompileFn: CreateCompile // Return a promise that is never resolved. return new Promise(() => undefined); -} \ No newline at end of file +} diff --git a/packages/compiler-cli/ngcc/test/execution/cluster/package_json_updater_spec.ts b/packages/compiler-cli/ngcc/test/execution/cluster/package_json_updater_spec.ts index fe68783bc0..902d8d06c1 100644 --- a/packages/compiler-cli/ngcc/test/execution/cluster/package_json_updater_spec.ts +++ b/packages/compiler-cli/ngcc/test/execution/cluster/package_json_updater_spec.ts @@ -87,25 +87,34 @@ runInEachFileSystem(() => { .writeChanges(jsonPath, parsed); writeToProp(['foo']); - expect(processSendSpy).toHaveBeenCalledWith({ - type: 'update-package-json', - packageJsonPath: jsonPath, - changes: [[['foo'], 'updated', 'unimportant']], - }); + expect(processSendSpy) + .toHaveBeenCalledWith( + { + type: 'update-package-json', + packageJsonPath: jsonPath, + changes: [[['foo'], 'updated', 'unimportant']], + }, + jasmine.any(Function)); writeToProp(['bar'], {before: 'foo'}); - expect(processSendSpy).toHaveBeenCalledWith({ - type: 'update-package-json', - packageJsonPath: jsonPath, - changes: [[['bar'], 'updated', {before: 'foo'}]], - }); + expect(processSendSpy) + .toHaveBeenCalledWith( + { + type: 'update-package-json', + packageJsonPath: jsonPath, + changes: [[['bar'], 'updated', {before: 'foo'}]], + }, + jasmine.any(Function)); writeToProp(['bar', 'baz', 'qux'], 'alphabetic', {}); - expect(processSendSpy).toHaveBeenCalledWith({ - type: 'update-package-json', - packageJsonPath: jsonPath, - changes: [[['bar', 'baz', 'qux'], 'updated', 'alphabetic']], - }); + expect(processSendSpy) + .toHaveBeenCalledWith( + { + type: 'update-package-json', + packageJsonPath: jsonPath, + changes: [[['bar', 'baz', 'qux'], 'updated', 'alphabetic']], + }, + jasmine.any(Function)); }); it('should update an in-memory representation (if provided)', () => { diff --git a/packages/compiler-cli/ngcc/test/execution/cluster/worker_spec.ts b/packages/compiler-cli/ngcc/test/execution/cluster/worker_spec.ts index c0887a6fcc..e02af404fd 100644 --- a/packages/compiler-cli/ngcc/test/execution/cluster/worker_spec.ts +++ b/packages/compiler-cli/ngcc/test/execution/cluster/worker_spec.ts @@ -11,8 +11,11 @@ import * as cluster from 'cluster'; import {EventEmitter} from 'events'; +import {AbsoluteFsPath} from '../../../../src/ngtsc/file_system'; +import {CreateCompileFn} from '../../../src/execution/api'; import {startWorker} from '../../../src/execution/cluster/worker'; import {Task, TaskCompletedCallback, TaskProcessingOutcome} from '../../../src/execution/tasks/api'; +import {FileToWrite} from '../../../src/rendering/utils'; import {MockLogger} from '../../helpers/mock_logger'; import {mockProperty} from '../../helpers/spy_utils'; @@ -60,6 +63,24 @@ describe('startWorker()', () => { expect(createCompileFnSpy).toHaveBeenCalledWith(jasmine.any(Function), jasmine.any(Function)); }); + it('should set up `compileFn()` to send `transformed-files` messages to master', () => { + startWorker(mockLogger, createCompileFnSpy); + + const mockTransformedFiles: FileToWrite[] = [ + {path: '/foo' as AbsoluteFsPath, contents: 'FOO'}, + {path: '/bar' as AbsoluteFsPath, contents: 'BAR'}, + ]; + const beforeWritingFiles: Parameters[0] = + createCompileFnSpy.calls.argsFor(0)[0]; + + beforeWritingFiles(mockTransformedFiles); + + expect(processSendSpy).toHaveBeenCalledTimes(1); + expect(processSendSpy) + .toHaveBeenCalledWith( + {type: 'transformed-files', files: ['/foo', '/bar']}, jasmine.any(Function)); + }); + it('should set up `compileFn()` to send `task-completed` messages to master', () => { startWorker(mockLogger, createCompileFnSpy); const onTaskCompleted: TaskCompletedCallback = createCompileFnSpy.calls.argsFor(0)[1]; @@ -68,17 +89,21 @@ describe('startWorker()', () => { expect(processSendSpy).toHaveBeenCalledTimes(1); expect(processSendSpy) .toHaveBeenCalledWith( - {type: 'task-completed', outcome: TaskProcessingOutcome.Processed, message: null}); + {type: 'task-completed', outcome: TaskProcessingOutcome.Processed, message: null}, + jasmine.any(Function)); processSendSpy.calls.reset(); onTaskCompleted(null as any, TaskProcessingOutcome.Failed, 'error message'); expect(processSendSpy).toHaveBeenCalledTimes(1); - expect(processSendSpy).toHaveBeenCalledWith({ - type: 'task-completed', - outcome: TaskProcessingOutcome.Failed, - message: 'error message', - }); + expect(processSendSpy) + .toHaveBeenCalledWith( + { + type: 'task-completed', + outcome: TaskProcessingOutcome.Failed, + message: 'error message', + }, + jasmine.any(Function)); }); it('should return a promise (that is never resolved)', done => { @@ -129,11 +154,13 @@ describe('startWorker()', () => { err = 'Error string.'; cluster.worker.emit('message', {type: 'process-task', task: mockTask}); - expect(processSendSpy).toHaveBeenCalledWith({type: 'error', error: err}); + expect(processSendSpy) + .toHaveBeenCalledWith({type: 'error', error: err}, jasmine.any(Function)); err = new Error('Error object.'); cluster.worker.emit('message', {type: 'process-task', task: mockTask}); - expect(processSendSpy).toHaveBeenCalledWith({type: 'error', error: err.stack}); + expect(processSendSpy) + .toHaveBeenCalledWith({type: 'error', error: err.stack}, jasmine.any(Function)); }); it('should throw, when an unknown message type is received', () => { @@ -141,11 +168,14 @@ describe('startWorker()', () => { cluster.worker.emit('message', {type: 'unknown', foo: 'bar'}); expect(compileFnSpy).not.toHaveBeenCalled(); - expect(processSendSpy).toHaveBeenCalledWith({ - type: 'error', - error: jasmine.stringMatching( - 'Error: \\[Worker #42\\] Invalid message received: {"type":"unknown","foo":"bar"}'), - }); + expect(processSendSpy) + .toHaveBeenCalledWith( + { + type: 'error', + error: jasmine.stringMatching( + 'Error: \\[Worker #42\\] Invalid message received: {"type":"unknown","foo":"bar"}'), + }, + jasmine.any(Function)); }); }); });