diff --git a/.gitignore b/.gitignore index 8e0a539be..cec1eed68 100644 --- a/.gitignore +++ b/.gitignore @@ -82,3 +82,5 @@ dependency-reduced-pom.xml # Vibe coding CLAUDE.md +.workbuddy +CMakeUserPresets.json diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 75d28f976..db7d15b84 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -71,3 +71,4 @@ endmacro() add_benchmark(arrow_chunk_reader_benchmark SRCS arrow_chunk_reader_benchmark.cc) add_benchmark(label_filter_benchmark SRCS label_filter_benchmark.cc) add_benchmark(graph_info_benchmark SRCS graph_info_benchmark.cc) +add_benchmark(lru_cache_benchmark SRCS lru_cache_benchmark.cc) diff --git a/cpp/benchmarks/benchmark_util.h b/cpp/benchmarks/benchmark_util.h index 80dfb0087..3d7d5af52 100644 --- a/cpp/benchmarks/benchmark_util.h +++ b/cpp/benchmarks/benchmark_util.h @@ -46,11 +46,20 @@ class BenchmarkFixture : public ::benchmark::Fixture { } path_ = std::string(c_root) + "/ldbc_sample/parquet/ldbc_sample.graph.yml"; auto maybe_graph_info = GraphInfo::Load(path_); + if (!maybe_graph_info.has_value()) { + throw std::runtime_error( + "Failed to load graph info: " + + maybe_graph_info.error().message()); + } graph_info_ = maybe_graph_info.value(); second_path_ = std::string(c_root) + "/ldbc/parquet/ldbc.graph.yml"; auto second_maybe_graph_info = GraphInfo::Load(second_path_); - second_graph_info_ = second_maybe_graph_info.value(); + if (second_maybe_graph_info.has_value()) { + second_graph_info_ = second_maybe_graph_info.value(); + } + // If second_path_ is not available, second_graph_info_ remains nullptr. + // Benchmarks that need it will skip gracefully. } void TearDown(const ::benchmark::State& state) override {} diff --git a/cpp/benchmarks/lru_cache_benchmark.cc b/cpp/benchmarks/lru_cache_benchmark.cc new file mode 100644 index 000000000..bec74140d --- /dev/null +++ b/cpp/benchmarks/lru_cache_benchmark.cc @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include "benchmark/benchmark.h" + +#include "./benchmark_util.h" +#include "graphar/api/arrow_reader.h" +#include "graphar/fwd.h" +#include "graphar/lru_cache.h" + +namespace graphar { + +// ============================================================================ +// Raw LRU Cache benchmarks +// ============================================================================ + +// Benchmark: LRU Cache Put operation with varying cache sizes +static void BM_LRUCachePut(benchmark::State& state) { + const int64_t capacity = state.range(0); + const int64_t num_ops = state.range(1); + LRUCache cache(capacity); + + int64_t i = 0; + for (auto _ : state) { + cache.Put(i % num_ops, "value_" + std::to_string(i % num_ops)); + ++i; + } + state.SetItemsProcessed(state.iterations()); +} +BENCHMARK(BM_LRUCachePut) + ->Args({4, 1000}) + ->Args({4, 10000}) + ->Args({16, 1000}) + ->Args({16, 10000}) + ->Args({64, 1000}) + ->Args({64, 10000}); + +// Benchmark: LRU Cache Get with 100% hit rate +static void BM_LRUCacheGetHit(benchmark::State& state) { + const int64_t capacity = state.range(0); + LRUCache cache(capacity); + for (int64_t i = 0; i < capacity; ++i) { + cache.Put(i, "value_" + std::to_string(i)); + } + + int64_t i = 0; + for (auto _ : state) { + auto* v = cache.Get(i % capacity); + benchmark::DoNotOptimize(v); + ++i; + } + state.SetItemsProcessed(state.iterations()); +} +BENCHMARK(BM_LRUCacheGetHit)->Arg(4)->Arg(16)->Arg(64)->Arg(256); + +// Benchmark: LRU Cache Get with 100% miss rate +static void BM_LRUCacheGetMiss(benchmark::State& state) { + const int64_t capacity = state.range(0); + LRUCache cache(capacity); + for (int64_t i = 0; i < capacity; ++i) { + cache.Put(i, "value_" + std::to_string(i)); + } + + int64_t i = capacity; + for (auto _ : state) { + auto* v = cache.Get(i); + benchmark::DoNotOptimize(v); + ++i; + } + state.SetItemsProcessed(state.iterations()); +} +BENCHMARK(BM_LRUCacheGetMiss)->Arg(4)->Arg(16)->Arg(64)->Arg(256); + +// Benchmark: LRU Cache Mixed operations (80% reads, 20% writes) +static void BM_LRUCacheMixed(benchmark::State& state) { + const int64_t capacity = state.range(0); + const int64_t key_space = state.range(1); + LRUCache cache(capacity); + + // Pre-fill to 50% capacity + for (int64_t i = 0; i < capacity / 2; ++i) { + cache.Put(i, "value_" + std::to_string(i)); + } + + int64_t i = capacity / 2; + for (auto _ : state) { + int64_t key = i % key_space; + if (key % 5 == 0) { + // 20% writes + cache.Put(key, "updated_" + std::to_string(key)); + } else { + // 80% reads + auto* v = cache.Get(key); + benchmark::DoNotOptimize(v); + } + ++i; + } + state.SetItemsProcessed(state.iterations()); +} +BENCHMARK(BM_LRUCacheMixed) + ->Args({4, 100}) + ->Args({16, 100}) + ->Args({64, 1000}) + ->Args({256, 1000}); + +// ============================================================================ +// Chunk Reader LRU Cache benchmarks +// These benchmarks measure the performance impact of the LRU cache within +// the chunk readers. The cache is always compiled in; whether it is +// effective is controlled by the cache capacity (set via FilterOptions or +// the reader constructor). When capacity is 0 or 1, caching is +// effectively disabled (every access reads from the filesystem). +// ============================================================================ + +// Benchmark: VertexPropertyArrowChunkReader - repeated access to same chunk +BENCHMARK_DEFINE_F(BenchmarkFixture, VertexPropertyChunkReaderCacheHit) +(::benchmark::State& state) { // NOLINT + auto gp = graph_info_->GetVertexInfo("person")->GetPropertyGroup("firstName"); + auto maybe_reader = + VertexPropertyArrowChunkReader::Make(graph_info_, "person", gp); + SKIP_WITH_ERROR_STATUS(state, maybe_reader.status()); + auto reader = maybe_reader.value(); + + for (auto _ : state) { + // Seek back to chunk 0 repeatedly - should hit cache on subsequent calls + auto st = reader->seek(0); + SKIP_WITH_ERROR_STATUS(state, st); + auto chunk_result = reader->GetChunk(); + SKIP_WITH_ERROR_STATUS(state, chunk_result.status()); + } + state.SetItemsProcessed(state.iterations()); +} + +// Benchmark: VertexPropertyArrowChunkReader - sequential scan (interleaved +// access) +BENCHMARK_DEFINE_F(BenchmarkFixture, + VertexPropertyChunkReaderSequentialScanWithRepeat) +(::benchmark::State& state) { // NOLINT + auto gp = graph_info_->GetVertexInfo("person")->GetPropertyGroup("firstName"); + auto maybe_reader = + VertexPropertyArrowChunkReader::Make(graph_info_, "person", gp); + SKIP_WITH_ERROR_STATUS(state, maybe_reader.status()); + auto reader = maybe_reader.value(); + + // Read first 4 chunks, then repeat - exercises LRU with capacity=4 + for (auto _ : state) { + for (int64_t chunk = 0; chunk < 4; ++chunk) { + auto st = reader->seek(static_cast(chunk)); + SKIP_WITH_ERROR_STATUS(state, st); + auto chunk_result = reader->GetChunk(); + SKIP_WITH_ERROR_STATUS(state, chunk_result.status()); + } + // Second pass through same chunks - should be all cache hits + for (int64_t chunk = 0; chunk < 4; ++chunk) { + auto st = reader->seek(static_cast(chunk)); + SKIP_WITH_ERROR_STATUS(state, st); + auto chunk_result = reader->GetChunk(); + SKIP_WITH_ERROR_STATUS(state, chunk_result.status()); + } + } + state.SetItemsProcessed(state.iterations() * 8); +} + +// Benchmark: AdjListArrowChunkReader - repeated seek to same vertex +BENCHMARK_DEFINE_F(BenchmarkFixture, AdjListChunkReaderCacheHit) +(::benchmark::State& state) { // NOLINT + auto maybe_reader = AdjListArrowChunkReader::Make( + graph_info_, "person", "knows", "person", AdjListType::ordered_by_source); + SKIP_WITH_ERROR_STATUS(state, maybe_reader.status()); + auto reader = maybe_reader.value(); + + for (auto _ : state) { + // Seek to the same source vertex repeatedly - should hit cache + auto st = reader->seek_src(0); + SKIP_WITH_ERROR_STATUS(state, st); + auto chunk_result = reader->GetChunk(); + SKIP_WITH_ERROR_STATUS(state, chunk_result.status()); + } + state.SetItemsProcessed(state.iterations()); +} + +// Benchmark: AdjListArrowChunkReader - iterate through vertex chunks and +// re-visit the first one +BENCHMARK_DEFINE_F(BenchmarkFixture, + AdjListChunkReaderIterateThenRepeat) +(::benchmark::State& state) { // NOLINT + auto maybe_reader = AdjListArrowChunkReader::Make( + graph_info_, "person", "knows", "person", AdjListType::ordered_by_source); + SKIP_WITH_ERROR_STATUS(state, maybe_reader.status()); + auto reader = maybe_reader.value(); + + for (auto _ : state) { + // Visit vertex chunks 0-3 sequentially + for (IdType v = 0; v < 4; ++v) { + auto st = reader->seek_src(v); + SKIP_WITH_ERROR_STATUS(state, st); + auto chunk_result = reader->GetChunk(); + SKIP_WITH_ERROR_STATUS(state, chunk_result.status()); + } + // Re-visit chunk 0 - should be evicted and cause cache miss, + // demonstrating LRU eviction behavior with capacity=4 + { + auto st = reader->seek_src(0); + SKIP_WITH_ERROR_STATUS(state, st); + auto chunk_result = reader->GetChunk(); + SKIP_WITH_ERROR_STATUS(state, chunk_result.status()); + } + } + state.SetItemsProcessed(state.iterations() * 5); +} + +// Benchmark: AdjListOffsetArrowChunkReader - repeated offset reads +BENCHMARK_DEFINE_F(BenchmarkFixture, AdjListOffsetChunkReaderCacheHit) +(::benchmark::State& state) { // NOLINT + auto maybe_reader = AdjListOffsetArrowChunkReader::Make( + graph_info_, "person", "knows", "person", + AdjListType::ordered_by_source); + SKIP_WITH_ERROR_STATUS(state, maybe_reader.status()); + auto reader = maybe_reader.value(); + + for (auto _ : state) { + auto st = reader->seek(0); + SKIP_WITH_ERROR_STATUS(state, st); + auto chunk_result = reader->GetChunk(); + SKIP_WITH_ERROR_STATUS(state, chunk_result.status()); + } + state.SetItemsProcessed(state.iterations()); +} + +// Benchmark: AdjListPropertyArrowChunkReader - repeated property reads +BENCHMARK_DEFINE_F(BenchmarkFixture, AdjListPropertyChunkReaderCacheHit) +(::benchmark::State& state) { // NOLINT + auto maybe_reader = AdjListPropertyArrowChunkReader::Make( + graph_info_, "person", "knows", "person", "creationDate", + AdjListType::ordered_by_source); + SKIP_WITH_ERROR_STATUS(state, maybe_reader.status()); + auto reader = maybe_reader.value(); + + for (auto _ : state) { + auto st = reader->seek_src(0); + SKIP_WITH_ERROR_STATUS(state, st); + auto chunk_result = reader->GetChunk(); + SKIP_WITH_ERROR_STATUS(state, chunk_result.status()); + } + state.SetItemsProcessed(state.iterations()); +} + +// ============================================================================ +// Register all benchmarks +// ============================================================================ + +BENCHMARK_REGISTER_F(BenchmarkFixture, VertexPropertyChunkReaderCacheHit); +BENCHMARK_REGISTER_F(BenchmarkFixture, + VertexPropertyChunkReaderSequentialScanWithRepeat); +BENCHMARK_REGISTER_F(BenchmarkFixture, AdjListChunkReaderCacheHit); +BENCHMARK_REGISTER_F(BenchmarkFixture, + AdjListChunkReaderIterateThenRepeat); +BENCHMARK_REGISTER_F(BenchmarkFixture, AdjListOffsetChunkReaderCacheHit); +BENCHMARK_REGISTER_F(BenchmarkFixture, AdjListPropertyChunkReaderCacheHit); + +} // namespace graphar diff --git a/cpp/src/graphar/arrow/chunk_reader.cc b/cpp/src/graphar/arrow/chunk_reader.cc index 71e2ffa97..e1fa39994 100644 --- a/cpp/src/graphar/arrow/chunk_reader.cc +++ b/cpp/src/graphar/arrow/chunk_reader.cc @@ -221,9 +221,8 @@ Status VertexPropertyArrowChunkReader::seek(IdType id) { IdType pre_chunk_index = chunk_index_; chunk_index_ = id / vertex_info_->GetChunkSize(); if (chunk_index_ != pre_chunk_index) { - // TODO(@acezen): use a cache to avoid reloading the same chunk, could use - // a LRU cache. - chunk_table_.reset(); + auto* cached = chunk_cache_.Get(chunk_index_); + chunk_table_ = cached ? *cached : nullptr; } if (chunk_index_ >= chunk_num_) { return Status::IndexError("Internal vertex id ", id, " is out of range [0,", @@ -270,6 +269,7 @@ VertexPropertyArrowChunkReader::GetChunkV2() { GAR_RETURN_NOT_OK( CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); } + chunk_cache_.Put(chunk_index_, chunk_table_); } IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -314,6 +314,7 @@ VertexPropertyArrowChunkReader::GetChunkV1() { GAR_RETURN_NOT_OK( CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); } + chunk_cache_.Put(chunk_index_, chunk_table_); } IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -350,6 +351,7 @@ VertexPropertyArrowChunkReader::GetLabelChunk() { // GAR_RETURN_NOT_OK( // CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); // } + chunk_cache_.Put(chunk_index_, chunk_table_); } IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -362,17 +364,22 @@ Status VertexPropertyArrowChunkReader::next_chunk() { vertex_info_->GetType(), " chunk num ", chunk_num_); } seek_id_ = chunk_index_ * vertex_info_->GetChunkSize(); - chunk_table_.reset(); + auto* cached = chunk_cache_.Get(chunk_index_); + chunk_table_ = cached ? *cached : nullptr; return Status::OK(); } void VertexPropertyArrowChunkReader::Filter(util::Filter filter) { filter_options_.filter = filter; + chunk_table_ = nullptr; + chunk_cache_.Disable(); } void VertexPropertyArrowChunkReader::Select(util::ColumnNames column_names) { filter_options_.columns = column_names; + chunk_table_ = nullptr; + chunk_cache_.Clear(); } Result> @@ -527,7 +534,7 @@ VertexPropertyArrowChunkReader::MakeForLabels( AdjListArrowChunkReader::AdjListArrowChunkReader( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix) + const std::string& prefix, size_t cache_capacity) : edge_info_(edge_info), adj_list_type_(adj_list_type), prefix_(prefix), @@ -535,6 +542,7 @@ AdjListArrowChunkReader::AdjListArrowChunkReader( chunk_index_(0), seek_offset_(0), chunk_table_(nullptr), + chunk_cache_(cache_capacity), chunk_num_(-1) /* -1 means uninitialized */ { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_path_prefix, @@ -554,6 +562,7 @@ AdjListArrowChunkReader::AdjListArrowChunkReader( chunk_index_(other.chunk_index_), seek_offset_(other.seek_offset_), chunk_table_(nullptr), + chunk_cache_(other.chunk_cache_.capacity()), vertex_chunk_num_(other.vertex_chunk_num_), chunk_num_(other.chunk_num_), base_dir_(other.base_dir_), @@ -595,7 +604,9 @@ Status AdjListArrowChunkReader::seek_src(IdType id) { // initialize or update chunk_num_ vertex_chunk_index_ = new_vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (adj_list_type_ == AdjListType::unordered_by_source) { @@ -628,7 +639,9 @@ Status AdjListArrowChunkReader::seek_dst(IdType id) { // initialize or update chunk_num_ vertex_chunk_index_ = new_vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (adj_list_type_ == AdjListType::unordered_by_dest) { @@ -646,7 +659,9 @@ Status AdjListArrowChunkReader::seek(IdType offset) { IdType pre_chunk_index = chunk_index_; chunk_index_ = offset / edge_info_->GetChunkSize(); if (chunk_index_ != pre_chunk_index) { - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (chunk_num_ < 0) { // initialize chunk_num_ @@ -676,6 +691,8 @@ Result> AdjListArrowChunkReader::GetChunk() { std::string path = prefix_ + chunk_file_path; auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType(); GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type)); + chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_), + chunk_table_); } IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -698,21 +715,26 @@ Status AdjListArrowChunkReader::next_chunk() { GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); } seek_offset_ = chunk_index_ * edge_info_->GetChunkSize(); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; return Status::OK(); } Status AdjListArrowChunkReader::seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index) { + bool changed = false; if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) { vertex_chunk_index_ = vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + changed = true; } - if (chunk_index_ != chunk_index) { + if (chunk_index_ != chunk_index || changed) { chunk_index_ = chunk_index; seek_offset_ = chunk_index * edge_info_->GetChunkSize(); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } return Status::OK(); } @@ -725,32 +747,35 @@ Result AdjListArrowChunkReader::GetRowNumOfChunk() { std::string path = prefix_ + chunk_file_path; auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType(); GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type)); + chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_), + chunk_table_); } return chunk_table_->num_rows(); } Result> AdjListArrowChunkReader::Make( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix) { + const std::string& prefix, size_t cache_capacity) { if (!edge_info->HasAdjacentListType(adj_list_type)) { return Status::KeyError( "The adjacent list type ", AdjListTypeToString(adj_list_type), " doesn't exist in edge ", edge_info->GetEdgeType(), "."); } return std::make_shared(edge_info, adj_list_type, - prefix); + prefix, cache_capacity); } Result> AdjListArrowChunkReader::Make( const std::shared_ptr& graph_info, const std::string& src_type, const std::string& edge_type, const std::string& dst_type, - AdjListType adj_list_type) { + AdjListType adj_list_type, size_t cache_capacity) { auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type); if (!edge_info) { return Status::KeyError("The edge ", src_type, " ", edge_type, " ", dst_type, " doesn't exist."); } - return Make(edge_info, adj_list_type, graph_info->GetPrefix()); + return Make(edge_info, adj_list_type, graph_info->GetPrefix(), + cache_capacity); } Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() { @@ -762,13 +787,14 @@ Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() { AdjListOffsetArrowChunkReader::AdjListOffsetArrowChunkReader( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix) + const std::string& prefix, size_t cache_capacity) : edge_info_(std::move(edge_info)), adj_list_type_(adj_list_type), prefix_(prefix), chunk_index_(0), seek_id_(0), - chunk_table_(nullptr) { + chunk_table_(nullptr), + chunk_cache_(cache_capacity) { std::string base_dir; GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR(auto dir_path, @@ -795,7 +821,8 @@ Status AdjListOffsetArrowChunkReader::seek(IdType id) { IdType pre_chunk_index = chunk_index_; chunk_index_ = id / vertex_chunk_size_; if (chunk_index_ != pre_chunk_index) { - chunk_table_.reset(); + auto* cached = chunk_cache_.Get(chunk_index_); + chunk_table_ = cached ? *cached : nullptr; } if (chunk_index_ >= vertex_chunk_num_) { return Status::IndexError("Internal vertex id ", id, "is out of range [0,", @@ -816,6 +843,7 @@ AdjListOffsetArrowChunkReader::GetChunk() { std::string path = prefix_ + chunk_file_path; auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType(); GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type)); + chunk_cache_.Put(chunk_index_, chunk_table_); } IdType row_offset = seek_id_ - chunk_index_ * vertex_chunk_size_; return chunk_table_->Slice(row_offset)->column(0)->chunk(0); @@ -830,7 +858,8 @@ Status AdjListOffsetArrowChunkReader::next_chunk() { AdjListTypeToString(adj_list_type_), "."); } seek_id_ = chunk_index_ * vertex_chunk_size_; - chunk_table_.reset(); + auto* cached = chunk_cache_.Get(chunk_index_); + chunk_table_ = cached ? *cached : nullptr; return Status::OK(); } @@ -838,27 +867,29 @@ Status AdjListOffsetArrowChunkReader::next_chunk() { Result> AdjListOffsetArrowChunkReader::Make(const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix) { + const std::string& prefix, + size_t cache_capacity) { if (!edge_info->HasAdjacentListType(adj_list_type)) { return Status::KeyError( "The adjacent list type ", AdjListTypeToString(adj_list_type), " doesn't exist in edge ", edge_info->GetEdgeType(), "."); } - return std::make_shared(edge_info, - adj_list_type, prefix); + return std::make_shared( + edge_info, adj_list_type, prefix, cache_capacity); } Result> AdjListOffsetArrowChunkReader::Make( const std::shared_ptr& graph_info, const std::string& src_type, const std::string& edge_type, const std::string& dst_type, - AdjListType adj_list_type) { + AdjListType adj_list_type, size_t cache_capacity) { auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type); if (!edge_info) { return Status::KeyError("The edge ", src_type, " ", edge_type, " ", dst_type, " doesn't exist."); } - return Make(edge_info, adj_list_type, graph_info->GetPrefix()); + return Make(edge_info, adj_list_type, graph_info->GetPrefix(), + cache_capacity); } AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader( @@ -875,7 +906,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader( seek_offset_(0), schema_(nullptr), chunk_table_(nullptr), - filter_options_(std::move(options)), + filter_options_(options), chunk_num_(-1) /* -1 means uninitialized */ { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR( @@ -900,6 +931,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader( seek_offset_(other.seek_offset_), schema_(other.schema_), chunk_table_(nullptr), + chunk_cache_(other.chunk_cache_.capacity()), filter_options_(other.filter_options_), vertex_chunk_num_(other.vertex_chunk_num_), chunk_num_(other.chunk_num_), @@ -945,7 +977,9 @@ Status AdjListPropertyArrowChunkReader::seek_src(IdType id) { if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) { vertex_chunk_index_ = new_vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (adj_list_type_ == AdjListType::unordered_by_source) { @@ -977,7 +1011,9 @@ Status AdjListPropertyArrowChunkReader::seek_dst(IdType id) { if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) { vertex_chunk_index_ = new_vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (adj_list_type_ == AdjListType::unordered_by_dest) { @@ -995,7 +1031,9 @@ Status AdjListPropertyArrowChunkReader::seek(IdType offset) { seek_offset_ = offset; chunk_index_ = offset / edge_info_->GetChunkSize(); if (chunk_index_ != pre_chunk_index) { - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (chunk_num_ < 0) { // initialize chunk_num_ @@ -1034,6 +1072,8 @@ AdjListPropertyArrowChunkReader::GetChunk() { GAR_RETURN_NOT_OK( CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); } + chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_), + chunk_table_); } IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -1059,31 +1099,40 @@ Status AdjListPropertyArrowChunkReader::next_chunk() { GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); } seek_offset_ = chunk_index_ * edge_info_->GetChunkSize(); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; return Status::OK(); } Status AdjListPropertyArrowChunkReader::seek_chunk_index( IdType vertex_chunk_index, IdType chunk_index) { + bool changed = false; if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) { vertex_chunk_index_ = vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + changed = true; } - if (chunk_index_ != chunk_index) { + if (chunk_index_ != chunk_index || changed) { chunk_index_ = chunk_index; seek_offset_ = chunk_index * edge_info_->GetChunkSize(); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } return Status::OK(); } void AdjListPropertyArrowChunkReader::Filter(util::Filter filter) { filter_options_.filter = filter; + chunk_table_ = nullptr; + chunk_cache_.Disable(); } void AdjListPropertyArrowChunkReader::Select(util::ColumnNames column_names) { filter_options_.columns = column_names; + chunk_table_ = nullptr; + chunk_cache_.Clear(); } Result> diff --git a/cpp/src/graphar/arrow/chunk_reader.h b/cpp/src/graphar/arrow/chunk_reader.h index 84131eb8e..0ddc70234 100644 --- a/cpp/src/graphar/arrow/chunk_reader.h +++ b/cpp/src/graphar/arrow/chunk_reader.h @@ -25,6 +25,7 @@ #include #include "graphar/fwd.h" +#include "graphar/lru_cache.h" #include "graphar/reader_util.h" #include "graphar/status.h" @@ -67,7 +68,10 @@ class VertexPropertyArrowChunkReader { const std::vector& property_names, const std::string& prefix, util::FilterOptions options = {}); - VertexPropertyArrowChunkReader() : vertex_info_(nullptr), prefix_("") {} + VertexPropertyArrowChunkReader() + : vertex_info_(nullptr), + prefix_(""), + chunk_cache_(util::FilterOptions::kDefaultCacheCapacity) {} /** * @brief Initialize the VertexPropertyArrowChunkReader. @@ -267,6 +271,8 @@ class VertexPropertyArrowChunkReader { IdType vertex_num_; std::shared_ptr schema_; std::shared_ptr chunk_table_; + LRUCache> chunk_cache_{ + util::FilterOptions::kDefaultCacheCapacity}; util::FilterOptions filter_options_; std::shared_ptr fs_; }; @@ -283,9 +289,12 @@ class AdjListArrowChunkReader { * @param edge_info The edge info that describes the edge type. * @param adj_list_type The adj list type for the edge. * @param prefix The absolute prefix. + * @param cache_capacity The capacity of the LRU chunk cache. */ - AdjListArrowChunkReader(const std::shared_ptr& edge_info, - AdjListType adj_list_type, const std::string& prefix); + AdjListArrowChunkReader( + const std::shared_ptr& edge_info, AdjListType adj_list_type, + const std::string& prefix, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); /** * @brief Copy constructor. @@ -355,10 +364,12 @@ class AdjListArrowChunkReader { * @param edge_info The edge info. * @param adj_list_type The adj list type for the edges. * @param prefix The absolute prefix of the graph. + * @param cache_capacity The capacity of the LRU chunk cache. Default is 4. */ static Result> Make( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix); + const std::string& prefix, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); /** * @brief Create an AdjListArrowChunkReader instance from graph info. @@ -368,11 +379,13 @@ class AdjListArrowChunkReader { * @param edge_type The edge type. * @param dst_type The destination vertex type. * @param adj_list_type The adj list type for the edges. + * @param cache_capacity The capacity of the LRU chunk cache. Default is 4. */ static Result> Make( const std::shared_ptr& graph_info, const std::string& src_type, const std::string& edge_type, const std::string& dst_type, - AdjListType adj_list_type); + AdjListType adj_list_type, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); private: Status initOrUpdateEdgeChunkNum(); @@ -384,6 +397,8 @@ class AdjListArrowChunkReader { IdType vertex_chunk_index_, chunk_index_; IdType seek_offset_; std::shared_ptr chunk_table_; + LRUCache, std::shared_ptr, PairHash> + chunk_cache_{util::FilterOptions::kDefaultCacheCapacity}; IdType vertex_chunk_num_, chunk_num_; std::string base_dir_; std::shared_ptr fs_; @@ -403,10 +418,12 @@ class AdjListOffsetArrowChunkReader { * Note that the adj list type must be AdjListType::ordered_by_source * or AdjListType::ordered_by_dest. * @param prefix The absolute prefix. + * @param cache_capacity The capacity of the LRU chunk cache. */ - AdjListOffsetArrowChunkReader(const std::shared_ptr& edge_info, - AdjListType adj_list_type, - const std::string& prefix); + AdjListOffsetArrowChunkReader( + const std::shared_ptr& edge_info, AdjListType adj_list_type, + const std::string& prefix, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); /** * @brief Sets chunk position indicator for reader by internal vertex id. @@ -441,10 +458,12 @@ class AdjListOffsetArrowChunkReader { * @param edge_info The edge info. * @param adj_list_type The adj list type for the edges. * @param prefix The absolute prefix of the graph. + * @param cache_capacity The capacity of the LRU chunk cache. */ static Result> Make( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix); + const std::string& prefix, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); /** * @brief Create an AdjListOffsetArrowChunkReader instance from graph info. @@ -454,11 +473,13 @@ class AdjListOffsetArrowChunkReader { * @param edge_type The edge type. * @param dst_type The destination vertex type. * @param adj_list_type The adj list type for the edges. + * @param cache_capacity The capacity of the LRU chunk cache. Default is 4. */ static Result> Make( const std::shared_ptr& graph_info, const std::string& src_type, const std::string& edge_type, const std::string& dst_type, - AdjListType adj_list_type); + AdjListType adj_list_type, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); private: std::shared_ptr edge_info_; @@ -467,6 +488,8 @@ class AdjListOffsetArrowChunkReader { IdType chunk_index_; IdType seek_id_; std::shared_ptr chunk_table_; + LRUCache> chunk_cache_{ + util::FilterOptions::kDefaultCacheCapacity}; IdType vertex_chunk_num_; IdType vertex_chunk_size_; std::string base_dir_; @@ -633,6 +656,8 @@ class AdjListPropertyArrowChunkReader { IdType seek_offset_; std::shared_ptr schema_; std::shared_ptr chunk_table_; + LRUCache, std::shared_ptr, PairHash> + chunk_cache_{util::FilterOptions::kDefaultCacheCapacity}; util::FilterOptions filter_options_; IdType vertex_chunk_num_, chunk_num_; std::string base_dir_; diff --git a/cpp/src/graphar/lru_cache.h b/cpp/src/graphar/lru_cache.h new file mode 100644 index 000000000..32be252bc --- /dev/null +++ b/cpp/src/graphar/lru_cache.h @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace graphar { + +/** + * @brief A simple LRU cache implementation. + * + * @tparam Key The key type. + * @tparam Value The value type (must be movable). + * @tparam Hash The hash functor type for the key. + * + * The cache is always "enabled" at the code level. To effectively + * disable caching, set capacity to 0 or 1 (which means at most one + * entry can be cached — equivalent to no caching for most workloads). + * Use SetCapacity() at runtime to adjust the cache size. + */ +template > +class LRUCache { + public: + explicit LRUCache(size_t capacity) : capacity_(capacity) {} + + Value* Get(const Key& key) { + auto it = map_.find(key); + if (it == map_.end()) { + return nullptr; + } + items_.splice(items_.begin(), items_, it->second); + return &it->second->second; + } + + void Put(const Key& key, Value value) { + if (capacity_ == 0) { + return; + } + auto it = map_.find(key); + if (it != map_.end()) { + it->second->second = std::move(value); + items_.splice(items_.begin(), items_, it->second); + return; + } + + if (map_.size() >= capacity_) { + auto& back = items_.back(); + map_.erase(back.first); + items_.pop_back(); + } + items_.emplace_front(key, std::move(value)); + map_[key] = items_.begin(); + } + + void Clear() { + map_.clear(); + items_.clear(); + } + + size_t Size() const { return map_.size(); } + + size_t capacity() const { return capacity_; } + + void SetCapacity(size_t capacity) { + capacity_ = capacity; + while (map_.size() > capacity_) { + auto& back = items_.back(); + map_.erase(back.first); + items_.pop_back(); + } + } + + void Disable() { SetCapacity(0); } + + private: + size_t capacity_{0}; + std::list> items_; + std::unordered_map>::iterator, + Hash> + map_; +}; + +struct PairHash { + template + size_t operator()(const std::pair& p) const { + size_t h1 = std::hash{}(p.first); + size_t h2 = std::hash{}(p.second); + // inspired by boost::hash_combine + constexpr auto kMul = static_cast(0x9e3779b97f4a7c15ULL); + h1 ^= h2 + kMul + (h1 << 6) + (h1 >> 2); + return h1; + } +}; + +} // namespace graphar diff --git a/cpp/src/graphar/reader_util.h b/cpp/src/graphar/reader_util.h index e80cbfa28..9da96051d 100644 --- a/cpp/src/graphar/reader_util.h +++ b/cpp/src/graphar/reader_util.h @@ -28,14 +28,19 @@ namespace graphar::util { struct FilterOptions { + static constexpr size_t kDefaultCacheCapacity = 4; + // The row filter to apply to the table. Filter filter = nullptr; // The columns to include in the table. Select all columns by default. ColumnNames columns = std::nullopt; + // The number of chunks to cache in the LRU cache. + size_t cache_capacity = kDefaultCacheCapacity; FilterOptions() {} - FilterOptions(Filter filter, ColumnNames columns) - : filter(filter), columns(columns) {} + FilterOptions(Filter filter, ColumnNames columns, + size_t cache_capacity = kDefaultCacheCapacity) + : filter(filter), columns(columns), cache_capacity(cache_capacity) {} }; Status CheckFilterOptions( diff --git a/cpp/test/test_lru_cache.cc b/cpp/test/test_lru_cache.cc new file mode 100644 index 000000000..46bba6adb --- /dev/null +++ b/cpp/test/test_lru_cache.cc @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "graphar/lru_cache.h" + +#include + +namespace graphar { + +TEST_CASE("LRUCache basic operations") { + LRUCache cache(3); + + SECTION("Empty cache returns nullptr") { + REQUIRE(cache.Get(1) == nullptr); + REQUIRE(cache.Size() == 0); + } + + SECTION("Put and Get") { + cache.Put(1, "one"); + auto* value = cache.Get(1); + REQUIRE(value != nullptr); + REQUIRE(*value == "one"); + REQUIRE(cache.Size() == 1); + } + + SECTION("Multiple Put and Get") { + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Put(3, "three"); + + auto* v1 = cache.Get(1); + auto* v2 = cache.Get(2); + auto* v3 = cache.Get(3); + + REQUIRE(v1 != nullptr); + REQUIRE(v2 != nullptr); + REQUIRE(v3 != nullptr); + REQUIRE(*v1 == "one"); + REQUIRE(*v2 == "two"); + REQUIRE(*v3 == "three"); + REQUIRE(cache.Size() == 3); + } + + SECTION("Update existing key") { + cache.Put(1, "one"); + cache.Put(1, "updated"); + + auto* value = cache.Get(1); + REQUIRE(value != nullptr); + REQUIRE(*value == "updated"); + REQUIRE(cache.Size() == 1); + } + + SECTION("Get updates recency") { + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Put(3, "three"); + + cache.Get(1); + cache.Put(4, "four"); + + REQUIRE(cache.Get(1) != nullptr); + REQUIRE(cache.Get(2) == nullptr); + REQUIRE(cache.Get(3) != nullptr); + REQUIRE(cache.Get(4) != nullptr); + REQUIRE(cache.Size() == 3); + } +} + +TEST_CASE("LRUCache eviction") { + SECTION("Evict when exceeding capacity") { + LRUCache cache(2); + + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Put(3, "three"); + + REQUIRE(cache.Get(1) == nullptr); + REQUIRE(cache.Get(2) != nullptr); + REQUIRE(cache.Get(3) != nullptr); + REQUIRE(cache.Size() == 2); + } + + SECTION("Evict least recently used") { + LRUCache cache(3); + + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Put(3, "three"); + + cache.Get(1); + cache.Get(2); + cache.Put(4, "four"); + + REQUIRE(cache.Get(1) != nullptr); + REQUIRE(cache.Get(2) != nullptr); + REQUIRE(cache.Get(3) == nullptr); + REQUIRE(cache.Get(4) != nullptr); + } +} + +TEST_CASE("LRUCache Clear") { + LRUCache cache(3); + + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Clear(); + + REQUIRE(cache.Get(1) == nullptr); + REQUIRE(cache.Get(2) == nullptr); + REQUIRE(cache.Size() == 0); +} + +TEST_CASE("LRUCache with string keys") { + LRUCache cache(2); + + cache.Put("one", 1); + cache.Put("two", 2); + + auto* v1 = cache.Get("one"); + auto* v2 = cache.Get("two"); + + REQUIRE(v1 != nullptr); + REQUIRE(v2 != nullptr); + REQUIRE(*v1 == 1); + REQUIRE(*v2 == 2); +} + +TEST_CASE("LRUCache with PairHash") { + LRUCache, std::string, PairHash> cache(2); + + cache.Put({1, 2}, "value1"); + cache.Put({3, 4}, "value2"); + + auto* v1 = cache.Get({1, 2}); + auto* v2 = cache.Get({3, 4}); + + REQUIRE(v1 != nullptr); + REQUIRE(v2 != nullptr); + REQUIRE(*v1 == "value1"); + REQUIRE(*v2 == "value2"); + + REQUIRE(cache.Get({5, 6}) == nullptr); +} + +TEST_CASE("LRUCache move semantics") { + LRUCache cache(2); + + std::string value = "test_value"; + cache.Put(1, std::move(value)); + + auto* cached = cache.Get(1); + REQUIRE(cached != nullptr); + REQUIRE(*cached == "test_value"); +} + +TEST_CASE("LRUCache zero capacity edge case") { + LRUCache cache(0); + + cache.Put(1, "one"); + + REQUIRE(cache.Size() == 0); + REQUIRE(cache.Get(1) == nullptr); +} + +} // namespace graphar