From 22c6e604bfc9b7c809d176373796ab52e54ef716 Mon Sep 17 00:00:00 2001 From: Rohit Kumbhar Date: Sat, 1 Nov 2025 18:37:47 -0700 Subject: [PATCH 1/5] Using namespace scope for withRooms and all in onStartup hook --- src/attach.ts | 4 +- src/emission.ts | 6 +- tests/system/issue590.spec.ts | 133 ++++++++++++++++++++++++++++++++-- 3 files changed, 131 insertions(+), 12 deletions(-) diff --git a/src/attach.ts b/src/attach.ts index caae1264..6ed3f549 100644 --- a/src/attach.ts +++ b/src/attach.ts @@ -59,7 +59,7 @@ export const attachSockets = async ({ const emitCfg: EmitterConfig = { emission, timeout }; const nsCtx: IndependentContext = { logger: rootLogger, - withRooms: makeRoomService({ subject: io, metadata, ...emitCfg }), + withRooms: makeRoomService({ subject: ns, metadata, ...emitCfg }), all: { getClients: async () => makeRemoteClients({ @@ -68,7 +68,7 @@ export const attachSockets = async ({ ...emitCfg, }), getRooms: () => Array.from(ns.adapter.rooms.keys()), - broadcast: makeEmitter({ subject: io, ...emitCfg }), + broadcast: makeEmitter({ subject: ns, ...emitCfg }), }, }; ns.on("connection", async (socket) => { diff --git a/src/emission.ts b/src/emission.ts index 24c342c6..d9718327 100644 --- a/src/emission.ts +++ b/src/emission.ts @@ -58,14 +58,16 @@ export function makeEmitter( } & EmitterConfig, ): Emitter; export function makeEmitter( - props: { subject: Socket["broadcast"] | Server } & EmitterConfig, + props: { + subject: Socket["broadcast"] | Server | Namespace; + } & EmitterConfig, ): Broadcaster; export function makeEmitter({ subject, emission, timeout, }: { - subject: Socket | SomeRemoteSocket | Socket["broadcast"] | Server; + subject: Socket | SomeRemoteSocket | Namespace | Socket["broadcast"] | Server; } & EmitterConfig) { /** * @throws z.ZodError on validation diff --git a/tests/system/issue590.spec.ts b/tests/system/issue590.spec.ts index ee5103da..ab9e995a 100644 --- a/tests/system/issue590.spec.ts +++ b/tests/system/issue590.spec.ts @@ -1,9 +1,9 @@ import http from "node:http"; -import { Server } from "socket.io"; -import { io as ioClient } from "socket.io-client"; -import { z } from "zod"; -import { attachSockets, Config, ActionsFactory } from "../../src"; -import { promisify } from "node:util"; +import {Server} from "socket.io"; +import {io as ioClient} from "socket.io-client"; +import {z} from "zod"; +import {ActionsFactory, attachSockets, Config} from "../../src"; +import {promisify} from "node:util"; import assert from "node:assert/strict"; const port = 8999; @@ -26,10 +26,10 @@ describe("Issue #590", () => { const config = new Config().addNamespace({ path: "/chat", emission: { - testBroadcast: { schema: z.tuple([z.string()]) }, + testBroadcast: {schema: z.tuple([z.string()])}, }, hooks: { - onConnection: async ({ client }) => await client.join("testRoom"), + onConnection: async ({client}) => await client.join("testRoom"), }, metadata: z.object({}), }); @@ -40,7 +40,7 @@ describe("Issue #590", () => { ns: "/chat", event: "testQuery", input: z.tuple([]), - async handler({ withRooms }) { + async handler({withRooms}) { const clients = await withRooms("testRoom").getClients(); clientsInRoom = clients.length; await withRooms("testRoom").broadcast("testBroadcast", "hello"); @@ -86,5 +86,122 @@ describe("Issue #590", () => { if (httpServer.listening) await promisify(httpServer.close.bind(httpServer))(); }); + test("should broadcast to all from startup hook using all.broadcast", async () => { + const httpServer = http.createServer(); + const io = new Server(); + let intervalRef; + + const config = new Config().addNamespace({ + path: "/chat", + emission: { + testBroadcast: { schema: z.tuple([z.string()]) }, + }, + hooks: { + onConnection: async ({ client }) => { + await client.join("testRoom"); + }, + onStartup: async ({ all }) => { + intervalRef = setInterval(() => { + all.broadcast("testBroadcast", "from onStartup hook"); + }, 100); + }, + }, + metadata: z.object({}), + }); + + await attachSockets({ + io, + config, + actions: [], + target: httpServer, + }); + + await promisify(httpServer.listen.bind(httpServer, port))(); + + // connect client: + const clientSocket = ioClient(`http://localhost:${port}/chat`, { + transports: ["websocket"], + }); + + await vi.waitFor(() => assert(clientSocket.connected)); + + // listen for broadcast: + const broadcastReceived = new Promise((resolve) => { + clientSocket.on("testBroadcast", resolve); + }); + + const receivedBroadcast = await vi.waitFor(() => broadcastReceived, { + timeout: 1000, + }); + + // withRooms().broadcast() should reach client: + expect(receivedBroadcast).toBe("from onStartup hook"); + + clientSocket.disconnect(); + await promisify(io.close.bind(io))(); + clearInterval(intervalRef); + if (httpServer.listening) + await promisify(httpServer.close.bind(httpServer))(); + }); + test("should broadcast to rooms from startup hook using withRooms", async () => { + const httpServer = http.createServer(); + const io = new Server(); + let intervalRef; + + const config = new Config().addNamespace({ + path: "/chat", + emission: { + testBroadcast: { schema: z.tuple([z.string()]) }, + }, + hooks: { + onConnection: async ({ client }) => { + await client.join("testRoom"); + }, + onStartup: async ({ withRooms }) => { + intervalRef = setInterval(() => { + withRooms("testRoom").broadcast( + "testBroadcast", + "from onStartup hook", + ); + }, 100); + }, + }, + metadata: z.object({}), + }); + + await attachSockets({ + io, + config, + actions: [], + target: httpServer, + }); + + await promisify(httpServer.listen.bind(httpServer, port))(); + + // connect client: + const clientSocket = ioClient(`http://localhost:${port}/chat`, { + transports: ["websocket"], + }); + + await vi.waitFor(() => assert(clientSocket.connected)); + + // listen for broadcast: + const broadcastReceived = new Promise((resolve) => { + clientSocket.on("testBroadcast", resolve); + }); + + const receivedBroadcast = await vi.waitFor(() => broadcastReceived, { + timeout: 1000, + }); + + // withRooms().broadcast() should reach client: + expect(receivedBroadcast).toBe("from onStartup hook"); + + clientSocket.disconnect(); + await promisify(io.close.bind(io))(); + clearInterval(intervalRef); + if (httpServer.listening) + await promisify(httpServer.close.bind(httpServer))(); + }); }); }); From ea1edbe998cabad392d4e86d91635100fdad6db0 Mon Sep 17 00:00:00 2001 From: Rohit Kumbhar Date: Sat, 1 Nov 2025 18:43:28 -0700 Subject: [PATCH 2/5] Reverted the formatting changes --- tests/system/issue590.spec.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/system/issue590.spec.ts b/tests/system/issue590.spec.ts index ab9e995a..096bb427 100644 --- a/tests/system/issue590.spec.ts +++ b/tests/system/issue590.spec.ts @@ -1,9 +1,9 @@ import http from "node:http"; -import {Server} from "socket.io"; -import {io as ioClient} from "socket.io-client"; -import {z} from "zod"; -import {ActionsFactory, attachSockets, Config} from "../../src"; -import {promisify} from "node:util"; +import { Server } from "socket.io"; +import { io as ioClient } from "socket.io-client"; +import { z } from "zod"; +import { ActionsFactory, attachSockets, Config } from "../../src"; +import { promisify } from "node:util"; import assert from "node:assert/strict"; const port = 8999; @@ -26,10 +26,10 @@ describe("Issue #590", () => { const config = new Config().addNamespace({ path: "/chat", emission: { - testBroadcast: {schema: z.tuple([z.string()])}, + testBroadcast: { schema: z.tuple([z.string()]) }, }, hooks: { - onConnection: async ({client}) => await client.join("testRoom"), + onConnection: async ({ client }) => await client.join("testRoom"), }, metadata: z.object({}), }); @@ -40,7 +40,7 @@ describe("Issue #590", () => { ns: "/chat", event: "testQuery", input: z.tuple([]), - async handler({withRooms}) { + async handler({ withRooms }) { const clients = await withRooms("testRoom").getClients(); clientsInRoom = clients.length; await withRooms("testRoom").broadcast("testBroadcast", "hello"); From b43d12ad9109f260df3fda64ea42db6d79dcb942 Mon Sep 17 00:00:00 2001 From: Rohit Kumbhar Date: Sun, 2 Nov 2025 10:40:56 -0800 Subject: [PATCH 3/5] Addreseed code review suggestions --- tests/system/issue590.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system/issue590.spec.ts b/tests/system/issue590.spec.ts index 096bb427..b6e17659 100644 --- a/tests/system/issue590.spec.ts +++ b/tests/system/issue590.spec.ts @@ -89,7 +89,7 @@ describe("Issue #590", () => { test("should broadcast to all from startup hook using all.broadcast", async () => { const httpServer = http.createServer(); const io = new Server(); - let intervalRef; + let intervalRef: NodeJS.Timeout | undefined; const config = new Config().addNamespace({ path: "/chat", @@ -146,7 +146,7 @@ describe("Issue #590", () => { test("should broadcast to rooms from startup hook using withRooms", async () => { const httpServer = http.createServer(); const io = new Server(); - let intervalRef; + let intervalRef: NodeJS.Timeout | undefined; const config = new Config().addNamespace({ path: "/chat", From e71d08564988aa5c79b5e128ef8fbabf8ff8876f Mon Sep 17 00:00:00 2001 From: Rohit Kumbhar Date: Sun, 2 Nov 2025 10:50:23 -0800 Subject: [PATCH 4/5] Clearing interval in afterEach block to ensure timeout gets cleared --- tests/system/issue590.spec.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/system/issue590.spec.ts b/tests/system/issue590.spec.ts index b6e17659..f7f2bc97 100644 --- a/tests/system/issue590.spec.ts +++ b/tests/system/issue590.spec.ts @@ -17,6 +17,11 @@ const port = 8999; */ describe("Issue #590", () => { describe("attachSockets() with real Socket.IO", () => { + let intervalRef: NodeJS.Timeout | undefined; + afterEach(() => { + if (intervalRef) clearInterval(intervalRef); + }); + test("should query and broadcast to rooms joined in onConnection", async () => { const httpServer = http.createServer(); const io = new Server(); @@ -89,7 +94,6 @@ describe("Issue #590", () => { test("should broadcast to all from startup hook using all.broadcast", async () => { const httpServer = http.createServer(); const io = new Server(); - let intervalRef: NodeJS.Timeout | undefined; const config = new Config().addNamespace({ path: "/chat", @@ -139,14 +143,12 @@ describe("Issue #590", () => { clientSocket.disconnect(); await promisify(io.close.bind(io))(); - clearInterval(intervalRef); if (httpServer.listening) await promisify(httpServer.close.bind(httpServer))(); }); test("should broadcast to rooms from startup hook using withRooms", async () => { const httpServer = http.createServer(); const io = new Server(); - let intervalRef: NodeJS.Timeout | undefined; const config = new Config().addNamespace({ path: "/chat", @@ -199,7 +201,6 @@ describe("Issue #590", () => { clientSocket.disconnect(); await promisify(io.close.bind(io))(); - clearInterval(intervalRef); if (httpServer.listening) await promisify(httpServer.close.bind(httpServer))(); }); From e1e84937ac07969cfe0dde03bbdb7327b7addb25 Mon Sep 17 00:00:00 2001 From: Rohit Kumbhar Date: Sun, 2 Nov 2025 11:44:08 -0800 Subject: [PATCH 5/5] fixed misleading comment --- tests/system/issue590.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/issue590.spec.ts b/tests/system/issue590.spec.ts index f7f2bc97..7b510b87 100644 --- a/tests/system/issue590.spec.ts +++ b/tests/system/issue590.spec.ts @@ -138,7 +138,7 @@ describe("Issue #590", () => { timeout: 1000, }); - // withRooms().broadcast() should reach client: + // all.broadcast() should reach client: expect(receivedBroadcast).toBe("from onStartup hook"); clientSocket.disconnect();