Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions etc/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -5044,6 +5044,12 @@
},
"spare_sync_incomplete": {
"type": "integer"
},
"flows_work_queue": {
"type": "integer"
},
"flows_work_queue_max": {
"type": "integer"
}
},
"additionalProperties": false
Expand Down
29 changes: 21 additions & 8 deletions src/flow-manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
uint32_t rows_sec = 0;
uint32_t rows_per_wu = 0;
uint64_t sleep_per_wu = 0;
bool emerg = false;
bool prev_emerg = false;
uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
SCTime_t ts;
Expand All @@ -784,7 +783,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);

TmThreadsSetFlag(th_v, THV_RUNNING);
Expand All @@ -797,9 +796,8 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}

if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
emerg = true;
}
bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);

/* Get the time */
ts = TimeGet();
SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
Expand Down Expand Up @@ -1038,9 +1036,10 @@ static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)

FlowClearMemory(f, f->protomap);
FLOWLOCK_UNLOCK(f);
FlowSparePoolReturnFlow(f);
}

extern uint32_t flow_spare_pool_block_size;

/** \brief Thread that manages timed out flows.
*
* \param td ThreadVars casted to void ptr
Expand All @@ -1051,6 +1050,7 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
BUG_ON(ftd == NULL);
const bool time_is_live = TimeModeIsLive();
uint64_t recycled_cnt = 0;
FlowQueuePrivate ret_queue = { NULL, NULL, 0 };

TmThreadsSetFlag(th_v, THV_RUNNING);

Expand All @@ -1072,11 +1072,24 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
/* Get the time */
SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));

uint64_t cnt = 0;
Flow *f;
while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
Recycler(th_v, ftd, f);
recycled_cnt++;
StatsIncr(th_v, ftd->counter_flows);
cnt++;

/* for every full sized block, add it to the spare pool */
FlowQueuePrivateAppendFlow(&ret_queue, f);
if (ret_queue.len == flow_spare_pool_block_size) {
FlowSparePoolReturnFlows(&ret_queue);
}
}
if (ret_queue.len > 0) {
FlowSparePoolReturnFlows(&ret_queue);
}
if (cnt > 0) {
recycled_cnt += cnt;
StatsAddUI64(th_v, ftd->counter_flows, cnt);
}
SC_ATOMIC_SUB(flowrec_busy,1);

Expand Down
52 changes: 50 additions & 2 deletions src/flow-spare-pool.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2007-2020 Open Information Security Foundation
/* Copyright (C) 2007-2023 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
Expand Down Expand Up @@ -40,7 +40,7 @@ typedef struct FlowSparePool {
} FlowSparePool;

static uint32_t flow_spare_pool_flow_cnt = 0;
static uint32_t flow_spare_pool_block_size = 100;
uint32_t flow_spare_pool_block_size = 100;
static FlowSparePool *flow_spare_pool = NULL;
static SCMutex flow_spare_pool_m = SCMUTEX_INITIALIZER;

Expand Down Expand Up @@ -121,7 +121,55 @@ void FlowSparePoolReturnFlow(Flow *f)

void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
{
FlowSparePool *p = FlowSpareGetPool();
DEBUG_VALIDATE_BUG_ON(p == NULL);
p->queue = *fqp;

SCMutexLock(&flow_spare_pool_m);
flow_spare_pool_flow_cnt += fqp->len;
if (flow_spare_pool != NULL) {
if (p->queue.len == flow_spare_pool_block_size) {
/* full block insert */

if (flow_spare_pool->queue.len < flow_spare_pool_block_size) {
p->next = flow_spare_pool->next;
flow_spare_pool->next = p;
p = NULL;
} else {
p->next = flow_spare_pool;
flow_spare_pool = p;
p = NULL;
}
} else {
/* incomplete block insert */

if (p->queue.len + flow_spare_pool->queue.len <= flow_spare_pool_block_size) {
FlowQueuePrivateAppendPrivate(&flow_spare_pool->queue, &p->queue);
/* free 'p' outside of lock below */
} else {
// put smallest first
if (p->queue.len < flow_spare_pool->queue.len) {
p->next = flow_spare_pool;
flow_spare_pool = p;
} else {
p->next = flow_spare_pool->next;
flow_spare_pool->next = p;
}
p = NULL;
}
}
} else {
p->next = flow_spare_pool;
flow_spare_pool = p;
p = NULL;
}
SCMutexUnlock(&flow_spare_pool_m);

