diff --git a/cmake/Dependencies.cmake b/cmake/Dependencies.cmake index bce5c33..6ef202c 100644 --- a/cmake/Dependencies.cmake +++ b/cmake/Dependencies.cmake @@ -14,7 +14,7 @@ FetchContent_MakeAvailable(googletest) FetchContent_Declare( liblsl GIT_REPOSITORY https://github.com/sccn/liblsl.git - GIT_TAG v1.16.2 + GIT_TAG v1.17.7 ) set(LSL_BUILD_EXAMPLES OFF CACHE BOOL "" FORCE) FetchContent_MakeAvailable(liblsl) diff --git a/include/EEGData.hpp b/include/EEGData.hpp new file mode 100644 index 0000000..12524cb --- /dev/null +++ b/include/EEGData.hpp @@ -0,0 +1,20 @@ +#ifndef EEGDATA_HPP +#define EEGDATA_HPP + +#include +#include + +struct EEGData { + std::vector channels; + double timestamp = 0.0; + + EEGData() = default; + + EEGData(const std::vector& channelValues, double sampleTimestamp) + : channels(channelValues), timestamp(sampleTimestamp) {} + + EEGData(std::vector&& channelValues, double sampleTimestamp) + : channels(std::move(channelValues)), timestamp(sampleTimestamp) {} +}; + +#endif // EEGDATA_HPP \ No newline at end of file diff --git a/include/datawriter/CSVFormatStrategy.hpp b/include/datawriter/CSVFormatStrategy.hpp new file mode 100644 index 0000000..b36cb9c --- /dev/null +++ b/include/datawriter/CSVFormatStrategy.hpp @@ -0,0 +1,16 @@ +#ifndef CSVFORMATSTRATEGY_HPP +#define CSVFORMATSTRATEGY_HPP + +#include + +class CSVFormatStrategy : public IDataFormatStrategy { + public: + CSVFormatStrategy() = default; + ~CSVFormatStrategy() override = default; + + void writeHeader(std::ofstream& outputFile) const override; + void writeEEGData(std::ofstream& outputFile, const EEGData& data) const override; + void writeMarker(std::ofstream& outputFile, const Marker& marker) const override; +}; + +#endif // CSVFORMATSTRATEGY_HPP \ No newline at end of file diff --git a/include/datawriter/DataWriter.hpp b/include/datawriter/DataWriter.hpp new file mode 100644 index 0000000..ed86dd1 --- /dev/null +++ b/include/datawriter/DataWriter.hpp @@ -0,0 +1,41 @@ +#ifndef DATAWRITER_HPP +#define DATAWRITER_HPP + +#include + +#include + +class EEGData; +class Marker; + +#include +#include +#include +#include + +class DataWriter { + public: + DataWriter(); + ~DataWriter(); + + DataWriter(const DataWriter&) = delete; + DataWriter& operator=(const DataWriter&) = delete; + DataWriter(DataWriter&&) = delete; + DataWriter& operator=(DataWriter&&) = delete; + + void start(const std::string& filePath, + std::shared_ptr> eegQueue, + std::shared_ptr> markerQueue); + void stop(); + + private: + void writeLoop(const std::stop_token& stopToken); + + std::ofstream outputFile; + std::unique_ptr formatStrategy; + std::shared_ptr> eegQueue; + std::shared_ptr> markerQueue; + std::jthread writerThread; +}; + +#endif // DATAWRITER_HPP \ No newline at end of file diff --git a/include/datawriter/IDataFormatStrategy.hpp b/include/datawriter/IDataFormatStrategy.hpp new file mode 100644 index 0000000..54e865b --- /dev/null +++ b/include/datawriter/IDataFormatStrategy.hpp @@ -0,0 +1,24 @@ +#ifndef IDATAFORMATSTRATEGY_HPP +#define IDATAFORMATSTRATEGY_HPP + +#include + +class EEGData; +struct Marker; + +class IDataFormatStrategy { + public: + IDataFormatStrategy() = default; + virtual ~IDataFormatStrategy() = default; + + IDataFormatStrategy(const IDataFormatStrategy&) = delete; + IDataFormatStrategy& operator=(const IDataFormatStrategy&) = delete; + IDataFormatStrategy(IDataFormatStrategy&&) = delete; + IDataFormatStrategy& operator=(IDataFormatStrategy&&) = delete; + + virtual void writeHeader(std::ofstream& outputFile) const = 0; + virtual void writeEEGData(std::ofstream& outputFile, const EEGData& data) const = 0; + virtual void writeMarker(std::ofstream& outputFile, const Marker& marker) const = 0; +}; + +#endif // IDATAFORMATSTRATEGY_HPP \ No newline at end of file diff --git a/include/datawriter/Marker.hpp b/include/datawriter/Marker.hpp new file mode 100644 index 0000000..acd4ade --- /dev/null +++ b/include/datawriter/Marker.hpp @@ -0,0 +1,20 @@ +#ifndef MARKER_HPP +#define MARKER_HPP + +#include +#include + +struct Marker { + std::string eventName; + double timestamp = 0.0; + + Marker() = default; + + Marker(const std::string& name, double markerTimestamp) + : eventName(name), timestamp(markerTimestamp) {} + + Marker(std::string&& name, double markerTimestamp) + : eventName(std::move(name)), timestamp(markerTimestamp) {} +}; + +#endif // MARKER_HPP \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 62cd34d..61bd2b7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,6 +2,8 @@ protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${CMAKE_CURRENT_SOURCE_DIR}/../proto add_library(runtime_core OBJECT Runtime.cpp + datawriter/CSVFormatStrategy.cpp + datawriter/DataWriter.cpp parser/Parser.cpp scene/components/ComponentRegistry.cpp scene/components/BlinkComponent.cpp diff --git a/src/datawriter/CSVFormatStrategy.cpp b/src/datawriter/CSVFormatStrategy.cpp new file mode 100644 index 0000000..25d722f --- /dev/null +++ b/src/datawriter/CSVFormatStrategy.cpp @@ -0,0 +1,22 @@ +#include +#include +#include + +void CSVFormatStrategy::writeHeader(std::ofstream& outputFile) const { + outputFile << "type,timestamp,payload\n"; +} + +void CSVFormatStrategy::writeEEGData(std::ofstream& outputFile, const EEGData& data) const { + outputFile << "eeg," << data.timestamp << ",\""; + for (std::size_t index = 0; index < data.channels.size(); ++index) { + if (index > 0) { + outputFile << ','; + } + outputFile << data.channels[index]; + } + outputFile << '"' << '\n'; +} + +void CSVFormatStrategy::writeMarker(std::ofstream& outputFile, const Marker& marker) const { + outputFile << "marker," << marker.timestamp << ",\"" << marker.eventName << '"' << '\n'; +} \ No newline at end of file diff --git a/src/datawriter/DataWriter.cpp b/src/datawriter/DataWriter.cpp new file mode 100644 index 0000000..821719e --- /dev/null +++ b/src/datawriter/DataWriter.cpp @@ -0,0 +1,89 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +constexpr auto kWriteLoopSleep = std::chrono::milliseconds(10); + +template +bool drainQueue(const std::shared_ptr& queue, WriteFn&& writeFn) { + bool wroteData = false; + + if (queue) { + ItemT item; + while (queue->try_dequeue(item)) { + writeFn(item); + wroteData = true; + } + } + + return wroteData; +} + +DataWriter::DataWriter() : formatStrategy(std::make_unique()) {} + +DataWriter::~DataWriter() { stop(); } + +void DataWriter::start(const std::string& filePath, + std::shared_ptr> eegQueue, + std::shared_ptr> markerQueue) { + stop(); + + if (!formatStrategy) { + throw std::runtime_error("No data format strategy set for DataWriter."); + } + + outputFile.open(filePath, std::ios::out | std::ios::trunc); + if (!outputFile.is_open()) { + throw std::runtime_error("Failed to open data writer output file: " + filePath); + } + + this->eegQueue = std::move(eegQueue); + this->markerQueue = std::move(markerQueue); + + formatStrategy->writeHeader(outputFile); + writerThread = std::jthread([this](const std::stop_token& stopToken) { writeLoop(stopToken); }); +} + +void DataWriter::stop() { + if (writerThread.joinable()) { + writerThread.request_stop(); + } + + if (writerThread.joinable()) { + writerThread.join(); + } + + if (outputFile.is_open()) { + outputFile.flush(); + outputFile.close(); + } +} + +void DataWriter::writeLoop(const std::stop_token& stopToken) { + while (!stopToken.stop_requested()) { + bool wroteData = drainQueue, EEGData>( + eegQueue, + [this](const EEGData& eegData) { formatStrategy->writeEEGData(outputFile, eegData); }); + + wroteData |= drainQueue, Marker>( + markerQueue, + [this](const Marker& marker) { formatStrategy->writeMarker(outputFile, marker); }); + + if (!wroteData) { + std::this_thread::sleep_for(kWriteLoopSleep); + } + } + + drainQueue, EEGData>( + eegQueue, + [this](const EEGData& eegData) { formatStrategy->writeEEGData(outputFile, eegData); }); + drainQueue, Marker>( + markerQueue, + [this](const Marker& marker) { formatStrategy->writeMarker(outputFile, marker); }); +} \ No newline at end of file diff --git a/tests/unit_tests/data_writer_test.cpp b/tests/unit_tests/data_writer_test.cpp new file mode 100644 index 0000000..8c070b1 --- /dev/null +++ b/tests/unit_tests/data_writer_test.cpp @@ -0,0 +1,85 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { +namespace fs = std::filesystem; + +constexpr float kChannelOne = 1.25F; +constexpr float kChannelTwo = 2.5F; +constexpr float kChannelThree = 3.75F; +constexpr double kEegTimestamp = 12.5; +constexpr double kMarkerTimestamp = 13.25; +constexpr auto kWriteWait = std::chrono::milliseconds(50); + +fs::path makeTempFilePath(const std::string& nameStem) { + const auto uniqueSuffix = + std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()); + return fs::temp_directory_path() / (nameStem + "_" + uniqueSuffix + ".csv"); +} + +std::vector readAllLines(const fs::path& filePath) { + std::ifstream input(filePath); + std::vector lines; + std::string line; + + while (std::getline(input, line)) { + lines.push_back(line); + } + + return lines; +} +} // namespace + +TEST(DataWriterTest, WritesCsvHeaderOnStart) { + const auto filePath = makeTempFilePath("datawriter_header"); + + auto eegQueue = std::make_shared>(); + auto markerQueue = std::make_shared>(); + + { + DataWriter writer; + writer.start(filePath.string(), eegQueue, markerQueue); + writer.stop(); + } + + const auto lines = readAllLines(filePath); + ASSERT_FALSE(lines.empty()); + EXPECT_EQ(lines.front(), "type,timestamp,payload"); + + fs::remove(filePath); +} + +TEST(DataWriterTest, FlushesEegAndMarkerRecords) { + const auto filePath = makeTempFilePath("datawriter_records"); + + auto eegQueue = std::make_shared>(); + auto markerQueue = std::make_shared>(); + + eegQueue->enqueue(EEGData{{kChannelOne, kChannelTwo, kChannelThree}, kEegTimestamp}); + markerQueue->enqueue(Marker{"stimulus_on", kMarkerTimestamp}); + + { + DataWriter writer; + writer.start(filePath.string(), eegQueue, markerQueue); + std::this_thread::sleep_for(kWriteWait); + writer.stop(); + } + + const auto lines = readAllLines(filePath); + ASSERT_GE(lines.size(), 3U); + EXPECT_EQ(lines[0], "type,timestamp,payload"); + EXPECT_EQ(lines[1], "eeg,12.5,\"1.25,2.5,3.75\""); + EXPECT_EQ(lines[2], "marker,13.25,\"stimulus_on\""); + + fs::remove(filePath); +}