-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Mqtt frames v7 #8513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mqtt frames v7 #8513
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ use crate::applayer::*; | |
| use crate::applayer::{self, LoggerFlags}; | ||
| use crate::conf::conf_get; | ||
| use crate::core::*; | ||
| use crate::frames::*; | ||
| use nom7::Err; | ||
| use std; | ||
| use std::collections::VecDeque; | ||
|
|
@@ -41,6 +42,13 @@ static mut MQTT_MAX_TX: usize = 1024; | |
|
|
||
| static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN; | ||
|
|
||
| #[derive(AppLayerFrameType)] | ||
| pub enum MQTTFrameType { | ||
| Pdu, | ||
| Header, | ||
| Data, | ||
| } | ||
|
|
||
| #[derive(FromPrimitive, Debug, AppLayerEvent)] | ||
| pub enum MQTTEvent { | ||
| MissingConnect, | ||
|
|
@@ -422,8 +430,10 @@ impl MQTTState { | |
| } | ||
| } | ||
|
|
||
| fn parse_request(&mut self, input: &[u8]) -> AppLayerResult { | ||
| fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult { | ||
| let input = stream_slice.as_slice(); | ||
| let mut current = input; | ||
|
|
||
| if input.is_empty() { | ||
| return AppLayerResult::ok(); | ||
| } | ||
|
|
@@ -455,6 +465,13 @@ impl MQTTState { | |
| SCLogDebug!("request: handling {}", current.len()); | ||
| match parse_message(current, self.protocol_version, self.max_msg_len) { | ||
| Ok((rem, msg)) => { | ||
| let _pdu = Frame::new( | ||
| flow, | ||
| &stream_slice, | ||
| input, | ||
| current.len() as i64, | ||
| MQTTFrameType::Pdu as u8, | ||
| ); | ||
| SCLogDebug!("request msg {:?}", msg); | ||
| if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { | ||
| SCLogDebug!( | ||
|
|
@@ -463,17 +480,21 @@ impl MQTTState { | |
| current.len() | ||
| ); | ||
| if trunc.skipped_length >= current.len() { | ||
| self.mqtt_frames_trunc(flow, &stream_slice, trunc, current, &msg); | ||
| self.skip_request = trunc.skipped_length - current.len(); | ||
| self.handle_msg(msg, true); | ||
| return AppLayerResult::ok(); | ||
| } else { | ||
| self.mqtt_frames_trunc(flow, &stream_slice, trunc, current, &msg); | ||
| consumed += trunc.skipped_length; | ||
| current = ¤t[trunc.skipped_length..]; | ||
| self.handle_msg(msg, true); | ||
| self.skip_request = 0; | ||
| continue; | ||
| } | ||
| } | ||
|
|
||
| self.mqtt_frames(flow, &stream_slice, &msg); | ||
| self.handle_msg(msg, false); | ||
| consumed += current.len() - rem.len(); | ||
| current = rem; | ||
|
|
@@ -497,8 +518,10 @@ impl MQTTState { | |
| return AppLayerResult::ok(); | ||
| } | ||
|
|
||
| fn parse_response(&mut self, input: &[u8]) -> AppLayerResult { | ||
| fn parse_response(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult { | ||
| let input = stream_slice.as_slice(); | ||
| let mut current = input; | ||
|
|
||
| if input.is_empty() { | ||
| return AppLayerResult::ok(); | ||
| } | ||
|
|
@@ -529,6 +552,14 @@ impl MQTTState { | |
| SCLogDebug!("response: handling {}", current.len()); | ||
| match parse_message(current, self.protocol_version, self.max_msg_len) { | ||
| Ok((rem, msg)) => { | ||
| let _pdu = Frame::new( | ||
| flow, | ||
| &stream_slice, | ||
| input, | ||
| input.len() as i64, | ||
| MQTTFrameType::Pdu as u8, | ||
| ); | ||
|
|
||
| SCLogDebug!("response msg {:?}", msg); | ||
| if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { | ||
| SCLogDebug!( | ||
|
|
@@ -537,18 +568,22 @@ impl MQTTState { | |
| current.len() | ||
| ); | ||
| if trunc.skipped_length >= current.len() { | ||
| self.mqtt_frames_trunc(flow, &stream_slice, trunc, current, &msg); | ||
| self.skip_response = trunc.skipped_length - current.len(); | ||
| self.handle_msg(msg, true); | ||
| SCLogDebug!("skip_response now {}", self.skip_response); | ||
| return AppLayerResult::ok(); | ||
| } else { | ||
| self.mqtt_frames_trunc(flow, &stream_slice, trunc, current, &msg); | ||
| consumed += trunc.skipped_length; | ||
| current = ¤t[trunc.skipped_length..]; | ||
| self.handle_msg(msg, true); | ||
| self.skip_response = 0; | ||
| continue; | ||
| } | ||
| } | ||
|
|
||
| self.mqtt_frames(flow, &stream_slice, &msg); | ||
| self.handle_msg(msg, true); | ||
| consumed += current.len() - rem.len(); | ||
| current = rem; | ||
|
|
@@ -589,6 +624,64 @@ impl MQTTState { | |
| tx.tx_data.set_event(event as u8); | ||
| self.transactions.push_back(tx); | ||
| } | ||
|
|
||
| fn mqtt_frames(&mut self, flow: *const Flow, stream_slice: &StreamSlice, input: &MQTTMessage) { | ||
| let hdr = stream_slice.as_slice(); | ||
| //MQTT payload has a fixed header of 2 bytes | ||
| let _mqtt_hdr = Frame::new(flow, stream_slice, hdr, 2, MQTTFrameType::Header as u8); | ||
| SCLogDebug!("mqtt_hdr Frame {:?}", _mqtt_hdr); | ||
| let rem_length = input.header.remaining_length as usize; | ||
| let data = &hdr[2..rem_length + 2]; | ||
| let _mqtt_data = Frame::new( | ||
| flow, | ||
| stream_slice, | ||
| data, | ||
| rem_length as i64, | ||
| MQTTFrameType::Data as u8, | ||
| ); | ||
| SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: new line |
||
| fn mqtt_frames_trunc( | ||
| &mut self, flow: *const Flow, stream_slice: &StreamSlice, input: &MQTTTruncatedData, | ||
| current: &[u8], msg: &MQTTMessage, | ||
| ) { | ||
| let hdr = stream_slice.as_slice(); | ||
| let hdr_length = input.skipped_length - msg.header.remaining_length as usize; | ||
| let _mqtt_hdr = Frame::new( | ||
| flow, | ||
| stream_slice, | ||
| hdr, | ||
| hdr_length as i64, | ||
| MQTTFrameType::Header as u8, | ||
| ); | ||
|
Comment on lines
+649
to
+656
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm curious to see what we will see in these frames. Analizing the code some more, it seems to me that the Truncated message will have all header info, but no payload parsed by Suricata.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now, one of the questions is what is interesting/important for us to expose to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so basically as far as I have understood when the length of parsed input message > max_mg_len then there are two conditions
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
All of this brings me back to your last comment ^
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have left a review on the tests PR that adds some input to this convo. |
||
| SCLogDebug!("mqtt_hdr Frame {:?}", _mqtt_hdr); | ||
|
|
||
| if input.skipped_length >= current.len() { | ||
| //taking current.len() as reference as trunc.skipped_length >= current.len() | ||
|
|
||
| let rem_length = current.len() - hdr_length; | ||
| let data = &hdr[hdr_length..current.len()]; | ||
| let _mqtt_data = Frame::new( | ||
| flow, | ||
| stream_slice, | ||
| data, | ||
| rem_length as i64, | ||
| MQTTFrameType::Data as u8, | ||
| ); | ||
|
Comment on lines
+663
to
+670
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following my previous comment, I think that if we really want to offer any insight into this truncated data, we would have to pass
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might work too. but I am passing the current buffer which also represents the full input buffer. So this is another way of exactly using the remainder stream as current = input = msg +rem. I have based my length calculations on this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From running the tests locally, your approach seems to be working. So I think we can move on ;)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on your comment on the test frames PR , I will make a new PR for frames to add trunc_data frames |
||
| SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data); | ||
| } else { | ||
| let rem_length = input.skipped_length - hdr_length; | ||
| let data = &hdr[hdr_length..rem_length + hdr_length]; | ||
| let _mqtt_data = Frame::new( | ||
| flow, | ||
| stream_slice, | ||
| data, | ||
| rem_length as i64, | ||
| MQTTFrameType::Data as u8, | ||
| ); | ||
| SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // C exports. | ||
|
|
@@ -637,20 +730,20 @@ pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void, | |
|
|
||
| #[no_mangle] | ||
| pub unsafe extern "C" fn rs_mqtt_parse_request( | ||
| _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, | ||
| flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, | ||
| stream_slice: StreamSlice, _data: *const std::os::raw::c_void, | ||
| ) -> AppLayerResult { | ||
| let state = cast_pointer!(state, MQTTState); | ||
| return state.parse_request(stream_slice.as_slice()); | ||
| return state.parse_request(flow, stream_slice); | ||
| } | ||
|
|
||
| #[no_mangle] | ||
| pub unsafe extern "C" fn rs_mqtt_parse_response( | ||
| _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, | ||
| flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void, | ||
| stream_slice: StreamSlice, _data: *const std::os::raw::c_void, | ||
| ) -> AppLayerResult { | ||
| let state = cast_pointer!(state, MQTTState); | ||
| return state.parse_response(stream_slice.as_slice()); | ||
| return state.parse_response(flow, stream_slice); | ||
| } | ||
|
|
||
| #[no_mangle] | ||
|
|
@@ -761,8 +854,8 @@ pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) { | |
| apply_tx_config: None, | ||
| flags: APP_LAYER_PARSER_OPT_UNIDIR_TXS, | ||
| truncate: None, | ||
| get_frame_id_by_name: None, | ||
| get_frame_name_by_id: None, | ||
| get_frame_id_by_name: Some(MQTTFrameType::ffi_id_from_name), | ||
| get_frame_name_by_id: Some(MQTTFrameType::ffi_name_from_id), | ||
| }; | ||
|
|
||
| let ip_proto_str = CString::new("tcp").unwrap(); | ||
|
|
@@ -783,4 +876,4 @@ pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) { | |
| } else { | ||
| SCLogDebug!("Protocol detector and parser disabled for MQTT."); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,7 @@ use nom7::IResult; | |
|
|
||
| // TODO: It might be useful to also add detection on property presence and | ||
| // content, e.g. mqtt.property: AUTHENTICATION_METHOD. | ||
| #[derive(Debug, PartialEq, PartialOrd)] | ||
| #[derive(Debug, PartialEq, Eq, PartialOrd)] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this needed by anything?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was added due to cargo clippy when I was having the issue. I will try to remove it and see if it works this time. |
||
| #[allow(non_camel_case_types)] | ||
| pub enum MQTTProperty { | ||
| UNKNOWN, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of me isn't happy with this function name, because we register the mqtt pdu frame outside of it, and so the name is a bit misleading to me. Do you have some suggestions on how could we make more clear what's going on here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to come up with something for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
mqtt_hdr_data_frames?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, this gives the idea of
header dataframes, instead of header and data frames 🤔 not sure if this is premature optimization or not.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
mqtt_hdr_and_data_frames?