Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/attach.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export const attachSockets = async <NS extends Namespaces>({
const emitCfg: EmitterConfig<NSEmissions> = { emission, timeout };
const nsCtx: IndependentContext<NSEmissions, NSMeta> = {
logger: rootLogger,
withRooms: makeRoomService({ subject: io, metadata, ...emitCfg }),
withRooms: makeRoomService({ subject: ns, metadata, ...emitCfg }),
all: {
getClients: async () =>
makeRemoteClients({
Expand All @@ -68,7 +68,7 @@ export const attachSockets = async <NS extends Namespaces>({
...emitCfg,
}),
getRooms: () => Array.from(ns.adapter.rooms.keys()),
broadcast: makeEmitter({ subject: io, ...emitCfg }),
broadcast: makeEmitter({ subject: ns, ...emitCfg }),
},
};
ns.on("connection", async (socket) => {
Expand Down
6 changes: 4 additions & 2 deletions src/emission.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ export function makeEmitter<E extends EmissionMap>(
} & EmitterConfig<E>,
): Emitter<E>;
export function makeEmitter<E extends EmissionMap>(
props: { subject: Socket["broadcast"] | Server } & EmitterConfig<E>,
props: {
subject: Socket["broadcast"] | Server | Namespace;
} & EmitterConfig<E>,
): Broadcaster<E>;
export function makeEmitter({
subject,
emission,
timeout,
}: {
subject: Socket | SomeRemoteSocket | Socket["broadcast"] | Server;
subject: Socket | SomeRemoteSocket | Namespace | Socket["broadcast"] | Server;
} & EmitterConfig<EmissionMap>) {
/**
* @throws z.ZodError on validation
Expand Down
119 changes: 118 additions & 1 deletion tests/system/issue590.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import http from "node:http";
import { Server } from "socket.io";
import { io as ioClient } from "socket.io-client";
import { z } from "zod";
import { attachSockets, Config, ActionsFactory } from "../../src";
import { ActionsFactory, attachSockets, Config } from "../../src";
import { promisify } from "node:util";
import assert from "node:assert/strict";

Expand Down Expand Up @@ -86,5 +86,122 @@ describe("Issue #590", () => {
if (httpServer.listening)
await promisify(httpServer.close.bind(httpServer))();
});
test("should broadcast to all from startup hook using all.broadcast", async () => {
const httpServer = http.createServer();
const io = new Server();
let intervalRef: NodeJS.Timeout | undefined;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Ensure interval cleanup in all code paths.

If the test throws an error or assertion fails before line 142, the interval will continue running and may cause test pollution or resource leaks.

Wrap the test body in a try-finally block or store the interval reference and clear it in an afterEach hook:

+    let intervalRef: NodeJS.Timeout | undefined;
+
+    afterEach(() => {
+      if (intervalRef) clearInterval(intervalRef);
+    });
+
     test("should broadcast to all from startup hook using all.broadcast", async () => {
       const httpServer = http.createServer();
       const io = new Server();
-      let intervalRef: NodeJS.Timeout | undefined;
       
       // ... test body ...
       
       clientSocket.disconnect();
       await promisify(io.close.bind(io))();
-      clearInterval(intervalRef);
       if (httpServer.listening)
         await promisify(httpServer.close.bind(httpServer))();
     });

Or use a try-finally within the test itself.

Also applies to: 104-106, 142-142

🤖 Prompt for AI Agents
In tests/system/issue590.spec.ts around lines 92-92 (and also ensure coverage
for 104-106 and 142-142), the interval referenced by `intervalRef` may not be
cleared if the test throws or an assertion fails; wrap the test body in a
try-finally that clears the interval in the finally block (or register an
afterEach hook that checks and clears the intervalRef), ensuring intervalRef is
set before the finally/afterEach check and cleared with clearInterval to prevent
test pollution and resource leaks.


const config = new Config().addNamespace({
path: "/chat",
emission: {
testBroadcast: { schema: z.tuple([z.string()]) },
},
hooks: {
onConnection: async ({ client }) => {
await client.join("testRoom");
},
onStartup: async ({ all }) => {
intervalRef = setInterval(() => {
all.broadcast("testBroadcast", "from onStartup hook");
}, 100);
},
},
metadata: z.object({}),
});

await attachSockets({
io,
config,
actions: [],
target: httpServer,
});

await promisify(httpServer.listen.bind(httpServer, port))();

// connect client:
const clientSocket = ioClient(`http://localhost:${port}/chat`, {
transports: ["websocket"],
});

await vi.waitFor(() => assert(clientSocket.connected));

// listen for broadcast:
const broadcastReceived = new Promise<string>((resolve) => {
clientSocket.on("testBroadcast", resolve);
});

const receivedBroadcast = await vi.waitFor(() => broadcastReceived, {
timeout: 1000,
});

// withRooms().broadcast() should reach client:
expect(receivedBroadcast).toBe("from onStartup hook");
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

clientSocket.disconnect();
await promisify(io.close.bind(io))();
clearInterval(intervalRef);
if (httpServer.listening)
await promisify(httpServer.close.bind(httpServer))();
});
test("should broadcast to rooms from startup hook using withRooms", async () => {
const httpServer = http.createServer();
const io = new Server();
let intervalRef: NodeJS.Timeout | undefined;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

const config = new Config().addNamespace({
path: "/chat",
emission: {
testBroadcast: { schema: z.tuple([z.string()]) },
},
hooks: {
onConnection: async ({ client }) => {
await client.join("testRoom");
},
onStartup: async ({ withRooms }) => {
intervalRef = setInterval(() => {
withRooms("testRoom").broadcast(
"testBroadcast",
"from onStartup hook",
);
}, 100);
},
},
metadata: z.object({}),
});

await attachSockets({
io,
config,
actions: [],
target: httpServer,
});

await promisify(httpServer.listen.bind(httpServer, port))();

// connect client:
const clientSocket = ioClient(`http://localhost:${port}/chat`, {
transports: ["websocket"],
});

await vi.waitFor(() => assert(clientSocket.connected));

// listen for broadcast:
const broadcastReceived = new Promise<string>((resolve) => {
clientSocket.on("testBroadcast", resolve);
});

const receivedBroadcast = await vi.waitFor(() => broadcastReceived, {
timeout: 1000,
});

// withRooms().broadcast() should reach client:
expect(receivedBroadcast).toBe("from onStartup hook");

clientSocket.disconnect();
await promisify(io.close.bind(io))();
clearInterval(intervalRef);
if (httpServer.listening)
await promisify(httpServer.close.bind(httpServer))();
});
});
});