Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ endef
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority default_queue_type_prop term_to_binary_compat_prop topic_permission unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit rabbit_ra_systems

PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange
PARALLEL_CT_SET_2_B = crashing_queues deprecated_features direct_exchange_routing_v2 disconnect_detected_during_alarm exchanges unit_gen_server2
Expand Down
48 changes: 48 additions & 0 deletions deps/rabbit/docs/rabbitmq.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,54 @@
##
# quorum_queue.snapshot_chunk_size = 1000000

##
## Coordination System Ra Parameters
##
## Relevant doc guides:
##
## * https://www.rabbitmq.com/docs/clustering
##

## Coordination system WAL compute checksums flag.
##
# coordination.wal_compute_checksums = true

## Coordination system segment compute checksums flag.
##
# coordination.segment_compute_checksums = true

## Maximum size in bytes of each WAL file segment for the coordination system.
##
# coordination.wal_max_size_bytes = 64000000

## Maximum number of entries allowed in the coordination system WAL.
##
# coordination.wal_max_entries = 500000

## Maximum number of entries allowed in a single WAL write batch for the coordination system.
##
# coordination.wal_max_batch_size = 4096

## Maximum size in bytes of each log segment for the coordination system.
##
# coordination.segment_max_size_bytes = 64000000

## Maximum number of entries in a log segment for the coordination system.
##
# coordination.segment_max_entries = 4096

## Maximum number of append entries RPC batch size for the coordination system.
##
# coordination.max_append_entries_rpc_batch_size = 16

## Whether to compress memory tables in the coordination system.
##
# coordination.compress_mem_tables = true

## Size of snapshot chunks in bytes for the coordination system.
##
# coordination.snapshot_chunk_size = 1000000

## Changes classic queue storage implementation version.
## As of 4.0.0, version 2 is the default and this is a forward compatibility setting,
## that is, it will be useful when a new version is developed.
Expand Down
52 changes: 52 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2824,6 +2824,58 @@ end}.
]}.


%%
%% Coordination System Ra Parameters
%%

{mapping, "coordination.wal_compute_checksums", "rabbit.coordination_wal_compute_checksums", [
{datatype, {enum, [true, false]}}
]}.

{mapping, "coordination.segment_compute_checksums", "rabbit.coordination_segment_compute_checksums", [
{datatype, {enum, [true, false]}}
]}.

{mapping, "coordination.wal_max_size_bytes", "rabbit.coordination_wal_max_size_bytes", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "coordination.wal_max_entries", "rabbit.coordination_wal_max_entries", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "coordination.wal_max_batch_size", "rabbit.coordination_wal_max_batch_size", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "coordination.segment_max_size_bytes", "rabbit.coordination_segment_max_size_bytes", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "coordination.segment_max_entries", "rabbit.coordination_segment_max_entries", [
{datatype, integer},
{validators, ["non_zero_positive_integer", "positive_16_bit_unsigned_integer"]}
]}.

{mapping, "coordination.max_append_entries_rpc_batch_size", "rabbit.coordination_max_append_entries_rpc_batch_size", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "coordination.compress_mem_tables", "rabbit.coordination_compress_mem_tables", [
{datatype, {enum, [true, false]}}
]}.

{mapping, "coordination.snapshot_chunk_size", "rabbit.coordination_snapshot_chunk_size", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.


%%
%% Runtime parameters
%%
Expand Down
36 changes: 35 additions & 1 deletion deps/rabbit/src/rabbit_ra_systems.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
%% the default min bin vheap value in OTP 26
-define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422).
-define(MIN_BIN_VHEAP_SIZE_MULT, 64).
%% coordination system Ra parameters
-define(COORD_DEFAULT_WAL_COMPUTE_CHECKSUMS, true).
-define(COORD_DEFAULT_SEGMENT_COMPUTE_CHECKSUMS, true).
-define(COORD_DEFAULT_COMPRESS_MEM_TABLES, true).

-spec setup() -> ok | no_return().

