Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ use laminar::{Socket, Packet};

// create the socket
let mut socket = Socket::bind("127.0.0.1:12345")?;
let packet_sender = socket.get_packet_sender();
let packet_sender = socket.get_event_sender();
// this will start the socket, which will start a poll mechanism to receive and send messages.
let _thread = thread::spawn(move || socket.start_polling());

Expand Down
102 changes: 62 additions & 40 deletions examples/server_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,48 @@ use std::io::stdin;
use std::thread;
use std::time::Instant;

use laminar::{ErrorKind, Packet, Socket, SocketEvent};
use laminar::{
managers::SimpleSocketManager, ConnectionEvent, ErrorKind, Packet, ReceiveEvent, Socket,
SocketEventSender,
};

const SERVER: &str = "127.0.0.1:12351";

fn server() -> Result<(), ErrorKind> {
let mut socket = Socket::bind(SERVER)?;
let (sender, receiver) = (socket.get_packet_sender(), socket.get_event_receiver());
// create socket manager, that will use SimpleConnectionManager, that actually initiates connection by exchanging methods
let mut socket = Socket::bind(SERVER, Box::new(SimpleSocketManager(false)))?;
let (sender, receiver) = (
SocketEventSender(socket.get_event_sender()),
socket.get_event_receiver(),
);
let _thread = thread::spawn(move || socket.start_polling());

loop {
if let Ok(event) = receiver.recv() {
if let Ok(ConnectionEvent(addr, event)) = receiver.recv() {
match event {
SocketEvent::Packet(packet) => {
ReceiveEvent::Connected(data) => {
println!(
"{:?} -> Connected msg:{}",
addr,
String::from_utf8_lossy(data.as_ref())
);
}
ReceiveEvent::Packet(packet) => {
let msg = packet.payload();

if msg == b"Bye!" {
break;
}

let msg = String::from_utf8_lossy(msg);
let ip = packet.addr().ip();

println!("Received {:?} from {:?}", msg, ip);
println!("{:?} -> Packet msg:{}", addr, msg);

sender
.send(Packet::reliable_unordered(
packet.addr(),
"Copy that!".as_bytes().to_vec(),
["Echo: ".as_bytes(), msg.as_bytes()].concat(),
))
.expect("This should send");
}
SocketEvent::Timeout(address) => {
println!("Client timed out: {}", address);
}
_ => {}
_ => println!("{:?} -> {:?}", addr, event),
}
}
}
Expand All @@ -49,43 +56,58 @@ fn server() -> Result<(), ErrorKind> {

fn client() -> Result<(), ErrorKind> {
let addr = "127.0.0.1:12352";
let mut socket = Socket::bind(addr)?;
let mut socket = Socket::bind(addr, Box::new(SimpleSocketManager(false)))?;
println!("Connected on {}", addr);

let server = SERVER.parse().unwrap();
let sender = SocketEventSender(socket.get_event_sender());
let _thread = thread::spawn(move || loop {
socket.manual_poll(Instant::now());

println!("Type a message and press Enter to send. Send `Bye!` to quit.");
if let Some(ConnectionEvent(addr, event)) = socket.recv() {
match event {
ReceiveEvent::Connected(data) => {
println!(
"{:?} -> Connected msg:{}",
addr,
String::from_utf8_lossy(data.as_ref())
);
}
ReceiveEvent::Packet(packet) => {
let msg = String::from_utf8_lossy(packet.payload());
println!("{:?} -> Packet msg:{}", addr, msg);
}
_ => println!("{:?} -> {:?}", addr, event),
}
}
});

let stdin = stdin();
let mut s_buffer = String::new();
s_buffer.clear();

let server = SERVER.parse().unwrap();
println!("Type a `:c<message>` to connect");
println!("Type a `<message>` to send a packet");
println!("Type a `:d` to disconnect");
println!("Type a `:q` to quit.");

loop {
s_buffer.clear();
stdin.read_line(&mut s_buffer)?;
let line = s_buffer.replace(|x| x == '\n' || x == '\r', "");

socket.send(Packet::reliable_unordered(
server,
line.clone().into_bytes(),
))?;

socket.manual_poll(Instant::now());

if line == "Bye!" {
if line == ":q" {
break;
} else if line.starts_with(":c") {
sender
.connect(server, Box::from(line.split_at(2).1.as_bytes()))
.expect("sending should not fail");
} else if line == ":d" {
sender.disconnect(server).expect("sending should not fail");
} else {
sender
.send(Packet::reliable_unordered(server, line.into_bytes()))
.expect("sending should not fail");
}

match socket.recv() {
Some(SocketEvent::Packet(packet)) => {
if packet.addr() == server {
println!("Server sent: {}", String::from_utf8_lossy(packet.payload()));
} else {
println!("Unknown sender.");
}
}
Some(SocketEvent::Timeout(_)) => {}
_ => println!("Silence.."),
}
s_buffer.clear();
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions examples/simple_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! 2. setting up client to send data.
//! 3. serialize data to send and deserialize when received.
use bincode::{deserialize, serialize};
use laminar::{Packet, Socket, SocketEvent};
use laminar::{ConnectionEventSender, Packet, Socket};
use serde_derive::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::time::Instant;
Expand All @@ -27,7 +27,7 @@ pub fn main() {
let mut server = Socket::bind(server_address()).unwrap();

/* setup or `Client` and send some test data. */
let mut client = Socket::bind(client_address()).unwrap();
let mut client = ConnectionEventSender(Socket::bind(client_address()).unwrap());

client.send(Packet::unreliable(
server_address(),
Expand Down
3 changes: 2 additions & 1 deletion src/either.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) enum Either<L, R> {
#[derive(Debug)]
pub enum Either<L, R> {
Left(L),
Right(R),
}
37 changes: 32 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! This module contains the laminar error handling logic.

use crate::SocketEvent;
use crate::either::Either;
use crate::net::events::{ConnectionEvent, ReceiveEvent, SendEvent};
use crate::net::managers::ConnectionManagerError;
use crossbeam_channel::SendError;
use std::{
error::Error,
Expand All @@ -27,9 +29,11 @@ pub enum ErrorKind {
/// Protocol versions did not match
ProtocolVersionMismatch,
/// Could not send on `SendChannel`.
SendError(SendError<SocketEvent>),
SendError(SendError<Either<ConnectionEvent<SendEvent>, ConnectionEvent<ReceiveEvent>>>),
Comment thread
fraillt marked this conversation as resolved.
Outdated
/// Expected header but could not be read from buffer.
CouldNotReadHeader(String),
/// Errors that is returned from `ConnectionManager` either preprocessing data or processing packet
ConnectionError(ConnectionManagerError),
}

impl Display for ErrorKind {
Expand Down Expand Up @@ -67,6 +71,11 @@ impl Display for ErrorKind {
"Expected {} header but could not be read from buffer.",
header
),
ErrorKind::ConnectionError(err) => write!(
fmt,
"Something went wrong in ConnectionManager. Reason: {:?}.",
err
),
}
}
}
Expand Down Expand Up @@ -103,6 +112,8 @@ impl Display for DecodingErrorKind {
pub enum PacketErrorKind {
/// The maximal allowed size of the packet was exceeded
ExceededMaxPacketSize,
/// Only user packets (a.k.a PacketType::Packet) can be fragmented
PacketTypeCannotBeFragmented,
}

impl Display for PacketErrorKind {
Expand All @@ -111,6 +122,10 @@ impl Display for PacketErrorKind {
PacketErrorKind::ExceededMaxPacketSize => {
write!(fmt, "The packet size was bigger than the max allowed size.")
}
PacketErrorKind::PacketTypeCannotBeFragmented => write!(
fmt,
"Only user packets (PacketType::Packet) can be fragmented."
),
}
}
}
Expand Down Expand Up @@ -173,9 +188,21 @@ impl From<FragmentErrorKind> for ErrorKind {
}
}

impl From<crossbeam_channel::SendError<SocketEvent>> for ErrorKind {
fn from(inner: SendError<SocketEvent>) -> Self {
ErrorKind::SendError(inner)
impl From<SendError<ConnectionEvent<SendEvent>>> for ErrorKind {
fn from(inner: SendError<ConnectionEvent<SendEvent>>) -> Self {
ErrorKind::SendError(SendError(Either::Left(inner.0)))
}
}

impl From<SendError<ConnectionEvent<ReceiveEvent>>> for ErrorKind {
fn from(inner: SendError<ConnectionEvent<ReceiveEvent>>) -> Self {
ErrorKind::SendError(SendError(Either::Right(inner.0)))
}
}

impl From<ConnectionManagerError> for ErrorKind {
fn from(inner: ConnectionManagerError) -> Self {
ErrorKind::ConnectionError(inner)
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/infrastructure/acknowledgment.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::packet::OrderingGuarantee;
use crate::packet::SequenceNumber;
use crate::packet::{OrderingGuarantee, PacketType, SequenceNumber};
use crate::sequence_buffer::{sequence_greater_than, sequence_less_than, SequenceBuffer};
use std::collections::HashMap;

Expand Down Expand Up @@ -101,13 +100,15 @@ impl AcknowledgmentHandler {
/// Enqueue the outgoing packet for acknowledgment.
pub fn process_outgoing(
&mut self,
packet_type: PacketType,
payload: &[u8],
ordering_guarantee: OrderingGuarantee,
item_identifier: Option<SequenceNumber>,
) {
self.sent_packets.insert(
self.sequence_number,
SentPacket {
packet_type,
payload: Box::from(payload),
ordering_guarantee,
item_identifier,
Expand Down Expand Up @@ -138,8 +139,9 @@ impl AcknowledgmentHandler {
}
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq)]
pub struct SentPacket {
pub packet_type: PacketType,
pub payload: Box<[u8]>,
pub ordering_guarantee: OrderingGuarantee,
pub item_identifier: Option<SequenceNumber>,
Expand Down
8 changes: 4 additions & 4 deletions src/infrastructure/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Fragmentation {
Fragmentation::fragments_needed(payload_length, config.fragment_size) as u8;

if num_fragments > config.max_fragments {
Err(FragmentErrorKind::ExceededMaxFragments)?;
return Err(FragmentErrorKind::ExceededMaxFragments.into());
}

for fragment_id in 0..num_fragments {
Expand Down Expand Up @@ -112,16 +112,16 @@ impl Fragmentation {
// get entry of previous received fragments
let reassembly_data = match self.fragments.get_mut(fragment_header.sequence()) {
Some(val) => val,
None => Err(FragmentErrorKind::CouldNotFindFragmentById)?,
None => return Err(FragmentErrorKind::CouldNotFindFragmentById.into()),
};

// Got the data
if reassembly_data.num_fragments_total != fragment_header.fragment_count() {
Err(FragmentErrorKind::FragmentWithUnevenNumberOfFragemts)?
return Err(FragmentErrorKind::FragmentWithUnevenNumberOfFragemts.into());
}

if reassembly_data.fragments_received[usize::from(fragment_header.id())] {
Err(FragmentErrorKind::AlreadyProcessedFragment)?
return Err(FragmentErrorKind::AlreadyProcessedFragment.into());
}

// increase number of received fragments and set the specific fragment to received.
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod config;
mod either;
mod error;
mod infrastructure;
pub mod managers;
mod net;
mod packet;
mod protocol_version;
Expand All @@ -39,5 +40,7 @@ pub use self::throughput::ThroughputMonitoring;

pub use self::config::Config;
pub use self::error::{ErrorKind, Result};
pub use self::net::{LinkConditioner, Socket, SocketEvent};
pub use self::net::events::*;
Comment thread
fraillt marked this conversation as resolved.
Outdated
pub use self::net::managers::{ConnectionManager, ConnectionManagerError};
pub use self::net::{LinkConditioner, Socket, SocketEventSender};
pub use self::packet::{DeliveryGuarantee, OrderingGuarantee, Packet};
4 changes: 4 additions & 0 deletions src/managers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! This module provides socket managers.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see a short description of what a socket manager is

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you agree that I split SocketManager into two parts, as I wrote in the comment?
One would be ConnectionManagerFactory two methods accept_remote_connection, accept_local_connection, and the other would be SystemTracker?, SocketTracker?, MetricsCollector? ... you name it :) that would basically have these track_ methods.

mod simple;

pub use self::simple::SimpleSocketManager;
Loading