-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Create datawriter class #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2be929c
291d970
c89cd8e
d9d35de
7ec3125
fa5daac
510683e
08cdd71
61dbc73
789bbaf
a3f7e9e
9ca2a86
2ebaff7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| #ifndef EEGDATA_HPP | ||
| #define EEGDATA_HPP | ||
|
|
||
| #include <utility> | ||
| #include <vector> | ||
|
|
||
| struct EEGData { | ||
| std::vector<float> channels; | ||
| double timestamp = 0.0; | ||
|
|
||
| EEGData() = default; | ||
|
|
||
| EEGData(const std::vector<float>& channelValues, double sampleTimestamp) | ||
| : channels(channelValues), timestamp(sampleTimestamp) {} | ||
|
|
||
| EEGData(std::vector<float>&& channelValues, double sampleTimestamp) | ||
| : channels(std::move(channelValues)), timestamp(sampleTimestamp) {} | ||
| }; | ||
|
|
||
| #endif // EEGDATA_HPP | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| #ifndef CSVFORMATSTRATEGY_HPP | ||
| #define CSVFORMATSTRATEGY_HPP | ||
|
|
||
| #include <datawriter/IDataFormatStrategy.hpp> | ||
|
|
||
| class CSVFormatStrategy : public IDataFormatStrategy { | ||
| public: | ||
| CSVFormatStrategy() = default; | ||
| ~CSVFormatStrategy() override = default; | ||
|
|
||
| void writeHeader(std::ofstream& outputFile) const override; | ||
| void writeEEGData(std::ofstream& outputFile, const EEGData& data) const override; | ||
| void writeMarker(std::ofstream& outputFile, const Marker& marker) const override; | ||
| }; | ||
|
|
||
| #endif // CSVFORMATSTRATEGY_HPP |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| #ifndef DATAWRITER_HPP | ||
| #define DATAWRITER_HPP | ||
|
|
||
| #include <concurrentqueue.h> | ||
|
|
||
| #include <datawriter/IDataFormatStrategy.hpp> | ||
|
|
||
| class EEGData; | ||
| class Marker; | ||
|
|
||
| #include <fstream> | ||
| #include <memory> | ||
| #include <string> | ||
| #include <thread> | ||
|
|
||
| class DataWriter { | ||
| public: | ||
| DataWriter(); | ||
| ~DataWriter(); | ||
|
|
||
| DataWriter(const DataWriter&) = delete; | ||
| DataWriter& operator=(const DataWriter&) = delete; | ||
| DataWriter(DataWriter&&) = delete; | ||
| DataWriter& operator=(DataWriter&&) = delete; | ||
|
|
||
| void start(const std::string& filePath, | ||
| std::shared_ptr<moodycamel::ConcurrentQueue<EEGData>> eegQueue, | ||
| std::shared_ptr<moodycamel::ConcurrentQueue<Marker>> markerQueue); | ||
| void stop(); | ||
|
|
||
| private: | ||
| void writeLoop(const std::stop_token& stopToken); | ||
|
|
||
| std::ofstream outputFile; | ||
| std::unique_ptr<IDataFormatStrategy> formatStrategy; | ||
| std::shared_ptr<moodycamel::ConcurrentQueue<EEGData>> eegQueue; | ||
| std::shared_ptr<moodycamel::ConcurrentQueue<Marker>> markerQueue; | ||
| std::jthread writerThread; | ||
| }; | ||
|
|
||
| #endif // DATAWRITER_HPP |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| #ifndef IDATAFORMATSTRATEGY_HPP | ||
| #define IDATAFORMATSTRATEGY_HPP | ||
|
|
||
| #include <fstream> | ||
|
|
||
| class EEGData; | ||
| struct Marker; | ||
|
|
||
| class IDataFormatStrategy { | ||
| public: | ||
| IDataFormatStrategy() = default; | ||
| virtual ~IDataFormatStrategy() = default; | ||
|
|
||
| IDataFormatStrategy(const IDataFormatStrategy&) = delete; | ||
| IDataFormatStrategy& operator=(const IDataFormatStrategy&) = delete; | ||
| IDataFormatStrategy(IDataFormatStrategy&&) = delete; | ||
| IDataFormatStrategy& operator=(IDataFormatStrategy&&) = delete; | ||
|
|
||
| virtual void writeHeader(std::ofstream& outputFile) const = 0; | ||
| virtual void writeEEGData(std::ofstream& outputFile, const EEGData& data) const = 0; | ||
| virtual void writeMarker(std::ofstream& outputFile, const Marker& marker) const = 0; | ||
| }; | ||
|
Comment on lines
+19
to
+22
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue is that you force every strategy to use std::ofstream as a way to save data which is not good, because in other strategies like FifFormatStrategy we're (most likely) going to use Fiffstream to save our data. I think it would be better to add open(std::string filepath) function as I suggested in the UML diagram. |
||
|
|
||
| #endif // IDATAFORMATSTRATEGY_HPP | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| #ifndef MARKER_HPP | ||
| #define MARKER_HPP | ||
|
|
||
| #include <string> | ||
| #include <utility> | ||
|
|
||
| struct Marker { | ||
| std::string eventName; | ||
| double timestamp = 0.0; | ||
|
|
||
| Marker() = default; | ||
|
|
||
| Marker(const std::string& name, double markerTimestamp) | ||
| : eventName(name), timestamp(markerTimestamp) {} | ||
|
|
||
| Marker(std::string&& name, double markerTimestamp) | ||
| : eventName(std::move(name)), timestamp(markerTimestamp) {} | ||
| }; | ||
|
|
||
| #endif // MARKER_HPP |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| #include <EEGData.hpp> | ||
| #include <datawriter/CSVFormatStrategy.hpp> | ||
| #include <datawriter/Marker.hpp> | ||
|
|
||
| void CSVFormatStrategy::writeHeader(std::ofstream& outputFile) const { | ||
| outputFile << "type,timestamp,payload\n"; | ||
| } | ||
|
|
||
| void CSVFormatStrategy::writeEEGData(std::ofstream& outputFile, const EEGData& data) const { | ||
| outputFile << "eeg," << data.timestamp << ",\""; | ||
| for (std::size_t index = 0; index < data.channels.size(); ++index) { | ||
| if (index > 0) { | ||
| outputFile << ','; | ||
| } | ||
| outputFile << data.channels[index]; | ||
| } | ||
| outputFile << '"' << '\n'; | ||
| } | ||
|
|
||
| void CSVFormatStrategy::writeMarker(std::ofstream& outputFile, const Marker& marker) const { | ||
| outputFile << "marker," << marker.timestamp << ",\"" << marker.eventName << '"' << '\n'; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| #include <EEGData.hpp> | ||
| #include <chrono> | ||
| #include <datawriter/CSVFormatStrategy.hpp> | ||
| #include <datawriter/DataWriter.hpp> | ||
| #include <datawriter/IDataFormatStrategy.hpp> | ||
| #include <datawriter/Marker.hpp> | ||
| #include <stdexcept> | ||
| #include <thread> | ||
| #include <utility> | ||
|
|
||
| constexpr auto kWriteLoopSleep = std::chrono::milliseconds(10); | ||
|
|
||
| template <typename QueueT, typename ItemT, typename WriteFn> | ||
| bool drainQueue(const std::shared_ptr<QueueT>& queue, WriteFn&& writeFn) { | ||
| bool wroteData = false; | ||
|
|
||
| if (queue) { | ||
| ItemT item; | ||
| while (queue->try_dequeue(item)) { | ||
| writeFn(item); | ||
| wroteData = true; | ||
| } | ||
| } | ||
|
|
||
| return wroteData; | ||
| } | ||
|
|
||
| DataWriter::DataWriter() : formatStrategy(std::make_unique<CSVFormatStrategy>()) {} | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pass IDataFormatStrategy as argument for DataWriter constructor, delete default constructor. |
||
|
|
||
| DataWriter::~DataWriter() { stop(); } | ||
|
|
||
| void DataWriter::start(const std::string& filePath, | ||
| std::shared_ptr<moodycamel::ConcurrentQueue<EEGData>> eegQueue, | ||
| std::shared_ptr<moodycamel::ConcurrentQueue<Marker>> markerQueue) { | ||
| stop(); | ||
|
|
||
| if (!formatStrategy) { | ||
| throw std::runtime_error("No data format strategy set for DataWriter."); | ||
| } | ||
|
|
||
| outputFile.open(filePath, std::ios::out | std::ios::trunc); | ||
| if (!outputFile.is_open()) { | ||
| throw std::runtime_error("Failed to open data writer output file: " + filePath); | ||
| } | ||
|
|
||
| this->eegQueue = std::move(eegQueue); | ||
| this->markerQueue = std::move(markerQueue); | ||
|
|
||
| formatStrategy->writeHeader(outputFile); | ||
| writerThread = std::jthread([this](const std::stop_token& stopToken) { writeLoop(stopToken); }); | ||
| } | ||
|
|
||
| void DataWriter::stop() { | ||
| if (writerThread.joinable()) { | ||
| writerThread.request_stop(); | ||
| } | ||
|
|
||
| if (writerThread.joinable()) { | ||
| writerThread.join(); | ||
| } | ||
|
|
||
| if (outputFile.is_open()) { | ||
| outputFile.flush(); | ||
| outputFile.close(); | ||
| } | ||
| } | ||
|
|
||
| void DataWriter::writeLoop(const std::stop_token& stopToken) { | ||
| while (!stopToken.stop_requested()) { | ||
| bool wroteData = drainQueue<moodycamel::ConcurrentQueue<EEGData>, EEGData>( | ||
| eegQueue, | ||
| [this](const EEGData& eegData) { formatStrategy->writeEEGData(outputFile, eegData); }); | ||
|
|
||
| wroteData |= drainQueue<moodycamel::ConcurrentQueue<Marker>, Marker>( | ||
| markerQueue, | ||
| [this](const Marker& marker) { formatStrategy->writeMarker(outputFile, marker); }); | ||
|
|
||
| if (!wroteData) { | ||
| std::this_thread::sleep_for(kWriteLoopSleep); | ||
| } | ||
| } | ||
|
|
||
| drainQueue<moodycamel::ConcurrentQueue<EEGData>, EEGData>( | ||
| eegQueue, | ||
| [this](const EEGData& eegData) { formatStrategy->writeEEGData(outputFile, eegData); }); | ||
| drainQueue<moodycamel::ConcurrentQueue<Marker>, Marker>( | ||
| markerQueue, | ||
| [this](const Marker& marker) { formatStrategy->writeMarker(outputFile, marker); }); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| #include <gtest/gtest.h> | ||
|
|
||
| #include <EEGData.hpp> | ||
| #include <chrono> | ||
| #include <datawriter/DataWriter.hpp> | ||
| #include <datawriter/Marker.hpp> | ||
| #include <filesystem> | ||
| #include <fstream> | ||
| #include <memory> | ||
| #include <string> | ||
| #include <thread> | ||
| #include <vector> | ||
|
|
||
| namespace { | ||
| namespace fs = std::filesystem; | ||
|
|
||
| constexpr float kChannelOne = 1.25F; | ||
| constexpr float kChannelTwo = 2.5F; | ||
| constexpr float kChannelThree = 3.75F; | ||
| constexpr double kEegTimestamp = 12.5; | ||
| constexpr double kMarkerTimestamp = 13.25; | ||
| constexpr auto kWriteWait = std::chrono::milliseconds(50); | ||
|
|
||
| fs::path makeTempFilePath(const std::string& nameStem) { | ||
| const auto uniqueSuffix = | ||
| std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()); | ||
| return fs::temp_directory_path() / (nameStem + "_" + uniqueSuffix + ".csv"); | ||
| } | ||
|
|
||
| std::vector<std::string> readAllLines(const fs::path& filePath) { | ||
| std::ifstream input(filePath); | ||
| std::vector<std::string> lines; | ||
| std::string line; | ||
|
|
||
| while (std::getline(input, line)) { | ||
| lines.push_back(line); | ||
| } | ||
|
|
||
| return lines; | ||
| } | ||
| } // namespace | ||
|
|
||
| TEST(DataWriterTest, WritesCsvHeaderOnStart) { | ||
| const auto filePath = makeTempFilePath("datawriter_header"); | ||
|
|
||
| auto eegQueue = std::make_shared<moodycamel::ConcurrentQueue<EEGData>>(); | ||
| auto markerQueue = std::make_shared<moodycamel::ConcurrentQueue<Marker>>(); | ||
|
|
||
| { | ||
| DataWriter writer; | ||
| writer.start(filePath.string(), eegQueue, markerQueue); | ||
| writer.stop(); | ||
| } | ||
|
|
||
| const auto lines = readAllLines(filePath); | ||
| ASSERT_FALSE(lines.empty()); | ||
| EXPECT_EQ(lines.front(), "type,timestamp,payload"); | ||
|
|
||
| fs::remove(filePath); | ||
| } | ||
|
|
||
| TEST(DataWriterTest, FlushesEegAndMarkerRecords) { | ||
| const auto filePath = makeTempFilePath("datawriter_records"); | ||
|
|
||
| auto eegQueue = std::make_shared<moodycamel::ConcurrentQueue<EEGData>>(); | ||
| auto markerQueue = std::make_shared<moodycamel::ConcurrentQueue<Marker>>(); | ||
|
|
||
| eegQueue->enqueue(EEGData{{kChannelOne, kChannelTwo, kChannelThree}, kEegTimestamp}); | ||
| markerQueue->enqueue(Marker{"stimulus_on", kMarkerTimestamp}); | ||
|
|
||
| { | ||
| DataWriter writer; | ||
| writer.start(filePath.string(), eegQueue, markerQueue); | ||
| std::this_thread::sleep_for(kWriteWait); | ||
| writer.stop(); | ||
| } | ||
|
|
||
| const auto lines = readAllLines(filePath); | ||
| ASSERT_GE(lines.size(), 3U); | ||
| EXPECT_EQ(lines[0], "type,timestamp,payload"); | ||
| EXPECT_EQ(lines[1], "eeg,12.5,\"1.25,2.5,3.75\""); | ||
| EXPECT_EQ(lines[2], "marker,13.25,\"stimulus_on\""); | ||
|
|
||
| fs::remove(filePath); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider moving EEGData and Marker classes to seperate directory like include/data_structures or something like that