Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/attach.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export const attachSockets = async <NS extends Namespaces>({
const emitCfg: EmitterConfig<NSEmissions> = { emission, timeout };
const nsCtx: IndependentContext<NSEmissions, NSMeta> = {
logger: rootLogger,
withRooms: makeRoomService({ subject: io, metadata, ...emitCfg }),
withRooms: makeRoomService({ subject: ns, metadata, ...emitCfg }),
all: {
getClients: async () =>
makeRemoteClients({
Expand All @@ -68,7 +68,7 @@ export const attachSockets = async <NS extends Namespaces>({
...emitCfg,
}),
getRooms: () => Array.from(ns.adapter.rooms.keys()),
broadcast: makeEmitter({ subject: io, ...emitCfg }),
broadcast: makeEmitter({ subject: ns, ...emitCfg }),
},
};
ns.on("connection", async (socket) => {
Expand Down
6 changes: 4 additions & 2 deletions src/emission.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ export function makeEmitter<E extends EmissionMap>(
} & EmitterConfig<E>,
): Emitter<E>;
export function makeEmitter<E extends EmissionMap>(
props: { subject: Socket["broadcast"] | Server } & EmitterConfig<E>,
props: {
subject: Socket["broadcast"] | Server | Namespace;
} & EmitterConfig<E>,
): Broadcaster<E>;
export function makeEmitter({
subject,
emission,
timeout,
}: {
subject: Socket | SomeRemoteSocket | Socket["broadcast"] | Server;
subject: Socket | SomeRemoteSocket | Namespace | Socket["broadcast"] | Server;
} & EmitterConfig<EmissionMap>) {
/**
* @throws z.ZodError on validation
Expand Down
120 changes: 119 additions & 1 deletion tests/system/issue590.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import http from "node:http";
import { Server } from "socket.io";
import { io as ioClient } from "socket.io-client";
import { z } from "zod";
import { attachSockets, Config, ActionsFactory } from "../../src";
import { ActionsFactory, attachSockets, Config } from "../../src";
import { promisify } from "node:util";
import assert from "node:assert/strict";

Expand All @@ -17,6 +17,11 @@ const port = 8999;
*/
describe("Issue #590", () => {
describe("attachSockets() with real Socket.IO", () => {
let intervalRef: NodeJS.Timeout | undefined;
afterEach(() => {
if (intervalRef) clearInterval(intervalRef);
});

test("should query and broadcast to rooms joined in onConnection", async () => {
const httpServer = http.createServer();
const io = new Server();
Expand Down Expand Up @@ -81,6 +86,119 @@ describe("Issue #590", () => {
// withRooms().broadcast() should reach client:
expect(receivedBroadcast).toBe("hello");

clientSocket.disconnect();
await promisify(io.close.bind(io))();
if (httpServer.listening)
await promisify(httpServer.close.bind(httpServer))();
});
test("should broadcast to all from startup hook using all.broadcast", async () => {
const httpServer = http.createServer();
const io = new Server();

const config = new Config().addNamespace({
path: "/chat",
emission: {
testBroadcast: { schema: z.tuple([z.string()]) },
},
hooks: {
onConnection: async ({ client }) => {
await client.join("testRoom");
},
onStartup: async ({ all }) => {
intervalRef = setInterval(() => {
all.broadcast("testBroadcast", "from onStartup hook");
}, 100);
},
},
metadata: z.object({}),
});

await attachSockets({
io,
config,
actions: [],
target: httpServer,
});

await promisify(httpServer.listen.bind(httpServer, port))();

// connect client:
const clientSocket = ioClient(`http://localhost:${port}/chat`, {
transports: ["websocket"],
});

await vi.waitFor(() => assert(clientSocket.connected));

// listen for broadcast:
const broadcastReceived = new Promise<string>((resolve) => {
clientSocket.on("testBroadcast", resolve);
});

const receivedBroadcast = await vi.waitFor(() => broadcastReceived, {
timeout: 1000,
});

// withRooms().broadcast() should reach client:
expect(receivedBroadcast).toBe("from onStartup hook");
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

clientSocket.disconnect();
await promisify(io.close.bind(io))();
if (httpServer.listening)
await promisify(httpServer.close.bind(httpServer))();
});
test("should broadcast to rooms from startup hook using withRooms", async () => {
const httpServer = http.createServer();
const io = new Server();

const config = new Config().addNamespace({
path: "/chat",
emission: {
testBroadcast: { schema: z.tuple([z.string()]) },
},
hooks: {
onConnection: async ({ client }) => {
await client.join("testRoom");
},
onStartup: async ({ withRooms }) => {
intervalRef = setInterval(() => {
withRooms("testRoom").broadcast(
"testBroadcast",
"from onStartup hook",
);
}, 100);
},
},
metadata: z.object({}),
});

await attachSockets({
io,
config,
actions: [],
target: httpServer,
});

await promisify(httpServer.listen.bind(httpServer, port))();

// connect client:
const clientSocket = ioClient(`http://localhost:${port}/chat`, {
transports: ["websocket"],
});

await vi.waitFor(() => assert(clientSocket.connected));

// listen for broadcast:
const broadcastReceived = new Promise<string>((resolve) => {
clientSocket.on("testBroadcast", resolve);
});

const receivedBroadcast = await vi.waitFor(() => broadcastReceived, {
timeout: 1000,
});

// withRooms().broadcast() should reach client:
expect(receivedBroadcast).toBe("from onStartup hook");

clientSocket.disconnect();
await promisify(io.close.bind(io))();
if (httpServer.listening)
Expand Down