Expand Down Expand Up @@ -159,10 +163,40 @@ get_config(coordination = RaSystem) ->
DefaultConfig = ra_system:default_config(),
CoordDataDir = filename:join(
[rabbit:data_dir(), "coordination", node()]),
WalComputeChecksums = application:get_env(rabbit, coordination_wal_compute_checksums,
?COORD_DEFAULT_WAL_COMPUTE_CHECKSUMS),
SegmentComputeChecksums = application:get_env(rabbit, coordination_segment_compute_checksums,
?COORD_DEFAULT_SEGMENT_COMPUTE_CHECKSUMS),
WalMaxSizeBytes = application:get_env(rabbit, coordination_wal_max_size_bytes,
?COORD_WAL_MAX_SIZE_B),
WalMaxEntries = application:get_env(rabbit, coordination_wal_max_entries,
maps:get(wal_max_entries, DefaultConfig)),
WalMaxBatchSize = application:get_env(rabbit, coordination_wal_max_batch_size,
maps:get(wal_max_batch_size, DefaultConfig)),
SegmentMaxSizeBytes = application:get_env(rabbit, coordination_segment_max_size_bytes,
maps:get(segment_max_size_bytes, DefaultConfig)),
SegmentMaxEntries = application:get_env(rabbit, coordination_segment_max_entries,
maps:get(segment_max_entries, DefaultConfig)),
AERBatchSize = application:get_env(rabbit, coordination_max_append_entries_rpc_batch_size,
maps:get(default_max_append_entries_rpc_batch_size, DefaultConfig)),
CompressMemTables = application:get_env(rabbit, coordination_compress_mem_tables,
?COORD_DEFAULT_COMPRESS_MEM_TABLES),
SnapshotChunkSize = application:get_env(rabbit, coordination_snapshot_chunk_size,
maps:get(snapshot_chunk_size, DefaultConfig)),

DefaultConfig#{name => RaSystem,
data_dir => CoordDataDir,
wal_data_dir => CoordDataDir,
wal_max_size_bytes => ?COORD_WAL_MAX_SIZE_B,
wal_compute_checksums => WalComputeChecksums,
segment_compute_checksums => SegmentComputeChecksums,
wal_max_size_bytes => WalMaxSizeBytes,
wal_max_entries => WalMaxEntries,
wal_max_batch_size => WalMaxBatchSize,
segment_max_size_bytes => SegmentMaxSizeBytes,
segment_max_entries => SegmentMaxEntries,
default_max_append_entries_rpc_batch_size => AERBatchSize,
compress_mem_tables => CompressMemTables,
snapshot_chunk_size => SnapshotChunkSize,
names => ra_system:derive_names(RaSystem)}.

-spec ensure_stopped() -> ok | no_return().
Expand Down
74 changes: 74 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,80 @@ credential_validator.regexp = ^abc\\d+",
]}],
[]},

%%
%% Coordination System Ra Parameters
%%

{coordination_wal_compute_checksums,
"coordination.wal_compute_checksums = true",
[{rabbit, [
{coordination_wal_compute_checksums, true}
]}],
[]},

{coordination_segment_compute_checksums,
"coordination.segment_compute_checksums = true",
[{rabbit, [
{coordination_segment_compute_checksums, true}
]}],
[]},

{coordination_wal_max_size_bytes,
"coordination.wal_max_size_bytes = 64000000",
[{rabbit, [
{coordination_wal_max_size_bytes, 64000000}
]}],
[]},

{coordination_wal_max_entries,
"coordination.wal_max_entries = 500000",
[{rabbit, [
{coordination_wal_max_entries, 500000}
]}],
[]},

{coordination_wal_max_batch_size,
"coordination.wal_max_batch_size = 4096",
[{rabbit, [
{coordination_wal_max_batch_size, 4096}
]}],
[]},

{coordination_segment_max_size_bytes,
"coordination.segment_max_size_bytes = 64000000",
[{rabbit, [
{coordination_segment_max_size_bytes, 64000000}
]}],
[]},

