From fee3416710d21cc750320c0cb37041c7d23fd5ce Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Mon, 22 Jun 2026 13:55:19 +0300 Subject: [PATCH 1/6] Implement library for 128-bit arithmetic Introduce helper functions for 128-bit arithmetic that are not provided by the Linux kernel: - 128/32 division using bitwise long division; - integer square root using binary search. The library is required for training mode statistics collection, where aggregating metrics across a large number of clients can overflow 64-bit intermediate values. An evaluation comparing the sum/sumsq and Welford algorithms using both 64-bit and 128-bit arithmetic showed that 64-bit implementations become inaccurate for workloads with approximately 100,000 or more clients due to intermediate overflows, while both 128-bit implementations match the exact results across all tested workloads Accuracy results: client maximum increases +1 on each iteration (same as expected for connection tracking): exact = 8.33e+08 sum/sumsq (128-bit) = 8.33e+08 Welford (128-bit) = 8.33e+08 sum/sumsq (64-bit) = 8.33e+08 Welford (64-bit) = 32.4295 client maximum randomly increases in a range (1 - 10) on each iteration (possible for non-idempotent request tracking): exact = 2.53805e+10 sum/sumsq (128-bit) = 2.53805e+10 Welford (128-bit) = 2.53805e+10 sum/sumsq (64-bit) = -2.95145e+15 Welford (64-bit) = 32.43 client maximum randomly increases in a range (1 - 100) on each iteration: exact = 2.12403e+12 sum/sumsq (128-bit) = 2.12403e+12 Welford (128-bit) = 2.12403e+12 sum/sumsq (64-bit) = -2.52534e+17 Welford (64-bit) = 32.4224 client maximum randomly increases in a range (1 - 1000) on each iteration (possible for memory usage tracking, since we are planning to track memory usage in pages): exact = 2.08852e+14 sum/sumsq (128-bit) = 2.08852e+14 Welford (128-bit) = 2.08852e+14 sum/sumsq (64-bit) = -2.47926e+19 Welford (64-bit) = 32.419 Part-of: training/defence mode implementation Issue: #1346 --- fw/t/unit/test.c | 3 + fw/t/unit/test.h | 1 + fw/t/unit/test_128bit.c | 78 ++++++++++++ lib/128bit.h | 274 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 356 insertions(+) create mode 100644 fw/t/unit/test_128bit.c create mode 100644 lib/128bit.h diff --git a/fw/t/unit/test.c b/fw/t/unit/test.c index f4e922e80..a490ee215 100644 --- a/fw/t/unit/test.c +++ b/fw/t/unit/test.c @@ -152,6 +152,9 @@ test_run_all(void) EXPECT_EQ(tfw_client_mem(__tfw_client_mem_from_conn(&conn_req)), 0); __fpu_schedule(); + TEST_SUITE_RUN(128bit); + __fpu_schedule(); + TEST_SUITE_RUN(hash); __fpu_schedule(); diff --git a/fw/t/unit/test.h b/fw/t/unit/test.h index 976f1530b..e43eaec22 100644 --- a/fw/t/unit/test.h +++ b/fw/t/unit/test.h @@ -273,6 +273,7 @@ TEST_SUITE(http_sticky); TEST_SUITE(http_cache); TEST_SUITE(http_match); TEST_SUITE(http_msg); +TEST_SUITE(128bit); TEST_SUITE(hash); TEST_SUITE(addr); TEST_SUITE(wq); diff --git a/fw/t/unit/test_128bit.c b/fw/t/unit/test_128bit.c new file mode 100644 index 000000000..f648bb367 --- /dev/null +++ b/fw/t/unit/test_128bit.c @@ -0,0 +1,78 @@ +/** + * Tempesta FW + * + * Copyright (C) 2026 Tempesta Technologies, Inc. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, + * or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 59 + * Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#include "lib/128bit.h" +#include "test.h" + +TEST(128bit, div) +{ + u128 v; + + /* exact division */ + v = 100; + EXPECT_EQ(u128_div_u32(v, 10), 10); + + v = (u128)U64_MAX; + EXPECT_EQ(u128_div_u32(v, 2), U64_MAX / 2); + + v = ((u128)(1) << 80); + EXPECT_EQ(u128_div_u32(v, 16), ((u128)(1) << 76)); + + v = 15555555555555532; + EXPECT_EQ(u128_div_u32(v, 10), 1555555555555553); +} + +TEST(128bit, sqrt) +{ + u128 v; + + /* 0 */ + v = 0; + EXPECT_EQ(u128_sqrt(v), 0); + + /* 1 */ + v = 1; + EXPECT_EQ(u128_sqrt(v), 1); + + /* 2..3 -> 1 */ + v = 3; + EXPECT_EQ(u128_sqrt(v), 1); + + /* 4 -> 2 */ + v = 4; + EXPECT_EQ(u128_sqrt(v), 2); + + /* 111 -> 10 */ + v = 111; + EXPECT_EQ(u128_sqrt(v), 10); + + /* U64_MAX */ + v = U64_MAX; + EXPECT_EQ(u128_sqrt(v), U32_MAX); + + v = (u128)(~0); + EXPECT_EQ(u128_sqrt(v), U64_MAX); +} + +TEST_SUITE(128bit) +{ + TEST_RUN(128bit, div); + TEST_RUN(128bit, sqrt); +} diff --git a/lib/128bit.h b/lib/128bit.h new file mode 100644 index 000000000..436f0fb7c --- /dev/null +++ b/lib/128bit.h @@ -0,0 +1,274 @@ +/** + * Tempesta kernel library + * + * Portable 128-bit arithmetic helpers. + * + * This header provides a minimal implementation of unsigned 128-bit + * arithmetic for kernel code without relying on compiler-specific + * __int128 support or runtime division helpers. + * + * Supported operations: + * - 128/64 division; + * - integer square root; + * + * The implementation is intended for statistics, counters and rate + * calculations where intermediate values may exceed 64 bits. + * + * Copyright (C) 2026 Tempesta Technologies, INC. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, + * or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 59 + * Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef __LIB_128BIT_H__ +#define __LIB_128BIT_H__ + +#include +#include +#include +#include + +static inline u128 +u128_div_u32(u128 num, u32 divisor) +{ + u128 res = 0; + u128 cur = 0; + + BUG_ON(!divisor); + + /* + * Iterate over all 128 dividend bits. + * + * Bit 127 is the most significant bit of v.hi. + * Bit 0 is the least significant bit of v.lo. + */ + for (int i = 127; i >= 0; --i) { + /* + * Equivalent to: + * + * cur = cur * 2 + * + * We are making room for the next dividend bit, + * exactly like multiplying the current partial value + * by the radix before bringing down the next digit in + * ordinary long division. + */ + cur <<= 1; + /* + * Extract the next dividend bit and append it to + * the low end of the remainder. + * + * r = (cur << 1) | next_dividend_bit + */ + cur |= (num >> i) & 1; + + /* + * At this point cur contains the partial dividend formed + * from all bits processed so far. + * + * If the divisor fits into the current remainder, + * then the current quotient bit must be 1. + * + * Example: + * + * cur = 25 + * divisor = 10 + * + * Then: + * + * quotient_bit = 1 + * cur = 25 - 10 = 15 + * + * The updated remainder is carried into the next + * iteration. + */ + if (cur >= divisor) { + cur -= divisor; + /* + * Set quotient bit corresponding to the + * currently processed dividend position. + * + * Since we are traversing bits from MSB to + * LSB, the bit index in the quotient is the + * same as the current dividend bit index. + */ + res |= ((u128)(1) << i); + } + } + + return res; +} + +/* + * Compute the integer square root of a 128-bit unsigned value. + * + * We need to find the largest u64 value x such that: + * + * x * x <= v + * + * Since: + * + * sqrt(U128_MAX) < 2^64 + * + * the result always fits into u64 and can be searched in the range: + * + * [0, U64_MAX] + * + * The function uses binary search. + * + * Search invariant: + * + * - every value <= result is known to satisfy: + * + * x^2 <= v + * + * - every value > right is known to satisfy: + * + * x^2 > v + * + * On each iteration: + * + * 1. Pick the middle point. + * 2. Compute mid^2 as a full 128-bit value. + * 3. Compare it with v. + * 4. If mid^2 <= v, remember mid as a valid answer and + * continue searching for a larger one. + * 5. Otherwise search the lower half. + * + * Example: + * + * v = 111 + * + * left = 0 + * right = 18446744073709551615 + * + * Eventually the search narrows to: + * + * [10, 11] + * + * because: + * + * 10^2 = 100 <= 111 + * 11^2 = 121 > 111 + * + * Therefore: + * + * sqrt(111) = 10 + * + * Return: + * + * floor(sqrt(v)) + */ +static inline u64 +u128_sqrt(const u128 v) +{ + /* + * Current binary search range. + * + * The true answer is guaranteed to lie somewhere + * inside [left, right]. + */ + u64 left = 0; + u64 right = U64_MAX; + + /* + * Best valid answer found so far. + * + * Whenever we discover a value whose square does + * not exceed v, we store it here. + */ + u64 result = 0; + + while (left <= right) { + u64 mid; + u128 sq; + + /* + * Midpoint calculation written this way to avoid + * possible overflow of: + * + * (left + right) / 2 + */ + mid = left + ((right - left) >> 1); + sq = (u128)mid * (u128)mid; + + /* + * Compare: + * + * mid^2 ? v + * + * Result: + * + * < 0 : mid^2 < v + * 0 : mid^2 = v + * > 0 : mid^2 > v + */ + + if (sq <= v) { + /* + * mid^2 <= v + * + * Therefore mid is a valid square root + * candidate. + * + * Remember it and try to find a larger + * valid value in the upper half. + */ + result = mid; + + /* + * Prevent overflow of: + * + * left = mid + 1 + * + * when mid already equals U64_MAX. + */ + if (mid == U64_MAX) + break; + + /* + * Discard the lower half including mid. + * + * All values <= mid are no better than the + * candidate we already have. + */ + left = mid + 1; + } else { + /* + * mid^2 > v + * + * mid is too large to be the answer. + * + * The square root, if it exists, must be + * strictly smaller than mid. + */ + if (!mid) + break; + + /* + * Search the lower half. + */ + right = mid - 1; + } + } + + /* + * The largest value discovered such that: + * + * result^2 <= v + */ + return result; +} + +#endif /* __LIB_128BIT_H__ */ From 1c2f90304d96904f0ba6b8d7c5c6b0a2490c67fa Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Wed, 29 Apr 2026 06:52:05 +0300 Subject: [PATCH 2/6] Introduce z-score based adaptive_limits library and client conn tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a generic training/defence subsystem used to detect abnormal behavior based on z-score statistics. The implementation provides: - training mode: collect per-event statistics (sum, sumsq, count) using percpu counters to minimize contention; - defence mode: evaluate incoming values against calculated mean/std and reject anomalies exceeding configured z-score threshold (drop connection with TCP RST); Use adaptive limits (training/defence) library with per-client connection tracking. Maintain current and maximum number of concurrent connections per client and update statistic on each new maximum of concurrent client connections. In defence mode calculate z-score for the client on each new established connection and drop connection if z-score exceeded configured threshold. The classical Welford algorithm was evaluated but found unsuitable for this workload. In its original form Welford assumes an append-only stream of samples, where each new observation increases the sample count. In our case, "n" represents the number of clients rather than the number of events. For each client we continuously update the current maximum number of connections/requests/memory/cpu usage. When a value changes, the previous sample must be removed from the aggregated statistics before the updated value is inserted. This requires a replace/update operation rather than append-only updates, which implies a reversible variant of Welford’s algorithm and significantly increases implementation complexity. We therefore use a sum/sumsq based approach. Although sum/sumsq is generally considered less numerically stable than Welford’s algorithm due to potential catastrophic cancellation when subtracting large nearly equal values, this is not a concern in our case. For the expected value ranges in production workloads, such pathological distributions (e.g. values clustered around 1e9 with variance ≈ 1) are not realistic, and numerical precision remains sufficient. Part-of: training/defence mode implementation Issue: #1346 --- fw/adaptive_limits.c | 662 +++++++++++++++++++++++++++++++++++++++++++ fw/adaptive_limits.h | 220 ++++++++++++++ fw/client.c | 2 + fw/client.h | 7 +- fw/http_limits.c | 21 +- fw/main.c | 49 ++++ 6 files changed, 958 insertions(+), 3 deletions(-) create mode 100644 fw/adaptive_limits.c create mode 100644 fw/adaptive_limits.h diff --git a/fw/adaptive_limits.c b/fw/adaptive_limits.c new file mode 100644 index 000000000..ba899f931 --- /dev/null +++ b/fw/adaptive_limits.c @@ -0,0 +1,662 @@ +/** + * Tempesta FW + * + * Copyright (C) 2026 Tempesta Technologies, Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, + * or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ +#include "adaptive_limits.h" +#include "client.h" +#include "http_limits.h" +#include "tempesta_fw.h" +#include "lib/fault_injection_alloc.h" +#include "lib/128bit.h" + +/* Training period in seconds. Zero means disabled. */ +static unsigned int tfw_training_mode_period = 0; +unsigned int tfw_adaptive_limits_mode = + TFW_ADAPTIVE_LIMITS_MODE_IS_DISABLED; +unsigned int g_training_epoch = 0; + +/* Z-score thresholds for different metrics. */ +static int tfw_adaptive_limits_z_score_conn_num = 0; +static int tfw_adaptive_limits_z_score_req_num = 0; +static int tfw_adaptive_limits_z_score_mem = 0; +static int tfw_adaptive_limits_z_score_cpu = 0; + +/* Timer and worker used to switch training -> defence asynchronously. */ +static struct timer_list training_timer; +static struct work_struct training_work; + +/* + * Per-metric aggregated statistics. + * + * @sumsq - sum of squares of observed values; + * @sum - sum of observed values; + * @mean - calculated mean (scaled); + * @std - calculated standard deviation (scaled); + * @num - number of samples (e.g. number of clients); + * @scale_shift - scaling factor; + */ +struct stats { + u128 __percpu *sumsq; + u64 __percpu *sum; + u64 mean; + u64 std; + u32 __percpu *num; + unsigned int scale_shift; +}; + +/* Global RCU-protected statistics for each metric. */ +static struct stats __rcu *g_conn_num = NULL; +static struct stats __rcu *g_req_num = NULL; +static struct stats __rcu *g_mem_num = NULL; +static struct stats __rcu *g_cpu_num = NULL; + +/* + * Allocate and initialize stats structure. + * Returns NULL on failure. + */ +static inline struct stats * +__alloc_stats(void) +{ + struct stats *s; + gfp_t flags = GFP_KERNEL | __GFP_ZERO; + + s = tfw_kmalloc(sizeof(struct stats), flags); + if (unlikely(!s)) + return NULL; + + s->sumsq = tfw_alloc_percpu_gfp(u128, flags); + if (unlikely(!s->sumsq)) + goto fail_alloc_sumsq; + + s->sum = tfw_alloc_percpu_gfp(u64, flags); + if (unlikely(!s->sum)) + goto fail_alloc_sum; + + s->num = tfw_alloc_percpu_gfp(unsigned int, flags); + if (unlikely(!s->num)) + goto fail_alloc_num; + + return s; + +fail_alloc_num: + free_percpu(s->sum); +fail_alloc_sum: + free_percpu(s->sumsq); +fail_alloc_sumsq: + kfree(s); + + return NULL; +} + +/* Free stats structure and all embedded percpu counters. */ +static inline void +__free_stats(struct stats *s) +{ + if (likely(s)) { + free_percpu(s->sumsq); + free_percpu(s->sum); + free_percpu(s->num); + kfree(s); + } +} + +/* + * Disable both training and defence modes. + * + * Ensures that no readers are accessing RCU-protected stats, + * so pointers can be safely replaced. + */ +static inline void +tfw_adaptive_limits_disable_training_or_defence(void) +{ + /* + * Set TFW_ADAPTIVE_LIMITS_MODE_IS_DISABLED, now we stop + * calling all new defence and training functions. We don't + * try to make rcu pointer dereference after it. + */ + WRITE_ONCE(tfw_adaptive_limits_mode, + TFW_ADAPTIVE_LIMITS_MODE_IS_DISABLED); + /* + * Wait until all previous rcu calls finished, to be sure + * that we can safely change pointers. + */ + synchronize_rcu(); +} + +/* + * Replace all global stats with new instances. + * + * Safe due to prior call to tfw_adaptive_limits_disable_training_or_defence(). + */ +static inline void +__upgrade_all_stats(struct stats *new_conn_num, + struct stats *new_req_num, + struct stats *new_mem_num, + struct stats *new_cpu_num) +{ + struct stats *old_conn_num, *old_req_num, *old_mem_num, *old_cpu_num; + + tfw_adaptive_limits_disable_training_or_defence(); + old_conn_num = rcu_replace_pointer(g_conn_num, new_conn_num, true); + old_req_num = rcu_replace_pointer(g_req_num, new_req_num, true); + old_mem_num = rcu_replace_pointer(g_mem_num, new_mem_num, true); + old_cpu_num = rcu_replace_pointer(g_cpu_num, new_cpu_num, true); + /* + * We don't need second `synchronize_rcu` here (first `synchronize_rcu` + * is called inside `tfw_adaptive_limits_disable_training_or_defence`), + * because we check that adaptive limits mode is not disabled in all + * places where we access `stats`. + * So after calling `tfw_adaptive_limits_disable_training_or_defence` + * we are sure that all concurrent calls (where we can access `old_* + * stats) are finished or don't try to dereference appropriate pointer, + * because of already in disabled mode. + */ + __free_stats(old_conn_num); + __free_stats(old_req_num); + __free_stats(old_mem_num); + __free_stats(old_cpu_num); +} + +static inline int +__alloc_all_stats(struct stats **new_conn_num, + struct stats **new_req_num, + struct stats **new_mem_num, + struct stats **new_cpu_num) +{ + struct stats *conn_num = NULL; + struct stats *req_num = NULL; + struct stats *mem_num = NULL; + struct stats *cpu_num = NULL; + + conn_num = __alloc_stats(); + if (unlikely(!conn_num)) + return -ENOMEM; + + req_num = __alloc_stats(); + if (unlikely(!req_num)) + goto fail_alloc_req_num; + + mem_num = __alloc_stats(); + if (unlikely(!mem_num)) + goto fail_alloc_mem_num; + + cpu_num = __alloc_stats(); + if (unlikely(!cpu_num)) + goto fail_alloc_cpu_num; + + *new_conn_num = conn_num; + *new_req_num = req_num; + *new_mem_num = mem_num; + *new_cpu_num = cpu_num; + + return 0; + +fail_alloc_cpu_num: + __free_stats(mem_num); +fail_alloc_mem_num: + __free_stats(req_num); +fail_alloc_req_num: + __free_stats(conn_num); + + return -ENOMEM; +} + +static inline int +__alloc_upgrade_stats(void) +{ + struct stats *new_conn_num = NULL; + struct stats *new_req_num = NULL; + struct stats *new_mem_num = NULL; + struct stats *new_cpu_num = NULL; + int r; + + r = __alloc_all_stats(&new_conn_num, &new_req_num, &new_mem_num, + &new_cpu_num); + if (unlikely(r)) + return r; + + __upgrade_all_stats(new_conn_num, new_req_num, new_mem_num, + new_cpu_num); + + return 0; +} + +static inline int +__init_z_score(void) +{ + int r; + + r = __alloc_upgrade_stats(); + if (unlikely(r)) + return r; + + /* + * After set TFW_ADAPTIVE_LIMITS_MODE_IS_DISABLED and calling + * `synchronize_rcu` from `__upgrade_all_stats` we are sure that + * all defence and training functions were finished. Start new + * training epoch. + */ + g_training_epoch++; + + return 0; +} + +/* + * Compute mean and standard deviation from aggregated stats. + * Uses integer arithmetic with scaling. + */ +static inline void +__calculate_mean_and_std(struct stats *s) +{ + u128 variance, tmp1, tmp2; + u128 total_sumsq; + u64 total_sum; + u32 num_clients; + + total_sumsq = tfw_percpu_u128_counter_sum(s->sumsq); + total_sum = tfw_percpu_u64_counter_sum(s->sum); + num_clients = tfw_percpu_u32_counter_sum(s->num); + + if (!unlikely(num_clients)) + return; + + /* + * Use fixed-point scaling only if the accumulated sum is small enough. + * Once the sum approaches the u32 limit, disable scaling to prevent + * arithmetic overflow in subsequent calculations. + */ + s->scale_shift = total_sum < U32_MAX ? SCALE_SHIFT : 0; + /* + * Since total_sum < U32_MAX implies total_sumsq < U64_MAX, + * the fixed-point left shift cannot overflow. + */ + tmp1 = total_sumsq << s->scale_shift; + tmp1 = u128_div_u32(tmp1, num_clients); + s->mean = (total_sum << s->scale_shift) / num_clients; + tmp2 = (u128)s->mean * (u128)s->mean; + tmp2 = tmp2 >> s->scale_shift; + variance = tmp1 - tmp2; + s->std = u128_sqrt(variance << s->scale_shift); +} + +static inline bool +__calculate_z_score(u64 val, struct stats *s, s64 *z_score) +{ + if (unlikely(!s->std)) + return false; + + /* + * We store `mean` and `std` values in scaled format, so + * we should convert `val` to scaled format also. + */ + *z_score = ((s64)(val << s->scale_shift) - (s64)s->mean) / (s64)s->std; + + return true; +} + +static inline void +tfw_adaptive_limits_adjust_new_client(struct stats __rcu *g_stats) +{ + struct stats *s; + + /* + * rcu pointer dereference should be done under rcu lock, + * to prevent memory corruption. + */ + BUG_ON(!rcu_read_lock_held()); + s = rcu_dereference(g_stats); + this_cpu_add(*s->num, 1); +} + +static void +tfw_adaptive_limits_adjust_conn_new_client(void) +{ + return tfw_adaptive_limits_adjust_new_client(g_conn_num); +} + +static inline void +tfw_adaptive_limits_adjust_new_el(struct stats __rcu *g_stats, u64 delta1, + u128 delta2) +{ + struct stats *s; + + /* + * rcu pointer dereference should be done under rcu lock, + * to prevent memory corruption. + */ + BUG_ON(!rcu_read_lock_held()); + s = rcu_dereference(g_stats); + this_cpu_add(*s->sum, delta1); + /* `this_cpu_add` is not implemented for 128-bit value. */ + *this_cpu_ptr(s->sumsq) += delta2; +} + +static void +tfw_adaptive_limits_adjust_conn_num(u64 delta1, u128 delta2) +{ + return tfw_adaptive_limits_adjust_new_el(g_conn_num, delta1, delta2); +} + +/** + * Perform z-score based defence check + * + * @g_stats - RCU-protected pointer to aggregated statistics + * @val - current observed value + * @threshold - configured z-score threshold + * + * In defence mode, this function evaluates @val against previously + * learned statistics using z-score: + * + * z = (val - mean) / std + * + * If z-score exceeds @threshold, the value is considered anomalous + * and the function returns false (caller should reject the event) + * and drop connection with TCP RST. + * + * Return: + * true - value is acceptable + * false - value exceeds threshold (reject) + */ +static inline bool +tfw_adaptive_limits_defence(struct stats __rcu *g_stats, u64 val, int threshold) +{ + struct stats *p; + s64 z_score; + + /* + * rcu pointer dereference should be done under rcu lock, + * to prevent memory corruption. + */ + BUG_ON(!rcu_read_lock_held()); + + p = rcu_dereference(g_stats); + if (!__calculate_z_score(val, p, &z_score)) + return true; + + return z_score <= threshold; +} + +static inline bool +tfw_adaptive_limits_defence_conn_num(u64 val) +{ + int threshold = tfw_adaptive_limits_z_score_conn_num; + + return tfw_adaptive_limits_defence(g_conn_num, val, threshold); +} + +bool +tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, + u16 *epoch) +{ + u128 delta2; + u64 delta1; + unsigned int old_max; + bool new_client = false; + bool rc = true; + + /* + * Prevent training epoch changes while processing the event. + * + * A new training epoch is started only after: + * + * synchronize_rcu(); + * g_training_epoch++; + * + * Therefore, while we are inside this RCU read-side critical + * section, `g_training_epoch` cannot change and the event is + * guaranteed to be processed against a stable training epoch. + * + * This avoids races where an event is validated against one + * epoch and accounted after statistics have already been reset + * for the next epoch. + */ + rcu_read_lock(); + + if (tfw_adaptive_limits_mode_is_disabled()) + goto out; + + /* + * Ignore connection close events from previous training epochs. + * For new connections, assign current training epoch. + */ + if (delta < 0 && *epoch < g_training_epoch) + goto out; + else if (delta > 0) + *epoch = g_training_epoch; + + if (tfw_adaptive_limits_mode_is_defence()) { + limit->counter += delta; + WARN_ON(limit->counter < 0); + + if (delta > 0) + rc = tfw_adaptive_limits_defence_conn_num(limit->counter); + goto out; + } + + /* + * Training mode. + * + * Reset limit on each new training epoch. + * This is safe without extra synchronization as we are under + * client-private lock. + */ + if (limit->epoch < g_training_epoch) { + limit->epoch = g_training_epoch; + limit->counter = 0; + limit->max = 0; + new_client = true; + } + + if (new_client) + tfw_adaptive_limits_adjust_conn_new_client(); + limit->counter += delta; + WARN_ON(limit->counter < 0); + + old_max = limit->max; + if (limit->counter <= old_max) + goto out; + + limit->max = limit->counter; + delta1 = limit->counter - old_max; + delta2 = (u128)limit->counter * (u128)limit->counter - + (u128)old_max * (u128)old_max; + tfw_adaptive_limits_adjust_conn_num(delta1, delta2); + +out: + rcu_read_unlock(); + + return rc; +} + +static inline void +tfw_adaptive_limits_prepare_for_defence(void) +{ + /* + * Wait until all threads finish using of global pointers. + * After this call we are sure that no one access any g_*_num + * structures, so we can safely calculated mean and std. + */ + tfw_adaptive_limits_disable_training_or_defence(); + __calculate_mean_and_std(g_conn_num); + __calculate_mean_and_std(g_req_num); + __calculate_mean_and_std(g_mem_num); + __calculate_mean_and_std(g_cpu_num); + WRITE_ONCE(tfw_adaptive_limits_mode, + TFW_ADAPTIVE_LIMITS_MODE_IS_DEFENCE); +} + +/* + * Workqueue handler to safely switch modes (sleepable context). + */ +static void +training_work_fn(struct work_struct *work) +{ + tfw_adaptive_limits_prepare_for_defence(); +} + +static void +tfw_training_timer_cb(struct timer_list *t) +{ + /* + * We should use working thread, because we should call + * `synchronize_rcu` before start defence mode. + */ + schedule_work(&training_work); +} + +/* + * Stop training early and switch to defence mode if needed. + */ +static inline void +tfw_training_stop(void) +{ + /* + * If training mode has already completed and the system has switched + * to defence mode, or training was never started, the timer is already + * inactive and no further action is required. + */ + if (del_timer_sync(&training_timer)) + tfw_adaptive_limits_prepare_for_defence(); +} + +static inline int +tfw_training_start(void) +{ + unsigned long training_period_in_jiffies = + msecs_to_jiffies(1000 * tfw_training_mode_period); + int r; + + r = __init_z_score(); + if (unlikely(r)) + return r; + + WRITE_ONCE(tfw_adaptive_limits_mode, + TFW_ADAPTIVE_LIMITS_MODE_IS_TRAINING); + mod_timer(&training_timer, jiffies + training_period_in_jiffies); + + return 0; +} + +int +tfw_ctlfn_adaptive_limits_mode_change(unsigned int mode) +{ + if (mode) + return tfw_training_start(); + + tfw_training_stop(); + return 0; +} + +static int +tfw_training_mode_start(void) +{ + if (tfw_runstate_is_reconfig()) + return 0; + + INIT_WORK(&training_work, training_work_fn); + timer_setup(&training_timer, tfw_training_timer_cb, 0); + + return __alloc_all_stats(&g_conn_num, &g_req_num, &g_mem_num, + &g_cpu_num); +} + +static void +tfw_training_mode_stop(void) +{ + if (tfw_runstate_is_reconfig()) + return; + + timer_shutdown_sync(&training_timer); + flush_work(&training_work); + __upgrade_all_stats(NULL, NULL, NULL, NULL); +} + +static TfwCfgSpec tfw_training_mode_specs[] = { + { + .name = "training_period", + .deflt = "75", + .handler = tfw_cfg_set_int, + .allow_reconfig = true, + .dest = &tfw_training_mode_period, + .spec_ext = &(TfwCfgSpecInt) { + .range = { 0, UINT_MAX }, + }, + }, + { + .name = "adaptive_limits_z_score_connection_num", + .deflt = "0", + .handler = tfw_cfg_set_int, + .allow_reconfig = true, + .dest = &tfw_adaptive_limits_z_score_conn_num, + .spec_ext = &(TfwCfgSpecInt) { + .range = { 0, UINT_MAX }, + } + }, + { + .name = "adaptive_limits_z_score_request_num", + .deflt = "0", + .handler = tfw_cfg_set_int, + .allow_reconfig = true, + .dest = &tfw_adaptive_limits_z_score_req_num, + .spec_ext = &(TfwCfgSpecInt) { + .range = { 0, UINT_MAX }, + } + }, + { + .name = "adaptive_limits_z_score_mem", + .deflt = "0", + .handler = tfw_cfg_set_int, + .allow_reconfig = true, + .dest = &tfw_adaptive_limits_z_score_mem, + .spec_ext = &(TfwCfgSpecInt) { + .range = { 0, UINT_MAX }, + } + }, + { + .name = "adaptive_limits_z_score_cpu", + .deflt = "0", + .handler = tfw_cfg_set_int, + .allow_reconfig = true, + .dest = &tfw_adaptive_limits_z_score_cpu, + .spec_ext = &(TfwCfgSpecInt) { + .range = { 0, UINT_MAX }, + } + }, + { 0 } +}; + +TfwMod tfw_training_mod = { + .name = "training", + .start = tfw_training_mode_start, + .stop = tfw_training_mode_stop, + .specs = tfw_training_mode_specs, +}; + +int __init +tfw_adaptive_limits_init(void) +{ + tfw_mod_register(&tfw_training_mod); + + return 0; +} + +void +tfw_adaptive_limits_exit(void) +{ + tfw_mod_unregister(&tfw_training_mod); +} diff --git a/fw/adaptive_limits.h b/fw/adaptive_limits.h new file mode 100644 index 000000000..25fa6f973 --- /dev/null +++ b/fw/adaptive_limits.h @@ -0,0 +1,220 @@ +/** + * Tempesta FW training and defence subsystem. + * + * This file implements a lightweight anomaly detection mechanism used to + * protect against abnormal client behaviour (e.g. excessive number of + * connections, requests, memory usage or CPU consumption). + * + * The subsystem operates in three modes: + * - training: + * Statistics are collected for selected metrics. For each observation, + * the first and second moments (sum and sum of squares) are accumulated + * using per-CPU counters to minimize contention. + * + * - defence: + * Runtime values are compared against the learned distribution using + * z-score (z = (x - mean) / stddev) If the computed z-score exceeds + * a configured threshold, the event is considered anomalous and + * connection will be dropped. + * - disabled: + * Transient state used during mode transitions to safely update shared + * data structures. In this state both training and defence paths are + * bypassed. + * + * Implementation details: + * + * - All arithmetic is performed using fixed-point integers (see + * SCALE_SHIFT) to avoid floating point usage in kernel space. + * + * - Global statistics are maintained in RCU-protected structures and + * updated via percpu_counter to provide scalability on multi-core + * systems. + * + * - Mean and standard deviation are computed at the end of the training + * phase and then used in defence mode without further modification. + * + * Several approaches for online variance calculation were evaluated, including + * Welford’s algorithm and the “sum/squared-sum” method. It was found that the + * “sum/squared-sum” method is better for our purposes: + * + * - Original form Welford assumes an append-only stream of samples, where + * each new observation increases the total sample count. In our case, + * however, "n" represents the number of clients rather than the number + * of events, so we need a modified reversible version of Welford’s + * algorithm, which significantly complicates the implementation and + * slower then it's classic version. + * + * - Kernel-space constraints prohibit floating-point arithmetic, requiring + * the use of fixed-point integer arithmetic instead. While Welford’s + * algorithm is known for its excellent numerical stability with + * floating-point arithmetic, its fixed-point implementation introduces + * truncation errors during repeated division operations. + * + * - “sum/squared-sum” method is generally considered less numerically + * stable than Welford’s algorithm because subtracting two large close + * values may lead to catastrophic cancellation and precision loss. + * However, this issue primarily affects workloads with very large + * numbers and extremely small variance. For the considered workload, + * where client metrics are bounded and remain relatively small, the + * “sum/squared-sum” approach provides sufficient numerical accuracy + * while being substantially simpler and faster. + * + * According to the selected algorithm at the end of the training phase + * the following statistics are derived from the accumulated "sum" and + * "sumsq": + * + * sum + * μ = ─── + * n + * + * sumsq + * σ² = ───── - μ² + * n + * + * σ = √σ² + * + * The resulting mean (μ) and standard deviation (σ) are then used in the + * defence mode to compute z-scores. + * + * Copyright (C) 2026 Tempesta Technologies, Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, + * or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef __TFW_ADAPTIVE_LIMITS_H__ +#define __TFW_ADAPTIVE_LIMITS_H__ + +#include +#include "asm-generic/rwonce.h" +#include "lib/fault_injection_alloc.h" + +/* + * Fixed-point scaling factor used for integer arithmetic. + * Kernel code avoids floating point operations, so all fractional + * calculations (e.g. mean, variance, z-score) are performed using + * scaled integers. SCALE_SHIFT defines the number of fractional bits. + * + * A value X is represented internally as: + * + * X_scaled = X << SCALE_SHIFT + * + * For example, with SCALE_SHIFT = 10: + * + * 1.0 -> 1024 + * 2.5 -> 2560 + */ +#define SCALE_SHIFT 10 + +/* + * Adaptive limits mode. + * + * defence - defence mode. For each event Tempesta FW computes z-score + * and compares it against a configured threshold. If the + * threshold is exceeded, the connection is dropped (TCP RST). + * + * training - training mode. Statistics are collected and accumulated to + * build mean and standard deviation used later in defence mode. + * + * disabled - internal state used during mode transitions. While in this + * mode, both training and defence paths are disabled to + * provide safe synchronization. + */ +typedef enum { + TFW_ADAPTIVE_LIMITS_MODE_IS_DEFENCE = 0, + TFW_ADAPTIVE_LIMITS_MODE_IS_TRAINING = 1, + TFW_ADAPTIVE_LIMITS_MODE_IS_DISABLED = 2 +} TfwAdaptiveLimitsMode; + +/* * Current adaptive limits mode (see TfwAdaptiveLimitsMode). */ +extern unsigned int tfw_adaptive_limits_mode; +/* + * Global training epoch counter. + * Incremented each time a new training cycle starts. Used by per-object + * state to detect epoch changes and reset local statistics. + */ +extern unsigned int g_training_epoch; + +/* + * A simple adaptive limit structure used to track events, + * which is already protected by an external lock. + * + * @counter - current value (e.g. active connections). + * @max - maximum observed value within the current epoch. + * @epoch - training epoch identifier. compared against the global + * @g_training_epoch to detect epoch change and trigger + * reinitialization of @max and @counter. + */ +typedef struct { + int counter; + unsigned int max; + u16 epoch; +} TfwAdaptiveLimit; + +int tfw_adaptive_limits_init(void); +void tfw_adaptive_limits_exit(void); + +bool tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, + u16 *epoch); +int tfw_ctlfn_adaptive_limits_mode_change(unsigned int mode); + +static inline void +tfw_adaptive_limit_init(TfwAdaptiveLimit *limit) +{ + limit->counter = 0; + limit->max = 0; + limit->epoch = 0; +} + +static inline bool +tfw_adaptive_limits_mode_is_disabled(void) +{ + return READ_ONCE(tfw_adaptive_limits_mode) == + TFW_ADAPTIVE_LIMITS_MODE_IS_DISABLED; +} + +static inline bool +tfw_adaptive_limits_mode_is_training(void) +{ + return READ_ONCE(tfw_adaptive_limits_mode) == + TFW_ADAPTIVE_LIMITS_MODE_IS_TRAINING; +} + +static inline bool +tfw_adaptive_limits_mode_is_defence(void) +{ + return READ_ONCE(tfw_adaptive_limits_mode) == + TFW_ADAPTIVE_LIMITS_MODE_IS_DEFENCE; +} + +#define PERCPU_COUNTER_SUMM(type) \ +static inline type \ +tfw_percpu_##type##_counter_sum(type __percpu *counter) \ +{ \ + type total = 0; \ + int cpu; \ + \ + for_each_online_cpu(cpu) \ + total += *(per_cpu_ptr(counter, cpu)); \ + \ + return total; \ +} + +PERCPU_COUNTER_SUMM(u128) +PERCPU_COUNTER_SUMM(u64) +PERCPU_COUNTER_SUMM(u32) + +#undef PERCPU_COUNTER_SUMM + +#endif /* __TFW_ADAPTIVE_LIMITS_H__ */ diff --git a/fw/client.c b/fw/client.c index 56f28e0f5..ffb932b77 100644 --- a/fw/client.c +++ b/fw/client.c @@ -31,6 +31,7 @@ #include "procfs.h" #include "tdb.h" #include "lib/fault_injection_alloc.h" +#include "adaptive_limits.h" #include "lib/str.h" #include "lib/common.h" @@ -473,6 +474,7 @@ tfw_client_ent_init(TdbRec *rec, void *data) if (ctx->init) ctx->init(cli); + tfw_adaptive_limit_init(&cli->conn_lim); tfw_peer_init((TfwPeer *)cli, &ctx->addr); ent->xff_addr = ctx->xff_addr; tfw_str_to_cstr(&ctx->user_agent, ent->user_agent, diff --git a/fw/client.h b/fw/client.h index a280b98dc..a7535d536 100644 --- a/fw/client.h +++ b/fw/client.h @@ -23,6 +23,7 @@ #include "http_limits.h" #include "connection.h" +#include "adaptive_limits.h" /* * Client memory accounting structure for Tempesta FW. @@ -50,14 +51,18 @@ typedef struct tfw_client_mem_t { * @class_prvt - private client accounting data for classifier module. * Typically it's large and wastes memory in vain if * no any classification logic is used; - * @list_head - entry in the lru list; + * @list - entry in the lru list; * @cli_mem - memory used by current client; + * @conn_lim - structure to track active connections count in + * for the current client. Used in adaptive_limits + * module to collect statistic and z-score calculation; */ typedef struct { TFW_PEER_COMMON; TfwClassifierPrvt class_prvt; struct list_head list; TfwClientMem *cli_mem; + TfwAdaptiveLimit conn_lim; } TfwClient; int tfw_client_init(void); diff --git a/fw/http_limits.c b/fw/http_limits.c index f7018ecfe..7b5c223d0 100644 --- a/fw/http_limits.c +++ b/fw/http_limits.c @@ -45,6 +45,7 @@ #include "http_match.h" #include "http.h" #include "http_sess.h" +#include "adaptive_limits.h" /* * ------------------------------------------------------------------------ @@ -213,15 +214,24 @@ frang_time_quantum(unsigned short tframe) } static int -frang_conn_limit(FrangAcc *ra, FrangGlobCfg *conf) +frang_conn_limit(struct sock *sk, FrangGlobCfg *conf) { + FrangAcc *ra = frang_acc_from_sk(sk); + u16 *training_epoch = &tempesta_sock(sk)->training_epoch; const unsigned long ts = frang_time_quantum(conf->conn_rate_tf); const int i = ts % FRANG_FREQ; + TfwAdaptiveLimit *conn_lim; spin_lock(&ra->lock); frang_acc_history_init(ra, ts); + conn_lim = &FRANG_ACC2CLI(ra)->conn_lim; + if (!tfw_adaptive_limits_check_conn_num(conn_lim, 1, training_epoch)) { + spin_unlock(&ra->lock); + return T_BLOCK; + } + if (conf->conn_max && unlikely(ra->conn_curr > conf->conn_max)) { frang_limmsg("connections max num.", ra->conn_curr, conf->conn_max, &FRANG_ACC2CLI(ra)->addr); @@ -310,6 +320,7 @@ frang_conn_new(struct sock *sk, struct sk_buff *skb) * TfwConn{}. */ tempesta_sock(sk)->class_prvt = ra; + tempesta_sock(sk)->training_epoch = 0; if (tfw_http_mark_is_in_whitlist(skb->mark)) { /* * Netfilter works on TCP/IP level, so once we observe a @@ -339,7 +350,7 @@ frang_conn_new(struct sock *sk, struct sk_buff *skb) * and to configure it, while making some of the limits to be global * for a single client is absolutely straight-forward. */ - r = frang_conn_limit(ra, dflt_vh->frang_gconf); + r = frang_conn_limit(sk, dflt_vh->frang_gconf); if (unlikely(r == T_BLOCK) && dflt_vh->frang_gconf->ip_block) tfw_filter_block_ip(cli, dflt_vh->frang_gconf->ip_block_duration); @@ -357,6 +368,8 @@ void tfw_classify_conn_close(struct sock *sk) { FrangAcc *ra = frang_acc_from_sk(sk); + u16 *training_epoch = &tempesta_sock(sk)->training_epoch; + TfwAdaptiveLimit *conn_lim; if (unlikely(!sock_flag(sk, SOCK_TEMPESTA))) return; @@ -366,8 +379,11 @@ tfw_classify_conn_close(struct sock *sk) spin_lock(&ra->lock); + conn_lim = &FRANG_ACC2CLI(ra)->conn_lim; BUG_ON(ra->conn_curr == 0); ra->conn_curr--; + WARN_ON(!tfw_adaptive_limits_check_conn_num(conn_lim, -1, + training_epoch)); if (!ra->conn_curr) TFW_DEC_STAT_BH(clnt.online); @@ -375,6 +391,7 @@ tfw_classify_conn_close(struct sock *sk) spin_unlock(&ra->lock); tempesta_sock(sk)->class_prvt = NULL; + tempesta_sock(sk)->training_epoch = 0; tfw_client_put(FRANG_ACC2CLI(ra)); } diff --git a/fw/main.c b/fw/main.c index b974109ae..2f7fa2067 100644 --- a/fw/main.c +++ b/fw/main.c @@ -32,6 +32,7 @@ #include "server.h" #include "str.h" #include "sync_socket.h" +#include "adaptive_limits.h" #include "lib/fsm.h" MODULE_AUTHOR(TFW_AUTHOR); @@ -390,6 +391,47 @@ tfw_ctlfn_state_io(const struct ctl_table *ctl, int is_write, return r; } +/* + * Syctl handler for tempesta.training read/write operations. + */ +static int +tfw_ctlfn_training_mode_io(const struct ctl_table *ctl, int is_write, + void *user_buf, size_t *lenp, loff_t *ppos) +{ + struct ctl_table tmp = *ctl; + unsigned int mode; + int r; + + mutex_lock(&tfw_sysctl_mtx); + + if (is_write) { + /* + * There are only two available modes to set: + * 0 - defence mode; + * 1 - training mode; + * There is also one more mode (2 - disable mode), + * but it is used only for internal purposes. + */ + tmp.data = &mode; + tmp.extra1 = SYSCTL_ZERO, + tmp.extra2 = SYSCTL_ONE, + + r = proc_dointvec_minmax(&tmp, is_write, user_buf, lenp, ppos); + if (unlikely(r)) + goto out; + + r = tfw_ctlfn_adaptive_limits_mode_change(mode); + } else { + mode = READ_ONCE(tfw_adaptive_limits_mode); + tmp.data = &mode; + r = proc_dointvec_minmax(&tmp, is_write, user_buf, lenp, ppos); + } + +out: + mutex_unlock(&tfw_sysctl_mtx); + return r; +} + /** * Wait until all objects of some specific type @obj_name are * destructed. The count of objects is specified in atomic @counter. @@ -437,6 +479,12 @@ static struct ctl_table tfw_sysctl_tbl[] = { .maxlen = T_SYSCTL_STBUF_LEN - 1, .mode = 0644, .proc_handler = tfw_ctlfn_state_io, + }, + { + .procname = "training", + .maxlen = sizeof(unsigned int), + .mode = 0644, + .proc_handler = tfw_ctlfn_training_mode_io, } }; @@ -529,6 +577,7 @@ tfw_init(void) DO_INIT(http_tbl); DO_INIT(sched_hash); DO_INIT(sched_ratio); + DO_INIT(adaptive_limits); return 0; err: From 29171ea896157a9e32e15dfdd222e8352f0ff2b3 Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Wed, 3 Jun 2026 15:17:11 +0300 Subject: [PATCH 3/6] Implement non-idempotent requests tracking Use the adaptive limits framework to track per-client in-flight non-idempotent requests, since only such requests occupy upstream connections and therefore are suitable for overload detection. Introduce `TfwAdaptiveLimitLock`, a generic adaptive limit structure with a per-CPU counter, per-epoch maximum tracking, and synchronization for training epoch transitions. Extend the adaptive limits library with helpers for request accounting and z-score calculation, reusing the existing logic. Tracking of in-flight non-idempotent requests is performed in two stages: - We account non-idempotent requests in the HTTP layer by incrementing the counter when a non-idempotent request is queued and decrementing it once the request completes. On this stage the current request count is updated using per-CPU counters without acquiring any locks. - The second stage occurs in the `on_rcv_finish` callback at the end of `ss_tcp_process_data`. At this point, the current number of in-flight requests is obtained by aggregating all per-CPU counters. If the aggregated value exceeds the previously recorded maximum, the maximum is updated atomically and the corresponding deltas are applied to the global `sum` and `sumsq` statistics. This agregated value is also used in defence mode for z-score calculation and deciding whether the client should be blocked. This approach avoids expensive synchronization on every request while still maintaining accurate client maxima for statistical analysis. Part-of: training/defence mode implementation Issue: #1346 --- fw/adaptive_limits.c | 207 +++++++++++++++++++++++++++++++-- fw/adaptive_limits.h | 24 ++++ fw/client.c | 255 ++++++++++++++++++++++++----------------- fw/client.h | 61 ++++++---- fw/http.c | 31 ++++- fw/http.h | 6 +- fw/http_limits.c | 15 +-- fw/t/unit/helpers.c | 32 +++++- fw/t/unit/test.c | 2 +- fw/t/unit/test_hpack.c | 10 +- 10 files changed, 478 insertions(+), 165 deletions(-) diff --git a/fw/adaptive_limits.c b/fw/adaptive_limits.c index ba899f931..5b79fa53a 100644 --- a/fw/adaptive_limits.c +++ b/fw/adaptive_limits.c @@ -241,6 +241,9 @@ __init_z_score(void) { int r; + if (unlikely(g_training_epoch >= U16_MAX)) + return -EINVAL; + r = __alloc_upgrade_stats(); if (unlikely(r)) return r; @@ -329,6 +332,12 @@ tfw_adaptive_limits_adjust_conn_new_client(void) return tfw_adaptive_limits_adjust_new_client(g_conn_num); } +static void +tfw_adaptive_limits_adjust_req_new_client(void) +{ + return tfw_adaptive_limits_adjust_new_client(g_req_num); +} + static inline void tfw_adaptive_limits_adjust_new_el(struct stats __rcu *g_stats, u64 delta1, u128 delta2) @@ -352,6 +361,12 @@ tfw_adaptive_limits_adjust_conn_num(u64 delta1, u128 delta2) return tfw_adaptive_limits_adjust_new_el(g_conn_num, delta1, delta2); } +static void +tfw_adaptive_limits_adjust_req_num(u64 delta1, u128 delta2) +{ + return tfw_adaptive_limits_adjust_new_el(g_req_num, delta1, delta2); +} + /** * Perform z-score based defence check * @@ -399,6 +414,29 @@ tfw_adaptive_limits_defence_conn_num(u64 val) return tfw_adaptive_limits_defence(g_conn_num, val, threshold); } +static inline bool +tfw_adaptive_limits_defence_req_num(u64 val) +{ + int threshold = tfw_adaptive_limits_z_score_req_num; + + return tfw_adaptive_limits_defence(g_req_num, val, threshold); +} + +static inline bool +tfw_adaptive_limits_check_and_set_epoch(u16 *epoch, bool new_event) +{ + /* + * Ignore events from the previous training epochs. Set epoch for + * the new events. + */ + if (!new_event && *epoch < g_training_epoch) + return false; + else if (new_event) + *epoch = g_training_epoch; + + return true; +} + bool tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, u16 *epoch) @@ -406,7 +444,7 @@ tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, u128 delta2; u64 delta1; unsigned int old_max; - bool new_client = false; + bool new_event, new_client = false; bool rc = true; /* @@ -430,14 +468,9 @@ tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, if (tfw_adaptive_limits_mode_is_disabled()) goto out; - /* - * Ignore connection close events from previous training epochs. - * For new connections, assign current training epoch. - */ - if (delta < 0 && *epoch < g_training_epoch) + new_event = delta > 0 && !(*epoch); + if (!tfw_adaptive_limits_check_and_set_epoch(epoch, new_event)) goto out; - else if (delta > 0) - *epoch = g_training_epoch; if (tfw_adaptive_limits_mode_is_defence()) { limit->counter += delta; @@ -483,6 +516,143 @@ tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, return rc; } +static inline bool +tfw_adaptive_limits_change_epoch(TfwAdaptiveLimitLock *limit) +{ + bool new_client = false; + + /* + * We increment `g_training_epoch` each time when we start new + * training, when we are sure that all threads don't use `max` + * and `counter`. During training all threads call this function + * before use `counter` and `max`, so we are sure that `counter` + * and `max` will be zeroed on the start of the new training. + * We make first check to prevent unnecessary lock on the hot + * path on each call. + */ + if (limit->epoch < g_training_epoch) { + spin_lock_bh(&limit->lock); + if (likely(limit->epoch < g_training_epoch)) { + int cpu; + + for_each_online_cpu(cpu) + *(per_cpu_ptr(limit->counter, cpu)) = 0; + atomic64_set(&limit->max, 0); + limit->epoch = g_training_epoch; + new_client = true; + } + spin_unlock_bh(&limit->lock); + } + + return new_client; +} + +static void +tfw_adaptive_limits_acc(TfwAdaptiveLimitLock *limit, int delta, + void (*adjust_new_client)(void), + u16 *epoch) +{ + bool new_event; + + /* + * Prevent training epoch changes while processing the event. + * (see `tfw_adaptive_limits_check_conn_num` for detail comment). + */ + rcu_read_lock(); + + if (tfw_adaptive_limits_mode_is_disabled()) + goto out; + + new_event = delta > 0 && !(*epoch); + if (!tfw_adaptive_limits_check_and_set_epoch(epoch, new_event)) + goto out; + + if (tfw_adaptive_limits_mode_is_training() + && tfw_adaptive_limits_change_epoch(limit)) + adjust_new_client(); + + this_cpu_add(*limit->counter, delta); + +out: + rcu_read_unlock(); +} + +void +tfw_adaptive_limits_acc_req_num(TfwAdaptiveLimitLock *limit, int delta, + u16 *epoch) +{ + void (*adjust_new_client)(void) = + tfw_adaptive_limits_adjust_req_new_client; + + tfw_adaptive_limits_acc(limit, delta, adjust_new_client, epoch); +} + +static inline bool +tfw_adaptive_limits_change_max(TfwAdaptiveLimitLock *limit, s64 curr, + u64 *delta1, u128 *delta2) +{ + s64 old_max = atomic64_read(&limit->max); + + /* + * Can be called concurrentrly on other cpu with different + * curr value, so we need syncronization here. + */ + do { + if (curr <= old_max) + return false; + } while (!atomic64_try_cmpxchg(&limit->max, &old_max, curr)); + + *delta1 = ((u64)curr - (u64)old_max); + *delta2 = (u128)curr * (u128) curr - (u128)old_max * (u128)old_max; + + return true; +} + +static bool +tfw_adaptive_limits_check(TfwAdaptiveLimitLock *limit, + void (*adjust_num)(u64, u128), + bool(*defence)(u64)) +{ + u128 delta2; + u64 delta1; + s64 curr; + bool rc = true; + + /* + * Prevent training epoch changes while processing the event. + * (see `tfw_adaptive_limits_check_conn_num`). + */ + rcu_read_lock(); + + if (tfw_adaptive_limits_mode_is_disabled()) + goto out; + + curr = tfw_percpu_s64_counter_sum(limit->counter); + WARN_ON(curr < 0); + if (tfw_adaptive_limits_mode_is_defence()) { + rc = defence(curr); + goto out; + } + + if (tfw_adaptive_limits_change_max(limit, curr, &delta1, &delta2)) + adjust_num(delta1, delta2); + +out: + rcu_read_unlock(); + + return rc; +} + +bool +tfw_adaptive_limits_check_req_num(TfwAdaptiveLimitLock *limit) +{ + void (*adjust_num)(u64, u128) = + tfw_adaptive_limits_adjust_req_num; + bool (*defence)(u64) = tfw_adaptive_limits_defence_req_num; + + return tfw_adaptive_limits_check(limit, adjust_num, defence); +} + static inline void tfw_adaptive_limits_prepare_for_defence(void) { @@ -562,6 +732,27 @@ tfw_ctlfn_adaptive_limits_mode_change(unsigned int mode) return 0; } +int +tfw_adaptive_limit_lock_init(TfwAdaptiveLimitLock *limit, gfp_t flags) +{ + limit->counter = tfw_alloc_percpu_gfp(s64, flags | __GFP_ZERO); + if (unlikely(!limit->counter)) + return -ENOMEM; + + spin_lock_init(&limit->lock); + atomic64_set(&limit->max, 0); + limit->epoch = 0; + + return 0; +} + +void +tfw_adaptive_limit_lock_destroy(TfwAdaptiveLimitLock *limit) +{ + free_percpu(limit->counter); + limit->counter = NULL; +} + static int tfw_training_mode_start(void) { diff --git a/fw/adaptive_limits.h b/fw/adaptive_limits.h index 25fa6f973..0d5d8e017 100644 --- a/fw/adaptive_limits.h +++ b/fw/adaptive_limits.h @@ -162,13 +162,36 @@ typedef struct { u16 epoch; } TfwAdaptiveLimit; +/* + * counter - percpu array to track current value of the tracked metric; + * lock - spinlock for serialized reset of @max and @counter when a + * new training epoch starts. + * max - maximum observed value of the tracked metric within the + * current training epoch; + * @epoch - training epoch identifier. Compared against the global + * @g_training_epoch to detect epoch change and trigger + * reinitialization of @max and @counter; + */ +typedef struct { + s64 __percpu *counter; + spinlock_t lock; + atomic64_t max; + u16 epoch; +} TfwAdaptiveLimitLock; + int tfw_adaptive_limits_init(void); void tfw_adaptive_limits_exit(void); bool tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, u16 *epoch); +void tfw_adaptive_limits_acc_req_num(TfwAdaptiveLimitLock *limit, + int delta, u16 *epoch); +bool tfw_adaptive_limits_check_req_num(TfwAdaptiveLimitLock *limit); int tfw_ctlfn_adaptive_limits_mode_change(unsigned int mode); +int tfw_adaptive_limit_lock_init(TfwAdaptiveLimitLock *limit, gfp_t flags); +void tfw_adaptive_limit_lock_destroy(TfwAdaptiveLimitLock *limit); + static inline void tfw_adaptive_limit_init(TfwAdaptiveLimit *limit) { @@ -214,6 +237,7 @@ tfw_percpu_##type##_counter_sum(type __percpu *counter) \ PERCPU_COUNTER_SUMM(u128) PERCPU_COUNTER_SUMM(u64) PERCPU_COUNTER_SUMM(u32) +PERCPU_COUNTER_SUMM(s64) #undef PERCPU_COUNTER_SUMM diff --git a/fw/client.c b/fw/client.c index ffb932b77..d47bd2291 100644 --- a/fw/client.c +++ b/fw/client.c @@ -27,6 +27,7 @@ #include "hash.h" #include "client.h" #include "connection.h" +#include "filter.h" #include "log.h" #include "procfs.h" #include "tdb.h" @@ -72,111 +73,118 @@ static TDB *client_db; static atomic_t shutdown_pending = ATOMIC_INIT(0); static DECLARE_WAIT_QUEUE_HEAD(shutdown_wq); -static struct kmem_cache *tfw_cli_mem_cache; +static struct kmem_cache *tfw_cli_adaptive_limits_cache; static struct { - TfwClientMem *mem; - TfwClientMem *free_list; + TfwClientAdaptiveLimits *objs; + TfwClientAdaptiveLimits *free_list; unsigned int size; unsigned int order; -} cli_mem_pool; +} cli_adaptive_limits_pool; static inline bool -tfw_cli_mem_belongs_to_pool(TfwClientMem *cli_mem) +tfw_cli_adaptive_limits_belongs_to_pool(TfwClientAdaptiveLimits *limits) { - return cli_mem >= cli_mem_pool.mem - && cli_mem < cli_mem_pool.mem + cli_mem_pool.size; + return limits >= cli_adaptive_limits_pool.objs + && (limits < cli_adaptive_limits_pool.objs + + cli_adaptive_limits_pool.size); } static void -__cli_mem_release(TfwClientMem *cli_mem) +__cli_adaptive_limits_release(TfwClientAdaptiveLimits *limits) { - percpu_ref_exit(&cli_mem->refcnt); - free_percpu(cli_mem->mem); - if (!tfw_cli_mem_belongs_to_pool(cli_mem)) - kmem_cache_free(tfw_cli_mem_cache, cli_mem); + percpu_ref_exit(&limits->refcnt); + free_percpu(limits->cli_mem.mem); + tfw_adaptive_limit_lock_destroy(&limits->req_lim); + if (!tfw_cli_adaptive_limits_belongs_to_pool(limits)) + kmem_cache_free(tfw_cli_adaptive_limits_cache, limits); } /* - * Reset counters, reinit refcnt and put `cli_mem` back to the pool. - * Sohuld be called under `ga_lock`, to protect `cli_mem_pool.free_list` + * Reset limits, reinit refcnt and put `limits` back to the pool. + * Should be called under `ga_lock`, to protect pool `free_list` pointer. */ static inline void -tfw_cli_mem_pool_free(TfwClientMem *cli_mem) +tfw_cli_adaptive_limits_pool_free(TfwClientAdaptiveLimits *limits) { int cpu; assert_spin_locked(&client_db->ga_lock); - for_each_online_cpu(cpu) - *per_cpu_ptr(cli_mem->mem, cpu) = 0; - percpu_ref_reinit(&cli_mem->refcnt); - cli_mem->next_free = cli_mem_pool.free_list; - cli_mem_pool.free_list = cli_mem; + for_each_online_cpu(cpu) { + *per_cpu_ptr(limits->cli_mem.mem, cpu) = 0; + *per_cpu_ptr(limits->req_lim.counter, cpu) = 0; + } + percpu_ref_reinit(&limits->refcnt); + limits->next_free = cli_adaptive_limits_pool.free_list; + cli_adaptive_limits_pool.free_list = limits; } /* - * Workqueue handler for asynchronous cli_mem destruction. + * Workqueue handler for asynchronous limits destruction. * - * This function initiates final teardown of a TfwClientMem object: + * This function initiates final teardown of a TfwClientAdaptiveLimits + * object: * - percpu_ref_kill() marks the refcount as dead, preventing any new * users from acquiring references. * - percpu_ref_put() drops the caller’s reference, which may trigger - * final release via cli_mem_release() once all outstanding users - * are gone. + * final release via cli_adaptive_limits_release() once all outstanding + * users are gone. */ static void -tfw_cli_mem_kill_work_fn(struct work_struct *work) +tfw_cli_adaptive_limits_kill_work_fn(struct work_struct *work) { - TfwClientMem *cli_mem = container_of(work, TfwClientMem, kill_work); + TfwClientAdaptiveLimits *limits = + container_of(work, TfwClientAdaptiveLimits, kill_work); - percpu_ref_kill(&cli_mem->refcnt); - percpu_ref_put(&cli_mem->refcnt); + percpu_ref_kill(&limits->refcnt); + percpu_ref_put(&limits->refcnt); } /* - * Get `TfwClientMem` object from pool if present. + * Get `TfwClientAdaptiveLimits` object from pool if present. * Object was already initialized during pool creation or * releasing to pool. */ -static inline TfwClientMem * -tfw_cli_mem_pool_alloc(void) +static inline TfwClientAdaptiveLimits * +tfw_cli_adaptive_limits_pool_alloc(void) { - TfwClientMem *cli_mem; + TfwClientAdaptiveLimits *limits; assert_spin_locked(&client_db->ga_lock); - if (!cli_mem_pool.free_list) + if (!cli_adaptive_limits_pool.free_list) return NULL; - cli_mem = cli_mem_pool.free_list; - cli_mem_pool.free_list = cli_mem->next_free; + limits = cli_adaptive_limits_pool.free_list; + cli_adaptive_limits_pool.free_list = limits->next_free; /* * Should be called only after `free_list` initialization * using `next_free` pointer, because `next_free` and * `kill_work` members belong to the same union. */ - INIT_WORK(&cli_mem->kill_work, tfw_cli_mem_kill_work_fn); + INIT_WORK(&limits->kill_work, tfw_cli_adaptive_limits_kill_work_fn); - return cli_mem; + return limits; } /* - * Final release of cli_mem: verify refcnt/memory are zero and either + * Final release of limits: verify refcnt/memory are zero and either * return to pool or free it. Signals shutdown completion if needed. */ static void -cli_mem_release(struct percpu_ref *ref) +cli_adaptive_limits_release(struct percpu_ref *ref) { - TfwClientMem *cli_mem = container_of(ref, TfwClientMem, refcnt); + TfwClientAdaptiveLimits *limits = + container_of(ref, TfwClientAdaptiveLimits, refcnt); spin_lock_bh(&client_db->ga_lock); WARN_ON_ONCE(!percpu_ref_is_zero(ref)); - WARN_ON_ONCE(tfw_client_mem(cli_mem)); - if (tfw_cli_mem_belongs_to_pool(cli_mem)) - tfw_cli_mem_pool_free(cli_mem); + WARN_ON_ONCE(tfw_client_mem(&limits->cli_mem)); + if (tfw_cli_adaptive_limits_belongs_to_pool(limits)) + tfw_cli_adaptive_limits_pool_free(limits); else - __cli_mem_release(cli_mem); + __cli_adaptive_limits_release(limits); spin_unlock_bh(&client_db->ga_lock); @@ -185,76 +193,90 @@ cli_mem_release(struct percpu_ref *ref) } static inline int -tfw_cli_mem_init(TfwClientMem *cli_mem, gfp_t flags) +tfw_cli_adaptive_limits_init(TfwClientAdaptiveLimits *limits, gfp_t flags) { + TfwClientMem *cli_mem = &limits->cli_mem; + TfwAdaptiveLimitLock *req_lim = &limits->req_lim; int r; - cli_mem->mem = tfw_alloc_percpu_gfp(long, flags | __GFP_ZERO); + cli_mem->mem = tfw_alloc_percpu_gfp(s64, flags | __GFP_ZERO); if (unlikely(!cli_mem->mem)) return -ENOMEM; - r = tfw_percpu_ref_init(&cli_mem->refcnt, cli_mem_release, + r = tfw_adaptive_limit_lock_init(req_lim, flags | __GFP_ZERO); + if (unlikely(r)) + goto free_cli_mem; + + r = tfw_percpu_ref_init(&limits->refcnt, cli_adaptive_limits_release, PERCPU_REF_ALLOW_REINIT, flags); if (unlikely(r)) - goto free_per_cpu_mem; + goto destroy_req_lim; return 0; -free_per_cpu_mem: +destroy_req_lim: + tfw_adaptive_limit_lock_destroy(req_lim); +free_cli_mem: free_percpu(cli_mem->mem); - cli_mem->mem = NULL; - + return r; } static inline void -tfw_cli_mem_pool_exit(void) +tfw_cli_adaptive_limits_pool_exit(void) { - TfwClientMem *tmp, *curr = cli_mem_pool.free_list; + TfwClientAdaptiveLimits *tmp, *curr = + cli_adaptive_limits_pool.free_list; while (curr) { tmp = curr; curr = tmp->next_free; - __cli_mem_release(tmp); + __cli_adaptive_limits_release(tmp); } - free_pages((unsigned long)cli_mem_pool.mem, cli_mem_pool.order); - bzero_fast(&cli_mem_pool, sizeof(cli_mem_pool)); + free_pages((unsigned long)cli_adaptive_limits_pool.objs, + cli_adaptive_limits_pool.order); + bzero_fast(&cli_adaptive_limits_pool, + sizeof(cli_adaptive_limits_pool)); } /* - * Initialize cli_mem pool. + * Initialize cli_adaptive_limits pool. * - * Allocates a contiguous block of TfwClientMem objects and initializes each - * element, then builds a free list for fast allocation. + * Allocates a contiguous block of TfwClientAdaptiveLimits objects and + * initializes each element, then builds a free list for fast allocation. * * Steps: * - Validate pool size from configuration. * - Compute allocation order and clamp it to MAX_PAGE_ORDER. * - Allocate zeroed pages for the entire pool. - * - Initialize each TfwClientMem (per-cpu counters + refcnt + work). + * - Initialize each TfwClientAdaptiveLimits (per-cpu counters + + * refcnt + work). * - Link all objects into a singly-linked free list. * - * Provide fast allocations of `TfwClientMem` later. + * Provide fast allocations of `TfwClientAdaptiveLimits` later. */ static inline int -tfw_cli_mem_pool_init(void) +tfw_cli_adaptive_limits_pool_init(void) { - TfwClientMem *block, *tail = NULL; + TfwClientAdaptiveLimits *block, *tail = NULL; + unsigned long size; unsigned int i, order; int r; if (WARN_ON_ONCE(!client_cfg.lru_size)) return -EINVAL; - order = get_order(sizeof(TfwClientMem) * client_cfg.lru_size); + size = sizeof(TfwClientAdaptiveLimits) * client_cfg.lru_size; + order = get_order(size); if (order > MAX_PAGE_ORDER) order = MAX_PAGE_ORDER; - cli_mem_pool.order = order; - cli_mem_pool.mem = (TfwClientMem *)tfw__get_free_pages(GFP_KERNEL, + cli_adaptive_limits_pool.order = order; + cli_adaptive_limits_pool.objs = + (TfwClientAdaptiveLimits *)tfw__get_free_pages(GFP_KERNEL, order); - if (unlikely(!cli_mem_pool.mem)) + if (unlikely(!cli_adaptive_limits_pool.objs)) return -ENOMEM; /* @@ -262,27 +284,27 @@ tfw_cli_mem_pool_init(void) * 0 -> 1 -> ... -> N-1. * * This preserves the natural memory layout of the preallocated array, - * which is important because tfw_cli_mem_belongs_to_pool() relies on - * the pool being a contiguous range [mem, mem + size). + * which is important because tfw_cli_counters_belongs_to_pool() relies + * on the pool being a contiguous range [objs, objs + size). * * Using tail insertion avoids reversing the order (which would happen * with head insertion) and keeps allocation predictable and * cache-friendly. */ - block = cli_mem_pool.mem; + block = cli_adaptive_limits_pool.objs; for (i = 0; i < client_cfg.lru_size; i++) { - r = tfw_cli_mem_init(&block[i], GFP_KERNEL); + r = tfw_cli_adaptive_limits_init(&block[i], GFP_KERNEL); if (unlikely(r)) return r; - if (!cli_mem_pool.free_list) - cli_mem_pool.free_list = &block[i]; + if (!cli_adaptive_limits_pool.free_list) + cli_adaptive_limits_pool.free_list = &block[i]; else tail->next_free = &block[i]; block[i].next_free = NULL; tail = &block[i]; - cli_mem_pool.size++; + cli_adaptive_limits_pool.size++; } return 0; @@ -327,9 +349,9 @@ tfw_client_free(TdbRec *rec) * Tempesta FW shut down from `tfw_client_free_lru` */ WARN_ON(!list_empty(&cli->list)); - if (likely(cli->cli_mem)) { + if (likely(cli->limits)) { atomic_inc(&shutdown_pending); - if (!schedule_work(&cli->cli_mem->kill_work)) + if (!schedule_work(&cli->limits->kill_work)) atomic_dec(&shutdown_pending); } } @@ -408,49 +430,51 @@ tfw_client_addr_eq(TdbRec *rec, void *data) } /* - * Allocate cli_mem from slab cache and fully initialize it. + * Allocate client adaptive limits structure from slab cache and + * fully initialize it. * Used as a fallback when pool allocation is exhausted. */ -static inline TfwClientMem * -tfw_cli_mem_alloc_from_cache(void) +static inline TfwClientAdaptiveLimits * +tfw_cli_adaptive_limits_alloc_from_cache(void) { - TfwClientMem *cli_mem; + TfwClientAdaptiveLimits *limits; - cli_mem = kmem_cache_alloc(tfw_cli_mem_cache, GFP_ATOMIC); - if (unlikely(!cli_mem)) + limits = kmem_cache_alloc(tfw_cli_adaptive_limits_cache, GFP_ATOMIC); + if (unlikely(!limits)) return NULL; - if (unlikely(tfw_cli_mem_init(cli_mem, GFP_ATOMIC))) - goto free_cli_mem; + if (unlikely(tfw_cli_adaptive_limits_init(limits, GFP_ATOMIC))) + goto free_cli_adaptive_limits; - INIT_WORK(&cli_mem->kill_work, tfw_cli_mem_kill_work_fn); - return cli_mem; + INIT_WORK(&limits->kill_work, tfw_cli_adaptive_limits_kill_work_fn); -free_cli_mem: - kmem_cache_free(tfw_cli_mem_cache, cli_mem); + return limits; + +free_cli_adaptive_limits: + kmem_cache_free(tfw_cli_adaptive_limits_cache, limits); return NULL; } /* - * Allocate cli_mem: + * Allocate client adaptive limits structure: * - Try fast pool first, then fallback to slab cache. * - On success, take an extra refcnt reference before returning. */ -static inline TfwClientMem * -tfw_cli_mem_alloc(void) +static inline TfwClientAdaptiveLimits * +tfw_cli_adaptive_limits_alloc(void) { - TfwClientMem *cli_mem; + TfwClientAdaptiveLimits *limits; - cli_mem = tfw_cli_mem_pool_alloc(); - if (!cli_mem) - cli_mem = tfw_cli_mem_alloc_from_cache(); - if (unlikely(!cli_mem)) + limits = tfw_cli_adaptive_limits_pool_alloc(); + if (!limits) + limits = tfw_cli_adaptive_limits_alloc_from_cache(); + if (unlikely(!limits)) return NULL; - percpu_ref_get(&cli_mem->refcnt); + percpu_ref_get(&limits->refcnt); - return cli_mem; + return limits; } static int @@ -462,8 +486,8 @@ tfw_client_ent_init(TdbRec *rec, void *data) INIT_LIST_HEAD(&cli->list); - cli->cli_mem = tfw_cli_mem_alloc(); - if (unlikely(!cli->cli_mem)) + cli->limits = tfw_cli_adaptive_limits_alloc(); + if (unlikely(!cli->limits)) return -ENOMEM; assert_spin_locked(&client_db->ga_lock); @@ -554,6 +578,21 @@ tfw_client_obtain(TfwAddr addr, TfwAddr *xff_addr, TfwStr *user_agent, EXPORT_SYMBOL(tfw_client_obtain); ALLOW_ERROR_INJECTION(tfw_client_obtain, NULL); +void +tfw_client_filter_block_ip(TfwClient *cli) +{ + TfwVhost *dflt_vh = tfw_vhost_lookup_default(); + + if (WARN_ON_ONCE(!dflt_vh)) + return; + + if (dflt_vh->frang_gconf->ip_block) + tfw_filter_block_ip(cli, + dflt_vh->frang_gconf->ip_block_duration); + + tfw_vhost_put(dflt_vh); +} + /** * Beware: @fn is called under client hash bucket spin lock. * @@ -591,7 +630,7 @@ tfw_client_start(void) if (!client_db) return -EINVAL; - r = tfw_cli_mem_pool_init(); + r = tfw_cli_adaptive_limits_pool_init(); if (unlikely(r)) return r; @@ -609,7 +648,7 @@ tfw_client_stop(void) if (client_db) { tfw_client_free_lru(); wait_event(shutdown_wq, !atomic_read(&shutdown_pending)); - tfw_cli_mem_pool_exit(); + tfw_cli_adaptive_limits_pool_exit(); tdb_close(client_db); client_db = NULL; } @@ -657,11 +696,13 @@ TfwMod tfw_client_mod = { int __init tfw_client_init(void) { - tfw_cli_mem_cache = kmem_cache_create("tfw_cli_mem_cache", - sizeof(TfwClientMem), - 0, 0, NULL); - if (!tfw_cli_mem_cache) + tfw_cli_adaptive_limits_cache = + kmem_cache_create("tfw_cli_adaptive_limits_cache", + sizeof(TfwClientAdaptiveLimits), + 0, 0, NULL); + if (!tfw_cli_adaptive_limits_cache) return -ENOMEM; + tfw_mod_register(&tfw_client_mod); return 0; @@ -670,6 +711,6 @@ tfw_client_init(void) void tfw_client_exit(void) { - kmem_cache_destroy(tfw_cli_mem_cache); + kmem_cache_destroy(tfw_cli_adaptive_limits_cache); tfw_mod_unregister(&tfw_client_mod); } diff --git a/fw/client.h b/fw/client.h index a7535d536..16791523e 100644 --- a/fw/client.h +++ b/fw/client.h @@ -27,23 +27,36 @@ /* * Client memory accounting structure for Tempesta FW. - * - * @kill_work - Workqueue item used for asynchronous structure + * + * @mem - Per-CPU memory accounting storage. + */ +typedef struct tfw_client_mem_t { + s64 __percpu *mem; +} TfwClientMem; + +/* + * Structure to track different client statistic. + * + * @kill_work - workqueue item used for asynchronous structure * cleanup/destruction; - * @next_free - Pointer to the next free object in the freelist; - * @refcnt - Per-CPU reference counter. Provides scalable and + * @next_free - pointer to the next free object in the freelist; + * @refcnt - percpu reference counter. Provides scalable and * thread-safe reference tracking on SMP systems with * minimal contention; - * @mem - Per-CPU memory accounting storage. + * cli_mem - client memory accounting structure for Tempesta FW; + * req_lim - structure to track non-idempotent requests count in + * fly for the current client. Used in adaptive_limits + * module to collect statistic and z-score calculation; */ -typedef struct tfw_client_mem_t { +typedef struct tfw_adaptive_limits_t { union { - struct work_struct kill_work; - struct tfw_client_mem_t *next_free; + struct work_struct kill_work; + struct tfw_adaptive_limits_t *next_free; }; struct percpu_ref refcnt; - long __percpu *mem; -} TfwClientMem; + TfwClientMem cli_mem; + TfwAdaptiveLimitLock req_lim; +} TfwClientAdaptiveLimits; /** * Client descriptor. @@ -52,17 +65,17 @@ typedef struct tfw_client_mem_t { * Typically it's large and wastes memory in vain if * no any classification logic is used; * @list - entry in the lru list; - * @cli_mem - memory used by current client; * @conn_lim - structure to track active connections count in * for the current client. Used in adaptive_limits * module to collect statistic and z-score calculation; + * @limits - structure to track different client statistic; */ typedef struct { TFW_PEER_COMMON; TfwClassifierPrvt class_prvt; struct list_head list; - TfwClientMem *cli_mem; TfwAdaptiveLimit conn_lim; + TfwClientAdaptiveLimits *limits; } TfwClient; int tfw_client_init(void); @@ -75,11 +88,11 @@ void tfw_cli_conn_release(TfwCliConn *cli_conn); int tfw_cli_conn_send(TfwCliConn *cli_conn, TfwMsg *msg); int tfw_cli_conn_abort_all(void *data); void tfw_cli_abort_all(void); - void tfw_tls_connection_lost(TfwConn *conn); +void tfw_client_filter_block_ip(TfwClient *cli); #define CLIENT_MEM_FROM_CONN(conn) \ - ((TfwClient *)((TfwConn *)conn)->peer)->cli_mem + &((TfwClient *)((TfwConn *)conn)->peer)->limits->cli_mem static inline void tfw_client_adjust_mem(TfwClientMem *cli_mem, int delta) @@ -90,25 +103,25 @@ tfw_client_adjust_mem(TfwClientMem *cli_mem, int delta) static inline bool tfw_client_mem_get(TfwClientMem *cli_mem) { - return percpu_ref_tryget(&cli_mem->refcnt); + TfwClientAdaptiveLimits *limits = + container_of(cli_mem, TfwClientAdaptiveLimits, cli_mem); + + return percpu_ref_tryget(&limits->refcnt); } static inline void tfw_client_mem_put(TfwClientMem *cli_mem) { - percpu_ref_put(&cli_mem->refcnt); + TfwClientAdaptiveLimits *limits = + container_of(cli_mem, TfwClientAdaptiveLimits, cli_mem); + + percpu_ref_put(&limits->refcnt); } -static inline long +static inline s64 tfw_client_mem(TfwClientMem *cli_mem) { - long mem = 0; - int cpu; - - for_each_online_cpu(cpu) - mem += *(per_cpu_ptr(cli_mem->mem, cpu)); - - return mem; + return tfw_percpu_s64_counter_sum(cli_mem->mem); } #endif /* __TFW_CLIENT_H__ */ diff --git a/fw/http.c b/fw/http.c index 7cf2f90f0..222d4fce9 100644 --- a/fw/http.c +++ b/fw/http.c @@ -116,6 +116,7 @@ #include "websocket.h" #include "tf_filter.h" #include "tf_conf.h" +#include "adaptive_limits.h" #include "sync_socket.h" #include "lib/common.h" @@ -1357,6 +1358,19 @@ tfw_http_conn_nip_reset(TfwSrvConn *srv_conn) clear_bit(TFW_CONN_B_HASNIP, &srv_conn->flags); } +static inline void +tfw_http_adjust_nip_req(TfwHttpReq *req, int delta) +{ + TfwClient *cli = req->conn ? (TfwClient *)req->conn->peer : NULL; + TfwAdaptiveLimitLock *req_lim; + + if (unlikely(!cli)) + return; + + req_lim = &cli->limits->req_lim; + tfw_adaptive_limits_acc_req_num(req_lim, delta, &req->epoch); +} + /* * Put @req on the list of non-idempotent requests in @srv_conn. * Raise the flag saying that @srv_conn has non-idempotent requests. @@ -1364,6 +1378,7 @@ tfw_http_conn_nip_reset(TfwSrvConn *srv_conn) static inline void tfw_http_req_nip_enlist(TfwSrvConn *srv_conn, TfwHttpReq *req) { + tfw_http_adjust_nip_req(req, 1); BUG_ON(!list_empty(&req->nip_list)); list_add_tail(&req->nip_list, &srv_conn->nip_queue); set_bit(TFW_CONN_B_HASNIP, &srv_conn->flags); @@ -1382,6 +1397,7 @@ tfw_http_req_nip_delist(TfwSrvConn *srv_conn, TfwHttpReq *req) if (!list_empty(&req->nip_list)) { list_del_init(&req->nip_list); tfw_http_conn_nip_reset(srv_conn); + tfw_http_adjust_nip_req(req, -1); } } @@ -2902,8 +2918,8 @@ tfw_http_conn_msg_alloc(TfwConn *conn, TfwStream *stream) /* Can be equal to zero for health monitor requests. */ if (likely(hm->req->conn)) { - TfwClient *cli = (TfwClient *)hm->req->conn->peer; - TfwClientMem *cli_mem = cli->cli_mem; + TfwClientMem *cli_mem = + CLIENT_MEM_FROM_CONN(hm->req->conn); int delta = PAGE_SIZE << hm->pool->order; hm->pool->owner = cli_mem; @@ -3199,6 +3215,9 @@ tfw_http_conn_send(TfwConn *conn, TfwMsg *msg) static int tfw_http_conn_recv_finish(TfwConn *conn) { + TfwClient *cli = (TfwClient *)conn->peer; + TfwAdaptiveLimitLock *req_lim = &cli->limits->req_lim; + if (TFW_FSM_TYPE(conn->proto.type) == TFW_FSM_H2) tfw_h2_conn_recv_finish(conn); @@ -3210,6 +3229,14 @@ tfw_http_conn_recv_finish(TfwConn *conn) if (unlikely(frang_client_mem_limit((TfwCliConn *)conn, true))) return T_BLOCK_WITH_RST; + if (unlikely(!tfw_adaptive_limits_check_req_num(req_lim))) { + T_WARN_ADDR("Client connection dropped: non-idempotent" + " request z-score exceeded the configured" + " threshold\n", &cli->addr, TFW_NO_PORT); + tfw_client_filter_block_ip(cli); + return T_BLOCK_WITH_RST; + } + return 0; } diff --git a/fw/http.h b/fw/http.h index c77100f83..835362bf3 100644 --- a/fw/http.h +++ b/fw/http.h @@ -394,6 +394,9 @@ typedef struct { * @method_override - Overridden HTTP request method, passed in request headers; * @header_list_sz - total size of headers in bytes; * @headers_cnt - total headers count; + * @epoch - training epoch identifier. Used to ignore requests that + * belong to previous training epochs when updating statistics + * for the current training epoch; * * TfwStr members must be the first for efficient scanning. */ @@ -431,6 +434,7 @@ struct tfw_http_req_t { unsigned char method_override; unsigned int header_list_sz; unsigned int headers_cnt; + u16 epoch; }; #define TFW_IDX_BITS 24 @@ -604,7 +608,7 @@ tfw_http_msg_client_mem(TfwHttpMsg *msg) TfwCliConn *conn = (TfwCliConn *)(tfw_http_msg_is_req(msg) ? msg->conn : msg->pair->conn); - return ((TfwClient *)conn->peer)->cli_mem; + return &((TfwClient *)conn->peer)->limits->cli_mem; } static inline int diff --git a/fw/http_limits.c b/fw/http_limits.c index 7b5c223d0..d8895a0fb 100644 --- a/fw/http_limits.c +++ b/fw/http_limits.c @@ -1692,25 +1692,16 @@ int frang_client_mem_limit(TfwCliConn *conn, bool block_if_exceeded) { TfwClient *cli = (TfwClient *)conn->peer; - TfwVhost *dflt_vh; + TfwClientMem *cli_mem = &cli->limits->cli_mem; if (likely(!tfw_cli_hard_mem_limit - || tfw_client_mem(cli->cli_mem) <= tfw_cli_hard_mem_limit)) + || tfw_client_mem(cli_mem) <= tfw_cli_hard_mem_limit)) return 0; if (!block_if_exceeded) return T_BLOCK; - dflt_vh = tfw_vhost_lookup_default(); - if (WARN_ON_ONCE(!dflt_vh)) - return T_BLOCK; - - if (dflt_vh->frang_gconf->ip_block) { - unsigned int duration = dflt_vh->frang_gconf->ip_block_duration; - - tfw_filter_block_ip(cli, duration); - } - tfw_vhost_put(dflt_vh); + tfw_client_filter_block_ip(cli); return T_BLOCK; } diff --git a/fw/t/unit/helpers.c b/fw/t/unit/helpers.c index fb75b1cb2..65768b0fd 100644 --- a/fw/t/unit/helpers.c +++ b/fw/t/unit/helpers.c @@ -45,14 +45,16 @@ #include "tf_filter.h" #include "regex/kmod/rex.h" -static DEFINE_PER_CPU(long, mem); +static DEFINE_PER_CPU(s64, mem); unsigned int tfw_cli_max_concurrent_streams; TfwConn conn_req, conn_resp; -TfwClientMem cli_mem = { - .mem = &mem, +TfwClientAdaptiveLimits limits = { + .cli_mem = { + .mem = &mem, + } }; TfwClient client = { - .cli_mem = &cli_mem, + .limits = &limits, }; TfwHttpReq * @@ -66,7 +68,7 @@ test_req_alloc(size_t data_len) * tfw_http_msg_alloc(). It is removed because we need to test how it * initializes the message and we would not like to test the copy-paste. */ - hmreq = __tfw_http_msg_alloc(&cli_mem, Conn_HttpClnt, true); + hmreq = __tfw_http_msg_alloc(&limits.cli_mem, Conn_HttpClnt, true); BUG_ON(!hmreq); tfw_connection_init(&conn_req); @@ -113,7 +115,7 @@ test_resp_alloc_no_data(TfwHttpReq *req) { TfwHttpMsg *hmresp; - hmresp = __tfw_http_msg_alloc(&cli_mem, Conn_HttpSrv, true); + hmresp = __tfw_http_msg_alloc(&limits.cli_mem, Conn_HttpSrv, true); BUG_ON(!hmresp); tfw_connection_init(&conn_resp); @@ -526,6 +528,24 @@ tfh_get_records_rate(HttpTfh fingerprint) return 0; } +void tfw_adaptive_limits_acc_req_num(TfwAdaptiveLimitLock *limit, + int delta, u16 *epoch) +{ + +} + +bool +tfw_adaptive_limits_check_req_num(TfwAdaptiveLimitLock *limit) +{ + return true; +} + +void +tfw_client_filter_block_ip(TfwClient *cli) +{ + +} + TfwCfgSpec tf_hash_specs[0]; unsigned int cache_default_ttl = 60; diff --git a/fw/t/unit/test.c b/fw/t/unit/test.c index a490ee215..9a5825b1b 100644 --- a/fw/t/unit/test.c +++ b/fw/t/unit/test.c @@ -94,7 +94,7 @@ extern void tfw_pool_exit(void); static inline TfwClientMem * __tfw_client_mem_from_conn(TfwConn *conn) { - return ((TfwClient *)conn_req.peer)->cli_mem; + return &((TfwClient *)conn_req.peer)->limits->cli_mem; } int diff --git a/fw/t/unit/test_hpack.c b/fw/t/unit/test_hpack.c index c71a8a60a..a1d43d6d4 100644 --- a/fw/t/unit/test_hpack.c +++ b/fw/t/unit/test_hpack.c @@ -75,12 +75,14 @@ do { \ HDR_COMPOUND_STR(hdr_res, name, value); \ } while (0) -static DEFINE_PER_CPU(long, mem); -static TfwClientMem cli_mem = { - .mem = &mem, +static DEFINE_PER_CPU(s64, mem); +static TfwClientAdaptiveLimits limits = { + .cli_mem = { + .mem = &mem, + }, }; static TfwClient client = { - .cli_mem = &cli_mem, + .limits = &limits, }; static TfwH2Conn conn; static TfwH2Ctx *ctx; From e5c19ceec8b07d5ce4b3de1be5944729478996ba Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Wed, 29 Apr 2026 06:25:50 +0300 Subject: [PATCH 4/6] Update linux kernel patch Add per-socket training_epoch field to track the training generation for connection-related statistics. This allows associating socket events with a specific training period and prevents mixing measurements across training epochs when switching between TRAINING and DEFENCE modes. --- linux-6.12.12.patch | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/linux-6.12.12.patch b/linux-6.12.12.patch index dee42bd37..ae21bf531 100644 --- a/linux-6.12.12.patch +++ b/linux-6.12.12.patch @@ -1076,10 +1076,10 @@ index 0f3c58007..e2576c604 100644 diff --git a/include/linux/tempesta.h b/include/linux/tempesta.h new file mode 100644 -index 000000000..2269a39e8 +index 000000000..667d60f23 --- /dev/null +++ b/include/linux/tempesta.h -@@ -0,0 +1,66 @@ +@@ -0,0 +1,67 @@ +/** + * Linux interface for Tempesta FW. + * @@ -1121,6 +1121,7 @@ index 000000000..2269a39e8 + +struct socket_tempesta { + void *class_prvt; ++ u16 training_epoch; +}; + +extern struct lsm_blob_sizes tempesta_blob_sizes; From 4259769a55fbd58ef37ae9b636cdb3b476d84fdf Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Fri, 12 Jun 2026 22:09:12 +0300 Subject: [PATCH 5/6] Implement adaptive client CPU usage tracking Extend the adaptive limits framework to track per-client CPU usage during request/responce processing and use it as an additional overload detection metric. Introduce a CPU adaptive limit based on `TfwAdaptiveLimitLock` and integrate it into the existing training and defence infrastructure. Unlike request tracking, CPU usage is accumulated using an exponential moving average (EMA), which provides a stable estimate of client CPU consumption without introducing synchronization overhead. (A simple counter would grow monotonically throughout the lifetime of a client, making it unsuitable for anomaly detection. The EMA provides a bounded and continuously adapting estimate of recent CPU activity). CPU usage is tracked in two places: - Measure processing time by recording CPU cycles at the beginning of `ss_tcp_process_data()` and calculating the elapsed time in the `conn_recv_finish` callback after all received data has been processed. The measured delta is used to update the client's CPU usage statistics. (This is a primary accounting path). - CPU usage is also accounted during response processing in `tfw_http_msg_process_generic`. In this case, CPU cycles are measured at the function entry and exit. During training, aggregate per-CPU EMA values, update the recorded maximum CPU usage, and adjust the global statistical model. During defence mode, calculate the client's CPU usage z-score and drop the connection when it exceeds the configured threshold. Reuse the existing adaptive limits infrastructure and IP blocking mechanism for enforcement. Part-of: training/defence mode implementation Issue: #1346 --- fw/adaptive_limits.c | 102 ++++++++++++++++++++++++++++++++++++++++--- fw/adaptive_limits.h | 2 + fw/client.c | 11 ++++- fw/client.h | 21 +++++++-- fw/connection.c | 4 +- fw/connection.h | 4 +- fw/http.c | 32 +++++++++++++- fw/sock.c | 3 +- fw/sync_socket.h | 2 +- fw/t/unit/helpers.c | 11 +++++ fw/tls.c | 5 ++- fw/websocket.c | 4 +- 12 files changed, 179 insertions(+), 22 deletions(-) diff --git a/fw/adaptive_limits.c b/fw/adaptive_limits.c index 5b79fa53a..1881ba138 100644 --- a/fw/adaptive_limits.c +++ b/fw/adaptive_limits.c @@ -338,6 +338,12 @@ tfw_adaptive_limits_adjust_req_new_client(void) return tfw_adaptive_limits_adjust_new_client(g_req_num); } +static void +tfw_adaptive_limits_adjust_cpu_new_client(void) +{ + return tfw_adaptive_limits_adjust_new_client(g_cpu_num); +} + static inline void tfw_adaptive_limits_adjust_new_el(struct stats __rcu *g_stats, u64 delta1, u128 delta2) @@ -367,6 +373,12 @@ tfw_adaptive_limits_adjust_req_num(u64 delta1, u128 delta2) return tfw_adaptive_limits_adjust_new_el(g_req_num, delta1, delta2); } +static void +tfw_adaptive_limits_adjust_cpu(u64 delta1, u128_acc delta2) +{ + return tfw_adaptive_limits_adjust_new_el(g_cpu_num, delta1, delta2); +} + /** * Perform z-score based defence check * @@ -437,6 +449,14 @@ tfw_adaptive_limits_check_and_set_epoch(u16 *epoch, bool new_event) return true; } +static inline bool +tfw_adaptive_limits_defence_cpu(u64 val) +{ + int threshold = tfw_adaptive_limits_z_score_cpu; + + return tfw_adaptive_limits_defence(g_cpu_num, val, threshold); +} + bool tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, u16 *epoch) @@ -547,9 +567,21 @@ tfw_adaptive_limits_change_epoch(TfwAdaptiveLimitLock *limit) return new_client; } +static void +__tfw_adaptive_limits_acc(TfwAdaptiveLimitLock *limit, int delta, + void (*adjust_new_client)(void), + void (*add)(TfwAdaptiveLimitLock *limit, int delta)) +{ + if (tfw_adaptive_limits_mode_is_training() + && tfw_adaptive_limits_change_epoch(limit)) + adjust_new_client(); + add(limit, delta); +} + static void tfw_adaptive_limits_acc(TfwAdaptiveLimitLock *limit, int delta, void (*adjust_new_client)(void), + void (*add)(TfwAdaptiveLimitLock *limit, int delta), u16 *epoch) { bool new_event; @@ -567,24 +599,53 @@ tfw_adaptive_limits_acc(TfwAdaptiveLimitLock *limit, int delta, if (!tfw_adaptive_limits_check_and_set_epoch(epoch, new_event)) goto out; - if (tfw_adaptive_limits_mode_is_training() - && tfw_adaptive_limits_change_epoch(limit)) - adjust_new_client(); - - this_cpu_add(*limit->counter, delta); + __tfw_adaptive_limits_acc(limit, delta, adjust_new_client, add); out: rcu_read_unlock(); } +static inline void +tfw_adaptive_limits_counter_add(TfwAdaptiveLimitLock *limit, int delta) +{ + this_cpu_add(*limit->counter, delta); +} + +static inline void +tfw_adaptive_limits_counter_add_ema(TfwAdaptiveLimitLock *limit, int delta) +{ + s64 *ema = this_cpu_ptr(limit->counter); + static const unsigned int ema_alpha_shift = 4; + + *ema += ((s64)delta - *ema) >> ema_alpha_shift; +} + void tfw_adaptive_limits_acc_req_num(TfwAdaptiveLimitLock *limit, int delta, u16 *epoch) { void (*adjust_new_client)(void) = tfw_adaptive_limits_adjust_req_new_client; + void (*add)(TfwAdaptiveLimitLock *limit, int delta) = + tfw_adaptive_limits_counter_add; + + tfw_adaptive_limits_acc(limit, delta, adjust_new_client, add, epoch); +} + +void +tfw_adaptive_limits_acc_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin) +{ + void (*adjust_new_client)(void) = + tfw_adaptive_limits_adjust_cpu_new_client; + void (*add)(TfwAdaptiveLimitLock *limit, int delta) = + tfw_adaptive_limits_counter_add_ema; + u64 delta; + + if (tfw_adaptive_limits_mode_is_disabled()) + return; - tfw_adaptive_limits_acc(limit, delta, adjust_new_client, epoch); + delta = get_cycles() - time_begin; + __tfw_adaptive_limits_acc(limit, delta, adjust_new_client, add); } static inline bool @@ -628,7 +689,14 @@ tfw_adaptive_limits_check(TfwAdaptiveLimitLock *limit, goto out; curr = tfw_percpu_s64_counter_sum(limit->counter); - WARN_ON(curr < 0); + /* + * We don't track epochs for CPU usage because there is no + * suitable structure to associate an epoch with CPU samples. + * + * Moreover, CPU usage is tracked using an EMA, which may + * become negative. + */ + WARN_ON(curr < 0 && adjust_num != tfw_adaptive_limits_adjust_cpu); if (tfw_adaptive_limits_mode_is_defence()) { rc = defence(curr); goto out; @@ -653,6 +721,26 @@ tfw_adaptive_limits_check_req_num(TfwAdaptiveLimitLock *limit) return tfw_adaptive_limits_check(limit, adjust_num, defence); } +bool +tfw_adaptive_limits_check_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin) +{ + u64 delta = get_cycles() - time_begin; + void (*adjust_new_client)(void) = + tfw_adaptive_limits_adjust_cpu_new_client; + void (*add)(TfwAdaptiveLimitLock *limit, int delta) = + tfw_adaptive_limits_counter_add_ema; + void (*adjust_num)(u64, u128_acc) = + tfw_adaptive_limits_adjust_cpu; + bool (*defence)(u64) = tfw_adaptive_limits_defence_cpu; + + if (tfw_adaptive_limits_mode_is_disabled()) + return true; + + __tfw_adaptive_limits_acc(limit, delta, adjust_new_client, add); + + return tfw_adaptive_limits_check(limit, adjust_num, defence); +} + static inline void tfw_adaptive_limits_prepare_for_defence(void) { diff --git a/fw/adaptive_limits.h b/fw/adaptive_limits.h index 0d5d8e017..4cde6ebbc 100644 --- a/fw/adaptive_limits.h +++ b/fw/adaptive_limits.h @@ -186,7 +186,9 @@ bool tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, u16 *epoch); void tfw_adaptive_limits_acc_req_num(TfwAdaptiveLimitLock *limit, int delta, u16 *epoch); +void tfw_adaptive_limits_acc_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin); bool tfw_adaptive_limits_check_req_num(TfwAdaptiveLimitLock *limit); +bool tfw_adaptive_limits_check_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin); int tfw_ctlfn_adaptive_limits_mode_change(unsigned int mode); int tfw_adaptive_limit_lock_init(TfwAdaptiveLimitLock *limit, gfp_t flags); diff --git a/fw/client.c b/fw/client.c index d47bd2291..1d6da4902 100644 --- a/fw/client.c +++ b/fw/client.c @@ -95,6 +95,7 @@ __cli_adaptive_limits_release(TfwClientAdaptiveLimits *limits) percpu_ref_exit(&limits->refcnt); free_percpu(limits->cli_mem.mem); tfw_adaptive_limit_lock_destroy(&limits->req_lim); + tfw_adaptive_limit_lock_destroy(&limits->cpu_lim); if (!tfw_cli_adaptive_limits_belongs_to_pool(limits)) kmem_cache_free(tfw_cli_adaptive_limits_cache, limits); } @@ -113,6 +114,7 @@ tfw_cli_adaptive_limits_pool_free(TfwClientAdaptiveLimits *limits) for_each_online_cpu(cpu) { *per_cpu_ptr(limits->cli_mem.mem, cpu) = 0; *per_cpu_ptr(limits->req_lim.counter, cpu) = 0; + *per_cpu_ptr(limits->cpu_lim.counter, cpu) = 0; } percpu_ref_reinit(&limits->refcnt); limits->next_free = cli_adaptive_limits_pool.free_list; @@ -197,6 +199,7 @@ tfw_cli_adaptive_limits_init(TfwClientAdaptiveLimits *limits, gfp_t flags) { TfwClientMem *cli_mem = &limits->cli_mem; TfwAdaptiveLimitLock *req_lim = &limits->req_lim; + TfwAdaptiveLimitLock *cpu_lim = &limits->cpu_lim; int r; cli_mem->mem = tfw_alloc_percpu_gfp(s64, flags | __GFP_ZERO); @@ -207,13 +210,19 @@ tfw_cli_adaptive_limits_init(TfwClientAdaptiveLimits *limits, gfp_t flags) if (unlikely(r)) goto free_cli_mem; + r = tfw_adaptive_limit_lock_init(cpu_lim, flags | __GFP_ZERO); + if (unlikely(r)) + goto destroy_req_lim; + r = tfw_percpu_ref_init(&limits->refcnt, cli_adaptive_limits_release, PERCPU_REF_ALLOW_REINIT, flags); if (unlikely(r)) - goto destroy_req_lim; + goto destroy_cpu_lim; return 0; +destroy_cpu_lim: + tfw_adaptive_limit_lock_destroy(cpu_lim); destroy_req_lim: tfw_adaptive_limit_lock_destroy(req_lim); free_cli_mem: diff --git a/fw/client.h b/fw/client.h index 16791523e..a57043fe7 100644 --- a/fw/client.h +++ b/fw/client.h @@ -56,6 +56,7 @@ typedef struct tfw_adaptive_limits_t { struct percpu_ref refcnt; TfwClientMem cli_mem; TfwAdaptiveLimitLock req_lim; + TfwAdaptiveLimitLock cpu_lim; } TfwClientAdaptiveLimits; /** @@ -91,8 +92,10 @@ void tfw_cli_abort_all(void); void tfw_tls_connection_lost(TfwConn *conn); void tfw_client_filter_block_ip(TfwClient *cli); +#define CLIENT_LIMITS_FROM_CONN(conn) \ + ((TfwClient *)((TfwConn *)conn)->peer)->limits #define CLIENT_MEM_FROM_CONN(conn) \ - &((TfwClient *)((TfwConn *)conn)->peer)->limits->cli_mem + &CLIENT_LIMITS_FROM_CONN(conn)->cli_mem static inline void tfw_client_adjust_mem(TfwClientMem *cli_mem, int delta) @@ -100,13 +103,25 @@ tfw_client_adjust_mem(TfwClientMem *cli_mem, int delta) this_cpu_add(*cli_mem->mem, delta); } +static inline bool +tfw_client_limits_get(TfwClientAdaptiveLimits *limits) +{ + return percpu_ref_tryget(&limits->refcnt); +} + +static inline void +tfw_client_limits_put(TfwClientAdaptiveLimits *limits) +{ + percpu_ref_put(&limits->refcnt); +} + static inline bool tfw_client_mem_get(TfwClientMem *cli_mem) { TfwClientAdaptiveLimits *limits = container_of(cli_mem, TfwClientAdaptiveLimits, cli_mem); - return percpu_ref_tryget(&limits->refcnt); + return tfw_client_limits_get(limits); } static inline void @@ -115,7 +130,7 @@ tfw_client_mem_put(TfwClientMem *cli_mem) TfwClientAdaptiveLimits *limits = container_of(cli_mem, TfwClientAdaptiveLimits, cli_mem); - percpu_ref_put(&limits->refcnt); + tfw_client_limits_put(limits); } static inline s64 diff --git a/fw/connection.c b/fw/connection.c index d876d4b3a..f01d4ea35 100644 --- a/fw/connection.c +++ b/fw/connection.c @@ -205,9 +205,9 @@ tfw_connection_recv(TfwConn *conn, struct sk_buff *skb) } int -tfw_connection_recv_finish(TfwConn *conn) +tfw_connection_recv_finish(TfwConn *conn, u64 time_begin) { - return TFW_CONN_HOOK_CALL(conn, conn_recv_finish); + return TFW_CONN_HOOK_CALL(conn, conn_recv_finish, time_begin); } void diff --git a/fw/connection.h b/fw/connection.h index fca74a0a5..5813375ea 100644 --- a/fw/connection.h +++ b/fw/connection.h @@ -360,7 +360,7 @@ typedef struct { /* * Called after processing all socket received queue. */ - int (*conn_recv_finish)(TfwConn *conn); + int (*conn_recv_finish)(TfwConn *conn, u64 time_begin); } TfwConnHooks; #define TFW_CONN_MAX_PROTOS TFW_GFSM_FSM_N @@ -630,7 +630,7 @@ void tfw_connection_hooks_register(TfwConnHooks *hooks, int type); void tfw_connection_hooks_unregister(int type); int tfw_connection_send(TfwConn *conn, TfwMsg *msg); int tfw_connection_recv(TfwConn *conn, struct sk_buff *skb); -int tfw_connection_recv_finish(TfwConn *conn); +int tfw_connection_recv_finish(TfwConn *conn, u64 time_begin); /* Generic helpers, used for both client and server connections. */ void tfw_connection_init(TfwConn *conn); diff --git a/fw/http.c b/fw/http.c index 222d4fce9..61664a331 100644 --- a/fw/http.c +++ b/fw/http.c @@ -3213,10 +3213,11 @@ tfw_http_conn_send(TfwConn *conn, TfwMsg *msg) } static int -tfw_http_conn_recv_finish(TfwConn *conn) +tfw_http_conn_recv_finish(TfwConn *conn, u64 time_begin) { TfwClient *cli = (TfwClient *)conn->peer; TfwAdaptiveLimitLock *req_lim = &cli->limits->req_lim; + TfwAdaptiveLimitLock *cpu_lim = &cli->limits->cpu_lim; if (TFW_FSM_TYPE(conn->proto.type) == TFW_FSM_H2) tfw_h2_conn_recv_finish(conn); @@ -3237,6 +3238,14 @@ tfw_http_conn_recv_finish(TfwConn *conn) return T_BLOCK_WITH_RST; } + if (unlikely(!tfw_adaptive_limits_check_cpu(cpu_lim, time_begin))) { + T_WARN_ADDR("Client connection dropped: client cpu" + " usage z-score exceeded the configured" + " threshold\n", &cli->addr, TFW_NO_PORT); + tfw_client_filter_block_ip(cli); + return T_BLOCK_WITH_RST; + } + return 0; } @@ -7827,7 +7836,9 @@ int tfw_http_msg_process_generic(TfwConn *conn, TfwStream *stream, struct sk_buff *skb, struct sk_buff **next) { + u64 time_begin = get_cycles(); int r = T_BAD; + TfwClientAdaptiveLimits *limits; TfwHttpMsg *req; bool websocket; @@ -7857,6 +7868,20 @@ tfw_http_msg_process_generic(TfwConn *conn, TfwStream *stream, * so we cannot move it iside `if` clause. */ req = ((TfwHttpMsg *)stream->msg)->pair; websocket = test_bit(TFW_HTTP_B_UPGRADE_WEBSOCKET, req->flags); + limits = req->conn ? CLIENT_LIMITS_FROM_CONN(req->conn) : NULL; + if (likely(limits)) { + /* + * `tfw_client_limits_get` returns false, if we already + * call `percpu_ref_kill` for `limits` after client + * was freed. We hold the client reference counter + * here due to active connections, so this situation + * is impossible, moreover, false result here means + * memory corruption, since we access `limits` for + * already released client. + */ + BUG_ON(!tfw_client_limits_get(limits)); + } + if ((r = tfw_http_resp_process(conn, stream, skb, next))) { TfwSrvConn *srv_conn = (TfwSrvConn *)conn; /* @@ -7868,6 +7893,11 @@ tfw_http_msg_process_generic(TfwConn *conn, TfwStream *stream, clear_bit(TFW_CONN_B_UNSCHED, &srv_conn->flags); } + if (likely(limits)) { + tfw_adaptive_limits_acc_cpu(&limits->cpu_lim, time_begin); + tfw_client_limits_put(limits); + } + return r; err: diff --git a/fw/sock.c b/fw/sock.c index d66ace4ea..4b4c15a80 100644 --- a/fw/sock.c +++ b/fw/sock.c @@ -1063,6 +1063,7 @@ ss_tcp_process_data(struct sock *sk) unsigned int skb_len, skb_seq; struct sk_buff *skb, *tmp; struct tcp_sock *tp = tcp_sk(sk); + u64 time_begin = get_cycles(); skb_queue_walk_safe(&sk->sk_receive_queue, skb, tmp) { if (unlikely(before(tp->copied_seq, TCP_SKB_CB(skb)->seq))) { @@ -1094,7 +1095,7 @@ ss_tcp_process_data(struct sock *sk) skb_len); } out: - tmp_r = SS_CALL(connection_recv_finish, sk->sk_user_data); + tmp_r = SS_CALL(connection_recv_finish, sk->sk_user_data, time_begin); if (unlikely(tfw_error_code_more_crucial(tmp_r, r))) r = tmp_r; diff --git a/fw/sync_socket.h b/fw/sync_socket.h index 8fcbd9ccc..5d285273e 100644 --- a/fw/sync_socket.h +++ b/fw/sync_socket.h @@ -91,7 +91,7 @@ typedef struct ss_hooks { int (*connection_recv)(TfwConn *conn, struct sk_buff *skb); /* Callback to make some job after processing received data. */ - int (*connection_recv_finish)(TfwConn *conn); + int (*connection_recv_finish)(TfwConn *conn, u64 time_begin); /* Callback to make some job on connection shutdown. */ void (*connection_on_shutdown)(TfwConn *conn); diff --git a/fw/t/unit/helpers.c b/fw/t/unit/helpers.c index 65768b0fd..ff2480820 100644 --- a/fw/t/unit/helpers.c +++ b/fw/t/unit/helpers.c @@ -534,12 +534,23 @@ void tfw_adaptive_limits_acc_req_num(TfwAdaptiveLimitLock *limit, } +void tfw_adaptive_limits_acc_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin) +{ + +} + bool tfw_adaptive_limits_check_req_num(TfwAdaptiveLimitLock *limit) { return true; } +bool +tfw_adaptive_limits_check_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin) +{ + return true; +} + void tfw_client_filter_block_ip(TfwClient *cli) { diff --git a/fw/tls.c b/fw/tls.c index aa0ef03a2..9cdc41e1d 100644 --- a/fw/tls.c +++ b/fw/tls.c @@ -804,9 +804,10 @@ tfw_tls_conn_send(TfwConn *c, TfwMsg *msg) } static int -tfw_tls_conn_recv_finish(TfwConn *c) +tfw_tls_conn_recv_finish(TfwConn *c, u64 time_begin) { - return tfw_conn_hook_call(TFW_FSM_HTTP, c, conn_recv_finish); + return tfw_conn_hook_call(TFW_FSM_HTTP, c, conn_recv_finish, + time_begin); } static TfwConnHooks tls_conn_hooks = { diff --git a/fw/websocket.c b/fw/websocket.c index 3c813b3a2..a02955a45 100644 --- a/fw/websocket.c +++ b/fw/websocket.c @@ -358,10 +358,10 @@ tfw_ws_conn_send(TfwConn *conn, TfwMsg *msg) } static int -tfw_ws_conn_recv_finish(TfwConn *conn) +tfw_ws_conn_recv_finish(TfwConn *conn, u64 time_begin) { return tfw_conn_hook_call(TFW_CONN_HTTP_TYPE(conn), conn, - conn_recv_finish); + conn_recv_finish, time_begin); } static TfwConnHooks ws_conn_hooks = { From 6652af4b45fb747fa03ef4e2ee37f1d008d78c63 Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Wed, 17 Jun 2026 16:47:57 +0300 Subject: [PATCH 6/6] Implement client memory usage tracking Use training library for client memory usage tracking. Use `TfwAdaptiveLimitLock` structure for client memory usage tracking. In defence mode in `tfw_http_conn_recv_finish` callback calculate z-score, compare it with configured `threshold` and drop client connection if necessary (same as we do for non-idempotent requests). Current approach with per-cpu request accounting prevent performance degradation. Pay attention that we also adjust memory usage in per-cpu `mem` storage to check `soft` and `hard` mem limits. We should do it in other storage, because we zero `TfwAdaptiveLimitLock` on the start of the new training and do not account events from previous trainging in `TfwAdaptiveLimitLock`. Performance measurements for the whole patchset were made and show no measurable regression: Training: finished in 50.03s, 1205382.84 req/s, 933.22MB/s finished in 50.03s, 1206352.90 req/s, 935.01MB/s finished in 50.03s, 1212849.66 req/s, 940.37MB/s Defense: finished in 50.03s, 1202041.02 req/s, 931.99MB/s finished in 50.03s, 1221799.64 req/s, 947.31MB/s finished in 50.02s, 1214020.14 req/s, 941.28MB/s Master: finished in 50.03s, 1204474.98 req/s, 932.55MB/s finished in 50.03s, 1214912.74 req/s, 941.36MB/s finished in 50.03s, 1221197.26 req/s, 946.84MB/s Part-of: training/defence mode implementation Issue: #1346 --- fw/adaptive_limits.c | 79 ++++++++++++++++++++++++++++++++++++++++---- fw/adaptive_limits.h | 3 ++ fw/client.c | 45 +++++++++++++++++++++---- fw/client.h | 25 +++++++++++--- fw/http.c | 21 ++++++++++-- fw/pool.c | 38 +++++++++++++-------- fw/pool.h | 14 +++++--- fw/ss_skb.c | 16 +++++++-- fw/ss_skb.h | 6 ++-- fw/t/unit/helpers.c | 11 ++++++ 10 files changed, 215 insertions(+), 43 deletions(-) diff --git a/fw/adaptive_limits.c b/fw/adaptive_limits.c index 1881ba138..a55eba2b8 100644 --- a/fw/adaptive_limits.c +++ b/fw/adaptive_limits.c @@ -338,6 +338,12 @@ tfw_adaptive_limits_adjust_req_new_client(void) return tfw_adaptive_limits_adjust_new_client(g_req_num); } +static void +tfw_adaptive_limits_adjust_mem_new_client(void) +{ + return tfw_adaptive_limits_adjust_new_client(g_mem_num); +} + static void tfw_adaptive_limits_adjust_cpu_new_client(void) { @@ -374,7 +380,13 @@ tfw_adaptive_limits_adjust_req_num(u64 delta1, u128 delta2) } static void -tfw_adaptive_limits_adjust_cpu(u64 delta1, u128_acc delta2) +tfw_adaptive_limits_adjust_mem(u64 delta1, u128 delta2) +{ + return tfw_adaptive_limits_adjust_new_el(g_mem_num, delta1, delta2); +} + +static void +tfw_adaptive_limits_adjust_cpu(u64 delta1, u128 delta2) { return tfw_adaptive_limits_adjust_new_el(g_cpu_num, delta1, delta2); } @@ -449,6 +461,14 @@ tfw_adaptive_limits_check_and_set_epoch(u16 *epoch, bool new_event) return true; } +static inline bool +tfw_adaptive_limits_defence_mem(u64 val) +{ + int threshold = tfw_adaptive_limits_z_score_mem; + + return tfw_adaptive_limits_defence(g_mem_num, val, threshold); +} + static inline bool tfw_adaptive_limits_defence_cpu(u64 val) { @@ -632,6 +652,17 @@ tfw_adaptive_limits_acc_req_num(TfwAdaptiveLimitLock *limit, int delta, tfw_adaptive_limits_acc(limit, delta, adjust_new_client, add, epoch); } +void +tfw_adaptive_limits_acc_mem(TfwAdaptiveLimitLock *limit, int delta, u16 *epoch) +{ + void (*adjust_new_client)(void) = + tfw_adaptive_limits_adjust_mem_new_client; + void (*add)(TfwAdaptiveLimitLock *limit, int delta) = + tfw_adaptive_limits_counter_add; + + tfw_adaptive_limits_acc(limit, delta, adjust_new_client, add, epoch); +} + void tfw_adaptive_limits_acc_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin) { @@ -648,8 +679,21 @@ tfw_adaptive_limits_acc_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin) __tfw_adaptive_limits_acc(limit, delta, adjust_new_client, add); } +static inline s64 +tfw_adaptive_limits_no_convert(s64 val) +{ + return val; +} + +static inline s64 +tfw_adaptive_limits_page_convert(s64 val) +{ + return val >> PAGE_SHIFT; +} + static inline bool -tfw_adaptive_limits_change_max(TfwAdaptiveLimitLock *limit, s64 curr, +tfw_adaptive_limits_change_max(TfwAdaptiveLimitLock *limit, + s64 (*convert_val)(s64), s64 curr, u64 *delta1, u128 *delta2) { s64 old_max = atomic64_read(&limit->max); @@ -663,6 +707,9 @@ tfw_adaptive_limits_change_max(TfwAdaptiveLimitLock *limit, s64 curr, return false; } while (!atomic64_try_cmpxchg(&limit->max, &old_max, curr)); + curr = convert_val(curr); + old_max = convert_val(old_max); + *delta1 = ((u64)curr - (u64)old_max); *delta2 = (u128)curr * (u128) curr - (u128)old_max * (u128)old_max; @@ -671,6 +718,7 @@ tfw_adaptive_limits_change_max(TfwAdaptiveLimitLock *limit, s64 curr, static bool tfw_adaptive_limits_check(TfwAdaptiveLimitLock *limit, + s64 (*convert_val)(s64), void (*adjust_num)(u64, u128), bool(*defence)(u64)) { @@ -698,11 +746,12 @@ tfw_adaptive_limits_check(TfwAdaptiveLimitLock *limit, */ WARN_ON(curr < 0 && adjust_num != tfw_adaptive_limits_adjust_cpu); if (tfw_adaptive_limits_mode_is_defence()) { - rc = defence(curr); + rc = defence(convert_val(curr)); goto out; } - if (tfw_adaptive_limits_change_max(limit, curr, &delta1, &delta2)) + if (tfw_adaptive_limits_change_max(limit, convert_val, curr, &delta1, + &delta2)) adjust_num(delta1, delta2); out: @@ -717,8 +766,22 @@ tfw_adaptive_limits_check_req_num(TfwAdaptiveLimitLock *limit) void (*adjust_num)(u64, u128) = tfw_adaptive_limits_adjust_req_num; bool (*defence)(u64) = tfw_adaptive_limits_defence_req_num; + s64 (*convert_val)(s64) = tfw_adaptive_limits_no_convert; - return tfw_adaptive_limits_check(limit, adjust_num, defence); + return tfw_adaptive_limits_check(limit, convert_val, adjust_num, + defence); +} + +bool +tfw_adaptive_limits_check_mem(TfwAdaptiveLimitLock *limit) +{ + void (*adjust_num)(u64, u128) = + tfw_adaptive_limits_adjust_mem; + bool (*defence)(u64) = tfw_adaptive_limits_defence_mem; + s64 (*convert_val)(s64) = tfw_adaptive_limits_page_convert; + + return tfw_adaptive_limits_check(limit, convert_val, adjust_num, + defence); } bool @@ -729,16 +792,18 @@ tfw_adaptive_limits_check_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin) tfw_adaptive_limits_adjust_cpu_new_client; void (*add)(TfwAdaptiveLimitLock *limit, int delta) = tfw_adaptive_limits_counter_add_ema; - void (*adjust_num)(u64, u128_acc) = + void (*adjust_num)(u64, u128) = tfw_adaptive_limits_adjust_cpu; bool (*defence)(u64) = tfw_adaptive_limits_defence_cpu; + s64 (*convert_val)(s64) = tfw_adaptive_limits_no_convert; if (tfw_adaptive_limits_mode_is_disabled()) return true; __tfw_adaptive_limits_acc(limit, delta, adjust_new_client, add); - return tfw_adaptive_limits_check(limit, adjust_num, defence); + return tfw_adaptive_limits_check(limit, convert_val, adjust_num, + defence); } static inline void diff --git a/fw/adaptive_limits.h b/fw/adaptive_limits.h index 4cde6ebbc..7e86f453a 100644 --- a/fw/adaptive_limits.h +++ b/fw/adaptive_limits.h @@ -186,8 +186,11 @@ bool tfw_adaptive_limits_check_conn_num(TfwAdaptiveLimit *limit, int delta, u16 *epoch); void tfw_adaptive_limits_acc_req_num(TfwAdaptiveLimitLock *limit, int delta, u16 *epoch); +void tfw_adaptive_limits_acc_mem(TfwAdaptiveLimitLock *limit, + int delta, u16 *epoch); void tfw_adaptive_limits_acc_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin); bool tfw_adaptive_limits_check_req_num(TfwAdaptiveLimitLock *limit); +bool tfw_adaptive_limits_check_mem(TfwAdaptiveLimitLock *limit); bool tfw_adaptive_limits_check_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin); int tfw_ctlfn_adaptive_limits_mode_change(unsigned int mode); diff --git a/fw/client.c b/fw/client.c index 1d6da4902..12d32331f 100644 --- a/fw/client.c +++ b/fw/client.c @@ -81,6 +81,36 @@ static struct { unsigned int order; } cli_adaptive_limits_pool; +static inline int +tfw_client_mem_init(TfwClientMem *cli_mem, gfp_t flags) +{ + int r; + + flags |= __GFP_ZERO; + + cli_mem->mem = tfw_alloc_percpu_gfp(s64, flags); + if (unlikely(!cli_mem->mem)) + return -ENOMEM; + + r = tfw_adaptive_limit_lock_init(&cli_mem->mem_lim, flags); + if (unlikely(r)) + goto free_cli_mem; + + return 0; + +free_cli_mem: + free_percpu(cli_mem->mem); + + return r; +} + +static inline void +tfw_client_mem_destroy(TfwClientMem *cli_mem) +{ + tfw_adaptive_limit_lock_destroy(&cli_mem->mem_lim); + free_percpu(cli_mem->mem); +} + static inline bool tfw_cli_adaptive_limits_belongs_to_pool(TfwClientAdaptiveLimits *limits) { @@ -93,9 +123,9 @@ static void __cli_adaptive_limits_release(TfwClientAdaptiveLimits *limits) { percpu_ref_exit(&limits->refcnt); - free_percpu(limits->cli_mem.mem); tfw_adaptive_limit_lock_destroy(&limits->req_lim); tfw_adaptive_limit_lock_destroy(&limits->cpu_lim); + tfw_client_mem_destroy(&limits->cli_mem); if (!tfw_cli_adaptive_limits_belongs_to_pool(limits)) kmem_cache_free(tfw_cli_adaptive_limits_cache, limits); } @@ -115,6 +145,7 @@ tfw_cli_adaptive_limits_pool_free(TfwClientAdaptiveLimits *limits) *per_cpu_ptr(limits->cli_mem.mem, cpu) = 0; *per_cpu_ptr(limits->req_lim.counter, cpu) = 0; *per_cpu_ptr(limits->cpu_lim.counter, cpu) = 0; + *per_cpu_ptr(limits->cli_mem.mem_lim.counter, cpu) = 0; } percpu_ref_reinit(&limits->refcnt); limits->next_free = cli_adaptive_limits_pool.free_list; @@ -202,13 +233,13 @@ tfw_cli_adaptive_limits_init(TfwClientAdaptiveLimits *limits, gfp_t flags) TfwAdaptiveLimitLock *cpu_lim = &limits->cpu_lim; int r; - cli_mem->mem = tfw_alloc_percpu_gfp(s64, flags | __GFP_ZERO); - if (unlikely(!cli_mem->mem)) - return -ENOMEM; + r = tfw_client_mem_init(cli_mem, flags); + if (unlikely(r)) + return r; r = tfw_adaptive_limit_lock_init(req_lim, flags | __GFP_ZERO); if (unlikely(r)) - goto free_cli_mem; + goto destroy_cli_mem; r = tfw_adaptive_limit_lock_init(cpu_lim, flags | __GFP_ZERO); if (unlikely(r)) @@ -225,8 +256,8 @@ tfw_cli_adaptive_limits_init(TfwClientAdaptiveLimits *limits, gfp_t flags) tfw_adaptive_limit_lock_destroy(cpu_lim); destroy_req_lim: tfw_adaptive_limit_lock_destroy(req_lim); -free_cli_mem: - free_percpu(cli_mem->mem); +destroy_cli_mem: + tfw_client_mem_destroy(cli_mem); return r; } diff --git a/fw/client.h b/fw/client.h index a57043fe7..1ef3c7d46 100644 --- a/fw/client.h +++ b/fw/client.h @@ -28,9 +28,14 @@ /* * Client memory accounting structure for Tempesta FW. * - * @mem - Per-CPU memory accounting storage. + * @mem_lim - structure to track memory usage for the current client. + * Used in adaptive_limits module to collect statistic + * and z-score calculation; + * @mem - Per-CPU memory accounting storage (this storage is not + * zeroed on the next training epoch); */ typedef struct tfw_client_mem_t { + TfwAdaptiveLimitLock mem_lim; s64 __percpu *mem; } TfwClientMem; @@ -43,10 +48,13 @@ typedef struct tfw_client_mem_t { * @refcnt - percpu reference counter. Provides scalable and * thread-safe reference tracking on SMP systems with * minimal contention; - * cli_mem - client memory accounting structure for Tempesta FW; - * req_lim - structure to track non-idempotent requests count in + * @req_lim - structure to track non-idempotent requests count in * fly for the current client. Used in adaptive_limits * module to collect statistic and z-score calculation; + * @cpu_lim - structure to track cpu usage for current client. Used + * in adaptive_limits module to collect statistic and + * z-score calculation; + * @cli_mem - client memory accounting structure for Tempesta FW; */ typedef struct tfw_adaptive_limits_t { union { @@ -54,9 +62,9 @@ typedef struct tfw_adaptive_limits_t { struct tfw_adaptive_limits_t *next_free; }; struct percpu_ref refcnt; - TfwClientMem cli_mem; TfwAdaptiveLimitLock req_lim; TfwAdaptiveLimitLock cpu_lim; + TfwClientMem cli_mem; } TfwClientAdaptiveLimits; /** @@ -98,11 +106,18 @@ void tfw_client_filter_block_ip(TfwClient *cli); &CLIENT_LIMITS_FROM_CONN(conn)->cli_mem static inline void -tfw_client_adjust_mem(TfwClientMem *cli_mem, int delta) +__tfw_client_adjust_mem(TfwClientMem *cli_mem, int delta) { this_cpu_add(*cli_mem->mem, delta); } +static inline void +tfw_client_adjust_mem(TfwClientMem *cli_mem, int delta, u16 *epoch) +{ + __tfw_client_adjust_mem(cli_mem, delta); + tfw_adaptive_limits_acc_mem(&cli_mem->mem_lim, delta, epoch); +} + static inline bool tfw_client_limits_get(TfwClientAdaptiveLimits *limits) { diff --git a/fw/http.c b/fw/http.c index 61664a331..2f73d9b40 100644 --- a/fw/http.c +++ b/fw/http.c @@ -2920,7 +2920,7 @@ tfw_http_conn_msg_alloc(TfwConn *conn, TfwStream *stream) if (likely(hm->req->conn)) { TfwClientMem *cli_mem = CLIENT_MEM_FROM_CONN(hm->req->conn); - int delta = PAGE_SIZE << hm->pool->order; + TfwPoolChunk *c, *next; hm->pool->owner = cli_mem; /* @@ -2933,7 +2933,15 @@ tfw_http_conn_msg_alloc(TfwConn *conn, TfwStream *stream) * already released client. */ BUG_ON(!tfw_client_mem_get(cli_mem)); - tfw_client_adjust_mem(cli_mem, delta); + c = hm->pool->curr; + TFW_POOL_FOR_EACH_CHUNK_FROM(c, next) { + int delta; + + next = c->next; + delta = PAGE_SIZE << c->order; + tfw_client_adjust_mem(cli_mem, delta, + &c->epoch); + } } if (TFW_MSG_H2(hm->req)) { @@ -3218,6 +3226,7 @@ tfw_http_conn_recv_finish(TfwConn *conn, u64 time_begin) TfwClient *cli = (TfwClient *)conn->peer; TfwAdaptiveLimitLock *req_lim = &cli->limits->req_lim; TfwAdaptiveLimitLock *cpu_lim = &cli->limits->cpu_lim; + TfwAdaptiveLimitLock *mem_lim = &cli->limits->cli_mem.mem_lim; if (TFW_FSM_TYPE(conn->proto.type) == TFW_FSM_H2) tfw_h2_conn_recv_finish(conn); @@ -3246,6 +3255,14 @@ tfw_http_conn_recv_finish(TfwConn *conn, u64 time_begin) return T_BLOCK_WITH_RST; } + if (unlikely(!tfw_adaptive_limits_check_mem(mem_lim))) { + T_WARN_ADDR("Client connection dropped: client memory" + " usage z-score exceeded the configured" + " threshold\n", &cli->addr, TFW_NO_PORT); + tfw_client_filter_block_ip(cli); + return T_BLOCK_WITH_RST; + } + return 0; } diff --git a/fw/pool.c b/fw/pool.c index 5a52b7d69..602270db6 100644 --- a/fw/pool.c +++ b/fw/pool.c @@ -71,7 +71,8 @@ static unsigned long __percpu (*pg_cache)[TFW_POOL_PGCACHE_SZ]; * through buddies coalescing). So we never cache multi-pages. */ static unsigned long -tfw_pool_alloc_pages(TfwClientMem *cli_mem, unsigned int order) +tfw_pool_alloc_pages(TfwClientMem *cli_mem, unsigned int order, + u16 *epoch) { unsigned long pg_res = 0; unsigned int *pgn; @@ -92,7 +93,7 @@ tfw_pool_alloc_pages(TfwClientMem *cli_mem, unsigned int order) pg_res = tfw__get_free_pages(flags, order); } if (likely(pg_res) && cli_mem) - tfw_client_adjust_mem(cli_mem, PAGE_SIZE << order); + tfw_client_adjust_mem(cli_mem, PAGE_SIZE << order, epoch); return pg_res; @@ -101,7 +102,7 @@ ALLOW_ERROR_INJECTION(tfw_pool_alloc_pages, NULL); static void tfw_pool_free_pages(TfwClientMem *cli_mem, unsigned long addr, - unsigned int order) + unsigned int order, u16 *epoch) { unsigned int *pgn; int refcnt; @@ -111,8 +112,9 @@ tfw_pool_free_pages(TfwClientMem *cli_mem, unsigned long addr, pgn = this_cpu_ptr(&pg_next); refcnt = page_count(virt_to_page(addr)); - if (cli_mem) - tfw_client_adjust_mem(cli_mem, -(PAGE_SIZE << order)); + if (cli_mem) { + tfw_client_adjust_mem(cli_mem, -(PAGE_SIZE << order), epoch); + } if (likely(*pgn < TFW_POOL_PGCACHE_SZ && !order && refcnt == 1)) { ((unsigned long *)this_cpu_ptr(pg_cache))[*pgn] = addr; @@ -136,12 +138,15 @@ __tfw_pool_alloc_page(TfwPool *p, size_t n, bool align) : sizeof(TfwPoolChunk); unsigned int off = desc_size + n; unsigned int order = get_order(off); + u16 epoch = 0; - c = (TfwPoolChunk *)tfw_pool_alloc_pages(p->owner, order); + c = (TfwPoolChunk *)tfw_pool_alloc_pages(p->owner, order, + &epoch); if (!c) return NULL; c->next = curr; c->order = order; + c->epoch = epoch; curr->off = p->off; @@ -196,7 +201,7 @@ tfw_pool_free(TfwPool *p, void *ptr, size_t n) TfwPoolChunk *next = p->curr->next; tfw_pool_free_pages(p->owner, TFW_POOL_CHUNK_BASE(p->curr), - p->order); + p->order, &p->curr->epoch); p->curr = next; p->order = next->order; p->off = next->off; @@ -214,7 +219,8 @@ tfw_pool_clean_single(TfwPool *pool, void *ptr) BUG_ON(!pool); BUG_ON(!ptr); - for (c = pool->curr->next; c; c = next) { + c = pool->curr->next; + TFW_POOL_FOR_EACH_CHUNK_FROM(c, next) { if (!(next = c->next)) break; @@ -222,7 +228,7 @@ tfw_pool_clean_single(TfwPool *pool, void *ptr) && (char *)ptr < (char *)TFW_POOL_CHUNK_BASE(c) + c->off) { tfw_pool_free_pages(pool->owner, TFW_POOL_CHUNK_BASE(c), - c->order); + c->order, &c->epoch); prev->next = next; return; } @@ -243,12 +249,13 @@ tfw_pool_clean(TfwPool *pool) BUG_ON(!pool); - for (c = pool->curr->next; c; c = next) { + c= pool->curr->next; + TFW_POOL_FOR_EACH_CHUNK_FROM(c, next) { if (!(next = c->next)) break; tfw_pool_free_pages(pool->owner, TFW_POOL_CHUNK_BASE(c), - c->order); + c->order, &c->epoch); pool->curr->next = next; } } @@ -263,10 +270,11 @@ __tfw_pool_new(size_t n, TfwClientMem *owner) TfwPool *p; TfwPoolChunk *c; unsigned int order; + u16 epoch = 0; order = get_order(TFW_POOL_ALIGN_SZ(n) + TFW_POOL_HEAD_OFF); - c = (TfwPoolChunk *)tfw_pool_alloc_pages(cli_mem, order); + c = (TfwPoolChunk *)tfw_pool_alloc_pages(cli_mem, order, &epoch); if (unlikely(!c)) return NULL; @@ -286,6 +294,7 @@ __tfw_pool_new(size_t n, TfwClientMem *owner) p = (TfwPool *)((char *)c + TFW_POOL_ALIGN_SZ(sizeof(*c))); c->next = NULL; + c->epoch = epoch; p->order = c->order = order; p->owner = cli_mem; p->off = c->off = TFW_POOL_HEAD_OFF; @@ -304,10 +313,11 @@ tfw_pool_destroy(TfwPool *p) return; cli_mem = p->owner; - for (c = p->curr; c; c = next) { + c = p->curr; + TFW_POOL_FOR_EACH_CHUNK_FROM(c, next) { next = c->next; tfw_pool_free_pages(p->owner, TFW_POOL_CHUNK_BASE(c), - c->order); + c->order, &c->epoch); } if (cli_mem) tfw_client_mem_put(cli_mem); diff --git a/fw/pool.h b/fw/pool.h index 7fa305e0d..8e77bf4a5 100644 --- a/fw/pool.h +++ b/fw/pool.h @@ -35,6 +35,8 @@ #define TFW_POOL_CHUNK_ROOM(p) (TFW_POOL_CHUNK_SZ((p)) - (p)->off) #define TFW_POOL_ALIGN_SZ(n) (((n) + 7) & ~7UL) #define TFW_POOL_ALIGN_PTR(p) ((void *)TFW_POOL_ALIGN_SZ((unsigned long)p)) +#define TFW_POOL_FOR_EACH_CHUNK_FROM(pos, next) \ + for (; pos; pos = next) typedef struct tfw_client_mem_t TfwClientMem; @@ -42,13 +44,17 @@ typedef struct tfw_client_mem_t TfwClientMem; * Memory pool chunk descriptor. * * @next - pointer to next memory chunk; - * @order - order of number of pages in the chunk; * @off - current chunk offset; + * @order - order of number of pages in the chunk; + * @epoch - training epoch identifier. Used to don't adjust memory + * allocations/deallocations for current chunk in trainging + * if chunk belongs to the previous training epoch; */ typedef struct tfw_pool_chunk_t { struct tfw_pool_chunk_t *next; - unsigned int order; unsigned int off; + u16 order; + u16 epoch; } TfwPoolChunk; /** @@ -56,13 +62,13 @@ typedef struct tfw_pool_chunk_t { * * @curr - current chunk to allocate memory from; * @owner - owner for memory accounting; - * @order,@off - cached members of @curr; + * @off,@order - cached members of @curr; */ typedef struct { TfwPoolChunk *curr; TfwClientMem *owner; - unsigned int order; unsigned int off; + u16 order; } TfwPool; #define tfw_pool_new(struct_name, owner, mask) \ diff --git a/fw/ss_skb.c b/fw/ss_skb.c index d8cba68e3..4fd2e4d3b 100644 --- a/fw/ss_skb.c +++ b/fw/ss_skb.c @@ -1794,7 +1794,8 @@ ss_skb_set_owner(struct sk_buff *skb, void (*destructor)(struct sk_buff *), WARN_ON(TFW_SKB_CB(skb)->mem != 0); WARN_ON(TFW_SKB_CB(skb)->destructor || TFW_SKB_CB(skb)->opaque_data); __ss_skb_set_owner(skb, destructor, owner); - ss_skb_adjust_client_mem(skb, mem); + TFW_SKB_CB(skb)->mem = mem; + tfw_client_adjust_mem(owner, mem, &TFW_SKB_CB(skb)->epoch); } void @@ -1809,6 +1810,17 @@ ss_skb_adjust_client_mem(struct sk_buff *skb, int delta) if (cli_mem) { TFW_SKB_CB(skb)->mem += delta; WARN_ON(TFW_SKB_CB(skb)->mem < 0); - tfw_client_adjust_mem(cli_mem, delta); + /* + * `epoch` is set during skb allocation when we + * set owner for current skb. If `epoch` was not + * set (because training was disabled), we should + * not adjust such skb in the new training. + */ + if (TFW_SKB_CB(skb)->epoch) { + tfw_client_adjust_mem(cli_mem, delta, + &TFW_SKB_CB(skb)->epoch); + } else { + __tfw_client_adjust_mem(cli_mem, delta); + } } } diff --git a/fw/ss_skb.h b/fw/ss_skb.h index 5e78a4511..d5396d689 100644 --- a/fw/ss_skb.h +++ b/fw/ss_skb.h @@ -43,8 +43,9 @@ typedef struct tfw_client_mem_t TfwClientMem; * @mem - memory used for this skb, used to account appropriate * client memory; * @stream_id - id of sender stream; - * @tls_type - tls type of current skb, if it's data should be - * encrypted; + * @epoch - training epoch identifier. Used to don't adjust memory + * allocations/deallocations for current skb in trainging + * if skb belongs to the previous training epoch; * @is_head - flag indicates that this is a head of skb list; */ struct tfw_skb_cb { @@ -54,6 +55,7 @@ struct tfw_skb_cb { on_tcp_entail_t on_tcp_entail; long int mem; unsigned int stream_id; + u16 epoch; bool is_head; }; diff --git a/fw/t/unit/helpers.c b/fw/t/unit/helpers.c index ff2480820..347bfd2ad 100644 --- a/fw/t/unit/helpers.c +++ b/fw/t/unit/helpers.c @@ -545,6 +545,17 @@ tfw_adaptive_limits_check_req_num(TfwAdaptiveLimitLock *limit) return true; } +void tfw_adaptive_limits_acc_mem(TfwAdaptiveLimitLock *limit, + int delta, u16 *epoch) +{ + +} + +bool tfw_adaptive_limits_check_mem(TfwAdaptiveLimitLock *limit) +{ + return true; +} + bool tfw_adaptive_limits_check_cpu(TfwAdaptiveLimitLock *limit, u64 time_begin) {