diff --git a/deps/rabbitmq_management/priv/www/js/global.js b/deps/rabbitmq_management/priv/www/js/global.js index a4005e5d312..e6efedfae6d 100644 --- a/deps/rabbitmq_management/priv/www/js/global.js +++ b/deps/rabbitmq_management/priv/www/js/global.js @@ -934,7 +934,8 @@ QUEUE_TYPE["default"] = { "operator_policy_arguments": "classic-queue-operator-policy-arguments", "list" : "classic-queue-list", "stats" : "classic-queue-stats", - "node_details" : "classic-queue-node-details" + "node_details" : "classic-queue-node-details", + "get_message" : "classic-queue-get-message" } }; @@ -952,7 +953,8 @@ QUEUE_TYPE["classic"] = { "operator_policy_arguments": "classic-queue-operator-policy-arguments", "list" : "classic-queue-list", "stats" : "classic-queue-stats", - "node_details" : "classic-queue-node-details" + "node_details" : "classic-queue-node-details", + "get_message" : "classic-queue-get-message" } }; @@ -973,7 +975,8 @@ QUEUE_TYPE["quorum"] = { "operator_policy_arguments": "quorum-queue-operator-policy-arguments", "list" : "quorum-queue-list", "stats": "quorum-queue-stats", - "node_details" : "quorum-queue-node-details" + "node_details" : "quorum-queue-node-details", + "get_message" : "classic-queue-get-message" } }; @@ -1110,3 +1113,63 @@ var BINARY_STATISTICS = { {name: 'System', colour: 'system', keys: [['other', 'other']]}]] }; + + +// Which postprocesor functions we need to call from postprocess() function call +var current_postprocessors = new Map(); +function registerPostProcessor(name, postProcessorFun) { + if (current_postprocessors.has(name)) { + return false; + } + current_postprocessors.set(name, postProcessorFun); +} +function clear_postprocessors() { + current_postprocessors.clear(); +} +function is_postprocessor_registered(name) { + return current_postprocessors.has(name); +} +function unregisterPostProcessor(name) { + current_postprocessors.delete(name); +} +function invokeRegisteredPostProcessors() { + for (const [name, processorFun] of current_postprocessors) { + console.log(`Calling postprocessor ${name}`); + processorFun(); + } +} + +class ApplicationListener { + onRefresh() { + } + onVhostChange(newVhost) { + } + onTabDeactivated(tab) { + + } +} +var applicationListeners = new Map(); +function registerApplicationListener(name, listener) { + if (applicationListeners.has(name)) { + return false; + } + applicationListeners.set(name, listener); +} +function unregisterApplicationListener(name) { + applicationListeners.delete(name); +} +function notifyOnRefresh() { + for (const [name, listener] of applicationListeners) { + listener.onRefresh(); + } +} +function notifyOnVhostChange(newVhost) { + for (const [_name, listener] of applicationListeners) { + listener.onVhostChange(newVhost); + } +} +function notifyActivatedTab(tab) { + for (const [_name, listener] of applicationListeners) { + listener.onTabActivated(tab); + } +} diff --git a/deps/rabbitmq_management/priv/www/js/main.js b/deps/rabbitmq_management/priv/www/js/main.js index 6604555e057..5dcd9ac0cec 100644 --- a/deps/rabbitmq_management/priv/www/js/main.js +++ b/deps/rabbitmq_management/priv/www/js/main.js @@ -204,7 +204,10 @@ function setup_constant_events() { $('#show-vhost').on('change', function() { current_vhost = $(this).val(); store_pref('vhost', current_vhost); - update(); + if (current_reqs && Object.keys(current_reqs).length > 0) { + update(); + } + notifyOnVhostChange(current_vhost); }); if (!vhosts_interesting) { $('#vhost-form').hide(); @@ -235,19 +238,83 @@ function update_vhosts() { function setup_extensions() { var extensions = JSON.parse(sync_get('/extensions')); extension_count = 0; + var javascript_files = []; + for (var i in extensions) { var extension = extensions[i]; - if ($.isPlainObject(extension) && extension.hasOwnProperty('javascript')) { - dynamic_load(extension.javascript); + if ($.isPlainObject(extension)) { + if (extension.hasOwnProperty('javascript')) { + // Collect JavaScript files for sequential loading + if (Array.isArray(extension.javascript)) { + for (var j = 0; j < extension.javascript.length; j++) { + javascript_files.push(extension.javascript[j]); + } + } else { + javascript_files.push(extension.javascript); + } + } + if (extension.hasOwnProperty('css')) { + dynamic_css_load(extension.css); + } extension_count++; } } + // Load JavaScript files sequentially to ensure dependencies are available + load_javascript_files_sequentially(javascript_files, 0); } -function dynamic_load(filename) { +function load_javascript_files_sequentially(files, index) { + if (index >= files.length) { + return; // All files loaded + } + console.debug(`Loading extension ${files[index]} ...`); + dynamic_javascript_file_load(files[index], function() { + // Load next file after current one has finished loading + console.debug(`Loaded extension ${files[index]} !`); + load_javascript_files_sequentially(files, index + 1); + }); +} + +function dynamic_javascript_load(arrayOrString) { + if (Array.isArray(arrayOrString)) { + for (const file of arrayOrString) { + dynamic_javascript_file_load(file); + } + }else { + dynamic_javascript_file_load(arrayOrString); + } +} +function dynamic_javascript_file_load(filename, callback) { var element = document.createElement('script'); element.setAttribute('type', 'text/javascript'); element.setAttribute('src', 'js/' + filename); + + // Set up callback to fire when script has loaded + if (callback) { + element.onload = callback; + element.onerror = function() { + console.error('Failed to load script: ' + filename); + callback(); // Continue loading other scripts even if one fails + }; + } + + document.getElementsByTagName('head')[0].appendChild(element); + return element; +} +function dynamic_css_load(arrayOrString) { + if (Array.isArray(arrayOrString)) { + for (const file of arrayOrString) { + dynamic_css_file_load(file); + } + }else { + dynamic_css_file_load(arrayOrString); + } +} +function dynamic_css_file_load(filename) { + var element = document.createElement('link'); + element.setAttribute('rel', 'stylesheet'); + element.setAttribute('type', 'text/css'); + element.setAttribute('href', 'css/' + filename); document.getElementsByTagName('head')[0].appendChild(element); return element; } @@ -331,6 +398,7 @@ function render(reqs, template, highlight) { var old_template = current_template; current_template = template; current_reqs = reqs; + clear_postprocessors(); for (var i in outstanding_reqs) { outstanding_reqs[i].abort(); } @@ -340,6 +408,11 @@ function render(reqs, template, highlight) { window.scrollTo(0, 0); } update(); + notifyActivatedTab(current_highlight); +} + +function reset_current_reqs() { + current_reqs = {}; } function update() { @@ -839,6 +912,8 @@ function postprocess() { $('.administrator-only').remove(); } + invokeRegisteredPostProcessors(); + update_multifields(); } @@ -1319,7 +1394,7 @@ function update_status(status) { -function with_req(method, path, body, fun) { +function with_req(method, path, body, fun, on404fun) { if(!has_auth_credentials()) { // Clear any lingering auth settings in local storage and navigate to the login form. clear_auth(); @@ -1327,7 +1402,7 @@ function with_req(method, path, body, fun) { return; } - var json; + const full_page_404 = !on404fun var req = xmlHttpRequest(); req.open(method, 'api' + path, true ); var header = authorization_header(); @@ -1341,9 +1416,11 @@ function with_req(method, path, body, fun) { if (ix != -1) { outstanding_reqs.splice(ix, 1); } - if (check_bad_response(req, true)) { + if (check_bad_response(req, full_page_404)) { last_successful_connect = new Date(); fun(req); + } else if (req.status == 404) { + on404fun(JSON.parse(req.responseText)) } } }; diff --git a/deps/rabbitmq_management/priv/www/js/prefs.js b/deps/rabbitmq_management/priv/www/js/prefs.js index 4a9f107f9e9..1d45db46681 100644 --- a/deps/rabbitmq_management/priv/www/js/prefs.js +++ b/deps/rabbitmq_management/priv/www/js/prefs.js @@ -168,7 +168,7 @@ function get_local_pref(k) { } } -function get_pref(k) { +function get_pref(k, defaultValue = undefined) { var val; if (local_storage_available()) { val = window.localStorage['rabbitmq.' + k]; @@ -177,7 +177,8 @@ function get_pref(k) { val = parse_cookie()[short_key(k)]; } - var res = (val == undefined) ? default_pref(k) : val; + var res = (val == undefined) ? + (defaultValue != undefined ? defaultValue : default_pref(k)) : val; return res; } diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs index c64a2197eab..5d24447481d 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs @@ -101,49 +101,7 @@ <%= format('publish', {'mode': 'queue', 'queue': queue}) %> <% if (QUEUE_TYPE(queue).actions.get_message) { %> -
-

