Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 101 additions & 8 deletions rust/src/mqtt/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::applayer::*;
use crate::applayer::{self, LoggerFlags};
use crate::conf::conf_get;
use crate::core::*;
use crate::frames::*;
use nom7::Err;
use std;
use std::collections::VecDeque;
Expand All @@ -41,6 +42,13 @@ static mut MQTT_MAX_TX: usize = 1024;

static mut ALPROTO_MQTT: AppProto = ALPROTO_UNKNOWN;

#[derive(AppLayerFrameType)]
pub enum MQTTFrameType {
Pdu,
Header,
Data,
}

#[derive(FromPrimitive, Debug, AppLayerEvent)]
pub enum MQTTEvent {
MissingConnect,
Expand Down Expand Up @@ -422,8 +430,10 @@ impl MQTTState {
}
}

fn parse_request(&mut self, input: &[u8]) -> AppLayerResult {
fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult {
let input = stream_slice.as_slice();
let mut current = input;

if input.is_empty() {
return AppLayerResult::ok();
}
Expand Down Expand Up @@ -455,6 +465,13 @@ impl MQTTState {
SCLogDebug!("request: handling {}", current.len());
match parse_message(current, self.protocol_version, self.max_msg_len) {
Ok((rem, msg)) => {
let _pdu = Frame::new(
flow,
&stream_slice,
input,
current.len() as i64,
MQTTFrameType::Pdu as u8,
);
SCLogDebug!("request msg {:?}", msg);
if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
SCLogDebug!(
Expand All @@ -463,17 +480,21 @@ impl MQTTState {
current.len()
);
if trunc.skipped_length >= current.len() {
self.mqtt_frames_trunc(flow, &stream_slice, trunc, current, &msg);
self.skip_request = trunc.skipped_length - current.len();
self.handle_msg(msg, true);
return AppLayerResult::ok();
} else {
self.mqtt_frames_trunc(flow, &stream_slice, trunc, current, &msg);
consumed += trunc.skipped_length;
current = &current[trunc.skipped_length..];
self.handle_msg(msg, true);
self.skip_request = 0;
continue;
}
}

self.mqtt_frames(flow, &stream_slice, &msg);
self.handle_msg(msg, false);
consumed += current.len() - rem.len();
current = rem;
Expand All @@ -497,8 +518,10 @@ impl MQTTState {
return AppLayerResult::ok();
}

fn parse_response(&mut self, input: &[u8]) -> AppLayerResult {
fn parse_response(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult {
let input = stream_slice.as_slice();
let mut current = input;

if input.is_empty() {
return AppLayerResult::ok();
}
Expand Down Expand Up @@ -529,6 +552,14 @@ impl MQTTState {
SCLogDebug!("response: handling {}", current.len());
match parse_message(current, self.protocol_version, self.max_msg_len) {
Ok((rem, msg)) => {
let _pdu = Frame::new(
flow,
&stream_slice,
input,
input.len() as i64,
MQTTFrameType::Pdu as u8,
);

SCLogDebug!("response msg {:?}", msg);
if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
SCLogDebug!(
Expand All @@ -537,18 +568,22 @@ impl MQTTState {
current.len()
);
if trunc.skipped_length >= current.len() {
self.mqtt_frames_trunc(flow, &stream_slice, trunc, current, &msg);
self.skip_response = trunc.skipped_length - current.len();
self.handle_msg(msg, true);
SCLogDebug!("skip_response now {}", self.skip_response);
return AppLayerResult::ok();
} else {
self.mqtt_frames_trunc(flow, &stream_slice, trunc, current, &msg);
consumed += trunc.skipped_length;
current = &current[trunc.skipped_length..];
self.handle_msg(msg, true);
self.skip_response = 0;
continue;
}
}

self.mqtt_frames(flow, &stream_slice, &msg);
self.handle_msg(msg, true);
consumed += current.len() - rem.len();
current = rem;
Expand Down Expand Up @@ -589,6 +624,64 @@ impl MQTTState {
tx.tx_data.set_event(event as u8);
self.transactions.push_back(tx);
}

fn mqtt_frames(&mut self, flow: *const Flow, stream_slice: &StreamSlice, input: &MQTTMessage) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of me isn't happy with this function name, because we register the mqtt pdu frame outside of it, and so the name is a bit misleading to me. Do you have some suggestions on how could we make more clear what's going on here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to come up with something for this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about mqtt_hdr_data_frames ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, this gives the idea of header data frames, instead of header and data frames 🤔 not sure if this is premature optimization or not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about mqtt_hdr_and_data_frames ?

let hdr = stream_slice.as_slice();
//MQTT payload has a fixed header of 2 bytes
let _mqtt_hdr = Frame::new(flow, stream_slice, hdr, 2, MQTTFrameType::Header as u8);
SCLogDebug!("mqtt_hdr Frame {:?}", _mqtt_hdr);
let rem_length = input.header.remaining_length as usize;
let data = &hdr[2..rem_length + 2];
let _mqtt_data = Frame::new(
flow,
stream_slice,
data,
rem_length as i64,
MQTTFrameType::Data as u8,
);
SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line

fn mqtt_frames_trunc(
&mut self, flow: *const Flow, stream_slice: &StreamSlice, input: &MQTTTruncatedData,
current: &[u8], msg: &MQTTMessage,
) {
let hdr = stream_slice.as_slice();
let hdr_length = input.skipped_length - msg.header.remaining_length as usize;
let _mqtt_hdr = Frame::new(
flow,
stream_slice,
hdr,
hdr_length as i64,
MQTTFrameType::Header as u8,
);
Comment on lines +649 to +656
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious to see what we will see in these frames.

Analizing the code some more, it seems to me that the Truncated message will have all header info, but no payload parsed by Suricata.
So, I'm thinking that maybe this is a special Frame type where folks only have access to info in the header, or, if we really wanted to give some insight into the (partial) truncated data, we could maybe use the remainder stream of the parsed message to register the data frame, and in that case the length would be mqtt max msg length.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, one of the questions is what is interesting/important for us to expose to mqtt frames...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so basically as far as I have understood when the length of parsed input message > max_mg_len then there are two conditions

  1. Where it is greater than the current buffer meaning data exceeds the boundary of parsed as well as the current buffer. here I am trying to parse header and whatever remaining length is in current buffer I call it data
  2. Where it exceeds the parsed message buffer but still lies within the boundary of the current overall buffer. here I use the skipped length to first extract the header and then remain portion of skipped length is data

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, one of the questions is what is interesting/important for us to expose to mqtt frames...

All of this brings me back to your last comment ^

Copy link
Copy Markdown
Contributor

@jufajardini jufajardini Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left a review on the tests PR that adds some input to this convo.

SCLogDebug!("mqtt_hdr Frame {:?}", _mqtt_hdr);

if input.skipped_length >= current.len() {
//taking current.len() as reference as trunc.skipped_length >= current.len()

let rem_length = current.len() - hdr_length;
let data = &hdr[hdr_length..current.len()];
let _mqtt_data = Frame::new(
flow,
stream_slice,
data,
rem_length as i64,
MQTTFrameType::Data as u8,
);
Comment on lines +663 to +670
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following my previous comment, I think that if we really want to offer any insight into this truncated data, we would have to pass rem (https://github.com/OISF/suricata/blob/master/rust/src/mqtt/mqtt.rs#L457) to mqtt_frames_trunc (cf https://github.com/OISF/suricata/blob/master/rust/src/mqtt/parser.rs#L663)... 🤔

Copy link
Copy Markdown
Contributor Author

@hsadia538 hsadia538 Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might work too. but I am passing the current buffer which also represents the full input buffer. So this is another way of exactly using the remainder stream as current = input = msg +rem. I have based my length calculations on this.
Do you still want me to look into this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From running the tests locally, your approach seems to be working. So I think we can move on ;)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your comment on the test frames PR , I will make a new PR for frames to add trunc_data frames

SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data);
} else {
let rem_length = input.skipped_length - hdr_length;
let data = &hdr[hdr_length..rem_length + hdr_length];
let _mqtt_data = Frame::new(
flow,
stream_slice,
data,
rem_length as i64,
MQTTFrameType::Data as u8,
);
SCLogDebug!("mqtt_data Frame {:?}", _mqtt_data);
}
}
}

// C exports.
Expand Down Expand Up @@ -637,20 +730,20 @@ pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void,

#[no_mangle]
pub unsafe extern "C" fn rs_mqtt_parse_request(
_flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
) -> AppLayerResult {
let state = cast_pointer!(state, MQTTState);
return state.parse_request(stream_slice.as_slice());
return state.parse_request(flow, stream_slice);
}

#[no_mangle]
pub unsafe extern "C" fn rs_mqtt_parse_response(
_flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
) -> AppLayerResult {
let state = cast_pointer!(state, MQTTState);
return state.parse_response(stream_slice.as_slice());
return state.parse_response(flow, stream_slice);
}

#[no_mangle]
Expand Down Expand Up @@ -761,8 +854,8 @@ pub unsafe extern "C" fn rs_mqtt_register_parser(cfg_max_msg_len: u32) {
apply_tx_config: None,
flags: APP_LAYER_PARSER_OPT_UNIDIR_TXS,
truncate: None,
get_frame_id_by_name: None,
get_frame_name_by_id: None,
get_frame_id_by_name: Some(MQTTFrameType::ffi_id_from_name),
get_frame_name_by_id: Some(MQTTFrameType::ffi_name_from_id),
};

let ip_proto_str = CString::new("tcp").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion rust/src/mqtt/mqtt_property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use nom7::IResult;

// TODO: It might be useful to also add detection on property presence and
// content, e.g. mqtt.property: AUTHENTICATION_METHOD.
#[derive(Debug, PartialEq, PartialOrd)]
#[derive(Debug, PartialEq, Eq, PartialOrd)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this needed by anything?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was added due to cargo clippy when I was having the issue. I will try to remove it and see if it works this time.

#[allow(non_camel_case_types)]
pub enum MQTTProperty {
UNKNOWN,
Expand Down