Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions node/PacketMultiplexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@
#include "Node.hpp"
#include "RuntimeEnvironment.hpp"

#include <algorithm>
#include <stdio.h>
#include <stdlib.h>
#if defined(__linux__)
#include <errno.h>
#include <pthread.h>
#include <sched.h>
#include <string.h>
#endif

namespace ZeroTier {

Expand Down Expand Up @@ -67,7 +74,6 @@ void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency,
#endif
_enabled = true;
_concurrency = concurrency;
bool _enablePinning = cpuPinningEnabled;

for (unsigned int i = 0; i < _concurrency; ++i) {
fprintf(stderr, "Reserved queue for thread %d\n", i);
Expand All @@ -76,8 +82,23 @@ void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency,

// Each thread picks from its own queue to feed into the core
for (unsigned int i = 0; i < _concurrency; ++i) {
_rxThreads.push_back(std::thread([this, i, _enablePinning]() {
_rxThreads.push_back(std::thread([this, i, cpuPinningEnabled]() {
fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i);
#if defined(__linux__)
if (cpuPinningEnabled) {
const unsigned int cpuCount = std::max(1u, std::thread::hardware_concurrency());
const int pinCore = static_cast<int>(i % cpuCount);
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(pinCore, &cpuset);
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (rc != 0) {
fprintf(stderr, "Failed to pin packet thread %d to core %d: %s\n", i, pinCore, strerror(errno));
}
}
#else
(void)cpuPinningEnabled;
#endif

PacketRecord* packet = nullptr;
for (;;) {
Expand Down