Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions mysql-test/suite/rocksdb/r/alter_table_add_secondary_key.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Test that checks if a bulk_load_fail_if_not_bottommost_level=true doesn't affect secondary index creation badly.
#
#
# Create table t1 for which secondary index will be added later with ALTER
#
CREATE TABLE t1 (
id BIGINT PRIMARY KEY,
idx_col INT
) ENGINE=ROCKSDB;
#
# Create table t2. It will have index_id smaller than index_id of the secondary index for t1 (that we are going to create).
# Note: we can't use t1 instead of t2 to insert row while ALTER is running, because there will be MDL SHARED LOCK for t1.
#
CREATE TABLE t2 (
id BIGINT PRIMARY KEY,
idx_col INT
) ENGINE=ROCKSDB;
# Insert row to t1, so the secondary index is not going to be empty.
INSERT INTO t1 VALUES (1, 100);
#
# Start ALTER TABLE ADD INDEX and pause with DEBUG_SYNC after allocating new index_id
#
SET DEBUG_SYNC = 'rocksdb_create_key_def_after_index_id_allocated SIGNAL index_id_allocated';
SET GLOBAL DEBUG = '+d,rocksdb_create_key_def_after_index_id_allocated';
SET SESSION rocksdb_bulk_load_fail_if_not_bottommost_level = true;
ALTER TABLE t1 ADD INDEX idx (idx_col);;
#
# Waiting for index_id allocated...
#
SET DEBUG_SYNC = 'now WAIT_FOR index_id_allocated';
#
# Secondary index has received index_id. Creating table t3. It will have index_id greater than index_id of the secondary index for t1.
#
SET GLOBAL DEBUG = '-d,rocksdb_create_key_def_after_index_id_allocated';
CREATE TABLE t3 (
id BIGINT PRIMARY KEY,
idx_col INT
) ENGINE=ROCKSDB;
#
# Insert row with corresponding RocksDB key smaller than keys of the secondary index for t1.
INSERT INTO t2 VALUES (1, 100);
# Insert row with corresponding RocksDB key greater than keys of the secondary index for t1.
INSERT INTO t3 VALUES (1, 100);
#
#
# Flush memtable and compact SST from L0 to L6. Note: this SST will have range of keys overlapping with the range of keys that will be generated for the secondary index.
#
SET GLOBAL rocksdb_force_flush_memtable_and_lzero_now = ON;
# Now signal the waiting ALTER thread to continue and let it add the index (ingest happens now!)
SHOW INDEX FROM t1;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment Index_comment Visible Expression
t1 0 PRIMARY 1 id A 1 NULL NULL SE_SPECIFIC YES NULL
t1 1 idx 1 idx_col A 1 NULL NULL YES SE_SPECIFIC YES NULL
SELECT SUM(id) FROM t1 USE INDEX (idx);
SUM(id)
1
# Cleanup
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;
116 changes: 116 additions & 0 deletions mysql-test/suite/rocksdb/t/alter_table_add_secondary_key.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
--source include/not_valgrind.inc
--source include/have_rocksdb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc

# Set this to non-empty value to see SST files if you needed to debug this mtr test.
--let $debug_test = 0

--echo #
--echo # Test that checks if a bulk_load_fail_if_not_bottommost_level=true doesn't affect secondary index creation badly.
--echo #

--echo #
--echo # Create table t1 for which secondary index will be added later with ALTER
--echo #
CREATE TABLE t1 (
id BIGINT PRIMARY KEY,
idx_col INT
) ENGINE=ROCKSDB;

--echo #
--echo # Create table t2. It will have index_id smaller than index_id of the secondary index for t1 (that we are going to create).
--echo # Note: we can't use t1 instead of t2 to insert row while ALTER is running, because there will be MDL SHARED LOCK for t1.
--echo #
CREATE TABLE t2 (
id BIGINT PRIMARY KEY,
idx_col INT
) ENGINE=ROCKSDB;

--echo # Insert row to t1, so the secondary index is not going to be empty.
INSERT INTO t1 VALUES (1, 100);

--echo #
--echo # Start ALTER TABLE ADD INDEX and pause with DEBUG_SYNC after allocating new index_id
--echo #
--connect (conn_alter, localhost, root,,test)
--connection conn_alter