FlowQueuePrivate empty = { NULL, NULL, 0 };
*fqp = empty;

if (p != NULL)
SCFree(p);
}

FlowQueuePrivate FlowSpareGetFromPool(void)
Expand Down
25 changes: 21 additions & 4 deletions src/flow-worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ typedef struct FlowWorkerThreadData_ {
uint16_t flows_removed;
uint16_t flows_aside_needs_work;
uint16_t flows_aside_pkt_inject;
uint16_t flows_wq; /**< local work queue length */
uint16_t flows_wq_max; /**< local work queue length (max value) */
} cnt;
FlowEndCounters fec;

Expand Down Expand Up @@ -169,10 +171,13 @@ static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *d
return 1;
}

extern uint32_t flow_spare_pool_block_size;

/** \param[in] max_work Max flows to process. 0 if unlimited. */
static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters,
FlowQueuePrivate *fq, const uint32_t max_work)
{
FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
uint32_t i = 0;
Flow *f;
while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
Expand Down Expand Up @@ -205,15 +210,23 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeout
FlowClearMemory (f, f->protomap);
FLOWLOCK_UNLOCK(f);

if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size
FlowSparePoolReturnFlow(f);
if (fw->fls.spare_queue.len >= (flow_spare_pool_block_size * 2)) {
FlowQueuePrivatePrependFlow(&ret_queue, f);
if (ret_queue.len == flow_spare_pool_block_size) {
FlowSparePoolReturnFlows(&ret_queue);
}
} else {
FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
}

if (max_work != 0 && ++i == max_work)
break;
}
if (ret_queue.len > 0) {
FlowSparePoolReturnFlows(&ret_queue);
}

StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)i);
}

/** \brief handle flow for packet
Expand Down Expand Up @@ -271,6 +284,9 @@ static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void *
fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", tv);
fw->cnt.flows_injected_max = StatsRegisterMaxCounter("flow.wrk.flows_injected_max", tv);

fw->cnt.flows_wq = StatsRegisterCounter("flow.wrk.flows_work_queue", tv);
fw->cnt.flows_wq_max = StatsRegisterMaxCounter("flow.wrk.flows_work_queue_max", tv);

fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv);
if (fw->dtv == NULL) {
FlowWorkerThreadDeinit(tv, fw);
Expand Down Expand Up @@ -489,10 +505,11 @@ static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadD
if (p->pkt_src == PKT_SRC_SHUTDOWN_FLUSH || p->pkt_src == PKT_SRC_CAPTURE_TIMEOUT)
max_work = 0;

StatsSetUI64(tv, fw->cnt.flows_wq, fw->fls.work_queue.len);
StatsSetUI64(tv, fw->cnt.flows_wq_max, fw->fls.work_queue.len);

FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
if (fw->fls.work_queue.len) {
StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)fw->fls.work_queue.len);

FlowTimeoutCounters counters = { 0, 0, };
CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue, max_work);
UpdateCounters(tv, fw, &counters);
Expand Down
4 changes: 4 additions & 0 deletions src/source-windivert.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,12 @@ void *WinDivertGetQueue(int n)
}

// not defined in MinGW winerror.h
#ifndef ERROR_INVALID_IMAGE_HASH
#define ERROR_INVALID_IMAGE_HASH 577L
#endif
#ifndef ERROR_DATA_NOT_ACCEPTED
#define ERROR_DATA_NOT_ACCEPTED 592L
#endif

/**
* \brief return an error description for Win32 error values commonly returned
Expand Down