Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/either.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#[derive(Debug)]
pub(crate) enum Either<L, R> {
Left(L),
Right(R),
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ mod throughput;
#[cfg(feature = "tester")]
pub use self::throughput::ThroughputMonitoring;

#[cfg(test)]
pub mod test_utils;

pub use self::config::Config;
pub use self::error::{ErrorKind, Result};
pub use self::net::{LinkConditioner, Socket, SocketEvent};
Comment thread
TimonPost marked this conversation as resolved.
pub use self::net::{Socket, SocketEvent};
pub use self::packet::{DeliveryGuarantee, OrderingGuarantee, Packet};
7 changes: 4 additions & 3 deletions src/net.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
//! This module provides the logic between the low-level abstract types and the types that the user will be interacting with.
//! You can think of the socket, connection management, congestion control.

mod connection;
mod connection_controller;
mod events;
mod link_conditioner;
mod quality;
mod socket;
mod socket_controller;
mod virtual_connection;

pub mod constants;

pub use self::connection_controller::ConnectionController;
pub use self::events::SocketEvent;
pub use self::link_conditioner::LinkConditioner;
pub use self::quality::{NetworkQuality, RttMeasurer};
pub use self::socket::Socket;
pub use self::socket_controller::{SocketController, SocketReceiver, SocketSender};
pub use self::virtual_connection::VirtualConnection;
187 changes: 0 additions & 187 deletions src/net/connection.rs

This file was deleted.

158 changes: 158 additions & 0 deletions src/net/connection_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use crate::{
config::Config,
error::Result,
net::{events::SocketEvent, SocketSender, VirtualConnection},
packet::{DeliveryGuarantee, OutgoingPackets, Packet, PacketInfo},
};
use crossbeam_channel::Sender;
use log::error;
use std::{self, net::SocketAddr, time::Instant};

/// Controls all aspects of the connection:
/// * Processes incoming data (from a socket) or events (from a user).
/// * Updates connection state: resends dropped packets, sends heartbeat packet, etc.
/// * Creates new connections.
/// * Checks if connection should be dropped.
Comment thread
fraillt marked this conversation as resolved.
Outdated
/// It doesn't own connections, but only owns necessary components to process them.
#[derive(Debug)]
pub struct ConnectionController<PacketSender> {
config: Config,
packet_sender: PacketSender,
event_sender: Sender<SocketEvent>,
}

/// Defines a connection type.
type Connection = VirtualConnection;
Comment thread
fraillt marked this conversation as resolved.
Outdated
/// Defines a user event type.
type UserEvent = Packet;
/// Defines a connection event type.
type ConnectionEvent = SocketEvent;

impl<PacketSender: SocketSender> ConnectionController<PacketSender> {
Comment thread
fraillt marked this conversation as resolved.
Outdated
/// Creates a new instance of `ConnectionHandler`.
Comment thread
fraillt marked this conversation as resolved.
Outdated
pub fn new(
config: Config,
packet_sender: PacketSender,
event_sender: Sender<ConnectionEvent>,
) -> Self {
ConnectionController {
config,
packet_sender,
event_sender,
}
}

/// Creates new connection. Also will init it and send connection event to a user.
Comment thread
fraillt marked this conversation as resolved.
Outdated
pub fn create_connection(
&self,
address: SocketAddr,
time: Instant,
initial_data: Option<&[u8]>,
) -> Connection {
// emit connect event if this is initiated by remote host.
Comment thread
fraillt marked this conversation as resolved.
Outdated
if initial_data.is_some() {
self.event_sender
.send(ConnectionEvent::Connect(address))
.unwrap();
Comment thread
fraillt marked this conversation as resolved.
Outdated
}
Connection::new(address, &self.config, time)
}

/// Determine if this connection should be dropped due to its state.
Comment thread
fraillt marked this conversation as resolved.
Outdated
pub fn should_drop(&self, connection: &mut Connection, time: Instant) -> bool {
let should_drop = connection.packets_in_flight() > self.config.max_packets_in_flight
|| connection.last_heard(time) >= self.config.idle_connection_timeout;
if should_drop {
self.event_sender
.send(ConnectionEvent::Timeout(connection.remote_address))
.unwrap();
Comment thread
fraillt marked this conversation as resolved.
Outdated
}
should_drop
}

/// Handle a packet received from a socket.
Comment thread
fraillt marked this conversation as resolved.
Outdated
pub fn handle_packet(&mut self, connection: &mut Connection, payload: &[u8], time: Instant) {
match connection.process_incoming(payload, time) {
Ok(packets) => {
for incoming in packets {
self.event_sender
.send(ConnectionEvent::Packet(incoming.0))
.unwrap();
}
}
Err(err) => error!("Error occured processing incomming packet: {:?}", err),
}
}

/// Handle an event received from a user.
Comment thread
fraillt marked this conversation as resolved.
Outdated
pub fn handle_event(&mut self, connection: &mut Connection, event: UserEvent, time: Instant) {
self.send_packets(
&connection.remote_address.clone(),
connection.process_outgoing(
PacketInfo::user_packet(
event.payload(),
event.delivery_guarantee(),
event.order_guarantee(),
),
None,
time,
),
"user packet",
);
}

/// Process various connection related tasks: resend dropped packets, send heartbeat packet, etc...
Comment thread
fraillt marked this conversation as resolved.
Outdated
/// This function gets called very frequently.
pub fn update(&mut self, connection: &mut Connection, time: Instant) {
// resend dropped packets
let dropped_packets = connection.gather_dropped_packets();
for dropped in dropped_packets {
Comment thread
fraillt marked this conversation as resolved.
Outdated
let packets = connection.process_outgoing(
PacketInfo {
packet_type: dropped.packet_type,
payload: &dropped.payload,
// Because a delivery guarantee is only sent with reliable packets
delivery: DeliveryGuarantee::Reliable,
// This is stored with the dropped packet because they could be mixed
ordering: dropped.ordering_guarantee,
},
dropped.item_identifier,
time,
);
self.send_packets(&connection.remote_address, packets, "dropped packets");
}

// send heartbeat packets if required.
Comment thread
fraillt marked this conversation as resolved.
Outdated
if let Some(heartbeat_interval) = self.config.heartbeat_interval {
if connection.last_sent(time) >= heartbeat_interval {
self.send_packets(
&connection.remote_address.clone(),
connection.process_outgoing(PacketInfo::heartbeat_packet(&[]), None, time),
"heatbeat packet",
);
}
}
}

/// Helper function that sends multiple outgoing packets
Comment thread
fraillt marked this conversation as resolved.
Outdated
fn send_packets(
&mut self,
address: &SocketAddr,
packets: Result<OutgoingPackets>,
err_context: &str,
) {
match packets {
Ok(packets) => {
for outgoing in packets {
if let Err(error) = self
.packet_sender
.send_packet(address, &outgoing.contents())
{
error!("Error occured sending {}: {:?}", err_context, error);
}
}
}
Err(error) => error!("Error occured processing {}: {:?}", err_context, error),
}
}
}
2 changes: 1 addition & 1 deletion src/net/quality.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl RttMeasurer {
mod test {
use super::RttMeasurer;
use crate::config::Config;
use crate::net::connection::VirtualConnection;
use crate::net::VirtualConnection;
use std::net::ToSocketAddrs;
use std::time::{Duration, Instant};

Expand Down
Loading