diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index d80cf3214074a..075a6608002c0 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -307,7 +307,7 @@ "moduleExtensions": { "//bazel:extensions.bzl%non_module_dependencies": { "general": { - "bzlTransitiveDigest": "ZtSnFu4+NXfuxuGvvbUJp7UjeyCebjpwwaLk/CVuao8=", + "bzlTransitiveDigest": "04fZ+dMvk5JQXMhYcrOVUgWVmMSII9PePEgWcYqNf80=", "usagesDigest": "FEiDyZe9eAU6yEqnarZf0XMEUk+prUyYClvq1RU1J98=", "recordedFileInputs": {}, "recordedDirentsInputs": {}, @@ -479,9 +479,9 @@ "repoRuleId": "@@bazel_tools//tools/build_defs/repo:http.bzl%http_archive", "attributes": { "build_file": "@@//bazel/thirdparty:seastar.BUILD", - "sha256": "9c572c5111ef6b4f0afc6ba358696c0e2a78e118bc3c3617cf6454a9d98970de", - "strip_prefix": "seastar-617d3c421a949f449942d7ce0d275af0997efb48", - "url": "https://github.com/redpanda-data/seastar/archive/617d3c421a949f449942d7ce0d275af0997efb48.tar.gz" + "sha256": "77feafdc928dc12b6a261ece933c19e070b901061fa56ff2114c1ded850afb1e", + "strip_prefix": "seastar-8948822a046413306b8da155ab85674545ab7f04", + "url": "https://github.com/WillemKauf/seastar/archive/8948822a046413306b8da155ab85674545ab7f04.tar.gz" } }, "unordered_dense": { diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index cfa85f2eee669..83955886ecfde 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -162,13 +162,15 @@ def data_dependency(): url = "https://github.com/redpanda-data/CRoaring/archive/c433d1c70c10fb2e40f049e019e2abbcafa6e69d.tar.gz", ) - # branch: v26.2.x-pre + # branch: rp_log_filter (adds force_tag overloads for the per-callsite + # log filter). Flip back to redpanda-data/seastar once the upstream PR + # merges. http_archive( name = "seastar", build_file = "//bazel/thirdparty:seastar.BUILD", - sha256 = "9c572c5111ef6b4f0afc6ba358696c0e2a78e118bc3c3617cf6454a9d98970de", - strip_prefix = "seastar-617d3c421a949f449942d7ce0d275af0997efb48", - url = "https://github.com/redpanda-data/seastar/archive/617d3c421a949f449942d7ce0d275af0997efb48.tar.gz", + sha256 = "77feafdc928dc12b6a261ece933c19e070b901061fa56ff2114c1ded850afb1e", + strip_prefix = "seastar-8948822a046413306b8da155ab85674545ab7f04", + url = "https://github.com/WillemKauf/seastar/archive/8948822a046413306b8da155ab85674545ab7f04.tar.gz", ) http_archive( diff --git a/proto/redpanda/core/admin/internal/v1/BUILD b/proto/redpanda/core/admin/internal/v1/BUILD index 08c9f9fdff76b..5322f3eb569b4 100644 --- a/proto/redpanda/core/admin/internal/v1/BUILD +++ b/proto/redpanda/core/admin/internal/v1/BUILD @@ -32,3 +32,19 @@ redpanda_proto_library( protos = [":breakglass_proto"], visibility = ["//visibility:public"], ) + +proto_library( + name = "log_filter_proto", + srcs = ["log_filter.proto"], + visibility = ["//visibility:public"], + deps = [ + "//proto/redpanda/core/pbgen:options_proto", + "//proto/redpanda/core/pbgen:rpc_proto", + ], +) + +redpanda_proto_library( + name = "log_filter_redpanda_proto", + protos = [":log_filter_proto"], + visibility = ["//visibility:public"], +) diff --git a/proto/redpanda/core/admin/internal/v1/log_filter.proto b/proto/redpanda/core/admin/internal/v1/log_filter.proto new file mode 100644 index 0000000000000..96e689ab22fa4 --- /dev/null +++ b/proto/redpanda/core/admin/internal/v1/log_filter.proto @@ -0,0 +1,128 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package redpanda.core.admin.v2.internal; + +import "proto/redpanda/core/pbgen/options.proto"; +import "proto/redpanda/core/pbgen/rpc.proto"; + +option (pbgen.cpp_namespace) = "proto::admin"; + +// State value written onto every callsite a LogFilterRule matches, or +// reported by ListLogCallsites as the resolved state for a callsite. +enum LogFilterState { + // Proto3 default — rejected by SetLogFilter as INVALID_ARGUMENT, + // never returned by ListLogCallsites. + LOG_FILTER_STATE_UNSPECIFIED = 0; + // Callsite falls through to the logger's configured level gate. + LOG_FILTER_STATE_INHERITED = 1; + // Callsite emits regardless of the configured level. + LOG_FILTER_STATE_FORCE_ON = 2; + // Callsite is suppressed regardless of the configured level. + LOG_FILTER_STATE_FORCE_OFF = 3; +} + +// A filter rule applied to every registered vlog callsite. Every populated +// predicate must match for the rule to apply: file is an fnmatch(3) glob +// against the callsite's __FILE__, line matches __LINE__ as described +// below, and contains is a substring search against the format-string +// literal. An empty rule (no predicates set) matches every callsite. +// Rules in a list are applied in order; the last matching rule's state +// value wins. A callsite matching no rule resolves to +// LOG_FILTER_STATE_INHERITED. +message LogFilterRule { + // fnmatch(3) glob pattern matched against the callsite's __FILE__. + optional string file = 1; + // Predicate on __LINE__. Accepts exactly one or two entries: a + // single value matches that line, two values [lo, hi] match the + // inclusive range lo <= __LINE__ <= hi. Any other size is rejected + // with INVALID_ARGUMENT. + repeated uint32 line = 2; + // Substring that must appear in the format-string literal. + optional string contains = 3; + // State written to every site the rule matches. + LogFilterState state = 4; +} + +// A registered vlog callsite and its current enabled state. +message LogCallsiteInfo { + // Source-file basename (path stripped at compile time). + string file = 1; + // Source line. + uint32 line = 2; + // Format-string literal at the call site. + string fmt = 3; + // Resolved state for this callsite. + LogFilterState state = 4; +} + +message SetLogFilterRequest { + // Rule list that replaces the currently active rules. Every registered + // callsite is re-evaluated against the new set. To add a rule to the + // active set, fetch the current rules with GetLogFilter, append to + // that list client-side, and resubmit here. + repeated LogFilterRule rules = 1; +} +message SetLogFilterResponse {} + +message ResetLogFilterRequest {} +message ResetLogFilterResponse {} + +message GetLogFilterRequest {} +message GetLogFilterResponse { + // Current active rules, in application order (last match wins). + repeated LogFilterRule rules = 1; +} + +message ListLogCallsitesRequest { + // If set, only return callsites whose file matches this fnmatch(3) glob. + optional string file_filter = 1; +} +message ListLogCallsitesResponse { + repeated LogCallsiteInfo callsites = 1; +} + +// LogFilterService controls the per-callsite vlog filter on the node that +// handles the RPC. Filter state is a per-node property — to apply the same +// filter cluster-wide, call each broker directly. +service LogFilterService { + // Replace the active rule set with the supplied rules and re-evaluate + // every registered callsite against them. + rpc SetLogFilter(SetLogFilterRequest) returns (SetLogFilterResponse) { + option (pbgen.rpc) = { + authz: SUPERUSER, + }; + } + // Clear every rule and re-enable every callsite. + rpc ResetLogFilter(ResetLogFilterRequest) returns (ResetLogFilterResponse) { + option (pbgen.rpc) = { + authz: SUPERUSER, + }; + } + // Return the currently active filter rules in application order. + rpc GetLogFilter(GetLogFilterRequest) returns (GetLogFilterResponse) { + option (pbgen.rpc) = { + authz: SUPERUSER, + }; + } + // Enumerate all registered callsites and their current enabled state. + rpc ListLogCallsites(ListLogCallsitesRequest) + returns (ListLogCallsitesResponse) { + option (pbgen.rpc) = { + authz: SUPERUSER, + }; + } +} diff --git a/src/v/base/BUILD b/src/v/base/BUILD index bff2c57ef7917..14ab434d05648 100644 --- a/src/v/base/BUILD +++ b/src/v/base/BUILD @@ -4,6 +4,7 @@ redpanda_cc_library( name = "base", srcs = [ "vassert.cc", + "vlog_callsite.cc", ], hdrs = [ "compiler_utils.h", @@ -21,6 +22,8 @@ redpanda_cc_library( "vassert.h", "vassert-register.h", "vlog.h", + "vlog_callsite.h", + "vlog_filter.h", ], visibility = ["//visibility:public"], deps = [ diff --git a/src/v/base/tests/BUILD b/src/v/base/tests/BUILD index 43abcb0abd41f..0d60899548e90 100644 --- a/src/v/base/tests/BUILD +++ b/src/v/base/tests/BUILD @@ -51,3 +51,17 @@ redpanda_cc_gtest( "@seastar", ], ) + +redpanda_cc_gtest( + name = "vlog_filter_test", + timeout = "short", + srcs = [ + "vlog_filter_test.cc", + ], + deps = [ + "//src/v/base", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/base/tests/vlog_filter_test.cc b/src/v/base/tests/vlog_filter_test.cc new file mode 100644 index 0000000000000..d7b159a6773fc --- /dev/null +++ b/src/v/base/tests/vlog_filter_test.cc @@ -0,0 +1,307 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "base/vlog.h" +#include "base/vlog_callsite.h" +#include "base/vlog_filter.h" + +#include + +#include + +#include +#include +#include + +namespace { + +using cs_state = vlog::detail::callsite_base::state; + +// Each TEST body that references a callsite by line intentionally keeps the +// callsite literal inline so the reported __LINE__ is deterministic under +// formatting. The helper below looks up a registered callsite by filename +// basename + line + format literal. +const vlog::detail::callsite_base* +find_site(const char* file, unsigned line, const char* fmt) { + const vlog::detail::callsite_base* found = nullptr; + vlog::for_each_callsite([&](vlog::detail::callsite_base& cs) { + if (found != nullptr) { + return; + } + if (std::string_view(cs.file()) != file) { + return; + } + if (cs.line() != line) { + return; + } + if (std::string_view(cs.fmt()) != fmt) { + return; + } + found = &cs; + }); + return found; +} + +} // namespace + +TEST(VlogFilter, CallsiteSelfRegisters) { + static vlog::detail::callsite + cs = {}; + EXPECT_EQ(cs.resolved_state(), cs_state::default_); + const auto* found = find_site( + "src/v/base/tests/vlog_filter_test.cc", 100, "hello {}"); + ASSERT_NE(found, nullptr); + EXPECT_EQ(found, &cs); +} + +TEST(VlogFilter, FileGlobRule) { + static vlog::detail::callsite + cs = {}; + ASSERT_EQ(cs.resolved_state(), cs_state::default_); + + vlog::apply_rules({vlog::rule{ + .file = std::string{"src/v/raft/*"}, .state = cs_state::force_off}}); + EXPECT_EQ(cs.resolved_state(), cs_state::force_off); + + vlog::reset_rules(); + EXPECT_EQ(cs.resolved_state(), cs_state::default_); +} + +TEST(VlogFilter, FileLineRule) { + static vlog::detail::callsite + cs_a = {}; + static vlog::detail::callsite + cs_b = {}; + + vlog::apply_rules({vlog::rule{ + .file = std::string{"src/v/raft/consensus.cc"}, + .line = std::pair{42u, 42u}, + .state = cs_state::force_off}}); + + EXPECT_EQ(cs_a.resolved_state(), cs_state::force_off); + EXPECT_EQ(cs_b.resolved_state(), cs_state::default_); + + vlog::reset_rules(); +} + +TEST(VlogFilter, LineRangeRule) { + // Inclusive [lo, hi] — endpoints match, out-of-band doesn't. + static vlog::detail::callsite + cs_below = {}; + static vlog::detail::callsite + cs_lo = {}; + static vlog::detail::callsite + cs_mid = {}; + static vlog::detail::callsite + cs_hi = {}; + static vlog::detail::callsite + cs_above = {}; + + vlog::apply_rules({vlog::rule{ + .file = std::string{"src/v/raft/heartbeat.cc"}, + .line = std::pair{100u, 200u}, + .state = cs_state::force_off}}); + + EXPECT_EQ(cs_below.resolved_state(), cs_state::default_); + EXPECT_EQ(cs_lo.resolved_state(), cs_state::force_off); + EXPECT_EQ(cs_mid.resolved_state(), cs_state::force_off); + EXPECT_EQ(cs_hi.resolved_state(), cs_state::force_off); + EXPECT_EQ(cs_above.resolved_state(), cs_state::default_); + + vlog::reset_rules(); +} + +TEST(VlogFilter, ContainsRule) { + static vlog::detail::callsite + cs_hit = {}; + static vlog::detail::callsite + cs_miss = {}; + + vlog::apply_rules({vlog::rule{ + .contains = std::string{"slow path"}, .state = cs_state::force_off}}); + + EXPECT_EQ(cs_hit.resolved_state(), cs_state::force_off); + EXPECT_EQ(cs_miss.resolved_state(), cs_state::default_); + + vlog::reset_rules(); +} + +TEST(VlogFilter, LaterRuleOverridesEarlier) { + static vlog::detail::callsite + cs = {}; + + vlog::apply_rules({ + vlog::rule{ + .file = std::string{"src/v/storage/*"}, .state = cs_state::force_off}, + vlog::rule{ + .file = std::string{"src/v/storage/disk_log.cc"}, + .line = std::pair{100u, 100u}, + .state = cs_state::default_}, + }); + EXPECT_EQ(cs.resolved_state(), cs_state::default_); + + vlog::apply_rules({ + vlog::rule{ + .file = std::string{"src/v/storage/disk_log.cc"}, + .line = std::pair{100u, 100u}, + .state = cs_state::default_}, + vlog::rule{ + .file = std::string{"src/v/storage/*"}, .state = cs_state::force_off}, + }); + EXPECT_EQ(cs.resolved_state(), cs_state::force_off); + + vlog::reset_rules(); +} + +TEST(VlogFilter, EmptyRuleMatchesEverything) { + static vlog::detail::callsite + cs = {}; + + vlog::apply_rules({vlog::rule{.state = cs_state::force_off}}); + EXPECT_EQ(cs.resolved_state(), cs_state::force_off); + + vlog::reset_rules(); + EXPECT_EQ(cs.resolved_state(), cs_state::default_); +} + +TEST(VlogFilter, RulesAppliedBeforeRegistrationAffectNewSites) { + // Apply a rule that disables a pattern, then register a new callsite + // matching that pattern. The new site must start disabled — this is the + // cold-start case: a log line is filtered before it has ever fired. + vlog::apply_rules({vlog::rule{ + .file = std::string{"src/v/kafka/cold/*"}, + .state = cs_state::force_off}}); + + static vlog::detail::callsite + cs = {}; + + EXPECT_EQ(cs.resolved_state(), cs_state::force_off); + + vlog::reset_rules(); + EXPECT_EQ(cs.resolved_state(), cs_state::default_); +} + +TEST(VlogFilter, SitesNotMatchedKeepDefault) { + static vlog::detail::callsite + cs = {}; + + vlog::apply_rules({vlog::rule{ + .file = std::string{"src/v/raft/*"}, .state = cs_state::force_off}}); + EXPECT_EQ(cs.resolved_state(), cs_state::default_); + + vlog::reset_rules(); +} + +namespace { + +// Captures a seastar logger's output to a stringstream for duration of +// the scope. The destructor restores the ostream to std::cerr so the +// logger does not hold a dangling pointer after the captured buffer +// goes out of scope. +class logger_capture { +public: + logger_capture() { + seastar::logger::set_ostream(_captured); + seastar::logger::set_ostream_enabled(true); + } + ~logger_capture() { seastar::logger::set_ostream(std::cerr); } + std::string str() const { return _captured.str(); } + void clear() { _captured.str(""); } + +private: + std::ostringstream _captured; +}; + +// A dedicated logger for this test file's force_on/force_off tests. Each +// test sets its level explicitly; other tests in this binary don't share +// it. +seastar::logger vlog_filter_test_log("vlog_filter_test"); + +} // namespace + +TEST(VlogFilter, ForceOnBypassesLevelGate) { + logger_capture cap; + vlog_filter_test_log.set_level(seastar::log_level::warn); + + vlog::reset_rules(); + + // The vlog() macro declares a static callsite at the call line. We + // compute the macro's __LINE__ ahead of time so the rule pinpoints + // it. `__LINE__ + N` below points at the vlog line; adjust N if you + // re-indent the macro expansion. + constexpr auto* this_file = "src/v/base/tests/vlog_filter_test.cc"; + const auto site_line = static_cast(__LINE__) + 5; + vlog::apply_rules({vlog::rule{ + .file = std::string{this_file}, + .line = std::pair{site_line, site_line}, + .state = cs_state::force_on}}); + vlog(vlog_filter_test_log.trace, "trace_forced_on_payload"); // target line + + EXPECT_NE(cap.str().find("trace_forced_on_payload"), std::string::npos); + vlog::reset_rules(); +} + +TEST(VlogFilter, ForceOffSuppressesRegardless) { + logger_capture cap; + vlog_filter_test_log.set_level(seastar::log_level::trace); + + vlog::reset_rules(); + + const auto site_line = static_cast(__LINE__) + 5; + vlog::apply_rules({vlog::rule{ + .file = std::string{"src/v/base/tests/vlog_filter_test.cc"}, + .line = std::pair{site_line, site_line}, + .state = cs_state::force_off}}); + vlog(vlog_filter_test_log.trace, "trace_forced_off_payload"); // target line + + EXPECT_EQ(cap.str().find("trace_forced_off_payload"), std::string::npos); + vlog::reset_rules(); +} + +TEST(VlogFilter, LastRuleWinsAcrossAllThreeStates) { + static vlog::detail::callsite + cs = {}; + + vlog::apply_rules({ + vlog::rule{ + .file = std::string{"src/v/cluster/*"}, .state = cs_state::force_off}, + vlog::rule{ + .file = std::string{"src/v/cluster/ordering.cc"}, + .state = cs_state::force_on}, + }); + EXPECT_EQ(cs.resolved_state(), cs_state::force_on); + + vlog::apply_rules({ + vlog::rule{ + .file = std::string{"src/v/cluster/*"}, .state = cs_state::force_on}, + vlog::rule{ + .file = std::string{"src/v/cluster/ordering.cc"}, + .state = cs_state::default_}, + }); + EXPECT_EQ(cs.resolved_state(), cs_state::default_); + + vlog::reset_rules(); +} diff --git a/src/v/base/vlog.h b/src/v/base/vlog.h index 5a1ac9e794dc2..894b05a78b60d 100644 --- a/src/v/base/vlog.h +++ b/src/v/base/vlog.h @@ -10,20 +10,95 @@ */ #pragma once #include "base/source_location.h" +#include "base/vlog_callsite.h" #define fmt_with_ctx(method, fmt, args...) \ method("{} - " fmt, vlog::file_line::current(), ##args) -#define vlog(method, fmt, args...) fmt_with_ctx(method, fmt, ##args) +#define fmt_with_ctx_force(method, fmt, args...) \ + method( \ + ::seastar::logger::force, \ + "{} - " fmt, \ + vlog::file_line::current(), \ + ##args) -#define fmt_with_ctx_level(logger, level, fmt, args...) \ - logger.log(level, "{} - " fmt, vlog::file_line::current(), ##args) +#define fmt_with_ctx_level(logger_, level, fmt, args...) \ + logger_.log(level, "{} - " fmt, vlog::file_line::current(), ##args) -#define vlogl(logger, level, fmt, args...) \ - fmt_with_ctx_level(logger, level, fmt, ##args) +#define fmt_with_ctx_level_force(logger_, level, fmt, args...) \ + logger_.log( \ + level, \ + ::seastar::logger::force, \ + "{} - " fmt, \ + vlog::file_line::current(), \ + ##args) -#define fmt_with_ctx_level_and_rate(logger, level, rate, fmt, args...) \ - logger.log(level, rate, "{} - " fmt, vlog::file_line::current(), ##args) +// Gate a vlog invocation on the static per-callsite state. default_ goes +// through the logger's configured level gate, force_on bypasses the gate +// via the seastar force_tag overloads, force_off drops the call entirely +// without evaluating any format argument. +// +// The callsite is a class-template instantiation parameterized on an NTTP +// carrying __FILE__, __LINE__, and the format literal. That lets the +// static instance be constant-initialized at program start, avoiding the +// __cxa_guard once-check that would otherwise run on every macro entry. +#define vlog(method, fmt, args...) \ + do { \ + static ::vlog::detail::callsite<::vlog::detail::make_site_nttp( \ + __FILE__, __LINE__, fmt)> \ + _vlog_cs = {}; \ + switch (_vlog_cs.resolved_state()) { \ + case ::vlog::detail::callsite_base::state::default_: \ + fmt_with_ctx(method, fmt, ##args); \ + break; \ + case ::vlog::detail::callsite_base::state::force_on: \ + fmt_with_ctx_force(method, fmt, ##args); \ + break; \ + case ::vlog::detail::callsite_base::state::force_off: \ + case ::vlog::detail::callsite_base::state::uninit: \ + break; \ + } \ + } while (0) -#define vloglr(logger, level, rate, fmt, args...) \ - fmt_with_ctx_level_and_rate(logger, level, rate, fmt, ##args) +#define vlogl(logger_, level, fmt, args...) \ + do { \ + static ::vlog::detail::callsite<::vlog::detail::make_site_nttp( \ + __FILE__, __LINE__, fmt)> \ + _vlog_cs = {}; \ + switch (_vlog_cs.resolved_state()) { \ + case ::vlog::detail::callsite_base::state::default_: \ + fmt_with_ctx_level(logger_, level, fmt, ##args); \ + break; \ + case ::vlog::detail::callsite_base::state::force_on: \ + fmt_with_ctx_level_force(logger_, level, fmt, ##args); \ + break; \ + case ::vlog::detail::callsite_base::state::force_off: \ + case ::vlog::detail::callsite_base::state::uninit: \ + break; \ + } \ + } while (0) + +#define vloglr(logger_, level, rate, fmt, args...) \ + do { \ + static ::vlog::detail::callsite<::vlog::detail::make_site_nttp( \ + __FILE__, __LINE__, fmt)> \ + _vlog_cs = {}; \ + switch (_vlog_cs.resolved_state()) { \ + case ::vlog::detail::callsite_base::state::default_: \ + logger_.log( \ + level, rate, "{} - " fmt, vlog::file_line::current(), ##args); \ + break; \ + case ::vlog::detail::callsite_base::state::force_on: \ + logger_.log( \ + level, \ + ::seastar::logger::force, \ + rate, \ + "{} - " fmt, \ + vlog::file_line::current(), \ + ##args); \ + break; \ + case ::vlog::detail::callsite_base::state::force_off: \ + case ::vlog::detail::callsite_base::state::uninit: \ + break; \ + } \ + } while (0) diff --git a/src/v/base/vlog_callsite.cc b/src/v/base/vlog_callsite.cc new file mode 100644 index 0000000000000..bf926d9f69ab0 --- /dev/null +++ b/src/v/base/vlog_callsite.cc @@ -0,0 +1,172 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "base/vlog_callsite.h" + +#include "base/vlog_filter.h" + +#include +#include +#include +#include +#include + +namespace vlog { + +namespace { + +bool rule_matches(const rule& r, const detail::callsite_base& cs) { + if (r.file) { + if (fnmatch(r.file->c_str(), cs.file(), 0) != 0) { + return false; + } + } + if (r.line) { + const auto [lo, hi] = *r.line; + if (cs.line() < lo || cs.line() > hi) { + return false; + } + } + if (r.contains) { + const char* f = cs.fmt(); + if (f == nullptr || std::strstr(f, r.contains->c_str()) == nullptr) { + return false; + } + } + return true; +} + +detail::callsite_base::state +evaluate(const std::vector& rules, const detail::callsite_base& cs) { + auto result = detail::callsite_base::state::default_; + for (const auto& r : rules) { + if (rule_matches(r, cs)) { + result = r.state; + } + } + return result; +} + +// Current rule set. Stored as an immutable shared_ptr; readers (first-call +// slow_init) snapshot it atomically and hold a ref until they finish +// evaluating, the writer (apply_rules) swaps in a new shared_ptr, and the +// old one is freed when the last outstanding reader releases its ref. +// +// We use the C++11 free functions std::atomic_load/store on shared_ptr +// rather than std::atomic>: the latter is a C++20 +// specialization that libc++ still doesn't ship. The free functions are +// deprecated in C++20 but implemented by both libstdc++ and libc++, and +// there is no planned removal. +std::shared_ptr> g_rules + = std::make_shared>(); + +std::shared_ptr> load_rules() { + return std::atomic_load_explicit(&g_rules, std::memory_order_acquire); +} + +void store_rules(std::shared_ptr> r) { + std::atomic_store_explicit( + &g_rules, std::move(r), std::memory_order_release); +} + +} // namespace + +namespace detail { + +// Lock-free singly-linked list of callsites. Nodes are push-only — once +// registered, a callsite lives for the program's lifetime — so traversal +// needs no synchronization beyond an acquire load of the head. This keeps +// the admin path out of the logger thread's way: apply_rules is a pure +// traversal plus relaxed stores, never blocking. +class registry { +public: + static registry& instance() { + static registry r; + return r; + } + + void add(callsite_base* cs) noexcept { + callsite_base* prev = _head.load(std::memory_order_relaxed); + do { + cs->_next = prev; + } while (!_head.compare_exchange_weak( + prev, cs, std::memory_order_release, std::memory_order_relaxed)); + } + + template + void visit(const Fn& fn) { + for (callsite_base* cs = _head.load(std::memory_order_acquire); + cs != nullptr; + cs = cs->_next) { + fn(*cs); + } + } + +private: + std::atomic _head{nullptr}; +}; + +// First-call slow path. Concurrent first-callers race on a CAS of _state +// from uninit; exactly one thread wins and performs the single registry +// insertion plus the initial rule evaluation. Losers observe the +// tentative "enabled" value the winner published and return it — a +// racing reader may observe a transiently-enabled site that the winner +// will subsequently flip to disabled, but the next call converges on +// the published final state. +// +// Ordering: push onto the registry *before* loading the rules pointer. +// This preserves the invariant from the prior ctor-based design — a +// concurrent apply_rules() either sees this site during its walk (and +// writes the correct enabled state) or publishes the new rules pointer +// before we load it (so we evaluate against the new rules ourselves). +callsite_base::state callsite_base::slow_init() noexcept { + state expected = state::uninit; + if (!_state.compare_exchange_strong( + expected, + state::default_, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + return expected; + } + registry::instance().add(this); + auto rules = load_rules(); + auto final_state = evaluate(*rules, *this); + _state.store(final_state, std::memory_order_relaxed); + return final_state; +} + +} // namespace detail + +void apply_rules(std::vector rules) { + auto snapshot = std::make_shared>(std::move(rules)); + // Publish the new rules *before* walking. Any callsite registering + // concurrently loads this pointer (or a newer one) after its own push, + // so it cannot end up evaluating against an earlier rule set. + store_rules(snapshot); + detail::registry::instance().visit([&snapshot](detail::callsite_base& cs) { + cs.set_state(evaluate(*snapshot, cs)); + }); +} + +std::vector get_rules() { return *load_rules(); } + +void reset_rules() { + store_rules(std::make_shared>()); + detail::registry::instance().visit([](detail::callsite_base& cs) { + cs.set_state(detail::callsite_base::state::default_); + }); +} + +void for_each_callsite(std::function fn) { + detail::registry::instance().visit( + [&fn](detail::callsite_base& cs) { fn(cs); }); +} + +} // namespace vlog diff --git a/src/v/base/vlog_callsite.h b/src/v/base/vlog_callsite.h new file mode 100644 index 0000000000000..b381bb8138938 --- /dev/null +++ b/src/v/base/vlog_callsite.h @@ -0,0 +1,140 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include +#include +#include + +namespace vlog::detail { + +/// Structural wrapper around a string literal so it can live inside a +/// non-type template parameter. C++20 forbids taking the address of a +/// string-literal subobject as an NTTP value, but a value-type containing +/// an array of char is structural and eligible — so we copy the literal's +/// bytes into this object and pass it by value. +template +struct fixed_string { + char data[N]{}; + + constexpr fixed_string(const char (&s)[N]) noexcept { + for (std::size_t i = 0; i < N; ++i) { + data[i] = s[i]; + } + } + + constexpr const char* c_str() const noexcept { return data; } +}; + +template +fixed_string(const char (&)[N]) -> fixed_string; + +/// Per-callsite metadata carried as a non-type template parameter. Sizes +/// are deduced from the string-literal array bounds at the macro call so +/// each site's file and format bytes live inside the NTTP value itself. +template +struct site_info { + fixed_string file; + unsigned line; + fixed_string fmt; +}; + +/// Factory for the NTTP value. consteval because the only valid use is in +/// a template argument position — calling it at runtime would be a bug +/// that the constraint catches at compile time. +template +consteval site_info make_site_nttp( + const char (&file)[FileN], unsigned line, const char (&fmt)[FmtN]) { + return site_info{ + .file = fixed_string{file}, + .line = line, + .fmt = fixed_string{fmt}, + }; +} + +/// Non-template base. Holds the mutable per-site state and the registry +/// link so that the process-wide registry and the admin walk +/// (`apply_rules`, `for_each_callsite`) can work against a single concrete +/// type regardless of which class-template instantiation produced the +/// site. +class callsite_base { +public: + callsite_base(const callsite_base&) = delete; + callsite_base& operator=(const callsite_base&) = delete; + callsite_base(callsite_base&&) = delete; + callsite_base& operator=(callsite_base&&) = delete; + + /// The resolved state for this callsite. The vlog* macros switch on + /// this value: default_ goes through the logger's level gate, force_on + /// calls the logger's force-tag overload, force_off drops the call + /// entirely. + enum class state : std::uint8_t { + uninit = 0, // zero-init ground state, must remain 0 + default_ = 1, + force_on = 2, + force_off = 3, + }; + + /// Cheap gate consulted on every vlog(...) invocation. A single relaxed + /// load of one byte plus a well-predicted branch; the uninit slow path + /// registers the site and evaluates the active rule set. + state resolved_state() noexcept { + auto s = _state.load(std::memory_order_relaxed); + if (s == state::uninit) [[unlikely]] { + s = slow_init(); + } + return s; + } + + void set_state(state s) noexcept { + _state.store(s, std::memory_order_relaxed); + } + + const char* file() const noexcept { return _file; } + unsigned line() const noexcept { return _line; } + const char* fmt() const noexcept { return _fmt; } + +protected: + constexpr callsite_base(const char* f, unsigned l, const char* fmt) noexcept + : _file(f) + , _line(l) + , _fmt(fmt) {} + ~callsite_base() = default; + +private: + // First-call slow path: claims init via CAS on _state, registers with + // the process-wide registry, evaluates the current rule set, and + // publishes the final state. Subsequent calls see a non-uninit state + // and take only the fast path above. + state slow_init() noexcept; + + const char* _file; + unsigned _line; + const char* _fmt; + std::atomic _state{state::uninit}; + callsite_base* _next{nullptr}; + + friend class registry; +}; + +/// One class-template instantiation per vlog macro expansion. Because the +/// default constructor forwards constexpr NTTP fields to a constexpr base +/// constructor and no members require dynamic initialization, the static +/// instance is constant-initialized at program start: no __cxa_guard, no +/// per-entry once-check on the hot path. +template +class callsite final : public callsite_base { +public: + constexpr callsite() noexcept + : callsite_base(Info.file.c_str(), Info.line, Info.fmt.c_str()) {} +}; + +} // namespace vlog::detail diff --git a/src/v/base/vlog_filter.h b/src/v/base/vlog_filter.h new file mode 100644 index 0000000000000..856d3d22c515e --- /dev/null +++ b/src/v/base/vlog_filter.h @@ -0,0 +1,55 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "base/vlog_callsite.h" + +#include +#include +#include +#include +#include + +namespace vlog { + +/// A rule for gating vlog callsites. Every populated predicate must match +/// for the rule to apply: the file field is a fnmatch(3) glob tested +/// against the basename-preserved __FILE__ path, line is an inclusive +/// [lo, hi] range the callsite's __LINE__ must fall within (lo == hi for +/// an exact-match rule), and contains must appear as a substring of the +/// format-string literal. An empty rule (no predicates set) matches every +/// callsite. The rule's state field is the value written onto every site +/// it matches; a callsite matching no rule resolves to default_. +struct rule { + std::optional file; + std::optional> line; + std::optional contains; + detail::callsite_base::state state = detail::callsite_base::state::default_; +}; + +/// Replace the active rule set and re-evaluate every registered callsite +/// against it. Rules are applied in order; the last matching rule's +/// state value wins. A site that matches no rule resolves to default_. +void apply_rules(std::vector rules); + +/// Snapshot of the currently active rule set, in application order. +std::vector get_rules(); + +/// Shortcut for apply_rules({}): resets every callsite to default_. +void reset_rules(); + +/// Visit every registered callsite. The registry is lock-free: this is a +/// plain traversal of a singly-linked list, safe to call concurrently with +/// new callsites being registered by logger threads (though newly-added +/// sites may or may not appear in this walk). +void for_each_callsite(std::function fn); + +} // namespace vlog diff --git a/src/v/cloud_storage_clients/util.cc b/src/v/cloud_storage_clients/util.cc index c47996bc2cb25..854366dfcb0b8 100644 --- a/src/v/cloud_storage_clients/util.cc +++ b/src/v/cloud_storage_clients/util.cc @@ -179,16 +179,12 @@ void log_buffer_with_rate_limiting( static constexpr int buffer_size = 0x100; static constexpr auto rate_limit = std::chrono::seconds(1); thread_local static ss::logger::rate_limit rate(rate_limit); - auto log_with_rate_limit = [&logger]( - ss::logger::format_info fmt, auto... args) { - logger.log(ss::log_level::warn, rate, fmt, args...); - }; iobuf_istreambuf strbuf(buf); std::istream stream(&strbuf); std::array str{}; auto sz = stream.readsome(str.data(), buffer_size); auto sview = std::string_view(str.data(), sz); - vlog(log_with_rate_limit, "{}: {}", msg, sview); + vloglr(logger, ss::log_level::warn, rate, "{}: {}", msg, sview); } std::vector all_paths_to_file(const object_key& path) { diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 56b8e02b098f1..b2b24990b96ca 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -399,24 +399,64 @@ class connection_context final log(ss::log_level::error, format, std::forward(args)...); } template + void error(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::error, + ss::logger::force, + format, + std::forward(args)...); + } + template void warn(const char* format, Args&&... args) { log(ss::log_level::warn, format, std::forward(args)...); } + template + void warn(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::warn, + ss::logger::force, + format, + std::forward(args)...); + } template void info(const char* format, Args&&... args) { log(ss::log_level::info, format, std::forward(args)...); } + template + void info(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::info, + ss::logger::force, + format, + std::forward(args)...); + } template void debug(const char* format, Args&&... args) { log(ss::log_level::debug, format, std::forward(args)...); } + template + void debug(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::debug, + ss::logger::force, + format, + std::forward(args)...); + } template void trace(const char* format, Args&&... args) { log(ss::log_level::trace, format, std::forward(args)...); } + template + void trace(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::trace, + ss::logger::force, + format, + std::forward(args)...); + } template void log(ss::log_level lvl, const char* format, Args&&... args) { @@ -431,6 +471,22 @@ class connection_context final std::forward(args)...); } } + template + void log( + ss::log_level lvl, + ss::logger::force_tag, + const char* format, + Args&&... args) { + auto line_fmt = ss::sstring("{}:{} failed authorization - ") + + format; + kauthzlog.log( + lvl, + ss::logger::force, + line_fmt.c_str(), + _client_addr, + _client_port, + std::forward(args)...); + } private: // connection_context owns the original client_addr diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 40adceff6df5d..017f380f4d0c6 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -765,26 +765,71 @@ class group final : public ss::enable_lw_shared_from_this { void info(const char* format, Args&&... args) const { log(ss::log_level::info, format, std::forward(args)...); } + template + void + info(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::info, + ss::logger::force, + format, + std::forward(args)...); + } template void error(const char* format, Args&&... args) const { log(ss::log_level::error, format, std::forward(args)...); } + template + void + error(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::error, + ss::logger::force, + format, + std::forward(args)...); + } template void warn(const char* format, Args&&... args) const { log(ss::log_level::warn, format, std::forward(args)...); } + template + void + warn(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::warn, + ss::logger::force, + format, + std::forward(args)...); + } template void debug(const char* format, Args&&... args) const { log(ss::log_level::debug, format, std::forward(args)...); } + template + void + debug(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::debug, + ss::logger::force, + format, + std::forward(args)...); + } template void trace(const char* format, Args&&... args) const { log(ss::log_level::trace, format, std::forward(args)...); } + template + void + trace(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::trace, + ss::logger::force, + format, + std::forward(args)...); + } template void log(ss::log_level lvl, const char* format, Args&&... args) const { @@ -799,6 +844,22 @@ class group final : public ss::enable_lw_shared_from_this { std::forward(args)...); } } + template + void log( + ss::log_level lvl, + ss::logger::force_tag, + const char* format, + Args&&... args) const { + auto line_fmt = ss::sstring("[N:{} S:{} G:{}] ") + format; + _logger.log( + lvl, + ss::logger::force, + line_fmt.c_str(), + _group.id()(), + _group.state(), + _group.generation(), + std::forward(args)...); + } private: ss::logger& _logger; diff --git a/src/v/raft/logger.h b/src/v/raft/logger.h index 9ee39d9fd99b0..a51a00993f5cf 100644 --- a/src/v/raft/logger.h +++ b/src/v/raft/logger.h @@ -32,24 +32,64 @@ class ctx_log { log(ss::log_level::error, format, std::forward(args)...); } template + void error(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::error, + ss::logger::force, + format, + std::forward(args)...); + } + template void warn(const char* format, Args&&... args) { log(ss::log_level::warn, format, std::forward(args)...); } + template + void warn(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::warn, + ss::logger::force, + format, + std::forward(args)...); + } template void info(const char* format, Args&&... args) { log(ss::log_level::info, format, std::forward(args)...); } + template + void info(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::info, + ss::logger::force, + format, + std::forward(args)...); + } template void debug(const char* format, Args&&... args) { log(ss::log_level::debug, format, std::forward(args)...); } + template + void debug(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::debug, + ss::logger::force, + format, + std::forward(args)...); + } template void trace(const char* format, Args&&... args) { log(ss::log_level::trace, format, std::forward(args)...); } + template + void trace(ss::logger::force_tag, const char* format, Args&&... args) { + log( + ss::log_level::trace, + ss::logger::force, + format, + std::forward(args)...); + } template void log(ss::log_level lvl, const char* format, Args&&... args) { @@ -63,6 +103,21 @@ class ctx_log { std::forward(args)...); } } + template + void log( + ss::log_level lvl, + ss::logger::force_tag, + const char* format, + Args&&... args) { + auto line_fmt = ss::sstring("[group_id:{}, {}] ") + format; + raftlog.log( + lvl, + ss::logger::force, + line_fmt.c_str(), + _group_id, + _ntp, + std::forward(args)...); + } private: raft::group_id _group_id; diff --git a/src/v/redpanda/BUILD b/src/v/redpanda/BUILD index 814e5850d1191..2542ea42806ce 100644 --- a/src/v/redpanda/BUILD +++ b/src/v/redpanda/BUILD @@ -111,6 +111,7 @@ redpanda_cc_library( "//src/v/redpanda/admin/services/internal:breakglass", "//src/v/redpanda/admin/services/internal:debug", "//src/v/redpanda/admin/services/internal:level_zero", + "//src/v/redpanda/admin/services/internal:log_filter", "//src/v/redpanda/admin/services/internal:metastore", "//src/v/redpanda/admin/services/internal:shadow_link_internal", "//src/v/redpanda/admin/services/shadow_link", diff --git a/src/v/redpanda/admin/services/internal/BUILD b/src/v/redpanda/admin/services/internal/BUILD index 1f62b3998f103..d0d2d77d50de9 100644 --- a/src/v/redpanda/admin/services/internal/BUILD +++ b/src/v/redpanda/admin/services/internal/BUILD @@ -114,6 +114,18 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "log_filter", + srcs = ["log_filter.cc"], + hdrs = ["log_filter.h"], + deps = [ + "//proto/redpanda/core/admin/internal/v1:log_filter_redpanda_proto", + "//src/v/base", + "//src/v/serde/protobuf:rpc", + "@seastar", + ], +) + redpanda_cc_library( name = "shadow_link_internal", srcs = ["shadow_link_internal.cc"], diff --git a/src/v/redpanda/admin/services/internal/log_filter.cc b/src/v/redpanda/admin/services/internal/log_filter.cc new file mode 100644 index 0000000000000..5a1efe7baec7c --- /dev/null +++ b/src/v/redpanda/admin/services/internal/log_filter.cc @@ -0,0 +1,187 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "redpanda/admin/services/internal/log_filter.h" + +#include "base/vlog.h" +#include "base/vlog_callsite.h" +#include "base/vlog_filter.h" + +#include + +#include + +namespace admin { + +namespace { +// NOLINTNEXTLINE(*-non-const-global-variables,cert-err58-*) +ss::logger log{"admin_api_server/log_filter_service"}; + +proto::admin::log_filter_state +to_wire_state(vlog::detail::callsite_base::state s) { + using cs_state = vlog::detail::callsite_base::state; + switch (s) { + case cs_state::default_: + return proto::admin::log_filter_state::inherited; + case cs_state::force_on: + return proto::admin::log_filter_state::force_on; + case cs_state::force_off: + return proto::admin::log_filter_state::force_off; + case cs_state::uninit: + // resolved_state() never returns uninit post-slow_init. + return proto::admin::log_filter_state::inherited; + } + return proto::admin::log_filter_state::inherited; +} + +vlog::detail::callsite_base::state +to_internal_state(proto::admin::log_filter_state s) { + using cs_state = vlog::detail::callsite_base::state; + switch (s) { + case proto::admin::log_filter_state::inherited: + return cs_state::default_; + case proto::admin::log_filter_state::force_on: + return cs_state::force_on; + case proto::admin::log_filter_state::force_off: + return cs_state::force_off; + case proto::admin::log_filter_state::unspecified: + throw serde::pb::rpc::invalid_argument_exception( + "log_filter_rule.state must not be LOG_FILTER_STATE_UNSPECIFIED"); + } + throw serde::pb::rpc::invalid_argument_exception( + fmt::format( + "log_filter_rule.state carries unknown enum value {}", + static_cast(s))); +} + +proto::admin::log_filter_rule to_wire_rule(const vlog::rule& r) { + proto::admin::log_filter_rule wire; + if (r.file) { + wire.set_file(ss::sstring{*r.file}); + } + if (r.line) { + const auto [lo, hi] = *r.line; + chunked_vector line_vec; + line_vec.push_back(lo); + if (lo != hi) { + line_vec.push_back(hi); + } + wire.set_line(std::move(line_vec)); + } + if (r.contains) { + wire.set_contains(ss::sstring{*r.contains}); + } + wire.set_state(to_wire_state(r.state)); + return wire; +} + +vlog::rule to_internal_rule(proto::admin::log_filter_rule& wire) { + vlog::rule r; + if (wire.has_file()) { + r.file = std::move(wire.get_file()); + } + const auto& line = wire.get_line(); + switch (line.size()) { + case 0: + break; + case 1: + r.line = std::pair{line[0], line[0]}; + break; + case 2: + if (line[0] > line[1]) { + throw serde::pb::rpc::invalid_argument_exception( + fmt::format( + "log_filter_rule.line range [{}, {}] is inverted", + line[0], + line[1])); + } + r.line = std::pair{line[0], line[1]}; + break; + default: + throw serde::pb::rpc::invalid_argument_exception( + fmt::format( + "log_filter_rule.line accepts 0, 1, or 2 entries; got {}", + line.size())); + } + if (wire.has_contains()) { + r.contains = std::move(wire.get_contains()); + } + r.state = to_internal_state(wire.get_state()); + return r; +} + +} // namespace + +seastar::future +log_filter_service_impl::set_log_filter( + serde::pb::rpc::context, proto::admin::set_log_filter_request req) { + std::vector rules; + auto& wire_rules = req.get_rules(); + rules.reserve(wire_rules.size()); + for (auto& r : wire_rules) { + rules.push_back(to_internal_rule(r)); + } + vlog(log.info, "Applying {} vlog callsite filter rule(s)", rules.size()); + vlog::apply_rules(std::move(rules)); + co_return proto::admin::set_log_filter_response{}; +} + +seastar::future +log_filter_service_impl::reset_log_filter( + serde::pb::rpc::context, proto::admin::reset_log_filter_request) { + vlog(log.info, "Reset all vlog callsite filters"); + vlog::reset_rules(); + co_return proto::admin::reset_log_filter_response{}; +} + +seastar::future +log_filter_service_impl::get_log_filter( + serde::pb::rpc::context, proto::admin::get_log_filter_request) { + auto rules = vlog::get_rules(); + chunked_vector out; + out.reserve(rules.size()); + for (const auto& r : rules) { + out.push_back(to_wire_rule(r)); + } + proto::admin::get_log_filter_response resp; + resp.set_rules(std::move(out)); + co_return std::move(resp); +} + +seastar::future +log_filter_service_impl::list_log_callsites( + serde::pb::rpc::context, proto::admin::list_log_callsites_request req) { + std::optional file_filter; + if (req.has_file_filter()) { + file_filter = std::move(req.get_file_filter()); + } + + chunked_vector out; + vlog::for_each_callsite( + [&out, &file_filter](vlog::detail::callsite_base& cs) { + if (file_filter && fnmatch(file_filter->c_str(), cs.file(), 0) != 0) { + return; + } + proto::admin::log_callsite_info info; + info.set_file(ss::sstring{cs.file()}); + info.set_line(cs.line()); + info.set_fmt( + cs.fmt() != nullptr ? ss::sstring{cs.fmt()} : ss::sstring{}); + info.set_state(to_wire_state(cs.resolved_state())); + out.push_back(std::move(info)); + }); + + proto::admin::list_log_callsites_response resp; + resp.set_callsites(std::move(out)); + co_return std::move(resp); +} + +} // namespace admin diff --git a/src/v/redpanda/admin/services/internal/log_filter.h b/src/v/redpanda/admin/services/internal/log_filter.h new file mode 100644 index 0000000000000..3b09d7e228224 --- /dev/null +++ b/src/v/redpanda/admin/services/internal/log_filter.h @@ -0,0 +1,45 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "proto/redpanda/core/admin/internal/v1/log_filter.proto.h" + +#include + +namespace admin { + +// Admin service that drives the per-callsite vlog filter on this node. +// State (the rule set and the per-callsite enabled flags) is process- +// global, so no sharded dependencies are needed — the service just +// translates between the wire types and the vlog::apply_rules / +// vlog::reset_rules / vlog::get_rules / vlog::for_each_callsite API in +// src/v/base. +class log_filter_service_impl : public proto::admin::log_filter_service { +public: + log_filter_service_impl() = default; + + seastar::future set_log_filter( + serde::pb::rpc::context, proto::admin::set_log_filter_request) override; + + seastar::future reset_log_filter( + serde::pb::rpc::context, proto::admin::reset_log_filter_request) override; + + seastar::future get_log_filter( + serde::pb::rpc::context, proto::admin::get_log_filter_request) override; + + seastar::future + list_log_callsites( + serde::pb::rpc::context, + proto::admin::list_log_callsites_request) override; +}; + +} // namespace admin diff --git a/src/v/redpanda/application_admin.cc b/src/v/redpanda/application_admin.cc index 288802eac2f36..90185fbd6b7a5 100644 --- a/src/v/redpanda/application_admin.cc +++ b/src/v/redpanda/application_admin.cc @@ -17,6 +17,7 @@ #include "redpanda/admin/services/internal/breakglass.h" #include "redpanda/admin/services/internal/debug.h" #include "redpanda/admin/services/internal/level_zero.h" +#include "redpanda/admin/services/internal/log_filter.h" #include "redpanda/admin/services/internal/metastore.h" #include "redpanda/admin/services/internal/shadow_link_internal.h" #include "redpanda/admin/services/security.h" @@ -86,6 +87,7 @@ void application::configure_admin_server(model::node_id node_id) { s.add_service( std::make_unique( create_client(), stress_fiber_manager)); + s.add_service(std::make_unique()); s.add_service( std::make_unique( create_client(), &_datalake_coordinator_fe)); diff --git a/src/v/utils/prefix_logger.h b/src/v/utils/prefix_logger.h index 6387748f0a056..200dd388ace89 100644 --- a/src/v/utils/prefix_logger.h +++ b/src/v/utils/prefix_logger.h @@ -33,30 +33,88 @@ class prefix_logger { } } + template + void log( + ss::log_level lvl, + ss::logger::force_tag, + const char* format, + Args&&... args) const { + auto line_fmt = ss::sstring("{} - ") + format; + _logger->log( + lvl, + ss::logger::force, + line_fmt.c_str(), + _prefix, + std::forward(args)...); + } + template void error(const char* format, Args&&... args) const { log(ss::log_level::error, format, std::forward(args)...); } + template + void + error(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::error, + ss::logger::force, + format, + std::forward(args)...); + } template void warn(const char* format, Args&&... args) const { log(ss::log_level::warn, format, std::forward(args)...); } + template + void warn(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::warn, + ss::logger::force, + format, + std::forward(args)...); + } template void info(const char* format, Args&&... args) const { log(ss::log_level::info, format, std::forward(args)...); } + template + void info(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::info, + ss::logger::force, + format, + std::forward(args)...); + } template void debug(const char* format, Args&&... args) const { log(ss::log_level::debug, format, std::forward(args)...); } + template + void + debug(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::debug, + ss::logger::force, + format, + std::forward(args)...); + } template void trace(const char* format, Args&&... args) const { log(ss::log_level::trace, format, std::forward(args)...); } + template + void + trace(ss::logger::force_tag, const char* format, Args&&... args) const { + log( + ss::log_level::trace, + ss::logger::force, + format, + std::forward(args)...); + } template ss::sstring format(const char* format, Args&&... args) const { diff --git a/src/v/utils/retry_chain_node.h b/src/v/utils/retry_chain_node.h index f21a662682540..4526676f6d687 100644 --- a/src/v/utils/retry_chain_node.h +++ b/src/v/utils/retry_chain_node.h @@ -652,25 +652,104 @@ class basic_retry_chain_logger final { } } template + void log( + ss::log_level lvl, + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + auto msg = ssx::sformat(format, std::forward(args)...); + if (_has_tracing) { + _node.maybe_add_trace(msg); + } + auto lambda = [&](ss::logger& lgr, ss::log_level l) { + if (_ctx) { + lgr.log( + l, + ss::logger::force, + "{} - {}", + _node("{}", _ctx.value()), + msg); + } else { + lgr.log(l, ss::logger::force, "{} - {}", _node(), msg); + } + }; + do_log(lvl, std::move(lambda)); + } + template void error(fmt::format_string format, Args&&... args) const { log(ss::log_level::error, format, std::forward(args)...); } template + void error( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::error, + ss::logger::force, + format, + std::forward(args)...); + } + template void warn(fmt::format_string format, Args&&... args) const { log(ss::log_level::warn, format, std::forward(args)...); } template + void warn( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::warn, + ss::logger::force, + format, + std::forward(args)...); + } + template void info(fmt::format_string format, Args&&... args) const { log(ss::log_level::info, format, std::forward(args)...); } template + void info( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::info, + ss::logger::force, + format, + std::forward(args)...); + } + template void debug(fmt::format_string format, Args&&... args) const { log(ss::log_level::debug, format, std::forward(args)...); } template + void debug( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::debug, + ss::logger::force, + format, + std::forward(args)...); + } + template void trace(fmt::format_string format, Args&&... args) const { log(ss::log_level::trace, format, std::forward(args)...); } + template + void trace( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::trace, + ss::logger::force, + format, + std::forward(args)...); + } /// Invoke the lambda function but disable tracing while the function is /// running. template diff --git a/src/v/utils/truncating_logger.h b/src/v/utils/truncating_logger.h index 3a5295a4d28f8..419adc2251a7a 100644 --- a/src/v/utils/truncating_logger.h +++ b/src/v/utils/truncating_logger.h @@ -64,30 +64,111 @@ class truncating_logger { } } + template + void log( + ss::log_level lvl, + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + fmt::memory_buffer buf; + auto res = fmt::format_to_n( + std::back_inserter(buf), + _max_line_bytes, + format, + std::forward(args)...); + if (res.size > _max_line_bytes) { + fmt::format_to( + std::back_inserter(buf), + trunc_msg_fmt, + res.size - _max_line_bytes); + } + std::string_view sv{buf.data(), buf.size()}; +#ifdef SEASTAR_LOGGER_COMPILE_TIME_FMT + _logger.log(lvl, ss::logger::force, fmt::runtime(sv)); +#else + _logger.log(lvl, ss::logger::force, sv); +#endif + } + template void error(fmt::format_string format, Args&&... args) const { log(ss::log_level::error, format, std::forward(args)...); } + template + void error( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::error, + ss::logger::force, + format, + std::forward(args)...); + } template void warn(fmt::format_string format, Args&&... args) const { log(ss::log_level::warn, format, std::forward(args)...); } + template + void warn( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::warn, + ss::logger::force, + format, + std::forward(args)...); + } template void info(fmt::format_string format, Args&&... args) const { log(ss::log_level::info, format, std::forward(args)...); } + template + void info( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::info, + ss::logger::force, + format, + std::forward(args)...); + } template void debug(fmt::format_string format, Args&&... args) const { log(ss::log_level::debug, format, std::forward(args)...); } + template + void debug( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::debug, + ss::logger::force, + format, + std::forward(args)...); + } template void trace(fmt::format_string format, Args&&... args) const { log(ss::log_level::trace, format, std::forward(args)...); } + template + void trace( + ss::logger::force_tag, + fmt::format_string format, + Args&&... args) const { + log( + ss::log_level::trace, + ss::logger::force, + format, + std::forward(args)...); + } bool is_enabled(ss::log_level level) const noexcept { return _logger.is_enabled(level); diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/__init__.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/__init__.pyi index ed993d0ac2c04..015bcb1b03d4a 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/__init__.pyi +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/__init__.pyi @@ -1,2 +1,3 @@ from . import breakglass_pb2 from . import debug_pb2 +from . import log_filter_pb2 diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/log_filter_pb2.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/log_filter_pb2.py new file mode 100644 index 0000000000000..33029a978908e --- /dev/null +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/log_filter_pb2.py @@ -0,0 +1,49 @@ +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion(_runtime_version.Domain.PUBLIC, 5, 29, 0, '', 'proto/redpanda/core/admin/internal/v1/log_filter.proto') +_sym_db = _symbol_database.Default() +from .......proto.redpanda.core.pbgen import options_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_options__pb2 +from .......proto.redpanda.core.pbgen import rpc_pb2 as proto_dot_redpanda_dot_core_dot_pbgen_dot_rpc__pb2 +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n6proto/redpanda/core/admin/internal/v1/log_filter.proto\x12\x1fredpanda.core.admin.v2.internal\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto"\x9d\x01\n\rLogFilterRule\x12\x11\n\x04file\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0c\n\x04line\x18\x02 \x03(\r\x12\x15\n\x08contains\x18\x03 \x01(\tH\x01\x88\x01\x01\x12>\n\x05state\x18\x04 \x01(\x0e2/.redpanda.core.admin.v2.internal.LogFilterStateB\x07\n\x05_fileB\x0b\n\t_contains"z\n\x0fLogCallsiteInfo\x12\x0c\n\x04file\x18\x01 \x01(\t\x12\x0c\n\x04line\x18\x02 \x01(\r\x12\x0b\n\x03fmt\x18\x03 \x01(\t\x12>\n\x05state\x18\x04 \x01(\x0e2/.redpanda.core.admin.v2.internal.LogFilterState"T\n\x13SetLogFilterRequest\x12=\n\x05rules\x18\x01 \x03(\x0b2..redpanda.core.admin.v2.internal.LogFilterRule"\x16\n\x14SetLogFilterResponse"\x17\n\x15ResetLogFilterRequest"\x18\n\x16ResetLogFilterResponse"\x15\n\x13GetLogFilterRequest"U\n\x14GetLogFilterResponse\x12=\n\x05rules\x18\x01 \x03(\x0b2..redpanda.core.admin.v2.internal.LogFilterRule"C\n\x17ListLogCallsitesRequest\x12\x18\n\x0bfile_filter\x18\x01 \x01(\tH\x00\x88\x01\x01B\x0e\n\x0c_file_filter"_\n\x18ListLogCallsitesResponse\x12C\n\tcallsites\x18\x01 \x03(\x0b20.redpanda.core.admin.v2.internal.LogCallsiteInfo*\x91\x01\n\x0eLogFilterState\x12 \n\x1cLOG_FILTER_STATE_UNSPECIFIED\x10\x00\x12\x1e\n\x1aLOG_FILTER_STATE_INHERITED\x10\x01\x12\x1d\n\x19LOG_FILTER_STATE_FORCE_ON\x10\x02\x12\x1e\n\x1aLOG_FILTER_STATE_FORCE_OFF\x10\x032\xbc\x04\n\x10LogFilterService\x12\x83\x01\n\x0cSetLogFilter\x124.redpanda.core.admin.v2.internal.SetLogFilterRequest\x1a5.redpanda.core.admin.v2.internal.SetLogFilterResponse"\x06\xea\x92\x19\x02\x10\x03\x12\x89\x01\n\x0eResetLogFilter\x126.redpanda.core.admin.v2.internal.ResetLogFilterRequest\x1a7.redpanda.core.admin.v2.internal.ResetLogFilterResponse"\x06\xea\x92\x19\x02\x10\x03\x12\x83\x01\n\x0cGetLogFilter\x124.redpanda.core.admin.v2.internal.GetLogFilterRequest\x1a5.redpanda.core.admin.v2.internal.GetLogFilterResponse"\x06\xea\x92\x19\x02\x10\x03\x12\x8f\x01\n\x10ListLogCallsites\x128.redpanda.core.admin.v2.internal.ListLogCallsitesRequest\x1a9.redpanda.core.admin.v2.internal.ListLogCallsitesResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'proto.redpanda.core.admin.internal.v1.log_filter_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'\xea\x92\x19\x0cproto::admin' + _globals['_LOGFILTERSERVICE'].methods_by_name['SetLogFilter']._loaded_options = None + _globals['_LOGFILTERSERVICE'].methods_by_name['SetLogFilter']._serialized_options = b'\xea\x92\x19\x02\x10\x03' + _globals['_LOGFILTERSERVICE'].methods_by_name['ResetLogFilter']._loaded_options = None + _globals['_LOGFILTERSERVICE'].methods_by_name['ResetLogFilter']._serialized_options = b'\xea\x92\x19\x02\x10\x03' + _globals['_LOGFILTERSERVICE'].methods_by_name['GetLogFilter']._loaded_options = None + _globals['_LOGFILTERSERVICE'].methods_by_name['GetLogFilter']._serialized_options = b'\xea\x92\x19\x02\x10\x03' + _globals['_LOGFILTERSERVICE'].methods_by_name['ListLogCallsites']._loaded_options = None + _globals['_LOGFILTERSERVICE'].methods_by_name['ListLogCallsites']._serialized_options = b'\xea\x92\x19\x02\x10\x03' + _globals['_LOGFILTERSTATE']._serialized_start = 891 + _globals['_LOGFILTERSTATE']._serialized_end = 1036 + _globals['_LOGFILTERRULE']._serialized_start = 170 + _globals['_LOGFILTERRULE']._serialized_end = 327 + _globals['_LOGCALLSITEINFO']._serialized_start = 329 + _globals['_LOGCALLSITEINFO']._serialized_end = 451 + _globals['_SETLOGFILTERREQUEST']._serialized_start = 453 + _globals['_SETLOGFILTERREQUEST']._serialized_end = 537 + _globals['_SETLOGFILTERRESPONSE']._serialized_start = 539 + _globals['_SETLOGFILTERRESPONSE']._serialized_end = 561 + _globals['_RESETLOGFILTERREQUEST']._serialized_start = 563 + _globals['_RESETLOGFILTERREQUEST']._serialized_end = 586 + _globals['_RESETLOGFILTERRESPONSE']._serialized_start = 588 + _globals['_RESETLOGFILTERRESPONSE']._serialized_end = 612 + _globals['_GETLOGFILTERREQUEST']._serialized_start = 614 + _globals['_GETLOGFILTERREQUEST']._serialized_end = 635 + _globals['_GETLOGFILTERRESPONSE']._serialized_start = 637 + _globals['_GETLOGFILTERRESPONSE']._serialized_end = 722 + _globals['_LISTLOGCALLSITESREQUEST']._serialized_start = 724 + _globals['_LISTLOGCALLSITESREQUEST']._serialized_end = 791 + _globals['_LISTLOGCALLSITESRESPONSE']._serialized_start = 793 + _globals['_LISTLOGCALLSITESRESPONSE']._serialized_end = 888 + _globals['_LOGFILTERSERVICE']._serialized_start = 1039 + _globals['_LOGFILTERSERVICE']._serialized_end = 1611 \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/log_filter_pb2.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/log_filter_pb2.pyi new file mode 100644 index 0000000000000..3da3eac58efe2 --- /dev/null +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/log_filter_pb2.pyi @@ -0,0 +1,236 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +Copyright 2026 Redpanda Data, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import builtins +import collections.abc +import google.protobuf.descriptor +import google.protobuf.internal.containers +import google.protobuf.internal.enum_type_wrapper +import google.protobuf.message +import sys +import typing +if sys.version_info >= (3, 10): + import typing as typing_extensions +else: + import typing_extensions +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +class _LogFilterState: + ValueType = typing.NewType('ValueType', builtins.int) + V: typing_extensions.TypeAlias = ValueType + +class _LogFilterStateEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_LogFilterState.ValueType], builtins.type): + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + LOG_FILTER_STATE_UNSPECIFIED: _LogFilterState.ValueType + 'Proto3 default — rejected by SetLogFilter as INVALID_ARGUMENT,\n never returned by ListLogCallsites.\n ' + LOG_FILTER_STATE_INHERITED: _LogFilterState.ValueType + "Callsite falls through to the logger's configured level gate." + LOG_FILTER_STATE_FORCE_ON: _LogFilterState.ValueType + 'Callsite emits regardless of the configured level.' + LOG_FILTER_STATE_FORCE_OFF: _LogFilterState.ValueType + 'Callsite is suppressed regardless of the configured level.' + +class LogFilterState(_LogFilterState, metaclass=_LogFilterStateEnumTypeWrapper): + """State value written onto every callsite a LogFilterRule matches, or + reported by ListLogCallsites as the resolved state for a callsite. + """ +LOG_FILTER_STATE_UNSPECIFIED: LogFilterState.ValueType +'Proto3 default — rejected by SetLogFilter as INVALID_ARGUMENT,\nnever returned by ListLogCallsites.\n' +LOG_FILTER_STATE_INHERITED: LogFilterState.ValueType +"Callsite falls through to the logger's configured level gate." +LOG_FILTER_STATE_FORCE_ON: LogFilterState.ValueType +'Callsite emits regardless of the configured level.' +LOG_FILTER_STATE_FORCE_OFF: LogFilterState.ValueType +'Callsite is suppressed regardless of the configured level.' +Global___LogFilterState: typing_extensions.TypeAlias = LogFilterState + +@typing.final +class LogFilterRule(google.protobuf.message.Message): + """A filter rule applied to every registered vlog callsite. Every populated + predicate must match for the rule to apply: file is an fnmatch(3) glob + against the callsite's __FILE__, line matches __LINE__ as described + below, and contains is a substring search against the format-string + literal. An empty rule (no predicates set) matches every callsite. + Rules in a list are applied in order; the last matching rule's state + value wins. A callsite matching no rule resolves to + LOG_FILTER_STATE_INHERITED. + """ + DESCRIPTOR: google.protobuf.descriptor.Descriptor + FILE_FIELD_NUMBER: builtins.int + LINE_FIELD_NUMBER: builtins.int + CONTAINS_FIELD_NUMBER: builtins.int + STATE_FIELD_NUMBER: builtins.int + file: builtins.str + "fnmatch(3) glob pattern matched against the callsite's __FILE__." + contains: builtins.str + 'Substring that must appear in the format-string literal.' + state: Global___LogFilterState.ValueType + 'State written to every site the rule matches.' + + @property + def line(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.int]: + """Predicate on __LINE__. Accepts exactly one or two entries: a + single value matches that line, two values [lo, hi] match the + inclusive range lo <= __LINE__ <= hi. Any other size is rejected + with INVALID_ARGUMENT. + """ + + def __init__(self, *, file: builtins.str | None=..., line: collections.abc.Iterable[builtins.int] | None=..., contains: builtins.str | None=..., state: Global___LogFilterState.ValueType=...) -> None: + ... + + def HasField(self, field_name: typing.Literal['_contains', b'_contains', '_file', b'_file', 'contains', b'contains', 'file', b'file']) -> builtins.bool: + ... + + def ClearField(self, field_name: typing.Literal['_contains', b'_contains', '_file', b'_file', 'contains', b'contains', 'file', b'file', 'line', b'line', 'state', b'state']) -> None: + ... + + @typing.overload + def WhichOneof(self, oneof_group: typing.Literal['_contains', b'_contains']) -> typing.Literal['contains'] | None: + ... + + @typing.overload + def WhichOneof(self, oneof_group: typing.Literal['_file', b'_file']) -> typing.Literal['file'] | None: + ... +Global___LogFilterRule: typing_extensions.TypeAlias = LogFilterRule + +@typing.final +class LogCallsiteInfo(google.protobuf.message.Message): + """A registered vlog callsite and its current enabled state.""" + DESCRIPTOR: google.protobuf.descriptor.Descriptor + FILE_FIELD_NUMBER: builtins.int + LINE_FIELD_NUMBER: builtins.int + FMT_FIELD_NUMBER: builtins.int + STATE_FIELD_NUMBER: builtins.int + file: builtins.str + 'Source-file basename (path stripped at compile time).' + line: builtins.int + 'Source line.' + fmt: builtins.str + 'Format-string literal at the call site.' + state: Global___LogFilterState.ValueType + 'Resolved state for this callsite.' + + def __init__(self, *, file: builtins.str=..., line: builtins.int=..., fmt: builtins.str=..., state: Global___LogFilterState.ValueType=...) -> None: + ... + + def ClearField(self, field_name: typing.Literal['file', b'file', 'fmt', b'fmt', 'line', b'line', 'state', b'state']) -> None: + ... +Global___LogCallsiteInfo: typing_extensions.TypeAlias = LogCallsiteInfo + +@typing.final +class SetLogFilterRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + RULES_FIELD_NUMBER: builtins.int + + @property + def rules(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[Global___LogFilterRule]: + """Rule list that replaces the currently active rules. Every registered + callsite is re-evaluated against the new set. To add a rule to the + active set, fetch the current rules with GetLogFilter, append to + that list client-side, and resubmit here. + """ + + def __init__(self, *, rules: collections.abc.Iterable[Global___LogFilterRule] | None=...) -> None: + ... + + def ClearField(self, field_name: typing.Literal['rules', b'rules']) -> None: + ... +Global___SetLogFilterRequest: typing_extensions.TypeAlias = SetLogFilterRequest + +@typing.final +class SetLogFilterResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + def __init__(self) -> None: + ... +Global___SetLogFilterResponse: typing_extensions.TypeAlias = SetLogFilterResponse + +@typing.final +class ResetLogFilterRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + def __init__(self) -> None: + ... +Global___ResetLogFilterRequest: typing_extensions.TypeAlias = ResetLogFilterRequest + +@typing.final +class ResetLogFilterResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + def __init__(self) -> None: + ... +Global___ResetLogFilterResponse: typing_extensions.TypeAlias = ResetLogFilterResponse + +@typing.final +class GetLogFilterRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + def __init__(self) -> None: + ... +Global___GetLogFilterRequest: typing_extensions.TypeAlias = GetLogFilterRequest + +@typing.final +class GetLogFilterResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + RULES_FIELD_NUMBER: builtins.int + + @property + def rules(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[Global___LogFilterRule]: + """Current active rules, in application order (last match wins).""" + + def __init__(self, *, rules: collections.abc.Iterable[Global___LogFilterRule] | None=...) -> None: + ... + + def ClearField(self, field_name: typing.Literal['rules', b'rules']) -> None: + ... +Global___GetLogFilterResponse: typing_extensions.TypeAlias = GetLogFilterResponse + +@typing.final +class ListLogCallsitesRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + FILE_FILTER_FIELD_NUMBER: builtins.int + file_filter: builtins.str + 'If set, only return callsites whose file matches this fnmatch(3) glob.' + + def __init__(self, *, file_filter: builtins.str | None=...) -> None: + ... + + def HasField(self, field_name: typing.Literal['_file_filter', b'_file_filter', 'file_filter', b'file_filter']) -> builtins.bool: + ... + + def ClearField(self, field_name: typing.Literal['_file_filter', b'_file_filter', 'file_filter', b'file_filter']) -> None: + ... + + def WhichOneof(self, oneof_group: typing.Literal['_file_filter', b'_file_filter']) -> typing.Literal['file_filter'] | None: + ... +Global___ListLogCallsitesRequest: typing_extensions.TypeAlias = ListLogCallsitesRequest + +@typing.final +class ListLogCallsitesResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + CALLSITES_FIELD_NUMBER: builtins.int + + @property + def callsites(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[Global___LogCallsiteInfo]: + ... + + def __init__(self, *, callsites: collections.abc.Iterable[Global___LogCallsiteInfo] | None=...) -> None: + ... + + def ClearField(self, field_name: typing.Literal['callsites', b'callsites']) -> None: + ... +Global___ListLogCallsitesResponse: typing_extensions.TypeAlias = ListLogCallsitesResponse \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/log_filter_pb2_connect.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/log_filter_pb2_connect.py new file mode 100644 index 0000000000000..5d3707681a605 --- /dev/null +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/internal/v1/log_filter_pb2_connect.py @@ -0,0 +1,185 @@ +from __future__ import annotations +from collections.abc import AsyncIterator +from collections.abc import Iterator +from collections.abc import Iterable +import aiohttp +import urllib3 +import typing +import sys +from connectrpc.client_async import AsyncConnectClient +from connectrpc.client_sync import ConnectClient +from connectrpc.client_protocol import ConnectProtocol +from connectrpc.client_connect import ConnectProtocolError +from connectrpc.headers import HeaderInput +from connectrpc.server import ClientRequest +from connectrpc.server import ClientStream +from connectrpc.server import ServerResponse +from connectrpc.server import ServerStream +from connectrpc.server_sync import ConnectWSGI +from connectrpc.streams import StreamInput +from connectrpc.streams import AsyncStreamOutput +from connectrpc.streams import StreamOutput +from connectrpc.unary import UnaryOutput +from connectrpc.unary import ClientStreamingOutput +if typing.TYPE_CHECKING: + if sys.version_info >= (3, 11): + from wsgiref.types import WSGIApplication + else: + from _typeshed.wsgi import WSGIApplication +from ....... import proto + +class LogFilterServiceClient: + + def __init__(self, base_url: str, http_client: urllib3.PoolManager | None=None, protocol: ConnectProtocol=ConnectProtocol.CONNECT_PROTOBUF): + self.base_url = base_url + self._connect_client = ConnectClient(http_client, protocol) + + def call_set_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterResponse]: + """Low-level method to call SetLogFilter, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.LogFilterService/SetLogFilter' + return self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterResponse, extra_headers, timeout_seconds) + + def set_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterResponse: + response = self.call_set_log_filter(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + + def call_reset_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterResponse]: + """Low-level method to call ResetLogFilter, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.LogFilterService/ResetLogFilter' + return self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterResponse, extra_headers, timeout_seconds) + + def reset_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterResponse: + response = self.call_reset_log_filter(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + + def call_get_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterResponse]: + """Low-level method to call GetLogFilter, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.LogFilterService/GetLogFilter' + return self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterResponse, extra_headers, timeout_seconds) + + def get_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterResponse: + response = self.call_get_log_filter(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + + def call_list_log_callsites(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesResponse]: + """Low-level method to call ListLogCallsites, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.LogFilterService/ListLogCallsites' + return self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesResponse, extra_headers, timeout_seconds) + + def list_log_callsites(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesResponse: + response = self.call_list_log_callsites(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + +class AsyncLogFilterServiceClient: + + def __init__(self, base_url: str, http_client: aiohttp.ClientSession, protocol: ConnectProtocol=ConnectProtocol.CONNECT_PROTOBUF): + self.base_url = base_url + self._connect_client = AsyncConnectClient(http_client, protocol) + + async def call_set_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterResponse]: + """Low-level method to call SetLogFilter, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.LogFilterService/SetLogFilter' + return await self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterResponse, extra_headers, timeout_seconds) + + async def set_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterResponse: + response = await self.call_set_log_filter(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + + async def call_reset_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterResponse]: + """Low-level method to call ResetLogFilter, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.LogFilterService/ResetLogFilter' + return await self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterResponse, extra_headers, timeout_seconds) + + async def reset_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterResponse: + response = await self.call_reset_log_filter(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + + async def call_get_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterResponse]: + """Low-level method to call GetLogFilter, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.LogFilterService/GetLogFilter' + return await self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterResponse, extra_headers, timeout_seconds) + + async def get_log_filter(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterResponse: + response = await self.call_get_log_filter(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + + async def call_list_log_callsites(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> UnaryOutput[proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesResponse]: + """Low-level method to call ListLogCallsites, granting access to errors and metadata""" + url = self.base_url + '/redpanda.core.admin.v2.internal.LogFilterService/ListLogCallsites' + return await self._connect_client.call_unary(url, req, proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesResponse, extra_headers, timeout_seconds) + + async def list_log_callsites(self, req: proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesRequest, extra_headers: HeaderInput | None=None, timeout_seconds: float | None=None) -> proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesResponse: + response = await self.call_list_log_callsites(req, extra_headers, timeout_seconds) + err = response.error() + if err is not None: + raise err + msg = response.message() + if msg is None: + raise ConnectProtocolError('missing response message') + return msg + +@typing.runtime_checkable +class LogFilterServiceProtocol(typing.Protocol): + + def set_log_filter(self, req: ClientRequest[proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterRequest]) -> ServerResponse[proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterResponse]: + ... + + def reset_log_filter(self, req: ClientRequest[proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterRequest]) -> ServerResponse[proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterResponse]: + ... + + def get_log_filter(self, req: ClientRequest[proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterRequest]) -> ServerResponse[proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterResponse]: + ... + + def list_log_callsites(self, req: ClientRequest[proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesRequest]) -> ServerResponse[proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesResponse]: + ... +LOG_FILTER_SERVICE_PATH_PREFIX = '/redpanda.core.admin.v2.internal.LogFilterService' + +def wsgi_log_filter_service(implementation: LogFilterServiceProtocol) -> WSGIApplication: + app = ConnectWSGI() + app.register_unary_rpc('/redpanda.core.admin.v2.internal.LogFilterService/SetLogFilter', implementation.set_log_filter, proto.redpanda.core.admin.internal.v1.log_filter_pb2.SetLogFilterRequest) + app.register_unary_rpc('/redpanda.core.admin.v2.internal.LogFilterService/ResetLogFilter', implementation.reset_log_filter, proto.redpanda.core.admin.internal.v1.log_filter_pb2.ResetLogFilterRequest) + app.register_unary_rpc('/redpanda.core.admin.v2.internal.LogFilterService/GetLogFilter', implementation.get_log_filter, proto.redpanda.core.admin.internal.v1.log_filter_pb2.GetLogFilterRequest) + app.register_unary_rpc('/redpanda.core.admin.v2.internal.LogFilterService/ListLogCallsites', implementation.list_log_callsites, proto.redpanda.core.admin.internal.v1.log_filter_pb2.ListLogCallsitesRequest) + return app \ No newline at end of file