From 686e933d8292ee576a71f3c4ff55660e1089a4e5 Mon Sep 17 00:00:00 2001 From: csicar Date: Sun, 3 Apr 2022 22:59:53 +0200 Subject: [PATCH 1/3] Implement a Tokio Codec for SIP --- Cargo.lock | 51 ++++++++++++++++++++ Cargo.toml | 2 + src/codec.rs | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 + 4 files changed, 185 insertions(+) create mode 100644 src/codec.rs diff --git a/Cargo.lock b/Cargo.lock index 8070ff9..362e5dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,6 +93,18 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "futures-core" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" + +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + [[package]] name = "generic-array" version = "0.14.4" @@ -132,6 +144,15 @@ version = "0.2.97" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6" +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", +] + [[package]] name = "md-5" version = "0.9.1" @@ -172,6 +193,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "pin-project-lite" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -255,6 +282,7 @@ dependencies = [ "rsip-derives", "sha2", "testing-utils", + "tokio-util", "uuid", ] @@ -307,6 +335,29 @@ dependencies = [ "rand", ] +[[package]] +name = "tokio" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +dependencies = [ + "pin-project-lite", +] + +[[package]] +name = "tokio-util" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "typenum" version = "1.13.0" diff --git a/Cargo.toml b/Cargo.toml index bb04bb9..5068908 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,9 +25,11 @@ md-5 = "0.9.1" sha2 = "0.9.5" testing-utils = { version = "0.1.1", optional = true } bstr = "0.2.17" +tokio-util = { version = "0.7.0", features = ["codec"], optional = true } [features] test-utils = ["testing-utils"] +tokio-codec =["tokio-util"] [dev-dependencies] quote = "1.0.9" diff --git a/src/codec.rs b/src/codec.rs new file mode 100644 index 0000000..9838b5a --- /dev/null +++ b/src/codec.rs @@ -0,0 +1,130 @@ +use std::convert::TryInto; +use std::{convert::TryFrom, io::Write}; + +use bytes::{BytesMut, Bytes, BufMut}; +use tokio_util::codec; + +use crate::{SipMessage, error, Request}; +use crate::message::request::Tokenizer; + + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct SipServer(); + +impl SipServer { + pub fn new() -> Self { + SipServer() + } +} + +#[derive(Debug)] +pub enum MessageError { + SipDecodeError(crate::error::Error), + + IoError(std::io::Error), +} +impl From for MessageError { + fn from(err: std::io::Error) -> Self { + MessageError::IoError(err) + } +} + +impl From for MessageError + where E : Into { + fn from(err: E) -> Self { + MessageError::SipDecodeError(err.into()) + } +} + +impl codec::Decoder for SipServer { + type Item = Request; + + type Error = MessageError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let bytes = src.clone().freeze(); + { // debugging + let s = String::from_utf8_lossy(&bytes[..]); + println!("recv bytes:"); + println!(" -----"); + for line in s.split("\r\n") { + println!(" | {}", line); + } + println!(" ----- empty: {}", bytes.is_empty()); + } + if bytes.is_empty() { + // println!("msg is empty"); + src.clear(); + return Ok(None) + } + if bytes == "\r\n\r\n".as_bytes() { + // something weird has happened: + // somehow the end sequence was received even though no message preceded it. + // This happens with some clients that send this sequence to keep a (tcp) connection alive + // => clear the end sequence and tell tokio that we did not receive a full message + src.clear(); + return Ok(None) + } + // println!("msg is not empty"); + let res = match Tokenizer::tokenize(&bytes) { + Ok((empty_rem_tokens, tok)) => { + assert_eq!(empty_rem_tokens, []); + src.clear(); + let req : Request = tok.try_into()?; + Ok(Some(req)) + }, + Err(nom::Err::Incomplete(_)) => { + println!("nom says 'incomplete!'"); + Ok(None) + }, + Err(err)=> { + Err(err.into()) + }, + }; + // println!("parsed into: {:?}", res); + res + } +} + +#[cfg(test)] +mod tests { + use std::io::Write; + + use bytes::{BytesMut, BufMut}; + use tokio_util::codec::Decoder; + + use super::SipServer; + + #[test] + fn test_decode() { + let msg = "REGISTER asd.com SIP/2.0\r\nMY-HEADER: my-value\r\n\r\n"; + let mut buffer: BytesMut = msg.into(); + let res = SipServer::new().decode(&mut buffer); + println!("decode single"); + println!("{:?}", res); + } + + #[test] + fn test_decode_multiple() { + let msg = "REGISTER asd.com SIP/2.0\r\nMY-HEADER: my-value\r\n\r\n"; + let mut buffer: BytesMut = msg.into(); + let mut codec = SipServer::new(); + let res = codec.decode(&mut buffer); + println!("{:?}", res); + + let msg2 = "REGISTER second.com SIP/2.0\r\nSND-MY-HEADER: my-snd-value\r\n\r\n"; + let mut buffer2 = msg2.into(); + let res = codec.decode(&mut buffer2); + println!("decode second {:?}", res); + } +} + +impl codec::Encoder for SipServer { + type Error = std::io::Error; + + fn encode(&mut self, item: SipMessage, dst: &mut BytesMut) -> Result<(), Self::Error> { + let bytes: Bytes = item.into(); + dst.writer().write(&bytes[..])?; + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 0059010..7a4b938 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -264,6 +264,8 @@ mod error; pub mod headers; pub mod message; pub mod services; +#[cfg(feature = "tokio-codec")] +pub mod codec; pub use error::{Error, TokenizerError}; From 0a6647d83655617a2007f94c9345cbc6c2055ccb Mon Sep 17 00:00:00 2001 From: csicar Date: Sun, 3 Apr 2022 23:07:10 +0200 Subject: [PATCH 2/3] Fix formatting --- src/codec.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/codec.rs b/src/codec.rs index 9838b5a..ebcf07f 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -1,12 +1,11 @@ use std::convert::TryInto; use std::{convert::TryFrom, io::Write}; -use bytes::{BytesMut, Bytes, BufMut}; +use bytes::{BufMut, Bytes, BytesMut}; use tokio_util::codec; -use crate::{SipMessage, error, Request}; use crate::message::request::Tokenizer; - +use crate::{error, Request, SipMessage}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct SipServer(); @@ -29,8 +28,10 @@ impl From for MessageError { } } -impl From for MessageError - where E : Into { +impl From for MessageError +where + E: Into, +{ fn from(err: E) -> Self { MessageError::SipDecodeError(err.into()) } @@ -43,7 +44,8 @@ impl codec::Decoder for SipServer { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { let bytes = src.clone().freeze(); - { // debugging + { + // debugging let s = String::from_utf8_lossy(&bytes[..]); println!("recv bytes:"); println!(" -----"); @@ -55,31 +57,29 @@ impl codec::Decoder for SipServer { if bytes.is_empty() { // println!("msg is empty"); src.clear(); - return Ok(None) + return Ok(None); } if bytes == "\r\n\r\n".as_bytes() { - // something weird has happened: + // something weird has happened: // somehow the end sequence was received even though no message preceded it. // This happens with some clients that send this sequence to keep a (tcp) connection alive // => clear the end sequence and tell tokio that we did not receive a full message src.clear(); - return Ok(None) + return Ok(None); } // println!("msg is not empty"); let res = match Tokenizer::tokenize(&bytes) { Ok((empty_rem_tokens, tok)) => { assert_eq!(empty_rem_tokens, []); src.clear(); - let req : Request = tok.try_into()?; + let req: Request = tok.try_into()?; Ok(Some(req)) - }, + } Err(nom::Err::Incomplete(_)) => { println!("nom says 'incomplete!'"); Ok(None) - }, - Err(err)=> { - Err(err.into()) - }, + } + Err(err) => Err(err.into()), }; // println!("parsed into: {:?}", res); res @@ -90,7 +90,7 @@ impl codec::Decoder for SipServer { mod tests { use std::io::Write; - use bytes::{BytesMut, BufMut}; + use bytes::{BufMut, BytesMut}; use tokio_util::codec::Decoder; use super::SipServer; @@ -111,7 +111,7 @@ mod tests { let mut codec = SipServer::new(); let res = codec.decode(&mut buffer); println!("{:?}", res); - + let msg2 = "REGISTER second.com SIP/2.0\r\nSND-MY-HEADER: my-snd-value\r\n\r\n"; let mut buffer2 = msg2.into(); let res = codec.decode(&mut buffer2); From 3b88680de09541e1fa1c4f0c3b984fdfe896c978 Mon Sep 17 00:00:00 2001 From: csicar Date: Sun, 3 Apr 2022 23:08:36 +0200 Subject: [PATCH 3/3] Fix formatting in lib.rs --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 7a4b938..dd583c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -264,7 +264,7 @@ mod error; pub mod headers; pub mod message; pub mod services; -#[cfg(feature = "tokio-codec")] +// #[cfg(feature = "tokio-codec")] pub mod codec; pub use error::{Error, TokenizerError};