Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Fixed
* Support standard `router -> storage` upgrades from CRUD versions older than
1.6.0 by falling back to direct storage calls when old storages do not have
`_crud.call_on_storage` yet.

## [1.7.4] - 12-02-26

### Fixed
Expand Down
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ It also provides the `crud-storage` and `crud-router` roles for
- [Application dependency](#application-dependency)
- [Repository clone](#repository-clone)
- [Usage](#usage)
- [Upgrade from CRUD < 1.6.0](#upgrade-from-crud--160)
- [Sandbox](#sandbox)
- [API](#api)
- [Package info](#package-info)
Expand Down Expand Up @@ -123,6 +124,40 @@ For Tarantool 1.10, 2.x and 3.x you can also manually call
the [crud initialization code](#api) on [VShard](https://github.com/tarantool/vshard)
router and storage instances.

### Upgrade from CRUD < 1.6.0

CRUD 1.6.0 and CRUD 1.7.0 introduced compatibility boundaries that require
different upgrade orders.

When upgrading from CRUD < 1.6.0 to CRUD >= 1.6.0 and < 1.7.0 (1.6.0 or
1.6.1), update storages first:

1. Update all storages to the target CRUD version.
2. Make sure all storages are healthy and serve requests.
3. Update routers to the same target CRUD version.

CRUD 1.6.0 introduced router-to-storage calls through `_crud.call_on_storage`.
Storages running CRUD older than 1.6.0 do not have this function. Updating
storages first avoids the incompatible state where a new router calls
`_crud.call_on_storage` on an old storage. Old routers can still call upgraded
storages directly.

When upgrading from CRUD >= 1.6.0 and < 1.7.0 to CRUD >= 1.7.0, use the
[standard Tarantool replication cluster upgrade order](https://www.tarantool.io/en/doc/latest/admin/upgrades/upgrade_cluster/):
update routers first, then storage replica sets.

Direct upgrades from CRUD < 1.6.0 to CRUD >= 1.7.0 and <= 1.7.4 are not covered
by this procedure. To upgrade from CRUD < 1.6.0 to CRUD 1.7.4, do it in two
steps:

1. Upgrade to CRUD 1.6.0 or 1.6.1 using the storage-first order described above.
2. Upgrade from CRUD 1.6.0 or 1.6.1 to CRUD 1.7.4 using the standard
Tarantool/vshard order: routers first, then storages.

When upgrading from CRUD < 1.6.0 to CRUD > 1.7.4, router-side compatibility
mode is available, so use the standard Tarantool/vshard order: update routers
first, then storage replica sets.

> [!NOTE]
>
> After changing the cluster configuration (for example, adding a new replica set or changing their weights)
Expand Down
113 changes: 96 additions & 17 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local errors = require('errors')

local call_cache = require('crud.common.call_cache')
local dev_checks = require('crud.common.dev_checks')
local storage_call = require('crud.common.storage_call')
local yield_checks = require('crud.common.yield_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
Expand All @@ -15,16 +16,13 @@ local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor

local CallError = errors.new_class('CallError')

local CALL_FUNC_NAME = 'call_on_storage'
local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)

local call = {}

local function call_on_storage(run_as_user, func_name, ...)
return yield_checks.guard(box.session.su, run_as_user, call_cache.func_name_to_func(func_name), ...)
end

call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}
call.storage_api = {[storage_call.CALL_ON_STORAGE_FUNC_NAME] = call_on_storage}

function call.get_vshard_call_name(mode, prefer_replica, balance)
dev_checks('string', '?boolean', '?boolean')
Expand Down Expand Up @@ -84,18 +82,45 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
))
end

local function perform_storage_call(replicaset, method, replicaset_id, func_name, func_args, call_opts, force_legacy)
local call_func_name, call_args, used_legacy_call = storage_call.prepare(
replicaset_id, func_name, func_args, force_legacy
)

local resp, err = replicaset[method](replicaset, call_func_name, call_args, call_opts)
if err == nil and not used_legacy_call and not (call_opts or {}).is_async then
storage_call.mark_call_on_storage_supported(replicaset_id)
end

return resp, err, used_legacy_call
end

local function fallback_to_legacy_if_needed(replicaset, method, replicaset_id,
func_name, func_args, call_opts, resp, err, used_legacy_call)
if storage_call.should_fallback_to_legacy(replicaset_id, err, used_legacy_call) then
resp, err, used_legacy_call = perform_storage_call(
replicaset, method, replicaset_id, func_name, func_args, call_opts, true
)
end

return resp, err, used_legacy_call
end

--- Executes a vshard call and retries once after performing recovery actions
--- like bucket cache reset, destination redirect (for single calls), or master discovery.
local function call_with_retry_and_recovery(vshard_router,
replicaset, method, func_name, func_args, call_opts, is_single_call)
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)

replicaset, replicaset_id, method, func_name, func_args, call_opts, is_single_call)
-- In case cluster was just bootstrapped with auto master discovery,
-- replicaset may miss master.
local resp, err = replicaset[method](replicaset, CRUD_CALL_FUNC_NAME, func_args_ext, call_opts)
local resp, err, used_legacy_call = perform_storage_call(
replicaset, method, replicaset_id, func_name, func_args, call_opts
)
resp, err, used_legacy_call = fallback_to_legacy_if_needed(
replicaset, method, replicaset_id, func_name, func_args, call_opts, resp, err, used_legacy_call
)

if err == nil then
return resp, err
return resp, err, used_legacy_call
end

-- This is a partial copy of error handling from vshard.router.router_call_impl()
Expand All @@ -106,6 +131,7 @@ local function call_with_retry_and_recovery(vshard_router,
local destination = single_err.vshard_err.destination
if destination and vshard_router.replicasets[destination] then
replicaset = vshard_router.replicasets[destination]
replicaset_id = destination
end
end

Expand All @@ -124,7 +150,50 @@ local function call_with_retry_and_recovery(vshard_router,

-- Retry only once: should be enough for initial discovery,
-- otherwise force user fix up cluster bootstrap.
return replicaset[method](replicaset, CRUD_CALL_FUNC_NAME, func_args_ext, call_opts)
resp, err, used_legacy_call = perform_storage_call(
replicaset, method, replicaset_id, func_name, func_args, call_opts
)
resp, err, used_legacy_call = fallback_to_legacy_if_needed(
replicaset, method, replicaset_id, func_name, func_args, call_opts, resp, err, used_legacy_call
)

return resp, err, used_legacy_call
end

function call.storage_call(replicaset, method, replicaset_id, func_name, func_args, call_opts, force_legacy)
return perform_storage_call(replicaset, method, replicaset_id, func_name, func_args, call_opts, force_legacy)
end

local function wait_result_with_compat(future_info, wait_timeout)
local result, err = future_info.future:wait_result(wait_timeout)
local result_err = storage_call.result_error(result)

if err == nil and result_err == nil and not future_info.used_legacy_call then
storage_call.mark_call_on_storage_supported(future_info.replicaset_id)
end

if storage_call.should_fallback_to_legacy(
future_info.replicaset_id, err or result_err, future_info.used_legacy_call
) then
local future, call_err, used_legacy_call = perform_storage_call(
future_info.replicaset,
future_info.method,
future_info.replicaset_id,
future_info.func_name,
future_info.func_args,
future_info.call_opts,
true
)
if call_err ~= nil then
return nil, call_err
end

future_info.future = future
future_info.used_legacy_call = used_legacy_call
result, err = future:wait_result(wait_timeout)
end

return result, err
end

function call.map(vshard_router, func_name, func_args, opts)
Expand Down Expand Up @@ -172,8 +241,8 @@ function call.map(vshard_router, func_name, func_args, opts)
while iter:has_next() do
local args, replicaset, replicaset_id = iter:get()

local future, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
func_name, args, call_opts, false)
local future, err, used_legacy_call = call_with_retry_and_recovery(vshard_router, replicaset, replicaset_id,
vshard_call_name, func_name, args, call_opts, false)

if err ~= nil then
local result_info = {
Expand All @@ -192,17 +261,26 @@ function call.map(vshard_router, func_name, func_args, opts)
return postprocessor:get()
end

futures_by_replicasets[replicaset_id] = future
futures_by_replicasets[replicaset_id] = {
future = future,
replicaset = replicaset,
replicaset_id = replicaset_id,
method = vshard_call_name,
func_name = func_name,
func_args = args,
call_opts = call_opts,
used_legacy_call = used_legacy_call,
}
end

local deadline = fiber_clock() + timeout
for replicaset_id, future in pairs(futures_by_replicasets) do
for replicaset_id, future_info in pairs(futures_by_replicasets) do
local wait_timeout = deadline - fiber_clock()
if wait_timeout < 0 then
wait_timeout = 0
end

local result, err = future:wait_result(wait_timeout)
local result, err = wait_result_with_compat(future_info, wait_timeout)

local result_info = {
key = replicaset_id,
Expand Down Expand Up @@ -246,7 +324,8 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts)
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
local request_timeout = opts.mode == 'read' and opts.request_timeout or nil

local res, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
local replicaset_id = utils.get_replicaset_id(vshard_router, replicaset)
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, replicaset_id, vshard_call_name,
func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, true)
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
Expand All @@ -272,7 +351,7 @@ function call.any(vshard_router, func_name, func_args, opts)
end
local replicaset_id, replicaset = next(replicasets)

local res, err = call_with_retry_and_recovery(vshard_router, replicaset, 'call',
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, replicaset_id, 'call',
func_name, func_args, {timeout = timeout}, false)
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
Expand Down
Loading
Loading