# Before the bug fix we could have waited up until finish_index_creation is signaled after table t3 was created.
# However, after the bug fix, it would never get there because it would wait on this index creation to finish.
# Therefore instead of waiting for signal to continue, we prefer to inject a sleep.
# SET DEBUG_SYNC = 'rocksdb_create_key_def_after_index_id_allocated SIGNAL index_id_allocated WAIT_FOR finish_index_creation';
SET DEBUG_SYNC = 'rocksdb_create_key_def_after_index_id_allocated SIGNAL index_id_allocated';
SET GLOBAL DEBUG = '+d,rocksdb_create_key_def_after_index_id_allocated';

SET SESSION rocksdb_bulk_load_fail_if_not_bottommost_level = true;

# This will hit the "rocksdb_create_key_def_after_index_id_allocated" sync point and wait:
--send ALTER TABLE t1 ADD INDEX idx (idx_col);

--connection default

--echo #
--echo # Waiting for index_id allocated...
--echo #

SET DEBUG_SYNC = 'now WAIT_FOR index_id_allocated';

--echo #
--echo # Secondary index has received index_id. Creating table t3. It will have index_id greater than index_id of the secondary index for t1.
--echo #
SET GLOBAL DEBUG = '-d,rocksdb_create_key_def_after_index_id_allocated'; # no need to wait after t3 index id allocated
CREATE TABLE t3 (
id BIGINT PRIMARY KEY,
idx_col INT
) ENGINE=ROCKSDB;

--echo #
--echo # Insert row with corresponding RocksDB key smaller than keys of the secondary index for t1.
INSERT INTO t2 VALUES (1, 100);
--echo # Insert row with corresponding RocksDB key greater than keys of the secondary index for t1.
INSERT INTO t3 VALUES (1, 100);
--echo #

--echo #
--echo # Flush memtable and compact SST from L0 to L6. Note: this SST will have range of keys overlapping with the range of keys that will be generated for the secondary index.
--echo #
SET GLOBAL rocksdb_force_flush_memtable_and_lzero_now = ON;

if ($debug_test)
{
--echo # Current state of the LSM tree (just before the secondary index is created):
SELECT CF_NAME, LEVEL, NAME, SMALLEST_KEY, LARGEST_KEY, BEING_COMPACTED, SIZE
FROM information_schema.rocksdb_live_files_metadata
WHERE CF_NAME = 'default'
ORDER BY LEVEL, NAME;
}

--echo # Now signal the waiting ALTER thread to continue and let it add the index (ingest happens now!)
# See comment about why we cannot wait on finish_index_creation after the bug fix (earlier in the test).
# SET DEBUG_SYNC = 'now SIGNAL finish_index_creation';
--sleep 5

if ($debug_test)
{
--echo # Current state of the LSM tree (after the secondary index has been created):
SELECT CF_NAME, LEVEL, NAME, SMALLEST_KEY, LARGEST_KEY, BEING_COMPACTED, SIZE
FROM information_schema.rocksdb_live_files_metadata
WHERE CF_NAME = 'default'
ORDER BY LEVEL, NAME;
}

--connection conn_alter
--reap # Reap the ALTER statement (it should now continue from the sync point)

--connection default

SHOW INDEX FROM t1;
SELECT SUM(id) FROM t1 USE INDEX (idx);

--echo # Cleanup
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;

--disconnect conn_alter
104 changes: 91 additions & 13 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */

#include <cassert>
#include "my_dbug.h"
#include "mysqld_error.h"
#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation // gcc: Class implementation
Expand Down Expand Up @@ -2944,6 +2945,21 @@ static bool is_tmp_table(const std::string &tablename) {
}
}

static void wait_until_no_conflicting_index_ids_reserved(GL_INDEX_ID index_id) {
auto local_dict_manager =
dict_manager.get_dict_manager_selector_non_const(index_id.cf_id);
std::lock_guard dm_lock(*local_dict_manager);
local_dict_manager->wait_until_no_conflicting_index_ids_reserved_for_create(
index_id);
}

static void wait_until_no_conflicting_index_ids_reserved(
const std::vector<GL_INDEX_ID> &index_ids_to_reserve) {
for (const auto &index_id : index_ids_to_reserve) {
wait_until_no_conflicting_index_ids_reserved(index_id);
}
}

