From 4da02213bff4d9d47309df10a0d98f6a68fe59aa Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Mon, 29 Jun 2026 14:19:48 -0700 Subject: [PATCH 1/3] Expose data track pipeline options --- include/livekit/remote_data_track.h | 25 +++++++++++++++++++++++++ src/remote_data_track.cpp | 16 ++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/include/livekit/remote_data_track.h b/include/livekit/remote_data_track.h index e74fd936..faa650d1 100644 --- a/include/livekit/remote_data_track.h +++ b/include/livekit/remote_data_track.h @@ -16,7 +16,9 @@ #pragma once +#include #include +#include #include #include "livekit/data_track_error.h" @@ -65,6 +67,29 @@ class RemoteDataTrack { /// Whether the track is still published by the remote participant. LIVEKIT_API bool isPublished() const; + /// Track-level options that configure how the incoming-frame pipeline + /// reassembles packets for this remote data track. + /// + /// Applied via setPipelineOptions(). + struct PipelineOptions { + /// Maximum number of partial frames the depacketizer will track + /// concurrently for this track. + /// + /// Defaults to 1. Higher values give more out-of-order tolerance for + /// high-frequency senders at the cost of additional buffering. Zero is + /// not a valid value; if a value of zero is provided, it will be + /// clamped to one. Leave unset to keep the current value. + std::optional max_partial_frames{std::nullopt}; + }; + + /// Configures options for the pipeline handling incoming packets for this + /// track. + /// + /// These options apply to all current and future subscriptions of this + /// track, and may be set at any time. New options take effect with the + /// next received packet. + LIVEKIT_API void setPipelineOptions(const PipelineOptions& options); + #ifdef LIVEKIT_TEST_ACCESS /// Test-only accessor for exercising lower-level FFI subscription paths. uintptr_t testFfiHandleId() const noexcept { return ffiHandleId(); } diff --git a/src/remote_data_track.cpp b/src/remote_data_track.cpp index 7c48c560..dcaadb76 100644 --- a/src/remote_data_track.cpp +++ b/src/remote_data_track.cpp @@ -46,6 +46,22 @@ bool RemoteDataTrack::isPublished() const { return resp.remote_data_track_is_published().is_published(); } +void RemoteDataTrack::setPipelineOptions(const PipelineOptions& options) { + if (!handle_.valid()) { + return; + } + + proto::FfiRequest req; + auto* msg = req.mutable_remote_data_track_set_pipeline_options(); + msg->set_track_handle(static_cast(handle_.get())); + auto* opts = msg->mutable_options(); + if (options.max_partial_frames) { + opts->set_max_partial_frames(*options.max_partial_frames); + } + + FfiClient::instance().sendRequest(req); +} + Result, SubscribeDataTrackError> RemoteDataTrack::subscribe( const DataTrackStream::Options& options) { if (!handle_.valid()) { From 4435dcfa222524b3162591254103d7688c253944 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Tue, 30 Jun 2026 12:55:49 -0700 Subject: [PATCH 2/3] Don't use nested struct --- include/livekit/remote_data_track.h | 32 ++++++++++++++--------------- src/remote_data_track.cpp | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/include/livekit/remote_data_track.h b/include/livekit/remote_data_track.h index faa650d1..85483084 100644 --- a/include/livekit/remote_data_track.h +++ b/include/livekit/remote_data_track.h @@ -34,6 +34,21 @@ namespace proto { class OwnedRemoteDataTrack; } +/// Track-level options that configure how the incoming-frame pipeline +/// reassembles packets for a remote data track. +/// +/// Applied via RemoteDataTrack::setPipelineOptions(). +struct DataTrackPipelineOptions { + /// Maximum number of partial frames the depacketizer will track + /// concurrently for this track. + /// + /// Defaults to 1. Higher values give more out-of-order tolerance for + /// high-frequency senders at the cost of additional buffering. Zero is + /// not a valid value; if a value of zero is provided, it will be + /// clamped to one. Leave unset to keep the current value. + std::optional max_partial_frames{std::nullopt}; +}; + /// Represents a data track published by a remote participant. /// /// Discovered via the DataTrackPublishedEvent room event. Unlike @@ -67,28 +82,13 @@ class RemoteDataTrack { /// Whether the track is still published by the remote participant. LIVEKIT_API bool isPublished() const; - /// Track-level options that configure how the incoming-frame pipeline - /// reassembles packets for this remote data track. - /// - /// Applied via setPipelineOptions(). - struct PipelineOptions { - /// Maximum number of partial frames the depacketizer will track - /// concurrently for this track. - /// - /// Defaults to 1. Higher values give more out-of-order tolerance for - /// high-frequency senders at the cost of additional buffering. Zero is - /// not a valid value; if a value of zero is provided, it will be - /// clamped to one. Leave unset to keep the current value. - std::optional max_partial_frames{std::nullopt}; - }; - /// Configures options for the pipeline handling incoming packets for this /// track. /// /// These options apply to all current and future subscriptions of this /// track, and may be set at any time. New options take effect with the /// next received packet. - LIVEKIT_API void setPipelineOptions(const PipelineOptions& options); + LIVEKIT_API void setPipelineOptions(const DataTrackPipelineOptions& options); #ifdef LIVEKIT_TEST_ACCESS /// Test-only accessor for exercising lower-level FFI subscription paths. diff --git a/src/remote_data_track.cpp b/src/remote_data_track.cpp index dcaadb76..2d225396 100644 --- a/src/remote_data_track.cpp +++ b/src/remote_data_track.cpp @@ -46,7 +46,7 @@ bool RemoteDataTrack::isPublished() const { return resp.remote_data_track_is_published().is_published(); } -void RemoteDataTrack::setPipelineOptions(const PipelineOptions& options) { +void RemoteDataTrack::setPipelineOptions(const DataTrackPipelineOptions& options) { if (!handle_.valid()) { return; } From 829189efd640fac81aa70cce8930c6ad18def88f Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Tue, 30 Jun 2026 16:26:25 -0700 Subject: [PATCH 3/3] Add E2E test --- src/tests/integration/test_data_track.cpp | 70 +++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/src/tests/integration/test_data_track.cpp b/src/tests/integration/test_data_track.cpp index b575c410..4a881028 100644 --- a/src/tests/integration/test_data_track.cpp +++ b/src/tests/integration/test_data_track.cpp @@ -367,6 +367,76 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { EXPECT_TRUE(remote_track_published_after_read); } +TEST_F(DataTrackE2ETest, ReceivesFramesWithCustomPipelineOptionsEndToEnd) { + constexpr std::size_t payload_len = 1024; + const auto track_name = makeTrackName("pipeline_options"); + + std::vector room_configs(2); + room_configs[0].room_options.single_peer_connection = false; + room_configs[1].room_options.single_peer_connection = false; + + DataTrackPublishedDelegate subscriber_delegate; + room_configs[1].delegate = &subscriber_delegate; + + auto rooms = testRooms(room_configs); + auto& publisher_room = rooms[0]; + const auto publisher_identity = lockLocalParticipant(*publisher_room)->identity(); + + auto local_track = requirePublishedTrack(publisher_room->localParticipant(), track_name); + ASSERT_TRUE(local_track->isPublished()); + EXPECT_FALSE(local_track->info().uses_e2ee); + EXPECT_EQ(local_track->info().name, track_name); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + EXPECT_TRUE(remote_track->isPublished()); + EXPECT_FALSE(remote_track->info().uses_e2ee); + EXPECT_EQ(remote_track->info().name, track_name); + EXPECT_EQ(remote_track->publisherIdentity(), publisher_identity); + + remote_track->setPipelineOptions({.max_partial_frames = 4}); + + auto subscribe_result = remote_track->subscribe(); + if (!subscribe_result) { + FAIL() << describeDataTrackError(subscribe_result.error()); + } + auto subscription = subscribe_result.value(); + + std::atomic keep_publishing{true}; + auto publisher = std::async(std::launch::async, [&]() { + DataTrackFrame frame; + frame.payload.assign(payload_len, kTransportPayloadValue); + while (keep_publishing.load()) { + requirePushSuccess(local_track->tryPush(frame), "Failed to push data frame"); + std::this_thread::sleep_for(50ms); + } + }); + + DataTrackFrame frame; + std::exception_ptr read_error; + try { + frame = readFrameWithTimeout(subscription, kTransportFrameTimeout); + } catch (...) { + read_error = std::current_exception(); + } + + const bool remote_track_published_after_read = remote_track->isPublished(); + keep_publishing.store(false); + subscription->close(); + local_track->unpublishDataTrack(); + + publisher.get(); + if (read_error) { + std::rethrow_exception(read_error); + } + + ASSERT_EQ(frame.payload.size(), payload_len); + EXPECT_TRUE(std::all_of(frame.payload.begin(), frame.payload.end(), + [](std::uint8_t byte) { return byte == kTransportPayloadValue; })); + EXPECT_FALSE(frame.user_timestamp.has_value()); + EXPECT_TRUE(remote_track_published_after_read); +} + TEST_F(DataTrackE2ETest, UnpublishUpdatesPublishedStateEndToEnd) { const auto track_name = makeTrackName("published_state");