Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion benches/netbench/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Config {
#[arg(
long = "bytes",
short = 'k',
default_value_t = 1,
default_value_t = 10000,
help = "number of bytes to transfer every round"
)]
pub n_bytes: usize,
Expand All @@ -47,6 +47,13 @@ pub struct Config {
help = "id of process to pin thread to, -1 for no pinning"
)]
pub p_id: i8,
#[arg(
long = "warmup",
short = 'w',
default_value_t = 50,
help = "nr. of warmup runs"
)]
pub warmup: usize,
}

impl Config {
Expand Down
9 changes: 8 additions & 1 deletion benches/netbench/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ pub fn receive_message(n_bytes: usize, stream: &mut TcpStream, rbuf: &mut [u8])
let mut recv = 0;
while recv < n_bytes {
match stream.read(&mut rbuf[recv..]) {
Ok(n) => recv += n,
Ok(n) => {
if n == 0 {
// TODO: Return err instead and handle gracefully
panic!("Connection closed prematurely")
} else {
recv += n
}
}
Err(err) => match err.kind() {
WouldBlock => {}
_ => panic!("Error occurred while reading: {err:?}"),
Expand Down
122 changes: 122 additions & 0 deletions benches/netbench/src/print_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,125 @@ pub fn print_summary(hist: hdrhist::HDRHist) {
println!("{entry:?}");
}
}

trait StatCalcs: Sized {
fn mean(data: &[Self]) -> Option<Self>;
fn std_deviation(data: &[Self]) -> Option<f64>;
}
impl StatCalcs for f64 {
fn mean(data: &[f64]) -> Option<f64> {
let sum = data.iter().sum::<f64>();
match data.len() {
positive if positive > 0 => Some(sum / positive as f64),
_ => None,
}
}

fn std_deviation(data: &[f64]) -> Option<f64> {
match (Self::mean(data), data.len()) {
(Some(data_mean), count) if count > 0 => {
let variance = data
.iter()
.map(|value| {
let diff = data_mean - *value;

diff * diff
})
.sum::<f64>() / count as f64;

Some(variance.sqrt())
}
_ => None,
}
}
}
impl StatCalcs for u64 {
fn mean(data: &[u64]) -> Option<u64> {
let sum = data.iter().sum::<u64>();
match data.len() {
positive if positive > 0 => Some(sum / positive as u64),
_ => None,
}
}

fn std_deviation(data: &[u64]) -> Option<f64> {
match (Self::mean(data), data.len()) {
(Some(data_mean), count) if count > 0 => {
let variance = data
.iter()
.map(|value| {
let diff = data_mean as f64 - *value as f64;

diff * diff
})
.sum::<f64>() / count as f64;

Some((variance as f64).sqrt())
}
_ => None,
}
}
}

#[derive(Debug)]
#[allow(dead_code)]
pub struct BoxplotValues<T> {
pub whisk_min: T,
pub whisk_max: T,
pub median: T,
pub q1: T,
pub q3: T,
pub nr_outliers: usize,
pub mean: T,
pub std_deviation: f64,
}
impl<
T: Clone
+ PartialOrd<T>
+ std::ops::Mul<T, Output = T>
+ std::ops::Sub<T, Output = T>
+ std::ops::Add<T, Output = T>
+ std::ops::Div<Output = T>
+ std::marker::Copy
+ Default
+ StatCalcs
+ std::convert::From<u8>,
> From<&[T]> for BoxplotValues<T>
{
fn from(value: &[T]) -> Self {
let mut durations = Vec::from(value);
durations.sort_by(|a, b| a.partial_cmp(b).unwrap());
let median = *durations
.get(durations.len() / 2)
.unwrap_or(&&Default::default());

let q1 = *durations.get(durations.len() / 4).unwrap();
let q3 = *durations.get(durations.len() * 3 / 4).unwrap();
let iqr: T = <u8 as Into<T>>::into(3_u8) * (q3 - q1) / <u8 as Into<T>>::into(2_u8);
let outlier_min = q1 - iqr.into();
let outlier_max = q3 + iqr.into();
let outliers = durations
.iter()
.filter(|&x| *x < outlier_min || *x > outlier_max)
.copied()
.collect::<Vec<T>>();

let filtered_durations = durations
.iter()
.filter(|&x| *x >= outlier_min && *x <= outlier_max)
.collect::<Vec<&T>>();
let whisk_min = *filtered_durations[0];
let whisk_max = *filtered_durations[filtered_durations.len() - 1];

Self {
whisk_min,
whisk_max,
median,
q1,
q3,
nr_outliers: outliers.len(),
std_deviation: T::std_deviation(&durations).unwrap(),
mean: T::mean(&durations).unwrap(),
}
}
}
36 changes: 22 additions & 14 deletions benches/netbench/src/rust-tcp-bw/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
use std::io::{self, Write};
use std::net::TcpStream;

use clap::Parser;
#[cfg(target_os = "hermit")]
use hermit as _;
use rust_tcp_io_perf::config::Config;
use rust_tcp_io_perf::connection;

fn send_rounds(stream: &mut TcpStream, rounds: usize, bytes: usize) {
// Create a buffer of 0s, size n_bytes, to be sent over multiple times
let buf = vec![0; bytes];

for _i in 0..rounds {
let mut pos = 0;

while pos < buf.len() {
let bytes_written = match stream.write(&buf[pos..]) {
Ok(len) => len,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => 0,
Err(e) => panic!("encountered IO error: {e}"),
};
pos += bytes_written;
}
}
stream.flush().expect("Unexpected behaviour");
}

fn main() {
let args = Config::parse();

Expand All @@ -15,21 +35,9 @@ fn main() {
connection::setup(&args, &stream);
println!("Connection established! Ready to send...");

// Create a buffer of 0s, size n_bytes, to be sent over multiple times
let buf = vec![0; args.n_bytes];
send_rounds(&mut stream, args.warmup, args.n_bytes);
send_rounds(&mut stream, args.n_rounds, args.n_bytes);

for _i in 0..args.n_rounds {
let mut pos = 0;

while pos < buf.len() {
let bytes_written = match stream.write(&buf[pos..]) {
Ok(len) => len,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => 0,
Err(e) => panic!("encountered IO error: {e}"),
};
pos += bytes_written;
}
}
stream.flush().expect("Unexpected behaviour");
connection::close_connection(&stream);

Expand Down
88 changes: 60 additions & 28 deletions benches/netbench/src/rust-tcp-bw/server.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,79 @@
use std::io::Read;
use std::io::{ErrorKind, Read};
use std::net::TcpStream;
use std::time::Instant;

use clap::Parser;
#[cfg(target_os = "hermit")]
use hermit as _;
use hermit_bench_output::log_benchmark_data;
use rust_tcp_io_perf::config::Config;
use rust_tcp_io_perf::connection;
use rust_tcp_io_perf::print_utils::BoxplotValues;
use rust_tcp_io_perf::{connection, threading};

fn main() {
let args = Config::parse();
let tot_bytes = args.n_rounds * args.n_bytes;

let mut buf = vec![0; args.n_bytes];

let mut stream = connection::server_listen_and_get_first_connection(&args.port.to_string());
connection::setup(&args, &stream);
fn receive_rounds(
stream: &mut TcpStream,
rounds: usize,
bytes: usize,
progress_print: bool,
) -> Vec<f64> {
let mut buf = vec![0; bytes];
let mut durations = Vec::with_capacity(rounds);

let start = Instant::now();
for i in 0..args.n_rounds {
print!("round {i}: ");
let progress_prints = [
1,
rounds / 10,
rounds / 10 * 2,
rounds / 10 * 3,
rounds / 10 * 4,
rounds / 10 * 5,
rounds / 10 * 6,
rounds / 10 * 7,
rounds / 10 * 8,
rounds / 10 * 9,
];
for i in 0..rounds {
if progress_print && progress_prints.contains(&i) {
println!("round {i}/{}", rounds)
}
let round_start = Instant::now();
stream.read_exact(&mut buf).unwrap();
if let Err(e) = stream.read_exact(&mut buf) {
if e.kind() == ErrorKind::UnexpectedEof {
println!("Client ended transmission after {i} rounds");
break;
} else {
panic!("Error in reading from stream: {}", e.kind());
}
}
let round_end = Instant::now();
let duration = round_end.duration_since(round_start);
let mbits = buf.len() as f64 * 8.0f64 / (1024.0f64 * 1024.0f64 * duration.as_secs_f64());
println!("{mbits} Mbit/s");
durations.push(mbits);
}
let end = Instant::now();
let duration = end.duration_since(start);

#[cfg(target_os = "hermit")]
log_benchmark_data(
"TCP server",
"Mbit/s",
(tot_bytes as f64 * 8.0f64) / (1024.0f64 * 1024.0f64 * duration.as_secs_f64()),
durations
}

fn main() {
let args = Config::parse();

println!(
"starting server with {} bytes, {} warmup rounds and {} rounds",
args.n_bytes, args.warmup, args.n_rounds
);
let mut stream = connection::server_listen_and_get_first_connection(&args.port.to_string());
connection::setup(&args, &stream);
threading::setup(&args);

let _ = receive_rounds(&mut stream, args.warmup, args.n_bytes, false);
let durations = receive_rounds(&mut stream, args.n_rounds, args.n_bytes, true);

let statistics = BoxplotValues::<f64>::from(durations.as_slice());
log_benchmark_data("TCP server", "Mbit/s", statistics.mean);

#[cfg(not(target_os = "hermit"))]
log_benchmark_data(
"TCP client",
"Mbit/s",
(tot_bytes as f64 * 8.0f64) / (1024.0f64 * 1024.0f64 * duration.as_secs_f64()),
println!("{statistics:#.2?}");
println!(
"{} outliers ({:.1}%)",
statistics.nr_outliers,
100.0 * statistics.nr_outliers as f64 / durations.len() as f64
);

connection::close_connection(&stream);
Expand Down
Loading
Loading