Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions packages/nx/src/daemon/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ import {
POST_TASKS_EXECUTION,
PRE_TASKS_EXECUTION,
} from '../message-types/run-tasks-execution-hooks';
import {
isEmitLogMessage,
isUpdateProgressMessage,
} from '../message-types/streaming-messages';
import {
GET_ESTIMATED_TASK_TIMINGS,
GET_FLAKY_TASKS,
Expand Down Expand Up @@ -159,6 +163,11 @@ export class DaemonClient {
private currentMessage;
private currentResolve;
private currentReject;
// Tracks the spinner owned by the in-flight request so streamed
// progress updates are routed to the caller's spinner instead of
// mutating the process-wide globalSpinner (which may belong to an
// unrelated command).
private currentSpinner: DelayedSpinner | null = null;

private _enabled: boolean | undefined;
private _daemonStatus: DaemonStatus = DaemonStatus.DISCONNECTED;
Expand Down Expand Up @@ -307,6 +316,7 @@ export class DaemonClient {
'Calculating the project graph on the Nx Daemon is taking longer than expected. Re-run with NX_DAEMON=false to see more details.',
{ ciDelay: 60_000, delay: 30_000 }
);
this.currentSpinner = spinner;
try {
const response = await this.sendToDaemonViaQueue({
type: 'REQUEST_PROJECT_GRAPH',
Expand All @@ -323,6 +333,7 @@ export class DaemonClient {
}
} finally {
spinner?.cleanup();
this.currentSpinner = null;
}
}

Expand Down Expand Up @@ -760,9 +771,9 @@ export class DaemonClient {
type: 'PROCESS_IN_BACKGROUND',
requirePath,
data,
// This method is sometimes passed data that cannot be serialized with v8
// so we force JSON serialization here
},
// This method is sometimes passed data that cannot be serialized with v8
// so we force JSON serialization here
'json'
);
}
Expand Down Expand Up @@ -1032,11 +1043,15 @@ export class DaemonClient {

private async sendToDaemonViaQueue<T extends DaemonMessage>(
messageToDaemon: T,
force?: 'v8' | 'json'
parser?: 'v8' | 'json'
): Promise<any> {
return this.queue.sendToQueue(() =>
this.sendMessageToDaemon(messageToDaemon, force)
);
return this.queue.sendToQueue(async () => {
// Set currentSpinner inside the queued function so it's only
// active while this specific message is in flight — preventing
// concurrent callers from overwriting each other's spinner
// reference before their turn arrives.
return await this.sendMessageToDaemon(messageToDaemon, parser);
});
}

private setUpConnection() {
Expand Down Expand Up @@ -1259,6 +1274,19 @@ export class DaemonClient {
'result-parse-start-' + this.currentMessage.type,
'result-parse-end-' + this.currentMessage.type
);
// Streaming messages fire side-effects on the client but do not
// resolve the pending request promise — the daemon can push several
// of these before finally sending the real response. Progress
// updates route through the in-flight request's own spinner so
// we don't stomp on unrelated commands' spinner text.
if (isUpdateProgressMessage(parsedResult)) {
this.currentSpinner?.setMessage(parsedResult.message);
return;
}
if (isEmitLogMessage(parsedResult)) {
console[parsedResult.level](parsedResult.message);
return;
}
if (parsedResult.error) {
this.currentReject(parsedResult.error);
} else {
Expand Down
24 changes: 23 additions & 1 deletion packages/nx/src/daemon/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
*/

import { appendFileSync, existsSync, mkdirSync } from 'fs';
import { ProgressTopic } from '../utils/progress-topics';
import { nxVersion } from '../utils/versions';
import { EmitLogLevel } from './message-types/streaming-messages';
import { sendEmitLogMessageToTopic } from './server/client-socket-context';
import {
DAEMON_DIR_FOR_CURRENT_WORKSPACE,
DAEMON_OUTPUT_LOG_FILE,
} from './tmp-dir';
import { nxVersion } from '../utils/versions';

type LogSource = 'Server' | 'Client';

Expand Down Expand Up @@ -51,6 +54,25 @@ class DaemonLogger {
this.log(`[WATCHER]: ${s.join(' ')}`);
}

/**
* Broadcasts a log line to every client currently subscribed to the
* given topic. Useful for warnings raised inside daemon-executed code
* that we want the user to see in their terminal rather than lose to
* the daemon log file.
*
* Falls back to writing into the daemon log when no clients are
* subscribed to the topic.
*
* Must only be invoked from inside the Nx daemon process.
*/
logToClient(
topic: ProgressTopic,
message: string,
level: EmitLogLevel = 'log'
) {
sendEmitLogMessageToTopic(topic, message, level);
}

private writeToFile(message: string) {
try {
if (!existsSync(DAEMON_DIR_FOR_CURRENT_WORKSPACE)) {
Expand Down
36 changes: 36 additions & 0 deletions packages/nx/src/daemon/message-types/streaming-messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
export const UPDATE_PROGRESS_MESSAGE = 'UPDATE_PROGRESS_MESSAGE' as const;

export type UpdateProgressMessage = {
type: typeof UPDATE_PROGRESS_MESSAGE;
message: string;
};

export function isUpdateProgressMessage(
message: unknown
): message is UpdateProgressMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === UPDATE_PROGRESS_MESSAGE
);
}

export const EMIT_LOG = 'EMIT_LOG' as const;

export type EmitLogLevel = 'log' | 'warn' | 'error';

export type EmitLogMessage = {
type: typeof EMIT_LOG;
level: EmitLogLevel;
message: string;
};

export function isEmitLogMessage(message: unknown): message is EmitLogMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === EMIT_LOG
);
}
120 changes: 120 additions & 0 deletions packages/nx/src/daemon/server/client-socket-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import type { Socket } from 'net';
import { MESSAGE_END_SEQ } from '../../utils/consume-messages-from-socket';
import { ProgressTopic } from '../../utils/progress-topics';
import { isOnDaemon } from '../is-on-daemon';
import { serverLogger } from '../logger';
import {
EMIT_LOG,
EmitLogLevel,
EmitLogMessage,
UPDATE_PROGRESS_MESSAGE,
} from '../message-types/streaming-messages';
import { serialize } from '../socket-utils';

const topicSubscribers = new Map<ProgressTopic, Set<Socket>>();

export function subscribeClientToTopic(
socket: Socket,
topic: ProgressTopic
): void {
let subscribers = getTopicSubscribers(topic);
if (!subscribers) {
subscribers = new Set();
topicSubscribers.set(topic, subscribers);
}
subscribers.add(socket);
}

export function unsubscribeClientFromTopic(
socket: Socket,
topic: ProgressTopic
): void {
const subscribers = getTopicSubscribers(topic);
if (!subscribers) return;
subscribers.delete(socket);
}

export function getTopicSubscribers(topic: ProgressTopic): Set<Socket> {
const subscribers = topicSubscribers.get(topic);
if (!subscribers) {
const set = new Set<Socket>();
topicSubscribers.set(topic, set);
return set;
}
return subscribers;
}

export function assertOnDaemon(helperName: string) {
if (!isOnDaemon()) {
throw new Error(
`${helperName} can only be called from the Nx daemon process.`
);
}
}

/**
* Writes a streaming message over the given socket using the daemon's
* configured serialization format and terminated with MESSAGE_END_SEQ.
* Errors are logged to the daemon's stdout (redirected to the daemon
* log) rather than propagated — a disconnected client shouldn't tear
* down the current request handler or other subscribers.
*/
export function writeStreamingMessage(
socket: Socket,
payload: unknown,
description: string
) {
try {
serverLogger.log('Streaming message to client:', description);
socket.write(serialize(payload) + MESSAGE_END_SEQ, (err) => {
if (err) {
console.log(
`Streaming message write error (client likely disconnected): ${err.message}`
);
}
});
} catch (e) {
console.log(
`Failed to send streaming message to client: ${
e instanceof Error ? e.message : String(e)
}`
);
}
Comment on lines +67 to +82
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at respondToClient from shutdown-utils and borrow some ideas like logging this into the server logs etc.

}

/**
* Broadcasts a progress message to every client currently subscribed to
* the given topic. No-op when there are no subscribers.
*
* Must only be invoked from inside the Nx daemon process.
*/
export function sendProgressMessageToTopic(
topic: ProgressTopic,
message: string
): void {
assertOnDaemon('sendProgressMessageToTopic');
const subscribers = getTopicSubscribers(topic);
if (!subscribers?.size) return;
const payload = { type: UPDATE_PROGRESS_MESSAGE, message };
for (const socket of subscribers) {
writeStreamingMessage(
socket,
payload,
'progress update for topic ' + topic
);
}
}

export function sendEmitLogMessageToTopic(
topic: ProgressTopic,
message: string,
level: EmitLogLevel
): void {
assertOnDaemon('sendEmitLogMessageToTopic');
const subscribers = getTopicSubscribers(topic);
if (!subscribers?.size) return;
const payload: EmitLogMessage = { type: EMIT_LOG, message, level };
for (const socket of subscribers) {
writeStreamingMessage(socket, payload, 'emit log message to ' + topic);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { Socket } from 'net';
import { performance } from 'perf_hooks';
import { serializeResult } from '../socket-utils';
import { serverLogger } from '../logger';
import { getCachedSerializedProjectGraphPromise } from './project-graph-incremental-recomputation';
import { HandlerResult } from './server';

export async function handleRequestProjectGraph(): Promise<HandlerResult> {
export async function handleRequestProjectGraph(
socket: Socket
): Promise<HandlerResult> {
try {
performance.mark('server-connection');
serverLogger.requestLog('Client Request for Project Graph Received');

const result = await getCachedSerializedProjectGraphPromise();
const result = await getCachedSerializedProjectGraphPromise(socket);
if (result.error) {
return {
description: `Error when preparing serialized project graph.`,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Socket } from 'net';
import { performance } from 'perf_hooks';
import { readNxJson } from '../../config/nx-json';
import {
Expand Down Expand Up @@ -38,6 +39,11 @@ import {
} from '../../utils/workspace-context';
import { workspaceRoot } from '../../utils/workspace-root';
import { serverLogger } from '../logger';
import { ProgressTopics } from '../../utils/progress-topics';
import {
subscribeClientToTopic,
unsubscribeClientFromTopic,
} from './client-socket-context';
import { notifyFileChangeListeners } from './file-watching/file-change-events';
import { notifyFileWatcherSockets } from './file-watching/file-watcher-sockets';
import { notifyProjectGraphListenerSockets } from './project-graph-listener-sockets';
Expand Down Expand Up @@ -84,7 +90,16 @@ let knownExternalNodes: Record<string, ProjectGraphExternalNode> = {};
let fileChangeCounter = 0;
let recomputationGeneration = 0;

export async function getCachedSerializedProjectGraphPromise(): Promise<SerializedProjectGraph> {
export async function getCachedSerializedProjectGraphPromise(
socket?: Socket
): Promise<SerializedProjectGraph> {
// Subscribe the requesting client to the graph-construction topic
// for the duration of the await, so in-flight progress/log messages
// — including those produced by a recomputation that was already
// started before this caller arrived — are broadcast to them.
if (socket) {
subscribeClientToTopic(socket, ProgressTopics.GraphConstruction);
}
try {
let wasScheduled = false;
// recomputing it now on demand. we can ignore the scheduled timeout
Expand Down Expand Up @@ -180,6 +195,10 @@ export async function getCachedSerializedProjectGraphPromise(): Promise<Serializ
allWorkspaceFiles: null,
rustReferences: null,
};
} finally {
if (socket) {
unsubscribeClientFromTopic(socket, ProgressTopics.GraphConstruction);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/nx/src/daemon/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ async function handleMessage(socket: Socket, data: string) {
await handleResult(
socket,
'REQUEST_PROJECT_GRAPH',
() => handleRequestProjectGraph(),
() => handleRequestProjectGraph(socket),
mode
);
} else if (payload.type === 'HASH_TASKS') {
Expand Down
Loading
Loading