static int rocksdb_compact_column_family(THD *const thd,
struct SYS_VAR *const var,
void *const var_ptr,
Expand Down Expand Up @@ -3575,11 +3591,38 @@ class Rdb_transaction {
int finish_bulk_load(bool *is_critical_error = nullptr,
bool print_client_error = true,
TABLE *table_arg = nullptr,
char *table_name_arg = nullptr) {
char *table_name_arg = nullptr,
std::shared_ptr<Rdb_key_def> index = {}) {
struct Release_index_id_reserved_for_create {
std::shared_ptr<Rdb_key_def> index;

explicit Release_index_id_reserved_for_create(
std::shared_ptr<Rdb_key_def> index_arg)
: index(index_arg) {}

~Release_index_id_reserved_for_create() {
if (index == nullptr) {
return;
}
const auto index_id = index->get_gl_index_id();
auto local_dict_manager =
dict_manager.get_dict_manager_selector_non_const(index_id.cf_id);
std::lock_guard dm_lock(*local_dict_manager);
local_dict_manager->remove_index_id_reserved_for_create(index_id);
}
} release_index_id_reserved_for_create(index);

if (m_curr_bulk_load.size() == 0) {
if (is_critical_error) {
*is_critical_error = false;
}
// We skip the Ensure_cleanup step below - ensure we don't miss
// to cleanup any of resources that would otherwise be cleanup
// there. We could move Ensure_cleanup earlier but before doing
// so, we better ensure it is safe to do so - hence assertions.
assert(m_curr_bulk_load_tablename.empty());
assert(m_key_merge.empty());
assert(m_bulk_load_index_registry.empty());
return HA_EXIT_SUCCESS;
}

Expand Down Expand Up @@ -3980,7 +4023,12 @@ class Rdb_transaction {
file_count);
}

if (index != nullptr) {
wait_until_no_conflicting_index_ids_reserved(index->get_gl_index_id());
}

const rocksdb::Status s = ingest_bulk_load_files(args);

if (THDVAR(m_thd, trace_sst_api)) {
LogPluginErrMsg(INFORMATION_LEVEL, ER_LOG_PRINTF_MSG,
"SST Tracing: IngestExternalFile '%zu' files returned %s",
Expand Down Expand Up @@ -6430,8 +6478,8 @@ static int rocksdb_init_internal(void *const p) {

if (rdb_has_rocksdb_corruption()) {
LogPluginErrMsg(ERROR_LEVEL, 0,
"There was corruption detected in the RocksDB data files. "
"Check error log emitted earlier for more details.");
"There was corruption detected in the RocksDB data files. "
"Check error log emitted earlier for more details.");
if (rocksdb_allow_to_start_after_corruption) {
LogPluginErrMsg(INFORMATION_LEVEL, 0,
"Set rocksdb_allow_to_start_after_corruption=0 to "
Expand Down Expand Up @@ -8385,7 +8433,8 @@ int ha_rocksdb::create_key_defs(
const TABLE &table_arg, Rdb_tbl_def &tbl_def_arg,
const std::string &actual_user_table_name, bool is_dd_tbl,
const TABLE *const old_table_arg /* = nullptr */,
const Rdb_tbl_def *const old_tbl_def_arg /* = nullptr */) const {
const Rdb_tbl_def *const old_tbl_def_arg /* = nullptr */,
bool is_secondary_index) const {
DBUG_ENTER_FUNC();

assert(table_arg.s != nullptr);
Expand Down Expand Up @@ -8454,7 +8503,8 @@ int ha_rocksdb::create_key_defs(
*/
for (uint i = 0; i < tbl_def_arg.m_key_count; i++) {
if (create_key_def(table_arg, i, tbl_def_arg, m_key_descr_arr[i], cfs[i],
ttl_duration, ttl_column, is_dd_tbl)) {
ttl_duration, ttl_column, is_dd_tbl,
is_secondary_index)) {
DBUG_RETURN(HA_EXIT_FAILURE);
}
}
Expand Down Expand Up @@ -8702,7 +8752,7 @@ bool ha_rocksdb::create_inplace_key_defs(
->get_stats(gl_index_id),
index_info.m_index_flags, ttl_rec_offset, ttl_duration);
} else if (create_key_def(table_arg, i, tbl_def_arg, new_key_descr[i],
cfs[i], ttl_duration, ttl_column)) {
cfs[i], ttl_duration, ttl_column, false, true)) {
return true;
}

Expand Down Expand Up @@ -8866,13 +8916,32 @@ int ha_rocksdb::create_key_def(const TABLE &table_arg, uint i,
const struct key_def_cf_info &cf_info,
uint64 ttl_duration,
const std::string &ttl_column,
bool is_dd_tbl /* = false */) const {
bool is_dd_tbl /* = false */,
bool is_secondary_index) const {
DBUG_ENTER_FUNC();

assert(new_key_def == nullptr);

const uint index_id = ddl_manager.get_and_update_next_number(
cf_info.cf_handle->GetID(), is_dd_tbl);
uint index_id;

{
auto local_dict_manager = dict_manager.get_dict_manager_selector_non_const(
cf_info.cf_handle->GetID());
std::lock_guard dm_lock(*local_dict_manager);
index_id = ddl_manager.get_and_update_next_number(
cf_info.cf_handle->GetID(), is_dd_tbl); // FIXME
if (is_secondary_index) {
local_dict_manager->add_index_id_reserved_for_create(
GL_INDEX_ID{cf_info.cf_handle->GetID(), index_id});
}
}

DEBUG_SYNC(ha_thd(), "rocksdb_create_key_def_after_index_id_allocated");
DBUG_EXECUTE_IF("rocksdb_create_key_def_after_index_id_allocated", {
using namespace std::chrono_literals;
std::this_thread::sleep_for(5s);
});

const uint16_t index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
uchar index_type;
uint16_t kv_version;
Expand Down Expand Up @@ -9142,7 +9211,8 @@ int ha_rocksdb::create_table(const std::string &table_name,
ulonglong auto_increment_value,
const dd::Table *table_def [[maybe_unused]]) {
DBUG_ENTER_FUNC();

std::vector<GL_INDEX_ID>
index_ids_to_reserve; // defined here because of gotos in code below
int err;
bool is_dd_tbl = dd::get_dictionary()->is_dd_table_name(
table_arg.s->db.str, table_arg.s->table_name.str);
Expand Down Expand Up @@ -9199,6 +9269,14 @@ int ha_rocksdb::create_table(const std::string &table_name,
assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
});

index_ids_to_reserve.clear();
index_ids_to_reserve.reserve(n_keys);
for (size_t i = 0; i < n_keys; i++) {
index_ids_to_reserve.push_back(m_key_descr_arr[i]->get_gl_index_id());
}

wait_until_no_conflicting_index_ids_reserved(index_ids_to_reserve);

{
std::lock_guard<Rdb_dict_manager> dm_lock(*local_dict_manager);
err = ddl_manager.put_and_write(m_tbl_def, batch);
Expand Down Expand Up @@ -14384,7 +14462,7 @@ bool ha_rocksdb::prepare_inplace_alter_table(

if (create_key_defs(*altered_table, *new_tdef,
"" /*actual_user_table_name*/, false /*is_dd_tbl*/,
table, m_tbl_def)) {
table, m_tbl_def, true)) {
/* Delete the new key descriptors */
delete[] new_key_descr;

Expand Down Expand Up @@ -14505,7 +14583,7 @@ bool ha_rocksdb::rebuild_table_def_from_table(TABLE *altered_table) {

if (create_key_defs(*altered_table, *new_tdef,
"" /* actual_user_table_name */, false /* is_dd_tbl */,
table, m_tbl_def)) {
table, m_tbl_def, false)) {
goto error;
}

Expand Down Expand Up @@ -14852,7 +14930,7 @@ int ha_rocksdb::inplace_populate_sk(

bool is_critical_error;
res = tx->finish_bulk_load(&is_critical_error, true, new_table_arg,
m_table_handler->m_table_name);
m_table_handler->m_table_name, index);

if (res == ER_DUP_ENTRY) {
assert(new_table_arg->key_info[index->get_keyno()].flags & HA_NOSAME);
Expand Down
Loading