diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 7b97ba58126..0fe7dcc2f9c 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -251,7 +251,7 @@ endef PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority default_queue_type_prop term_to_binary_compat_prop topic_permission unicode unit_access_control -PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit +PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit rabbit_ra_systems PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange PARALLEL_CT_SET_2_B = crashing_queues deprecated_features direct_exchange_routing_v2 disconnect_detected_during_alarm exchanges unit_gen_server2 diff --git a/deps/rabbit/docs/rabbitmq.conf.example b/deps/rabbit/docs/rabbitmq.conf.example index 47e2c41d0fa..cdd9a52c76d 100644 --- a/deps/rabbit/docs/rabbitmq.conf.example +++ b/deps/rabbit/docs/rabbitmq.conf.example @@ -527,6 +527,54 @@ ## # quorum_queue.snapshot_chunk_size = 1000000 +## +## Coordination System Ra Parameters +## +## Relevant doc guides: +## +## * https://www.rabbitmq.com/docs/clustering +## + +## Coordination system WAL compute checksums flag. +## +# coordination.wal_compute_checksums = true + +## Coordination system segment compute checksums flag. +## +# coordination.segment_compute_checksums = true + +## Maximum size in bytes of each WAL file segment for the coordination system. +## +# coordination.wal_max_size_bytes = 64000000 + +## Maximum number of entries allowed in the coordination system WAL. +## +# coordination.wal_max_entries = 500000 + +## Maximum number of entries allowed in a single WAL write batch for the coordination system. +## +# coordination.wal_max_batch_size = 4096 + +## Maximum size in bytes of each log segment for the coordination system. +## +# coordination.segment_max_size_bytes = 64000000 + +## Maximum number of entries in a log segment for the coordination system. +## +# coordination.segment_max_entries = 4096 + +## Maximum number of append entries RPC batch size for the coordination system. +## +# coordination.max_append_entries_rpc_batch_size = 16 + +## Whether to compress memory tables in the coordination system. +## +# coordination.compress_mem_tables = true + +## Size of snapshot chunks in bytes for the coordination system. +## +# coordination.snapshot_chunk_size = 1000000 + ## Changes classic queue storage implementation version. ## As of 4.0.0, version 2 is the default and this is a forward compatibility setting, ## that is, it will be useful when a new version is developed. diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index d87c8ef9368..8cefb25b62e 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2824,6 +2824,58 @@ end}. ]}. +%% +%% Coordination System Ra Parameters +%% + +{mapping, "coordination.wal_compute_checksums", "rabbit.coordination_wal_compute_checksums", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "coordination.segment_compute_checksums", "rabbit.coordination_segment_compute_checksums", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "coordination.wal_max_size_bytes", "rabbit.coordination_wal_max_size_bytes", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "coordination.wal_max_entries", "rabbit.coordination_wal_max_entries", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "coordination.wal_max_batch_size", "rabbit.coordination_wal_max_batch_size", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "coordination.segment_max_size_bytes", "rabbit.coordination_segment_max_size_bytes", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "coordination.segment_max_entries", "rabbit.coordination_segment_max_entries", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer", "positive_16_bit_unsigned_integer"]} +]}. + +{mapping, "coordination.max_append_entries_rpc_batch_size", "rabbit.coordination_max_append_entries_rpc_batch_size", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "coordination.compress_mem_tables", "rabbit.coordination_compress_mem_tables", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "coordination.snapshot_chunk_size", "rabbit.coordination_snapshot_chunk_size", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + + %% %% Runtime parameters %% diff --git a/deps/rabbit/src/rabbit_ra_systems.erl b/deps/rabbit/src/rabbit_ra_systems.erl index f76602d4e9e..f1411d2a050 100644 --- a/deps/rabbit/src/rabbit_ra_systems.erl +++ b/deps/rabbit/src/rabbit_ra_systems.erl @@ -29,6 +29,10 @@ %% the default min bin vheap value in OTP 26 -define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422). -define(MIN_BIN_VHEAP_SIZE_MULT, 64). +%% coordination system Ra parameters +-define(COORD_DEFAULT_WAL_COMPUTE_CHECKSUMS, true). +-define(COORD_DEFAULT_SEGMENT_COMPUTE_CHECKSUMS, true). +-define(COORD_DEFAULT_COMPRESS_MEM_TABLES, true). -spec setup() -> ok | no_return(). @@ -159,10 +163,40 @@ get_config(coordination = RaSystem) -> DefaultConfig = ra_system:default_config(), CoordDataDir = filename:join( [rabbit:data_dir(), "coordination", node()]), + WalComputeChecksums = application:get_env(rabbit, coordination_wal_compute_checksums, + ?COORD_DEFAULT_WAL_COMPUTE_CHECKSUMS), + SegmentComputeChecksums = application:get_env(rabbit, coordination_segment_compute_checksums, + ?COORD_DEFAULT_SEGMENT_COMPUTE_CHECKSUMS), + WalMaxSizeBytes = application:get_env(rabbit, coordination_wal_max_size_bytes, + ?COORD_WAL_MAX_SIZE_B), + WalMaxEntries = application:get_env(rabbit, coordination_wal_max_entries, + maps:get(wal_max_entries, DefaultConfig)), + WalMaxBatchSize = application:get_env(rabbit, coordination_wal_max_batch_size, + maps:get(wal_max_batch_size, DefaultConfig)), + SegmentMaxSizeBytes = application:get_env(rabbit, coordination_segment_max_size_bytes, + maps:get(segment_max_size_bytes, DefaultConfig)), + SegmentMaxEntries = application:get_env(rabbit, coordination_segment_max_entries, + maps:get(segment_max_entries, DefaultConfig)), + AERBatchSize = application:get_env(rabbit, coordination_max_append_entries_rpc_batch_size, + maps:get(default_max_append_entries_rpc_batch_size, DefaultConfig)), + CompressMemTables = application:get_env(rabbit, coordination_compress_mem_tables, + ?COORD_DEFAULT_COMPRESS_MEM_TABLES), + SnapshotChunkSize = application:get_env(rabbit, coordination_snapshot_chunk_size, + maps:get(snapshot_chunk_size, DefaultConfig)), + DefaultConfig#{name => RaSystem, data_dir => CoordDataDir, wal_data_dir => CoordDataDir, - wal_max_size_bytes => ?COORD_WAL_MAX_SIZE_B, + wal_compute_checksums => WalComputeChecksums, + segment_compute_checksums => SegmentComputeChecksums, + wal_max_size_bytes => WalMaxSizeBytes, + wal_max_entries => WalMaxEntries, + wal_max_batch_size => WalMaxBatchSize, + segment_max_size_bytes => SegmentMaxSizeBytes, + segment_max_entries => SegmentMaxEntries, + default_max_append_entries_rpc_batch_size => AERBatchSize, + compress_mem_tables => CompressMemTables, + snapshot_chunk_size => SnapshotChunkSize, names => ra_system:derive_names(RaSystem)}. -spec ensure_stopped() -> ok | no_return(). diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 9beb4f529ce..11e009e6f4a 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1230,6 +1230,80 @@ credential_validator.regexp = ^abc\\d+", ]}], []}, + %% + %% Coordination System Ra Parameters + %% + + {coordination_wal_compute_checksums, + "coordination.wal_compute_checksums = true", + [{rabbit, [ + {coordination_wal_compute_checksums, true} + ]}], + []}, + + {coordination_segment_compute_checksums, + "coordination.segment_compute_checksums = true", + [{rabbit, [ + {coordination_segment_compute_checksums, true} + ]}], + []}, + + {coordination_wal_max_size_bytes, + "coordination.wal_max_size_bytes = 64000000", + [{rabbit, [ + {coordination_wal_max_size_bytes, 64000000} + ]}], + []}, + + {coordination_wal_max_entries, + "coordination.wal_max_entries = 500000", + [{rabbit, [ + {coordination_wal_max_entries, 500000} + ]}], + []}, + + {coordination_wal_max_batch_size, + "coordination.wal_max_batch_size = 4096", + [{rabbit, [ + {coordination_wal_max_batch_size, 4096} + ]}], + []}, + + {coordination_segment_max_size_bytes, + "coordination.segment_max_size_bytes = 64000000", + [{rabbit, [ + {coordination_segment_max_size_bytes, 64000000} + ]}], + []}, + + {coordination_segment_max_entries, + "coordination.segment_max_entries = 4096", + [{rabbit, [ + {coordination_segment_max_entries, 4096} + ]}], + []}, + + {coordination_max_append_entries_rpc_batch_size, + "coordination.max_append_entries_rpc_batch_size = 16", + [{rabbit, [ + {coordination_max_append_entries_rpc_batch_size, 16} + ]}], + []}, + + {coordination_compress_mem_tables, + "coordination.compress_mem_tables = true", + [{rabbit, [ + {coordination_compress_mem_tables, true} + ]}], + []}, + + {coordination_snapshot_chunk_size, + "coordination.snapshot_chunk_size = 1000000", + [{rabbit, [ + {coordination_snapshot_chunk_size, 1000000} + ]}], + []}, + %% %% Runtime parameters %% diff --git a/deps/rabbit/test/rabbit_ra_systems_SUITE.erl b/deps/rabbit/test/rabbit_ra_systems_SUITE.erl new file mode 100644 index 00000000000..2c9cee5fa92 --- /dev/null +++ b/deps/rabbit/test/rabbit_ra_systems_SUITE.erl @@ -0,0 +1,108 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_ra_systems_SUITE). + +-compile(export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + coordination_defaults, + coordination_wal_max_size_bytes_default, + quorum_queue_defaults + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:run_setup_steps(Config), + rabbit_ct_config_schema:init_schemas(rabbit, Config1). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +coordination_defaults(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, check_coordination_defaults, []). + +coordination_wal_max_size_bytes_default(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, check_coordination_wal_max_size_bytes_default, []). + +quorum_queue_defaults(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, check_quorum_queue_defaults, []). + +check_coordination_defaults() -> + %% Get the coordination Ra system config + Config = rabbit_ra_systems:get_config(coordination), + + %% Verify that all expected keys are present + ?assert(maps:is_key(wal_compute_checksums, Config)), + ?assert(maps:is_key(segment_compute_checksums, Config)), + ?assert(maps:is_key(wal_max_size_bytes, Config)), + ?assert(maps:is_key(wal_max_entries, Config)), + ?assert(maps:is_key(wal_max_batch_size, Config)), + ?assert(maps:is_key(segment_max_size_bytes, Config)), + ?assert(maps:is_key(segment_max_entries, Config)), + ?assert(maps:is_key(default_max_append_entries_rpc_batch_size, Config)), + ?assert(maps:is_key(compress_mem_tables, Config)), + ?assert(maps:is_key(snapshot_chunk_size, Config)), + + %% Verify default boolean values + ?assertEqual(true, maps:get(wal_compute_checksums, Config)), + ?assertEqual(true, maps:get(segment_compute_checksums, Config)), + ?assertEqual(true, maps:get(compress_mem_tables, Config)), + + ok. + +check_coordination_wal_max_size_bytes_default() -> + %% Get the coordination Ra system config + Config = rabbit_ra_systems:get_config(coordination), + + %% Verify that wal_max_size_bytes defaults to 64 MB (64_000_000 bytes) + WalMaxSizeBytes = maps:get(wal_max_size_bytes, Config), + ExpectedWalMaxSizeBytes = 64_000_000, + ?assertEqual(ExpectedWalMaxSizeBytes, WalMaxSizeBytes), + + ok. + +check_quorum_queue_defaults() -> + %% Get the quorum_queues Ra system config + Config = rabbit_ra_systems:get_config(quorum_queues), + + %% Verify that all expected keys are present + ?assert(maps:is_key(wal_compute_checksums, Config)), + ?assert(maps:is_key(segment_compute_checksums, Config)), + ?assert(maps:is_key(wal_max_size_bytes, Config)), + ?assert(maps:is_key(wal_max_entries, Config)), + ?assert(maps:is_key(wal_max_batch_size, Config)), + ?assert(maps:is_key(segment_max_size_bytes, Config)), + ?assert(maps:is_key(segment_max_entries, Config)), + ?assert(maps:is_key(default_max_append_entries_rpc_batch_size, Config)), + ?assert(maps:is_key(compress_mem_tables, Config)), + ?assert(maps:is_key(snapshot_chunk_size, Config)), + ?assert(maps:is_key(server_recovery_strategy, Config)), + + ok.