diff --git a/README.md b/README.md index d1f4d8027..2d59259a6 100644 --- a/README.md +++ b/README.md @@ -341,6 +341,7 @@ Also user can manually register topic-alias pair using PUBLISH topic:'some', ta: - [`mqtt.Client#endAsync()`](#end-async) - [`mqtt.Client#removeOutgoingMessage()`](#removeOutgoingMessage) - [`mqtt.Client#reconnect()`](#reconnect) +- [`mqtt.Client#reauthenticate()`](#client-reauthenticate) - [`mqtt.Client#handleMessage()`](#handleMessage) - [`mqtt.Client#connected`](#connected) - [`mqtt.Client#reconnecting`](#reconnecting) @@ -599,6 +600,16 @@ and connections - `packet` received packet, as defined in [mqtt-packet](https://github.com/mcollina/mqtt-packet) +#### Event `'reauth'` + +`function (packet) {}` + +Emitted when an MQTT 5 re-authentication completes successfully. + +- `packet` the AUTH packet received from the broker. + +Triggered after calling [`client.reauthenticate()`](#client-reauthenticate). + --- @@ -742,6 +753,30 @@ Connect again using the same options as connect() --- + + +### mqtt.Client#reauthenticate(reauthOptions, [callback]) + +Start an MQTT 5 re-authentication exchange. + +- `reauthOptions`: + - `authenticationData` (`Buffer`) + - `reasonString` (`string`, optional) + - `userProperties` (`object`, optional) + +- `callback` - `function (err, packet)` + - called when the AUTH exchange completes or fails + +Errors: +- client is not connected +- MQTT version is not 5 +- `authenticationData` is missing +- `authenticationMethod` is missing from the initial CONNECT + +Emits `'reauth'` on successful completion. + +--- + ### mqtt.Client#handleMessage(packet, callback) diff --git a/src/lib/client.ts b/src/lib/client.ts index ddcfcda3c..89525c87e 100644 --- a/src/lib/client.ts +++ b/src/lib/client.ts @@ -27,8 +27,8 @@ import DefaultMessageIdProvider, { } from './default-message-id-provider' import TopicAliasRecv from './topic-alias-recv' import { + ErrorWithReasonCode, type DoneCallback, - type ErrorWithReasonCode, ErrorWithSubackPacket, type GenericCallback, type IStream, @@ -433,6 +433,7 @@ export interface MqttClientEventCallbacks { reconnect: VoidCallback offline: VoidCallback outgoingEmpty: VoidCallback + reauth: OnPacketCallback } /** @@ -521,6 +522,9 @@ export default class MqttClient extends TypedEventEmitter} - Resolves with the AUTH packet from the broker + */ + public reauthenticateAsync( + reauthOptions: Pick< + NonNullable, + 'authenticationData' | 'reasonString' | 'userProperties' + >, + ): Promise { + return new Promise((resolve, reject) => { + this.reauthenticate(reauthOptions, (err, packet) => { + if (err) { + reject(err) + } else { + resolve(packet as IAuthPacket) + } + }) + }) + } + + /** + * reauthenticate - MQTT 5.0 Re-authentication + * @param {Object} reauthOptions - Re-authentication properties + * @param {Buffer} [reauthOptions.authenticationData] - Binary data for auth exchange + * @param {string} [reauthOptions.reasonString] - Human-readable reason for re-auth + * @param {Object} [reauthOptions.userProperties] - Custom user properties + * @param {PacketCallback} [callback] - Fired when the AUTH exchange completes or fails + * @returns {MqttClient} - Returns the client instance + */ + public reauthenticate( + reauthOptions: Pick< + NonNullable, + 'authenticationData' | 'reasonString' | 'userProperties' + >, + callback?: PacketCallback, + ): MqttClient { + const fail = (msg: string) => { + const err = new Error(`reauthenticate: ${msg}`) + if (callback) callback(err) + else this.emit('error', err) + return this + } + + if (this._reauthCallback) { + this._handleReauth( + new Error( + 'reauthenticate: interrupted by new reauthentication request', + ), + ) + } + + if (this.options.protocolVersion !== 5) + return fail('this feature works only with mqtt-v5') + else if (!this.connected) return fail('client is not connected') + else if (!reauthOptions.authenticationData) + return fail('authenticationData is required') + + const method = this.options.properties?.authenticationMethod + + if (!method) + return fail('authenticationMethod is required from initial CONNECT') + + const authPacket: IAuthPacket = { + cmd: 'auth', + reasonCode: 0x19, // Re-authentication (MQTT 5.0 Spec) + properties: { + ...reauthOptions, + authenticationMethod: method, + }, + } + + this._reauthCallback = callback + this._sendPacket(authPacket) + return this + } + + /** + * _handleReauth + * Internal method to finalize the re-authentication process. + * Clears the pending reauth callback and signals completion via callback or event. + * @param {Error | null} err - The error if the re-authentication failed + * @param {IAuthPacket} [packet] - The AUTH packet received from the broker + * @api private + */ + private _handleReauth(err: Error | null, packet?: IAuthPacket) { + if (!err && packet && packet.reasonCode !== 0) { + err = new ErrorWithReasonCode( + `Re-auth failed: ${packet.reasonCode}`, + packet.reasonCode, + ) + } + + if (this._reauthCallback) { + const cb = this._reauthCallback + this._reauthCallback = null + cb(err, packet) + } else if (err) { + this.emit('error', err) + } + + if (!err && packet) { + this.emit('reauth', packet) + } + } + /** * PRIVATE METHODS * ===================== @@ -1864,6 +1978,10 @@ export default class MqttClient extends TypedEventEmitter { diff --git a/test/node/client_mqtt5.ts b/test/node/client_mqtt5.ts index 72f60aa55..dcfc649f2 100644 --- a/test/node/client_mqtt5.ts +++ b/test/node/client_mqtt5.ts @@ -5,6 +5,7 @@ import { MqttServer } from './server' import serverBuilder from './server_helpers_for_client_tests' import getPorts from './helpers/port_list' import mqtt, { type ErrorWithReasonCode } from '../../src' +import { type IAuthPacket } from 'mqtt-packet' const ports = getPorts(1) @@ -1438,4 +1439,293 @@ describe('MQTT 5.0', () => { }) }, ) + + describe('reauthenticate', () => { + it( + 'should successfully reauthenticate with a new token', + { timeout: 15000 }, + function (t, done) { + const port = ports.PORTAND327 + 20 + const authMethod = 'GS-AUTH' + const initialToken = Buffer.from('initial-token') + const newToken = Buffer.from('new-refreshed-token') + + const testServer = serverBuilder('mqtt', (serverClient) => { + serverClient.on('connect', (packet) => { + assert.ok( + packet.properties.authenticationData.equals( + initialToken, + ), + ) + serverClient.connack({ reasonCode: 0 }) + }) + + serverClient.on('auth', (packet) => { + assert.strictEqual(packet.reasonCode, 0x19) + assert.ok( + packet.properties.authenticationData.equals( + newToken, + ), + ) + serverClient.auth({ reasonCode: 0 }) + }) + }).listen(port) + + const client = mqtt.connect({ + port, + protocolVersion: 5, + properties: { + authenticationMethod: authMethod, + authenticationData: initialToken, + }, + }) + + client.on('connect', () => { + client.reauthenticate( + { authenticationData: newToken }, + (err, packet: IAuthPacket) => { + assert.ifError(err) + assert.strictEqual(packet.reasonCode, 0) + client.end(true, () => testServer.close(done)) + }, + ) + }) + }, + ) + + it( + 'should error if reauthenticate is called while disconnected', + { timeout: 15000 }, + function (t, done) { + const port = ports.PORTAND327 + 21 + const testServer = serverBuilder('mqtt', (serverClient) => { + serverClient.on('connect', () => + serverClient.connack({ reasonCode: 0 }), + ) + }).listen(port) + + const client = mqtt.connect({ port, protocolVersion: 5 }) + + client.reauthenticate( + { authenticationData: Buffer.from('test') }, + (err) => { + assert.ok(err) + assert.strictEqual( + err.message, + 'reauthenticate: client is not connected', + ) + client.end(true, () => testServer.close(done)) + }, + ) + }, + ) + + it( + 'should return an error if reauthenticate is called on a non-v5 connection', + { timeout: 15000 }, + function (t, done) { + const port = ports.PORTAND327 + 22 + const testServer = serverBuilder('mqtt', (serverClient) => { + serverClient.on('connect', () => + serverClient.connack({ returnCode: 0 }), + ) + }).listen(port) + + const client = mqtt.connect({ port, protocolVersion: 4 }) + + client.on('connect', () => { + client.reauthenticate( + { authenticationData: Buffer.from('test') }, + (err) => { + assert.ok(err) + assert.ok( + err.message.includes('works only with mqtt-v5'), + ) + client.end(true, () => testServer.close(done)) + }, + ) + }) + }, + ) + + it( + 'should not crash if reauthenticate is called without a callback', + { timeout: 15000 }, + function (t, done) { + const port = ports.PORTAND327 + 23 + + const testServer = serverBuilder('mqtt', (serverClient) => { + serverClient.on('connect', () => + serverClient.connack({ reasonCode: 0 }), + ) + + serverClient.on('auth', (packet) => { + assert.strictEqual(packet.reasonCode, 0x19) + assert.ok( + packet.properties.authenticationData.equals( + Buffer.from('test'), + ), + ) + serverClient.auth({ reasonCode: 0 }) + }) + }).listen(port) + + const client = mqtt.connect({ + port, + protocolVersion: 5, + properties: { authenticationMethod: 'test' }, + }) + + client.once('reauth', () => { + client.end(true, () => testServer.close(done)) + }) + + client.once('connect', () => { + client.reauthenticate({ + authenticationData: Buffer.from('test'), + }) + }) + }, + ) + + it( + 'should error if reauthenticate is called without an initial authenticationMethod', + { timeout: 15000 }, + function (t, done) { + const port = ports.PORTAND327 + 24 + + const testServer = serverBuilder('mqtt', (serverClient) => { + serverClient.on('connect', () => + serverClient.connack({ reasonCode: 0 }), + ) + }).listen(port, () => { + const client = mqtt.connect({ + port, + protocolVersion: 5, + }) + + client.on('connect', () => { + client.reauthenticate( + { authenticationData: Buffer.from('test') }, + (err) => { + assert.ok(err) + assert.strictEqual( + err.message, + 'reauthenticate: authenticationMethod is required from initial CONNECT', + ) + + client.end(true, () => { + client.stream?.destroy?.() + + testServer.close(() => { + done() + }) + }) + }, + ) + }) + + client.on('error', (err) => { + client.removeAllListeners() + + try { + client.end(true) + client.stream?.destroy?.() + } catch {} + + testServer.close(() => done(err)) + }) + }) + }, + ) + + it( + 'should error if broker returns a non-zero reason code', + { timeout: 15000 }, + function (t, done) { + const port = ports.PORTAND327 + 82 + + const testServer = serverBuilder('mqtt', (serverClient) => { + serverClient.on('connect', () => { + serverClient.connack({ reasonCode: 0 }) + }) + + serverClient.on('auth', (packet) => { + assert.strictEqual(packet.reasonCode, 0x19) + assert.ok( + packet.properties.authenticationData.equals( + Buffer.from('test'), + ), + ) + + setImmediate(() => { + if (serverClient.writable) { + serverClient.auth({ reasonCode: 0x18 }) + } + }) + }) + }).listen(port) + + const client = mqtt.connect({ + port, + protocolVersion: 5, + keepalive: 0, + properties: { authenticationMethod: 'test' }, + }) + + client.once('connect', () => { + client.reauthenticate( + { authenticationData: Buffer.from('test') }, + (err, packet: IAuthPacket) => { + assert.ok(err) + assert.ok(packet) + assert.strictEqual(packet.reasonCode, 0x18) + assert.ok( + String(err.message) + .toLowerCase() + .includes('re-auth failed') || + String(err.message) + .toLowerCase() + .includes('24'), + ) + + client.end(true, () => testServer.close(done)) + }, + ) + }) + }, + ) + + it('should interrupt a pending reauthentication with a new request', function (t, done) { + const port = ports.PORTAND327 + 26 + const testServer = serverBuilder('mqtt', (serverClient) => { + serverClient.on('connect', () => + serverClient.connack({ reasonCode: 0 }), + ) + }).listen(port) + + const client = mqtt.connect({ + port, + protocolVersion: 5, + properties: { authenticationMethod: 'test' }, + }) + + client.on('connect', () => { + client.reauthenticate( + { authenticationData: Buffer.from('first') }, + (err) => { + assert.ok(err) + assert.strictEqual( + err.message, + 'reauthenticate: interrupted by new reauthentication request', + ) + client.end(true, () => testServer.close(done)) + }, + ) + client.reauthenticate({ + authenticationData: Buffer.from('second'), + }) + }) + }) + }) })