Get messages

-
-

- Warning: getting messages from a queue is a destructive action. - -

-
- - - - - - - - - - - - - - - - -
- -
- - -
- -
-
-
-
+ <%= format(QUEUE_TYPE(queue).tmpl.get_message, {queue: queue}) %> <% } %> <% if (is_user_policymaker) { %> diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_util.erl b/deps/rabbitmq_management/src/rabbit_mgmt_util.erl index 70fdcd80b5f..2f397a92caa 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_util.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_util.erl @@ -9,7 +9,7 @@ %% TODO sort all this out; maybe there's scope for rabbit_mgmt_request? --export([is_authorized/2, is_authorized_admin/2, is_authorized_admin/4, +-export([is_authorized/2, is_authorized/4, is_authorized_admin/2, is_authorized_admin/4, is_authorized_admin/3, vhost/1, vhost_from_headers/1]). -export([is_authorized_vhost/2, is_authorized_user/3, is_authorized_user/4, is_authorized_user/5, @@ -89,6 +89,10 @@ is_authorized_admin(ReqData, Context) -> Context, auth_config()). +is_authorized(ReqData, Context, ErrorMsg, Fun) -> + rabbit_web_dispatch_access_control:is_authorized(ReqData, + Context, ErrorMsg, Fun, auth_config()). + is_authorized_admin(ReqData, Context, Token) -> AuthConfig = auth_config(), rabbit_web_dispatch_access_control:is_authorized( diff --git a/deps/rabbitmq_stream_management/priv/www/js/stream.js b/deps/rabbitmq_stream_management/priv/www/js/stream.js index 9f615459276..b378c0a649c 100644 --- a/deps/rabbitmq_stream_management/priv/www/js/stream.js +++ b/deps/rabbitmq_stream_management/priv/www/js/stream.js @@ -12,12 +12,19 @@ dispatcher_add(function(sammy) { 'streamConnection', '#/stream/connections'); }); sammy.get('#/stream/super-streams', function() { - render({'vhosts': '/vhosts'}, 'superStreams', '#/stream/super-streams') + renderSuperStreams(); + }); + sammy.get('#/stream/super-streams/:vhost/:name', function() { + var vhost = esc(this.params['vhost']); + var name = esc(this.params['name']); + render({'superstream': {path: '/stream/super-streams/' + vhost + '/' + name, + options: {ranges: ['data-rates-conn']}} + }, "superStream", "#/stream/super-streams"); }); sammy.put('#/stream/super-streams', function() { put_cast_params(this, '/stream/super-streams/:vhost/:name', ['name', 'pattern', 'policy'], ['priority'], []); - location.href = "/#/queues"; + location.href = "/#/super-streams"; }); // not exactly dispatcher stuff, but we have to make sure this is called before // HTTP requests are made in case of refresh of the queue page @@ -90,3 +97,34 @@ CONSUMER_OWNER_FORMATTERS.push({ CONSUMER_OWNER_FORMATTERS.sort(CONSUMER_OWNER_FORMATTERS_COMPARATOR); +function renderSuperStreams() { + render({'queues': { + path: url_pagination_template('stream/super-streams', 1, 100), + options: { + sort: true, + vhost: true, + pagination: true + } + }, 'vhosts': '/vhosts'}, 'superStreams', '#/super-streams'); +} +function link_superstream(vhost, name) { + return _link_to(highlight_extra_whitespace(name), '#/stream/super-streams/' + esc(vhost) + '/' + esc(name), true, []); +} + +function format_superstream_state(SuperStream) { + const states = SuperStream.partitions.reduce((countMap, partition) => { + const state = partition.state; + countMap.set(state, (countMap.get(state) || 0) + 1); + return countMap; + }, new Map()); + if (states.size == 1) { + return fmt_object_state(SuperStream.partitions[0]) + } + var html = '' + for (const [state, count] of states) { + html += fmt_object_state({state: state, count: count}, function(text, s) { + return `${text} (${s.count})` + }) + } + return html +} \ No newline at end of file diff --git a/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStream.ejs b/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStream.ejs new file mode 100644 index 00000000000..4ae1a3dc5ca --- /dev/null +++ b/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStream.ejs @@ -0,0 +1,231 @@ +

Super Stream <%= fmt_string(highlight_extra_whitespace(superstream.name)) %><%= fmt_maybe_vhost(superstream.vhost) %>

+ +
+

Overview

+ +

Details

+ + + + + +
Features<%= fmt_features(superstream) %>
+ +
+ + +
+

Partitions (<%=(superstream.partition_count)%>)

+
+
+ <% if (superstream.partitions.length > 0) { %> + + + + <%= group_heading('queues', 'Overview', [display.vhosts, display.nodes, true]) %> +<% if(disable_stats && enable_queue_totals) { %> + <%= group_heading('queues', 'Messages', []) %> +<% } else { %> +<% if(!disable_stats) { %> + <%= group_heading('queues', 'Messages', []) %> + <%= group_heading('queues', 'Message bytes', []) %> +<% if (rates_mode != 'none') { %> + <%= group_heading('queues', 'Message rates', []) %> +<% } %> +<% } %> +<% } %> + + + +<% if (display.vhosts) { %> + +<% } %> + +<% if (display.nodes) { %> + +<% } %> +<% if (show_column('queues', 'features')) { %> + +<% } %> +<% if (show_column('queues', 'features_no_policy')) { %> + +<% } %> +<% if (show_column('queues', 'policy')) { %> + +<% } %> +<% if (show_column('queues', 'consumers')) { %> + +<% } %> +<% if (show_column('queues', 'consumer_capacity')) { %> + +<% } %> +<% if (show_column('queues', 'state')) { %> + +<% } %> +<% if(disable_stats && enable_queue_totals) { %> +<% if (show_column('queues', 'msgs-ready')) { %> + +<% } %> +<% if (show_column('queues', 'msgs-unacked')) { %> + +<% } %> +<% if (show_column('queues', 'msgs-total')) { %> + +<% } %> +<% } %> +<% if(!disable_stats) { %> +<% if (show_column('queues', 'msgs-ready')) { %> + +<% } %> +<% if (show_column('queues', 'msgs-unacked')) { %> + +<% } %> +<% if (show_column('queues', 'msgs-ram')) { %> + +<% } %> +<% if (show_column('queues', 'msgs-persistent')) { %> + +<% } %> +<% if (show_column('queues', 'msgs-total')) { %> + +<% } %> +<% if (show_column('queues', 'msg-bytes-ready')) { %> + +<% } %> +<% if (show_column('queues', 'msg-bytes-unacked')) { %> + +<% } %> +<% if (show_column('queues', 'msg-bytes-ram')) { %> + +<% } %> +<% if (show_column('queues', 'msg-bytes-persistent')) { %> + +<% } %> +<% if (show_column('queues', 'msg-bytes-total')) { %> + +<% } %> +<% if (rates_mode != 'none') { %> + <% if (show_column('queues', 'rate-incoming')) { %> + + <% } %> + <% if (show_column('queues', 'rate-deliver')) { %> + + <% } %> + <% if (show_column('queues', 'rate-redeliver')) { %> + + <% } %> + <% if (show_column('queues', 'rate-ack')) { %> + + <% } %> +<% } %> +<% } %> + + + +<% + for (var i = 0; i < superstream.partitions.length; i++) { + var queue = superstream.partitions[i]; +%> + > +<% if (display.vhosts) { %> + +<% } %> + +<% if (display.nodes) { %> + +<% } %> +<% if (show_column('queues', 'features')) { %> + +<% } %> +<% if (show_column('queues', 'features_no_policy')) { %> + +<% } %> +<% if (show_column('queues', 'policy')) { %> + +<% } %> +<% if (show_column('queues', 'consumers')) { %> + +<% } %> +<% if (show_column('queues', 'consumer_capacity')) { %> + +<% } %> +<% if (show_column('queues', 'state')) { %> + +<% } %> +<% if(!disable_stats || (disable_stats && enable_queue_totals)) { %> +<% if (show_column('queues', 'msgs-ready')) { %> + +<% } %> +<% if (show_column('queues', 'msgs-unacked')) { %> + +<% } %> +<% } %> +<% if(!disable_stats) { %> +<% if (show_column('queues', 'msgs-ram')) { %> + +<% } %> +<% if (show_column('queues', 'msgs-persistent')) { %> + +<% } %> +<% } %> +<% if(!disable_stats || (disable_stats && enable_queue_totals)) { %> +<% if (show_column('queues', 'msgs-total')) { %> + +<% } %> +<% } %> +<% if(!disable_stats) { %> +<% if (show_column('queues', 'msg-bytes-ready')) { %> + +<% } %> +<% if (show_column('queues', 'msg-bytes-unacked')) { %> + +<% } %> +<% if (show_column('queues', 'msg-bytes-ram')) { %> + +<% } %> +<% if (show_column('queues', 'msg-bytes-persistent')) { %> + +<% } %> +<% if (show_column('queues', 'msg-bytes-total')) { %> + +<% } %> +<% if (rates_mode != 'none') { %> + <% if (show_column('queues', 'rate-incoming')) { %> + + <% } %> + <% if (show_column('queues', 'rate-deliver')) { %> + + <% } %> + <% if (show_column('queues', 'rate-redeliver')) { %> + + <% } %> + <% if (show_column('queues', 'rate-ack')) { %> + + <% } %> +<% } %> +<% } %> + + <% } %> + +
+/-
<%= fmt_sort('Virtual host', 'vhost') %><%= fmt_sort('Name', 'name') %><%= fmt_sort('Node', 'node') %>FeaturesFeatures<%= fmt_sort('Policy','policy') %><%= fmt_sort('Consumers', 'consumers') %><%= fmt_sort('Consumer capacity', 'consumer_capacity') %><%= fmt_sort('State', 'state') %><%= fmt_sort('Ready', 'messages_ready') %><%= fmt_sort('Unacked', 'messages_unacknowledged') %><%= fmt_sort('Total', 'messages') %><%= fmt_sort('Ready', 'messages_ready') %><%= fmt_sort('Unacked', 'messages_unacknowledged') %><%= fmt_sort('In Memory', 'messages_ram') %><%= fmt_sort('Persistent', 'messages_persistent') %><%= fmt_sort('Total', 'messages') %><%= fmt_sort('Ready', 'message_bytes_ready') %><%= fmt_sort('Unacked', 'message_bytes_unacknowledged') %><%= fmt_sort('In Memory', 'message_bytes_ram') %><%= fmt_sort('Persistent', 'message_bytes_persistent') %><%= fmt_sort('Total', 'message_bytes') %><%= fmt_sort('incoming', 'message_stats.publish_details.rate') %><%= fmt_sort('deliver / get', 'message_stats.deliver_get_details.rate') %><%= fmt_sort('redelivered', 'message_stats.redeliver_details.rate') %><%= fmt_sort('ack', 'message_stats.ack_details.rate') %>
<%= fmt_string(queue.vhost) %><%= link_queue(queue.vhost, queue.name, queue.arguments) %> + <% if (queue.node) { %> + <%= fmt_node(queue.node) %> + <% } else { %> + <%= fmt_node(queue.leader) %> + <% } %> + <% if (queue.hasOwnProperty('members')) { %> + <%= fmt_members(queue) %> + <% } %> + + <%= fmt_features_short(queue) %> + <%= fmt_policy_short(queue) %> + <%= fmt_op_policy_short(queue) %> + <%= fmt_features_short(queue) %><%= link_policy(queue.vhost, queue.policy) %> + <%= fmt_string(queue.operator_policy) %><%= fmt_string(queue.consumers) %><%= fmt_percent(queue.consumer_capacity) %><%= fmt_object_state(queue) %><%= fmt_num_thousands(queue.messages_ready) %><%= fmt_num_thousands(queue.messages_unacknowledged) %><%= fmt_num_thousands(queue.messages_ram) %><%= fmt_num_thousands(queue.messages_persistent) %><%= fmt_num_thousands(queue.messages) %><%= fmt_bytes(queue.message_bytes_ready) %><%= fmt_bytes(queue.message_bytes_unacknowledged) %><%= fmt_bytes(queue.message_bytes_ram) %><%= fmt_bytes(queue.message_bytes_persistent) %><%= fmt_bytes(queue.message_bytes) %><%= fmt_detail_rate(queue.message_stats, 'publish') %><%= fmt_detail_rate(queue.message_stats, 'deliver_get') %><%= fmt_detail_rate(queue.message_stats, 'redeliver') %><%= fmt_detail_rate(queue.message_stats, 'ack') %>
+<% } else { %> +

... no queues ...

+<% } %> +
+
+
+ diff --git a/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStreams.ejs b/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStreams.ejs index 5934c8d7919..1e08edfcc7e 100644 --- a/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStreams.ejs +++ b/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStreams.ejs @@ -1,4 +1,43 @@ -

Super Streams

+

Super Streams

+ +
+ <%= paginate_ui(queues, 'super-streams') %> +
+
+<% if (queues.items.length > 0) { %> + + + +<% if (display.vhosts) { %> + +<% } %> + + + + + + +<% + for (var i = 0; i < queues.items.length; i++) { + var queue = queues.items[i]; +%> + +<% if (display.vhosts) { %> + +<% } %> + + + + + <% } %> + +
<%= fmt_sort('Virtual host', 'vhost') %><%= fmt_sort('Name', 'name') %><%= fmt_sort('Partitions', 'partition_count') %>State
<%= fmt_string(queue.vhost) %><%= link_superstream(queue.vhost, queue.name) %><%= fmt_string(queue.partition_count, '') %><%= format_superstream_state(queue) %>
+<% } else { %> +

... no queues ...

+<% } %> +
+ + <% if (ac.canAccessVhosts()) { %>
diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_util.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_util.erl new file mode 100644 index 00000000000..eb6766c7a18 --- /dev/null +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_util.erl @@ -0,0 +1,115 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_stream_mgmt_util). + +%% TODO sort all this out; maybe there's scope for rabbit_mgmt_request? + +-export([find_super_stream/3, + find_super_stream/1, + find_super_stream_from_exchange/1, find_super_stream_from_exchange/3]). +-include_lib("kernel/include/logger.hrl"). +-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +%%-------------------------------------------------------------------- +find_super_stream(ReqData) -> + case rabbit_mgmt_util:vhost(ReqData) of + not_found -> {error, missing_vhost}; + VHost -> find_super_stream(VHost, id(queue, ReqData), rabbit_mgmt_util:range_ceil(ReqData)) + + end. +find_super_stream(Vhost, Name, Range) -> + case exchange(Vhost, Name) of + not_found -> {error, not_found}; + E -> find_super_stream_from_exchange(E, detailed, Range) + end. + +find_super_stream_from_exchange(Exchange) -> + find_super_stream_from_exchange(Exchange, basic, undefined). + +find_super_stream_from_exchange(Exchange, Mode, Range) -> + case maps:get(<<"x-super-stream">>, proplists:get_value(arguments, Exchange, []), false) of + true -> + Vhost = proplists:get_value(vhost, Exchange), + Name = proplists:get_value(name, Exchange), + Partitions = super_stream_partitions(list_bindings(Vhost, Name), Mode, Range), + [{name, Name}, + {vhost, Vhost}, + {arguments, extract_arguments_from_partitions(Partitions)}, + {partition_count, erlang:length(Partitions)}, + {partitions, drop_partition_arguments(Partitions)}]; + false -> + {error, not_super_stream} + end. + +%%-------------------------------------------------------------------- + +super_stream_partitions(Bindings, Mode, Range) -> + [ format_partition(S, Mode, Range) || S <- Bindings ]. + +drop_partition_arguments(Partitions) -> + lists:map(fun(P) -> proplists:delete(arguments, P) end, Partitions). + +extract_arguments_from_partitions(Partitions) -> + case proplists:get_value(arguments, lists:nth(1, Partitions)) of + List when is_list(List) -> rabbit_mgmt_format:amqp_table(List); + Map when is_map(Map) -> Map + end. + +format_partition(#binding{source = _S, + key = Key, + destination = #resource{name = PartitionName} = D, + args = Args}, Mode, Range) -> + + Format = [ + {routing_key, Key}, + {name, PartitionName}, + {order, get_partition_order(Args)}], + + Format ++ case Mode of + basic -> + case rabbit_amqqueue:lookup(D) of + {ok, Q} -> rabbit_amqqueue:info(Q, [state, arguments]); + {error, not_found} -> [{state, not_found}] + end; + detailed -> + Q0 = [queue(D#resource.virtual_host, D#resource.name)], + [Q] = rabbit_mgmt_db:augment_queues(Q0, Range, full), + rabbit_mgmt_format:clean_consumer_details( + rabbit_mgmt_format:strip_pids(Q)) + end. + +queue(VHost, QName) -> + Name = rabbit_misc:r(VHost, queue, QName), + case rabbit_amqqueue:lookup(Name) of + {ok, Q} -> rabbit_mgmt_format:queue(Q); + {error, not_found} -> not_found + end. + +get_partition_order(Args) -> + case lists:search(fun({<<"x-stream-partition-order">>, long, _}) -> true; + (_) -> false + end, Args) of + {value, {_, _, Order}} -> Order; + false -> not_found + end. + +id(Type, ReqData) -> + rabbit_mgmt_util:id(Type, ReqData). + +exchange(VHost, XName) -> + Name = rabbit_misc:r(VHost, exchange, XName), + case rabbit_exchange:lookup(Name) of + {ok, X} -> rabbit_mgmt_format:exchange( + rabbit_exchange:info(X)); + {error, not_found} -> not_found + end. + +list_bindings(Vhost, Exchange) -> + rabbit_binding:list_for_source(rabbit_misc:r(Vhost, exchange, Exchange)). + diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_super_stream_mgmt.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_super_stream_mgmt.erl index ec992cb65ae..ae9e6214284 100644 --- a/deps/rabbitmq_stream_management/src/rabbit_stream_super_stream_mgmt.erl +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_super_stream_mgmt.erl @@ -13,7 +13,10 @@ web_ui/0]). -export([init/2, content_types_accepted/2, + content_types_provided/2, is_authorized/2, +% delete_resource/2, + to_json/2, resource_exists/2, allowed_methods/2, accept_content/2]). @@ -25,7 +28,7 @@ -define(DEFAULT_RPC_TIMEOUT, 30_000). dispatcher() -> - [{"/stream/super-streams/:vhost/:name", ?MODULE, []}]. + [{"/stream/super-streams/:vhost/:queue", ?MODULE, []}]. web_ui() -> []. @@ -40,11 +43,15 @@ init(Req, _State) -> variances(Req, Context) -> {[<<"accept-encoding">>, <<"origin">>], Req, Context}. +content_types_provided(ReqData, Context) -> + {rabbit_mgmt_util:responder_map(to_json), ReqData, Context}. + content_types_accepted(ReqData, Context) -> {[{{<<"application">>, <<"json">>, '*'}, accept_content}], ReqData, Context}. allowed_methods(ReqData, Context) -> - {[<<"PUT">>, <<"OPTIONS">>], ReqData, Context}. + {[<<"HEAD">>, <<"GET">>, <<"PUT">>, <<"OPTIONS">>], ReqData, Context}. +% {[<<"HEAD">>, <<"GET">>, <<"PUT">>, <<"DELETE">>, <<"OPTIONS">>], ReqData, Context}. resource_exists(ReqData, Context) -> %% just checking that the vhost requested exists @@ -59,7 +66,7 @@ is_authorized(ReqData, Context) -> accept_content(ReqData0, #context{user = #user{username = ActingUser}} = Context) -> %% TODO validate arguments? VHost = rabbit_mgmt_util:id(vhost, ReqData0), - Name = rabbit_mgmt_util:id(name, ReqData0), + Name = rabbit_mgmt_util:id(queue, ReqData0), rabbit_mgmt_util:with_decode( [], ReqData0, Context, fun([], BodyMap, ReqData) -> @@ -93,6 +100,18 @@ accept_content(ReqData0, #context{user = #user{username = ActingUser}} = Context end end). +%delete_resource(ReqData, Context) -> + %% We need to retrieve manually if-unused and if-empty, as the HTTP API uses '-' + %% while the record uses '_' + + %% TODO use IfUnused and IfEmpty in rabbit_stream_mgmt_util:delete_super_stream + %% IfUnused = <<"true">> =:= rabbit_mgmt_util:qs_val(<<"if-unused">>, ReqData), + %% IfEmpty = <<"true">> =:= rabbit_mgmt_util:qs_val(<<"if-empty">>, ReqData), +% rabbit_stream_mgmt_util:delete_super_stream(ReqData, Context). + +to_json(ReqData, Context) -> + rabbit_mgmt_util:reply(rabbit_stream_mgmt_util:find_super_stream(ReqData), ReqData, Context). + %%------------------------------------------------------------------- get_node(Props) -> case maps:get(<<"node">>, Props, undefined) of diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_super_streams_mgmt.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_super_streams_mgmt.erl new file mode 100644 index 00000000000..be9cd8dbd97 --- /dev/null +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_super_streams_mgmt.erl @@ -0,0 +1,88 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_stream_super_streams_mgmt). + +-behaviour(rabbit_mgmt_extension). + +-export([dispatcher/0, + web_ui/0]). +-export([init/2, + to_json/2, + content_types_provided/2, + resource_exists/2, + is_authorized/2]). + +-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("kernel/include/logger.hrl"). + +-define(DEFAULT_RPC_TIMEOUT, 30_000). + +-define(BASIC_COLUMNS, + ["vhost", + "name"]). + +-define(DEFAULT_SORT, ["vhost", "name"]). + +dispatcher() -> + [{"/stream/super-streams", ?MODULE, []}, + {"/stream/super-streams/:vhost", ?MODULE, []}]. + +web_ui() -> + []. + +%%-------------------------------------------------------------------- + +init(Req, _State) -> + {cowboy_rest, + rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), + #context{}}. + +content_types_provided(ReqData, Context) -> + {[{<<"application/json">>, to_json}], ReqData, Context}. + +resource_exists(ReqData, Context) -> + %% just checking that the vhost requested exists + {case rabbit_mgmt_util:all_or_one_vhost(ReqData, fun (_) -> [] end) of + vhost_not_found -> false; + _ -> true + end, ReqData, Context}. + +to_json(ReqData, Context) -> + try + Basic = do_super_streams_query(ReqData), + Data = rabbit_mgmt_util:augment_resources(Basic, ?DEFAULT_SORT, + ?BASIC_COLUMNS, ReqData, + Context, fun augment/2), + rabbit_mgmt_util:reply(Data, ReqData, Context) + catch + {error, invalid_range_parameters, Reason} -> + rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, + Context) + end. + +augment(Basic, _ReqData) -> + Basic. + +is_authorized(ReqData, Context) -> + rabbit_mgmt_util:is_authorized(ReqData, Context). + +do_super_streams_query(ReqData) -> + lists:foldl(fun(E, Acc) -> + case rabbit_stream_mgmt_util:find_super_stream_from_exchange(E) of + {error, not_super_stream} -> Acc; + S -> Acc ++ [S] + end end, [], exchanges(ReqData)). + +exchanges(ReqData) -> + [rabbit_mgmt_format:exchange(X) || X <- exchanges0(ReqData)]. + +exchanges0(ReqData) -> + rabbit_mgmt_util:all_or_one_vhost(ReqData, fun rabbit_exchange:info_all/1). + + diff --git a/deps/rabbitmq_web_dispatch/src/rabbit_web_dispatch_access_control.erl b/deps/rabbitmq_web_dispatch/src/rabbit_web_dispatch_access_control.erl index 50e9ec9716f..be72922a801 100644 --- a/deps/rabbitmq_web_dispatch/src/rabbit_web_dispatch_access_control.erl +++ b/deps/rabbitmq_web_dispatch/src/rabbit_web_dispatch_access_control.erl @@ -11,7 +11,7 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("kernel/include/logger.hrl"). --export([is_authorized/3, is_authorized/7, is_authorized_admin/3, +-export([is_authorized/3, is_authorized/5, is_authorized/7, is_authorized_admin/3, is_authorized_admin/5, vhost/1, vhost_from_headers/1]). -export([is_authorized_vhost/3, is_authorized_user/4, is_authorized_user/5, is_authorized_user/6,