diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 00cf90456229a4..d90be3a0016eee 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -1855,52 +1856,85 @@ int InstanceRecycler::abort_job_for_related_rowset(const RowsetMetaCloudPB& rows } template -int InstanceRecycler::abort_txn_or_job_for_recycle(T& rowset_meta_pb) { - RowsetMetaCloudPB* rs_meta; - RecycleRowsetPB::Type rowset_type = RecycleRowsetPB::PREPARE; +RowsetMetaCloudPB* mutable_rowset_meta(T& rowset_meta_pb) { + if constexpr (std::is_same_v) { + return rowset_meta_pb.mutable_rowset_meta(); + } else { + return &rowset_meta_pb; + } +} +template +const RowsetMetaCloudPB& rowset_meta(const T& rowset_meta_pb) { if constexpr (std::is_same_v) { - // For keys that are not in the RecycleRowsetPB::PREPARE state - // we do not need to check the job or txn state - // because tmp_rowset_key already exists when this key is generated. - rowset_type = rowset_meta_pb.type(); - rs_meta = rowset_meta_pb.mutable_rowset_meta(); + return rowset_meta_pb.rowset_meta(); } else { - rs_meta = &rowset_meta_pb; + return rowset_meta_pb; } +} - DCHECK(rs_meta != nullptr); +struct DeferredRecycleAbortTask { + enum class Type : uint8_t { + TXN, + JOB, + }; - // compaction/sc will generate recycle_rowset_key for each input rowset with load_id - // we need skip them because the related txn has been finished - // load_rowset1 load_rowset2 => pick for compaction => compact_rowset - // compact_rowset1 compact_rowset2 => pick for compaction/sc job => new_rowset - if (rowset_type == RecycleRowsetPB::PREPARE) { - if (rs_meta->has_load_id()) { - // load - return abort_txn_for_related_rowset(rs_meta->txn_id()); - } else if (rs_meta->has_job_id()) { - // compaction / schema change - return abort_job_for_related_rowset(*rs_meta); + Type type = Type::TXN; + int64_t txn_id = 0; + int64_t tablet_id = 0; + int64_t start_version = 0; + int64_t end_version = 0; + std::string rowset_id; + std::string job_id; +}; + +struct DeferredRecyclePrepareDeleteTask { + std::string key; + std::string resource_id; + std::string rowset_id; + int64_t tablet_id = 0; +}; + +template +std::optional make_deferred_abort_task(const T& rowset_meta_pb) { + if constexpr (std::is_same_v) { + if (rowset_meta_pb.type() != RecycleRowsetPB::PREPARE) { + return std::nullopt; } } - return 0; + const auto& rs_meta = rowset_meta(rowset_meta_pb); + DeferredRecycleAbortTask task; + task.tablet_id = rs_meta.tablet_id(); + task.start_version = rs_meta.start_version(); + task.end_version = rs_meta.end_version(); + if (rs_meta.has_load_id()) { + task.type = DeferredRecycleAbortTask::Type::TXN; + task.txn_id = rs_meta.txn_id(); + return task; + } + if (rs_meta.has_job_id()) { + task.type = DeferredRecycleAbortTask::Type::JOB; + task.rowset_id = rs_meta.rowset_id_v2(); + task.job_id = rs_meta.job_id(); + return task; + } + return std::nullopt; +} + +template +bool need_mark_rowset_as_recycled(const T& rowset_meta_pb) { + const auto& rs_meta = rowset_meta(rowset_meta_pb); + return !rs_meta.has_is_recycled() || !rs_meta.is_recycled(); } template int mark_rowset_as_recycled(TxnKv* txn_kv, const std::string& instance_id, std::string_view key, T& rowset_meta_pb) { - RowsetMetaCloudPB* rs_meta; - - if constexpr (std::is_same_v) { - rs_meta = rowset_meta_pb.mutable_rowset_meta(); - } else { - rs_meta = &rowset_meta_pb; - } + RowsetMetaCloudPB* rs_meta = mutable_rowset_meta(rowset_meta_pb); bool need_write_back = false; - if ((!rs_meta->has_is_recycled() || !rs_meta->is_recycled())) { + if (need_mark_rowset_as_recycled(rowset_meta_pb)) { need_write_back = true; rs_meta->set_is_recycled(true); } @@ -1920,11 +1954,7 @@ int mark_rowset_as_recycled(TxnKv* txn_kv, const std::string& instance_id, std:: LOG(WARNING) << "failed to parse rs_meta, instance_id=" << instance_id; return -1; } - if constexpr (std::is_same_v) { - rs_meta = rowset_meta.mutable_rowset_meta(); - } else { - rs_meta = &rowset_meta; - } + rs_meta = mutable_rowset_meta(rowset_meta); if ((rs_meta->has_is_recycled() && rs_meta->is_recycled())) { return 0; } @@ -1941,6 +1971,50 @@ int mark_rowset_as_recycled(TxnKv* txn_kv, const std::string& instance_id, std:: return need_write_back ? 1 : 0; } +template +int batch_mark_rowsets_as_recycled(TxnKv* txn_kv, const std::string& instance_id, + const std::vector& keys) { + constexpr size_t kMarkBatchSize = 256; + for (size_t offset = 0; offset < keys.size(); offset += kMarkBatchSize) { + size_t limit = std::min(keys.size(), offset + kMarkBatchSize); + std::unique_ptr txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to create txn, instance_id=" << instance_id; + return -1; + } + for (size_t idx = offset; idx < limit; ++idx) { + const std::string& key = keys[idx]; + std::string val; + err = txn->get(key, &val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + LOG(WARNING) << "failed to get rowset meta, instance_id=" << instance_id + << " key=" << hex(key); + return -1; + } + T rowset_meta_pb; + if (!rowset_meta_pb.ParseFromString(val)) { + LOG(WARNING) << "failed to parse rowset meta, instance_id=" << instance_id + << " key=" << hex(key); + return -1; + } + if (!need_mark_rowset_as_recycled(rowset_meta_pb)) { + continue; + } + mutable_rowset_meta(rowset_meta_pb)->set_is_recycled(true); + val.clear(); + rowset_meta_pb.SerializeToString(&val); + txn->put(key, val); + } + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to commit txn, instance_id=" << instance_id; + return -1; + } + } + return 0; +} + int InstanceRecycler::recycle_ref_rowsets(bool* has_unrecycled_rowsets) { const std::string task_name = "recycle_ref_rowsets"; *has_unrecycled_rowsets = false; @@ -4730,6 +4804,9 @@ int InstanceRecycler::recycle_rowsets() { }; std::vector rowset_keys; + std::vector rowset_keys_to_mark_recycled; + std::vector deferred_abort_tasks; + std::vector deferred_prepare_delete_tasks; // rowset_id -> rowset_meta // store rowset id and meta for statistics rs size when delete std::map rowsets; @@ -4856,38 +4933,26 @@ int InstanceRecycler::recycle_rowsets() { auto* rowset_meta = rowset.mutable_rowset_meta(); if (config::enable_mark_delete_rowset_before_recycle) { - int mark_ret = mark_rowset_as_recycled(txn_kv_.get(), instance_id_, k, rowset); - if (mark_ret == -1) { - LOG(WARNING) << "failed to mark rowset as recycled, instance_id=" << instance_id_ - << " tablet_id=" << rowset_meta->tablet_id() << " version=[" - << rowset_meta->start_version() << '-' << rowset_meta->end_version() - << "]"; - return -1; - } else if (mark_ret == 1) { - LOG(INFO) - << "rowset already marked as recycled, recycler will delete data and kv at " - "next turn, instance_id=" - << instance_id_ << " tablet_id=" << rowset_meta->tablet_id() << " version=[" - << rowset_meta->start_version() << '-' << rowset_meta->end_version() << "]"; + if (need_mark_rowset_as_recycled(rowset)) { + rowset_keys_to_mark_recycled.emplace_back(k); + LOG(INFO) << "rowset queued to mark as recycled, recycler will delete data and kv " + "at next turn, instance_id=" + << instance_id_ << " tablet_id=" << rowset_meta->tablet_id() + << " version=[" << rowset_meta->start_version() << '-' + << rowset_meta->end_version() << "]"; return 0; } } - if (config::enable_abort_txn_and_job_for_delete_rowset_before_recycle) { - LOG(INFO) << "begin to abort txn or job for related rowset, instance_id=" - << instance_id_ << " tablet_id=" << rowset_meta->tablet_id() << " version=[" - << rowset_meta->start_version() << '-' << rowset_meta->end_version() << "]"; - - if (rowset_meta->end_version() != 1) { - int ret = abort_txn_or_job_for_recycle(rowset); - - if (ret != 0) { - LOG(WARNING) << "failed to abort txn or job for related rowset, instance_id=" - << instance_id_ << " tablet_id=" << rowset.tablet_id() - << " version=[" << rowset_meta->start_version() << '-' - << rowset_meta->end_version() << "]"; - return ret; - } + if (config::enable_abort_txn_and_job_for_delete_rowset_before_recycle && + rowset_meta->end_version() != 1) { + if (auto abort_task = make_deferred_abort_task(rowset); abort_task.has_value()) { + LOG(INFO) << "rowset queued to abort related txn or job after current scan batch, " + "instance_id=" + << instance_id_ << " tablet_id=" << rowset_meta->tablet_id() + << " version=[" << rowset_meta->start_version() << '-' + << rowset_meta->end_version() << "]"; + deferred_abort_tasks.emplace_back(std::move(*abort_task)); } } @@ -4913,11 +4978,9 @@ int InstanceRecycler::recycle_rowsets() { if (rowset.type() == RecycleRowsetPB::PREPARE) { // unable to calculate file path, can only be deleted by rowset id prefix num_prepare += 1; - if (delete_rowset_data_by_prefix(std::string(k), rowset_meta->resource_id(), - rowset_meta->tablet_id(), - rowset_meta->rowset_id_v2()) != 0) { - return -1; - } + deferred_prepare_delete_tasks.push_back({std::string(k), rowset_meta->resource_id(), + rowset_meta->rowset_id_v2(), + rowset_meta->tablet_id()}); } else { num_compacted += rowset.type() == RecycleRowsetPB::COMPACT; rowset_keys.emplace_back(k); @@ -4931,11 +4994,60 @@ int InstanceRecycler::recycle_rowsets() { auto loop_done = [&]() -> int { std::vector rowset_keys_to_delete; + std::vector abort_tasks_to_process; + std::vector prepare_delete_tasks; // rowset_id -> rowset_meta // store rowset id and meta for statistics rs size when delete std::map rowsets_to_delete; rowset_keys_to_delete.swap(rowset_keys); + abort_tasks_to_process.swap(deferred_abort_tasks); + prepare_delete_tasks.swap(deferred_prepare_delete_tasks); rowsets_to_delete.swap(rowsets); + for (const auto& abort_task : abort_tasks_to_process) { + LOG(INFO) << "begin to abort txn or job for related rowset, instance_id=" + << instance_id_ << " tablet_id=" << abort_task.tablet_id << " version=[" + << abort_task.start_version << '-' << abort_task.end_version << "]"; + int abort_ret = 0; + if (abort_task.type == DeferredRecycleAbortTask::Type::TXN) { + abort_ret = abort_txn_for_related_rowset(abort_task.txn_id); + } else { + RowsetMetaCloudPB rowset_meta; + rowset_meta.set_tablet_id(abort_task.tablet_id); + rowset_meta.set_rowset_id_v2(abort_task.rowset_id); + rowset_meta.set_job_id(abort_task.job_id); + abort_ret = abort_job_for_related_rowset(rowset_meta); + } + if (abort_ret != 0) { + LOG(WARNING) << "failed to abort txn or job for related rowset, instance_id=" + << instance_id_ << " tablet_id=" << abort_task.tablet_id + << " version=[" << abort_task.start_version << '-' + << abort_task.end_version << "]"; + return abort_ret; + } + } + if (!prepare_delete_tasks.empty()) { + worker_pool->submit([&, prepare_delete_tasks = std::move(prepare_delete_tasks)]() { + std::vector prepare_rowset_keys_to_delete; + prepare_rowset_keys_to_delete.reserve(prepare_delete_tasks.size()); + for (const auto& task : prepare_delete_tasks) { + if (delete_rowset_data(task.resource_id, task.tablet_id, task.rowset_id) != 0) { + LOG(WARNING) << "failed to delete rowset data, key=" << hex(task.key); + return; + } + if (delete_versioned_delete_bitmap_kvs(task.tablet_id, task.rowset_id) != 0) { + return; + } + prepare_rowset_keys_to_delete.emplace_back(task.key); + } + if (txn_remove(txn_kv_.get(), prepare_rowset_keys_to_delete) != 0) { + LOG(WARNING) << "failed to delete recycle rowset kv, instance_id=" + << instance_id_; + return; + } + num_recycled.fetch_add(prepare_rowset_keys_to_delete.size(), + std::memory_order_relaxed); + }); + } worker_pool->submit([&, rowset_keys_to_delete = std::move(rowset_keys_to_delete), rowsets_to_delete = std::move(rowsets_to_delete)]() { if (delete_rowset_data(rowsets_to_delete, RowsetRecyclingState::FORMAL_ROWSET, @@ -4966,6 +5078,16 @@ int InstanceRecycler::recycle_rowsets() { worker_pool->stop(); + int mark_ret = 0; + if (!rowset_keys_to_mark_recycled.empty()) { + mark_ret = batch_mark_rowsets_as_recycled(txn_kv_.get(), instance_id_, + rowset_keys_to_mark_recycled); + if (mark_ret != 0) { + LOG(WARNING) << "failed to batch mark recycle rowsets as recycled, instance_id=" + << instance_id_; + } + } + if (!async_recycled_rowset_keys.empty()) { if (txn_remove(txn_kv_.get(), async_recycled_rowset_keys) != 0) { LOG(WARNING) << "failed to delete recycle rowset kv, instance_id=" << instance_id_; @@ -4979,6 +5101,9 @@ int InstanceRecycler::recycle_rowsets() { segment_metrics_context_.report(); metrics_context.report(); + if (ret == 0) { + ret = mark_ret; + } return ret; } @@ -5565,6 +5690,8 @@ int InstanceRecycler::recycle_tmp_rowsets() { std::vector tmp_rowset_keys; std::vector tmp_rowset_ref_count_keys; + std::vector tmp_rowset_keys_to_mark_recycled; + std::vector deferred_abort_tasks; // rowset_id -> rowset_meta // store tmp_rowset id and meta for statistics rs size when delete @@ -5577,7 +5704,8 @@ int InstanceRecycler::recycle_tmp_rowsets() { auto handle_rowset_kv = [&num_scanned, &num_expired, &tmp_rowset_keys, &tmp_rowsets, &expired_rowset_size, &total_rowset_key_size, &total_rowset_value_size, - &earlest_ts, &tmp_rowset_ref_count_keys, this, + &earlest_ts, &tmp_rowset_ref_count_keys, + &tmp_rowset_keys_to_mark_recycled, &deferred_abort_tasks, this, &metrics_context](std::string_view k, std::string_view v) -> int { ++num_scanned; total_rowset_key_size += k.size(); @@ -5598,33 +5726,23 @@ int InstanceRecycler::recycle_tmp_rowsets() { } if (config::enable_mark_delete_rowset_before_recycle) { - int mark_ret = mark_rowset_as_recycled(txn_kv_.get(), instance_id_, k, rowset); - if (mark_ret == -1) { - LOG(WARNING) << "failed to mark rowset as recycled, instance_id=" << instance_id_ - << " tablet_id=" << rowset.tablet_id() << " version=[" - << rowset.start_version() << '-' << rowset.end_version() << "]"; - return -1; - } else if (mark_ret == 1) { - LOG(INFO) - << "rowset already marked as recycled, recycler will delete data and kv at " - "next turn, instance_id=" - << instance_id_ << " tablet_id=" << rowset.tablet_id() << " version=[" - << rowset.start_version() << '-' << rowset.end_version() << "]"; + if (need_mark_rowset_as_recycled(rowset)) { + tmp_rowset_keys_to_mark_recycled.emplace_back(k); + LOG(INFO) << "rowset queued to mark as recycled, recycler will delete data and kv " + "at next turn, instance_id=" + << instance_id_ << " tablet_id=" << rowset.tablet_id() << " version=[" + << rowset.start_version() << '-' << rowset.end_version() << "]"; return 0; } } if (config::enable_abort_txn_and_job_for_delete_rowset_before_recycle) { - LOG(INFO) << "begin to abort txn or job for related rowset, instance_id=" - << instance_id_ << " tablet_id=" << rowset.tablet_id() << " version=[" - << rowset.start_version() << '-' << rowset.end_version() << "]"; - - int ret = abort_txn_or_job_for_recycle(rowset); - if (ret != 0) { - LOG(WARNING) << "failed to abort txn or job for related rowset, instance_id=" - << instance_id_ << " tablet_id=" << rowset.tablet_id() << " version=[" - << rowset.start_version() << '-' << rowset.end_version() << "]"; - return ret; + if (auto abort_task = make_deferred_abort_task(rowset); abort_task.has_value()) { + LOG(INFO) << "rowset queued to abort related txn or job after current scan batch, " + "instance_id=" + << instance_id_ << " tablet_id=" << rowset.tablet_id() << " version=[" + << rowset.start_version() << '-' << rowset.end_version() << "]"; + deferred_abort_tasks.emplace_back(std::move(*abort_task)); } } @@ -5689,14 +5807,40 @@ int InstanceRecycler::recycle_tmp_rowsets() { }; auto loop_done = [&]() -> int { - DORIS_CLOUD_DEFER { - tmp_rowset_keys.clear(); - tmp_rowsets.clear(); - tmp_rowset_ref_count_keys.clear(); - }; - worker_pool->submit([&, tmp_rowset_keys_to_delete = tmp_rowset_keys, - tmp_rowsets_to_delete = tmp_rowsets, - tmp_rowset_ref_count_keys_to_delete = tmp_rowset_ref_count_keys]() { + std::vector tmp_rowset_keys_to_delete; + std::vector tmp_rowset_ref_count_keys_to_delete; + std::vector abort_tasks_to_process; + std::map tmp_rowsets_to_delete; + tmp_rowset_keys_to_delete.swap(tmp_rowset_keys); + tmp_rowsets_to_delete.swap(tmp_rowsets); + tmp_rowset_ref_count_keys_to_delete.swap(tmp_rowset_ref_count_keys); + abort_tasks_to_process.swap(deferred_abort_tasks); + for (const auto& abort_task : abort_tasks_to_process) { + LOG(INFO) << "begin to abort txn or job for related rowset, instance_id=" + << instance_id_ << " tablet_id=" << abort_task.tablet_id << " version=[" + << abort_task.start_version << '-' << abort_task.end_version << "]"; + int abort_ret = 0; + if (abort_task.type == DeferredRecycleAbortTask::Type::TXN) { + abort_ret = abort_txn_for_related_rowset(abort_task.txn_id); + } else { + RowsetMetaCloudPB rowset_meta; + rowset_meta.set_tablet_id(abort_task.tablet_id); + rowset_meta.set_rowset_id_v2(abort_task.rowset_id); + rowset_meta.set_job_id(abort_task.job_id); + abort_ret = abort_job_for_related_rowset(rowset_meta); + } + if (abort_ret != 0) { + LOG(WARNING) << "failed to abort txn or job for related rowset, instance_id=" + << instance_id_ << " tablet_id=" << abort_task.tablet_id + << " version=[" << abort_task.start_version << '-' + << abort_task.end_version << "]"; + return abort_ret; + } + } + worker_pool->submit([&, tmp_rowset_keys_to_delete = std::move(tmp_rowset_keys_to_delete), + tmp_rowsets_to_delete = std::move(tmp_rowsets_to_delete), + tmp_rowset_ref_count_keys_to_delete = + std::move(tmp_rowset_ref_count_keys_to_delete)]() { if (delete_rowset_data(tmp_rowsets_to_delete, RowsetRecyclingState::TMP_ROWSET, metrics_context) != 0) { LOG(WARNING) << "failed to delete tmp rowset data, instance_id=" << instance_id_; @@ -5722,7 +5866,7 @@ int InstanceRecycler::recycle_tmp_rowsets() { LOG(WARNING) << "failed to tmp rowset ref count kv, instance_id=" << instance_id_; return; } - num_recycled += tmp_rowset_keys.size(); + num_recycled += tmp_rowset_keys_to_delete.size(); return; }); return 0; @@ -5737,10 +5881,23 @@ int InstanceRecycler::recycle_tmp_rowsets() { worker_pool->stop(); + int mark_ret = 0; + if (!tmp_rowset_keys_to_mark_recycled.empty()) { + mark_ret = batch_mark_rowsets_as_recycled( + txn_kv_.get(), instance_id_, tmp_rowset_keys_to_mark_recycled); + if (mark_ret != 0) { + LOG(WARNING) << "failed to batch mark tmp rowsets as recycled, instance_id=" + << instance_id_; + } + } + // Report final metrics after all concurrent tasks completed segment_metrics_context_.report(); metrics_context.report(); + if (ret == 0) { + ret = mark_ret; + } return ret; } diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index ab03460093a837..3cebee92d216d2 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -582,9 +582,6 @@ class InstanceRecycler { int abort_txn_for_related_rowset(int64_t txn_id); int abort_job_for_related_rowset(const RowsetMetaCloudPB& rowset_meta); - template - int abort_txn_or_job_for_recycle(T& rowset_meta_pb); - private: std::atomic_bool stopped_ {false}; std::shared_ptr txn_kv_;