{coordination_segment_max_entries,
"coordination.segment_max_entries = 4096",
[{rabbit, [
{coordination_segment_max_entries, 4096}
]}],
[]},

{coordination_max_append_entries_rpc_batch_size,
"coordination.max_append_entries_rpc_batch_size = 16",
[{rabbit, [
{coordination_max_append_entries_rpc_batch_size, 16}
]}],
[]},

{coordination_compress_mem_tables,
"coordination.compress_mem_tables = true",
[{rabbit, [
{coordination_compress_mem_tables, true}
]}],
[]},

{coordination_snapshot_chunk_size,
"coordination.snapshot_chunk_size = 1000000",
[{rabbit, [
{coordination_snapshot_chunk_size, 1000000}
]}],
[]},

%%
%% Runtime parameters
%%
Expand Down
108 changes: 108 additions & 0 deletions deps/rabbit/test/rabbit_ra_systems_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_ra_systems_SUITE).

-compile(export_all).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

all() ->
[
coordination_defaults,
coordination_wal_max_size_bytes_default,
quorum_queue_defaults
].

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:run_setup_steps(Config),
rabbit_ct_config_schema:init_schemas(rabbit, Config1).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Testcase}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).

coordination_defaults(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, check_coordination_defaults, []).

coordination_wal_max_size_bytes_default(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, check_coordination_wal_max_size_bytes_default, []).

quorum_queue_defaults(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, check_quorum_queue_defaults, []).

check_coordination_defaults() ->
%% Get the coordination Ra system config
Config = rabbit_ra_systems:get_config(coordination),

%% Verify that all expected keys are present
?assert(maps:is_key(wal_compute_checksums, Config)),
?assert(maps:is_key(segment_compute_checksums, Config)),
?assert(maps:is_key(wal_max_size_bytes, Config)),
?assert(maps:is_key(wal_max_entries, Config)),
?assert(maps:is_key(wal_max_batch_size, Config)),
?assert(maps:is_key(segment_max_size_bytes, Config)),
?assert(maps:is_key(segment_max_entries, Config)),
?assert(maps:is_key(default_max_append_entries_rpc_batch_size, Config)),
?assert(maps:is_key(compress_mem_tables, Config)),
?assert(maps:is_key(snapshot_chunk_size, Config)),

%% Verify default boolean values
?assertEqual(true, maps:get(wal_compute_checksums, Config)),
?assertEqual(true, maps:get(segment_compute_checksums, Config)),
?assertEqual(true, maps:get(compress_mem_tables, Config)),

ok.

check_coordination_wal_max_size_bytes_default() ->
%% Get the coordination Ra system config
Config = rabbit_ra_systems:get_config(coordination),

%% Verify that wal_max_size_bytes defaults to 64 MB (64_000_000 bytes)
WalMaxSizeBytes = maps:get(wal_max_size_bytes, Config),
ExpectedWalMaxSizeBytes = 64_000_000,
?assertEqual(ExpectedWalMaxSizeBytes, WalMaxSizeBytes),

ok.

check_quorum_queue_defaults() ->
%% Get the quorum_queues Ra system config
Config = rabbit_ra_systems:get_config(quorum_queues),

%% Verify that all expected keys are present
?assert(maps:is_key(wal_compute_checksums, Config)),
?assert(maps:is_key(segment_compute_checksums, Config)),
?assert(maps:is_key(wal_max_size_bytes, Config)),
?assert(maps:is_key(wal_max_entries, Config)),
?assert(maps:is_key(wal_max_batch_size, Config)),
?assert(maps:is_key(segment_max_size_bytes, Config)),
?assert(maps:is_key(segment_max_entries, Config)),
?assert(maps:is_key(default_max_append_entries_rpc_batch_size, Config)),
?assert(maps:is_key(compress_mem_tables, Config)),
?assert(maps:is_key(snapshot_chunk_size, Config)),
?assert(maps:is_key(server_recovery_strategy, Config)),

ok.
Loading