Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions packages/nx/src/daemon/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import { preventRecursionInGraphConstruction } from '../../project-graph/project
import { ConfigurationSourceMaps } from '../../project-graph/utils/project-configuration/source-maps';
import { parseMessage } from '../../utils/consume-messages-from-socket';
import { DelayedSpinner } from '../../utils/delayed-spinner';
import {
isEmitLogMessage,
isUpdateProgressMessage,
} from '../message-types/streaming-messages';
import { handleImport } from '../../utils/handle-import';
import { isCI } from '../../utils/is-ci';
import { isSandbox } from '../../utils/is-sandbox';
Expand Down Expand Up @@ -159,6 +163,11 @@ export class DaemonClient {
private currentMessage;
private currentResolve;
private currentReject;
// Tracks the spinner owned by the in-flight request so streamed
// progress updates are routed to the caller's spinner instead of
// mutating the process-wide globalSpinner (which may belong to an
// unrelated command).
private currentSpinner: DelayedSpinner | null = null;

private _enabled: boolean | undefined;
private _daemonStatus: DaemonStatus = DaemonStatus.DISCONNECTED;
Expand Down Expand Up @@ -308,9 +317,10 @@ export class DaemonClient {
{ ciDelay: 60_000, delay: 30_000 }
);
try {
const response = await this.sendToDaemonViaQueue({
type: 'REQUEST_PROJECT_GRAPH',
});
const response = await this.sendToDaemonViaQueue(
{ type: 'REQUEST_PROJECT_GRAPH' },
{ spinner }
);
return {
projectGraph: response.projectGraph,
sourceMaps: response.sourceMaps,
Expand Down Expand Up @@ -760,10 +770,10 @@ export class DaemonClient {
type: 'PROCESS_IN_BACKGROUND',
requirePath,
data,
// This method is sometimes passed data that cannot be serialized with v8
// so we force JSON serialization here
},
'json'
// This method is sometimes passed data that cannot be serialized with v8
// so we force JSON serialization here
{ force: 'json' }
);
}

Expand Down Expand Up @@ -1032,11 +1042,20 @@ export class DaemonClient {

private async sendToDaemonViaQueue<T extends DaemonMessage>(
messageToDaemon: T,
force?: 'v8' | 'json'
options?: { force?: 'v8' | 'json'; spinner?: DelayedSpinner }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
options?: { force?: 'v8' | 'json'; spinner?: DelayedSpinner }
options?: { parser?: 'v8' | 'json'; spinner?: DelayedSpinner }

): Promise<any> {
return this.queue.sendToQueue(() =>
this.sendMessageToDaemon(messageToDaemon, force)
);
return this.queue.sendToQueue(async () => {
// Set currentSpinner inside the queued function so it's only
// active while this specific message is in flight — preventing
// concurrent callers from overwriting each other's spinner
// reference before their turn arrives.
if (options?.spinner) this.currentSpinner = options.spinner;
try {
return await this.sendMessageToDaemon(messageToDaemon, options?.force);
} finally {
if (options?.spinner) this.currentSpinner = null;
}
});
}

private setUpConnection() {
Expand Down Expand Up @@ -1259,6 +1278,19 @@ export class DaemonClient {
'result-parse-start-' + this.currentMessage.type,
'result-parse-end-' + this.currentMessage.type
);
// Streaming messages fire side-effects on the client but do not
// resolve the pending request promise — the daemon can push several
// of these before finally sending the real response. Progress
// updates route through the in-flight request's own spinner so
// we don't stomp on unrelated commands' spinner text.
if (isUpdateProgressMessage(parsedResult)) {
this.currentSpinner?.setMessage(parsedResult.message);
return;
}
if (isEmitLogMessage(parsedResult)) {
console[parsedResult.level](parsedResult.message);
return;
}
if (parsedResult.error) {
this.currentReject(parsedResult.error);
} else {
Expand Down
31 changes: 31 additions & 0 deletions packages/nx/src/daemon/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ import {
DAEMON_OUTPUT_LOG_FILE,
} from './tmp-dir';
import { nxVersion } from '../utils/versions';
import { ProgressTopic } from '../utils/progress-topics';
import { EMIT_LOG, EmitLogLevel } from './message-types/streaming-messages';
import {
assertOnDaemon,
getTopicSubscribers,
writeStreamingMessage,
} from './server/client-socket-context';

type LogSource = 'Server' | 'Client';

Expand Down Expand Up @@ -51,6 +58,30 @@ class DaemonLogger {
this.log(`[WATCHER]: ${s.join(' ')}`);
}

/**
* Broadcasts a log line to every client currently subscribed to the
* given topic. Useful for warnings raised inside daemon-executed code
* that we want the user to see in their terminal rather than lose to
* the daemon log file.
*
* Falls back to writing into the daemon log when no clients are
* subscribed to the topic.
*
* Must only be invoked from inside the Nx daemon process.
*/
logToClient(topic: ProgressTopic, level: EmitLogLevel, message: string) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should import a function from the client socket context... shouldn't have the implementation here

assertOnDaemon('DaemonLogger#logToClient');
const subscribers = getTopicSubscribers(topic);
if (!subscribers?.size) {
this.log(`[emit-log:${level}] ${message}`);
return;
}
const payload = { type: EMIT_LOG, level, message };
for (const socket of subscribers) {
writeStreamingMessage(socket, payload);
}
}

private writeToFile(message: string) {
try {
if (!existsSync(DAEMON_DIR_FOR_CURRENT_WORKSPACE)) {
Expand Down
36 changes: 36 additions & 0 deletions packages/nx/src/daemon/message-types/streaming-messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
export const UPDATE_PROGRESS_MESSAGE = 'UPDATE_PROGRESS_MESSAGE' as const;

export type UpdateProgressMessage = {
type: typeof UPDATE_PROGRESS_MESSAGE;
message: string;
};

export function isUpdateProgressMessage(
message: unknown
): message is UpdateProgressMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === UPDATE_PROGRESS_MESSAGE
);
}

export const EMIT_LOG = 'EMIT_LOG' as const;

export type EmitLogLevel = 'log' | 'warn' | 'error';

export type EmitLogMessage = {
type: typeof EMIT_LOG;
level: EmitLogLevel;
message: string;
};

export function isEmitLogMessage(message: unknown): message is EmitLogMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === EMIT_LOG
);
}
88 changes: 88 additions & 0 deletions packages/nx/src/daemon/server/client-socket-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import type { Socket } from 'net';
import { MESSAGE_END_SEQ } from '../../utils/consume-messages-from-socket';
import { ProgressTopic } from '../../utils/progress-topics';
import { UPDATE_PROGRESS_MESSAGE } from '../message-types/streaming-messages';
import { isOnDaemon } from '../is-on-daemon';
import { serialize } from '../socket-utils';

const topicSubscribers = new Map<ProgressTopic, Set<Socket>>();

export function subscribeClientToTopic(
socket: Socket,
topic: ProgressTopic
): void {
let subscribers = topicSubscribers.get(topic);
if (!subscribers) {
subscribers = new Set();
topicSubscribers.set(topic, subscribers);
}
subscribers.add(socket);
}

export function unsubscribeClientFromTopic(
socket: Socket,
topic: ProgressTopic
): void {
const subscribers = topicSubscribers.get(topic);
if (!subscribers) return;
subscribers.delete(socket);
if (subscribers.size === 0) topicSubscribers.delete(topic);
}

export function getTopicSubscribers(
topic: ProgressTopic
): ReadonlySet<Socket> | undefined {
return topicSubscribers.get(topic);
}

export function assertOnDaemon(helperName: string) {
if (!isOnDaemon()) {
throw new Error(
`${helperName} can only be called from the Nx daemon process.`
);
}
}

/**
* Writes a streaming message over the given socket using the daemon's
* configured serialization format and terminated with MESSAGE_END_SEQ.
* Errors are logged to the daemon's stdout (redirected to the daemon
* log) rather than propagated — a disconnected client shouldn't tear
* down the current request handler or other subscribers.
*/
export function writeStreamingMessage(socket: Socket, payload: unknown) {
try {
socket.write(serialize(payload) + MESSAGE_END_SEQ, (err) => {
if (err) {
console.log(
`Streaming message write error (client likely disconnected): ${err.message}`
);
}
});
} catch (e) {
console.log(
`Failed to send streaming message to client: ${
e instanceof Error ? e.message : String(e)
}`
);
}
Comment on lines +67 to +82
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at respondToClient from shutdown-utils and borrow some ideas like logging this into the server logs etc.

}

/**
* Broadcasts a progress message to every client currently subscribed to
* the given topic. No-op when there are no subscribers.
*
* Must only be invoked from inside the Nx daemon process.
*/
export function sendProgressMessageToTopic(
topic: ProgressTopic,
message: string
): void {
assertOnDaemon('sendProgressMessageToTopic');
const subscribers = topicSubscribers.get(topic);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use getTopicSubscribers

if (!subscribers?.size) return;
const payload = { type: UPDATE_PROGRESS_MESSAGE, message };
for (const socket of subscribers) {
writeStreamingMessage(socket, payload);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { Socket } from 'net';
import { performance } from 'perf_hooks';
import { serializeResult } from '../socket-utils';
import { serverLogger } from '../logger';
import { getCachedSerializedProjectGraphPromise } from './project-graph-incremental-recomputation';
import { HandlerResult } from './server';

export async function handleRequestProjectGraph(): Promise<HandlerResult> {
export async function handleRequestProjectGraph(
socket: Socket
): Promise<HandlerResult> {
try {
performance.mark('server-connection');
serverLogger.requestLog('Client Request for Project Graph Received');

const result = await getCachedSerializedProjectGraphPromise();
const result = await getCachedSerializedProjectGraphPromise(socket);
if (result.error) {
return {
description: `Error when preparing serialized project graph.`,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Socket } from 'net';
import { performance } from 'perf_hooks';
import { readNxJson } from '../../config/nx-json';
import {
Expand Down Expand Up @@ -38,6 +39,11 @@ import {
} from '../../utils/workspace-context';
import { workspaceRoot } from '../../utils/workspace-root';
import { serverLogger } from '../logger';
import { ProgressTopics } from '../../utils/progress-topics';
import {
subscribeClientToTopic,
unsubscribeClientFromTopic,
} from './client-socket-context';
import { notifyFileChangeListeners } from './file-watching/file-change-events';
import { notifyFileWatcherSockets } from './file-watching/file-watcher-sockets';
import { notifyProjectGraphListenerSockets } from './project-graph-listener-sockets';
Expand Down Expand Up @@ -84,7 +90,16 @@ let knownExternalNodes: Record<string, ProjectGraphExternalNode> = {};
let fileChangeCounter = 0;
let recomputationGeneration = 0;

export async function getCachedSerializedProjectGraphPromise(): Promise<SerializedProjectGraph> {
export async function getCachedSerializedProjectGraphPromise(
socket?: Socket
): Promise<SerializedProjectGraph> {
// Subscribe the requesting client to the graph-construction topic
// for the duration of the await, so in-flight progress/log messages
// — including those produced by a recomputation that was already
// started before this caller arrived — are broadcast to them.
if (socket) {
subscribeClientToTopic(socket, ProgressTopics.GraphConstruction);
}
try {
let wasScheduled = false;
// recomputing it now on demand. we can ignore the scheduled timeout
Expand Down Expand Up @@ -180,6 +195,10 @@ export async function getCachedSerializedProjectGraphPromise(): Promise<Serializ
allWorkspaceFiles: null,
rustReferences: null,
};
} finally {
if (socket) {
unsubscribeClientFromTopic(socket, ProgressTopics.GraphConstruction);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/nx/src/daemon/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ async function handleMessage(socket: Socket, data: string) {
await handleResult(
socket,
'REQUEST_PROJECT_GRAPH',
() => handleRequestProjectGraph(),
() => handleRequestProjectGraph(socket),
mode
);
} else if (payload.type === 'HASH_TASKS') {
Expand Down
7 changes: 5 additions & 2 deletions packages/nx/src/project-graph/build-project-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
import { mergeMetadata } from './utils/project-configuration/target-merging';
import type { ConfigurationSourceMaps } from './utils/project-configuration/source-maps';
import { DelayedSpinner } from '../utils/delayed-spinner';
import { ProgressTopics } from '../utils/progress-topics';
import { hashObject } from '../hasher/file-hasher';

let storedFileMap: FileMap | null = null;
Expand Down Expand Up @@ -346,7 +347,8 @@ async function updateProjectGraphWithPlugins(
}

spinner = new DelayedSpinner(
`Creating project graph dependencies with ${createDependencyPlugins.length} plugins`
`Creating project graph dependencies with ${createDependencyPlugins.length} plugins`,
{ progressTopic: ProgressTopics.GraphConstruction }
);

await Promise.all(
Expand Down Expand Up @@ -470,7 +472,8 @@ export async function applyProjectMetadata(
(plugin) => plugin.createMetadata
);
spinner = new DelayedSpinner(
`Creating project metadata with ${createMetadataPlugins.length} plugins`
`Creating project metadata with ${createMetadataPlugins.length} plugins`,
{ progressTopic: ProgressTopics.GraphConstruction }
);

const promises = createMetadataPlugins.map(async (plugin) => {
Expand Down
Loading
Loading