diff --git a/Cargo.lock b/Cargo.lock index ee35dcc41b841..2c6b772c014be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5094,9 +5094,9 @@ dependencies = [ [[package]] name = "futures-async-stream" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "379790776b0d953337df4ab7ecc51936c66ea112484cad7912907b1d34253ebf" +checksum = "6cce57e88ba9fe4953f476112b2c8e315a2da07725a14dc091ac3e5b6e4cca72" dependencies = [ "futures-async-stream-macro", "futures-core", @@ -5105,9 +5105,9 @@ dependencies = [ [[package]] name = "futures-async-stream-macro" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5df2c13d48c8cb8a3ec093ede6f0f4482f327d7bb781120c5fb483ef0f17e758" +checksum = "5ac45ed0bddbd110eb68862768a194f88700f5b91c39931d2f432fab67a16d08" dependencies = [ "proc-macro2", "quote", @@ -7648,6 +7648,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-format" version = "0.4.4" @@ -13877,13 +13883,14 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", "itoa", "libc", + "num-conv", "num_threads", "powerfmt", "serde", @@ -13899,10 +13906,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ + "num-conv", "time-core", ] diff --git a/Cargo.toml b/Cargo.toml index 0c7b8a8999e60..1785398917b57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -247,11 +247,16 @@ rw_iter_util = { path = "src/utils/iter_util" } [workspace.lints.rust] # `forbid` will also prevent the misuse of `#[allow(unused)]` unused_must_use = "forbid" -future_incompatible = "warn" -nonstandard_style = "warn" -rust_2018_idioms = "warn" +future_incompatible = { level = "warn", priority = -1 } +nonstandard_style = { level = "warn", priority = -1 } +rust_2018_idioms = { level = "warn", priority = -1 } # Backward compatibility is not important for an application. async_fn_in_trait = "allow" +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(madsim)', + 'cfg(coverage)', + 'cfg(dashboard_built)', +] } [workspace.lints.clippy] uninlined_format_args = "allow" diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 329bf01b49c5b..6602509824e05 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240729 +export BUILD_ENV_VERSION=v20240731 export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 89287926edc7f..83cb000566d4a 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: - mysql - db @@ -107,12 +107,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -123,7 +123,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240729 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: db: condition: service_healthy diff --git a/ci/rust-toolchain b/ci/rust-toolchain index 92ef4a5ff27be..6bc57a2a65d8f 100644 --- a/ci/rust-toolchain +++ b/ci/rust-toolchain @@ -4,4 +4,4 @@ # 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints [toolchain] -channel = "nightly-2024-03-12" +channel = "nightly-2024-06-06" diff --git a/ci/scripts/build-other.sh b/ci/scripts/build-other.sh index 65c50462f97a0..d59e8ea876ccf 100755 --- a/ci/scripts/build-other.sh +++ b/ci/scripts/build-other.sh @@ -5,9 +5,9 @@ set -euo pipefail source ci/scripts/common.sh - echo "--- Build Rust UDF" cd e2e_test/udf/wasm +rustup target add wasm32-wasi cargo build --release cd ../../.. diff --git a/clippy.toml b/clippy.toml index 551de0eb6c479..21b972376b0ed 100644 --- a/clippy.toml +++ b/clippy.toml @@ -39,3 +39,6 @@ doc-valid-idents = [ avoid-breaking-exported-api = false upper-case-acronyms-aggressive = true too-many-arguments-threshold = 10 +ignore-interior-mutability = [ + "risingwave_frontend::expr::ExprImpl" # XXX: Where does ExprImpl have interior mutability? +] diff --git a/e2e_test/source_inline/pubsub/prepare-data.rs b/e2e_test/source_inline/pubsub/prepare-data.rs index a792c6cbb2618..e084b46919421 100755 --- a/e2e_test/source_inline/pubsub/prepare-data.rs +++ b/e2e_test/source_inline/pubsub/prepare-data.rs @@ -1,5 +1,5 @@ #!/usr/bin/env -S cargo -Zscript -```cargo +---cargo [dependencies] anyhow = "1" google-cloud-googleapis = { version = "0.13", features = ["pubsub"] } @@ -13,7 +13,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "signal", "fs", ] } -``` +--- use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::client::{Client, ClientConfig}; diff --git a/lints/Cargo.lock b/lints/Cargo.lock index 3a9bd8384c0d9..e3b748e6da670 100644 --- a/lints/Cargo.lock +++ b/lints/Cargo.lock @@ -162,8 +162,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clippy_config" -version = "0.1.79" -source = "git+https://github.com/rust-lang/rust-clippy?rev=fca4e16ffb8c07186ee23becd44cd5c9fb51896c#fca4e16ffb8c07186ee23becd44cd5c9fb51896c" +version = "0.1.80" dependencies = [ "rustc-semver", "serde", @@ -172,8 +171,7 @@ dependencies = [ [[package]] name = "clippy_utils" -version = "0.1.79" -source = "git+https://github.com/rust-lang/rust-clippy?rev=fca4e16ffb8c07186ee23becd44cd5c9fb51896c#fca4e16ffb8c07186ee23becd44cd5c9fb51896c" +version = "0.1.80" dependencies = [ "arrayvec", "clippy_config", diff --git a/lints/Cargo.toml b/lints/Cargo.toml index 74fc49c3fd080..43ece1f6fc5b7 100644 --- a/lints/Cargo.toml +++ b/lints/Cargo.toml @@ -14,7 +14,7 @@ path = "ui/format_error.rs" # See `README.md` before bumping the version. # Remember to update the version in `ci/Dockerfile` as well. [dependencies] -clippy_utils = { git = "https://github.com/rust-lang/rust-clippy", rev = "fca4e16ffb8c07186ee23becd44cd5c9fb51896c" } +clippy_utils = { git = "https://github.com/risingwavelabs/clippy", rev = "5e2a7c6adebdb0478ee6d5b67ab4ee94153b2997" } dylint_linting = "3.1.0" itertools = "0.12" diff --git a/lints/README.md b/lints/README.md index 5007474227ab3..3ab55f0bbfe7a 100644 --- a/lints/README.md +++ b/lints/README.md @@ -30,8 +30,13 @@ Duplicate `.vscode/settings.json.example` to `.vscode/settings.json` to enable r ## Bump toolchain -The version of the toolchain is specified in `rust-toolchain` file under current directory. -It does not have to be exactly the same as the one used to build RisingWave, but it should be close enough to avoid compile errors. +The version of the toolchain is specified in `rust-toolchain` file under current directory. It will be used to build the lints, and also be used by `dylint` to compile RisingWave, instead of the root-level `rust-toolchain`. + +So the chosen toolchain needs to +1. be close enough to the root-level `rust-toolchain` to make RisingWave compile. It does not have to be exactly the same version though. +2. be close enough to the dependency `clippy_utils`'s corresponding `rust-toolchain` in the Clippy's repo. + +(Note: `clippy_utils` depends on rustc's internal unstable API. When rustc has breaking changes, the `rust` repo's Clippy will be updated. And then it's [synced back to the Clippy repo bi-weekly](https://doc.rust-lang.org/clippy/development/infrastructure/sync.html#syncing-changes-between-clippy-and-rust-langrust). So ideally we can use `clippy_utils` in the rust repo corresponding to our root-level nightly version, but that repo is too large. Perhaps we can also consider copy the code out to workaround this problem.) The information below can be helpful in finding the appropriate version to bump to. diff --git a/lints/rust-toolchain b/lints/rust-toolchain index 3bbdf2b2d53fd..a146af66cd637 100644 --- a/lints/rust-toolchain +++ b/lints/rust-toolchain @@ -1,5 +1,5 @@ # See `README.md` before bumping the version. [toolchain] -channel = "nightly-2024-04-18" +channel = "nightly-2024-06-06" components = ["llvm-tools-preview", "rustc-dev"] diff --git a/lints/src/format_error.rs b/lints/src/format_error.rs index 9b6d9be5b8c4e..402adc4aa5af0 100644 --- a/lints/src/format_error.rs +++ b/lints/src/format_error.rs @@ -14,7 +14,7 @@ use clippy_utils::diagnostics::span_lint_and_help; use clippy_utils::macros::{ - find_format_arg_expr, find_format_args, is_format_macro, macro_backtrace, + find_format_arg_expr, is_format_macro, macro_backtrace, FormatArgsStorage, }; use clippy_utils::ty::{implements_trait, match_type}; use clippy_utils::{ @@ -56,7 +56,15 @@ declare_tool_lint! { } #[derive(Default)] -pub struct FormatError; +pub struct FormatError { + format_args: FormatArgsStorage, +} + +impl FormatError { + pub fn new(format_args: FormatArgsStorage) -> Self { + Self { format_args } + } +} impl_lint_pass!(FormatError => [FORMAT_ERROR]); @@ -90,7 +98,7 @@ impl<'tcx> LateLintPass<'tcx> for FormatError { for macro_call in macro_backtrace(expr.span) { if is_format_macro(cx, macro_call.def_id) - && let Some(format_args) = find_format_args(cx, expr, macro_call.expn) + && let Some(format_args) = self.format_args.get(cx, expr, macro_call.expn) { for piece in &format_args.template { if let FormatArgsPiece::Placeholder(placeholder) = piece diff --git a/lints/src/lib.rs b/lints/src/lib.rs index df77538d3cf17..6928bcd028a8c 100644 --- a/lints/src/lib.rs +++ b/lints/src/lib.rs @@ -14,7 +14,6 @@ #![feature(rustc_private)] #![feature(let_chains)] -#![feature(lazy_cell)] #![warn(unused_extern_crates)] extern crate rustc_ast; @@ -36,13 +35,19 @@ pub fn register_lints(_sess: &rustc_session::Session, lint_store: &mut rustc_lin // -- Begin lint registration -- // Preparation steps. - lint_store.register_early_pass(|| { - Box::::default() + let format_args_storage = clippy_utils::macros::FormatArgsStorage::default(); + let format_args = format_args_storage.clone(); + lint_store.register_early_pass(move || { + Box::new(utils::format_args_collector::FormatArgsCollector::new( + format_args.clone(), + )) }); // Actual lints. lint_store.register_lints(&[format_error::FORMAT_ERROR]); - lint_store.register_late_pass(|_| Box::::default()); + let format_args = format_args_storage.clone(); + lint_store + .register_late_pass(move |_| Box::new(format_error::FormatError::new(format_args.clone()))); // -- End lint registration -- diff --git a/lints/src/utils/format_args_collector.rs b/lints/src/utils/format_args_collector.rs index 7524169666d97..a3b144a6a8aeb 100644 --- a/lints/src/utils/format_args_collector.rs +++ b/lints/src/utils/format_args_collector.rs @@ -12,15 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Copied from `https://github.com/rust-lang/rust-clippy/blob/8b0bf6423dfaf5545014db85fcba7bc745beed4c/clippy_lints/src/utils/format_args_collector.rs` -//! -//! Init `AST_FORMAT_ARGS` before running the late pass, so that we can call `find_format_args`. +//! Copied from `https://github.com/rust-lang/rust-clippy/blob/993d8ae2a7b26ac779fde923b2ce9ce35d7143a8/clippy_lints/src/utils/format_args_collector.rs` use std::iter::once; use std::mem; -use std::rc::Rc; -use clippy_utils::macros::AST_FORMAT_ARGS; +use clippy_utils::macros::FormatArgsStorage; use clippy_utils::source::snippet_opt; use itertools::Itertools; use rustc_ast::{Crate, Expr, ExprKind, FormatArgs}; @@ -30,11 +27,19 @@ use rustc_lint::{EarlyContext, EarlyLintPass}; use rustc_session::impl_lint_pass; use rustc_span::{hygiene, Span}; -/// Collects [`rustc_ast::FormatArgs`] so that future late passes can call -/// [`clippy_utils::macros::find_format_args`] -#[derive(Default)] +/// Populates [`FormatArgsStorage`] with AST [`FormatArgs`] nodes pub struct FormatArgsCollector { - format_args: FxHashMap>, + format_args: FxHashMap, + storage: FormatArgsStorage, +} + +impl FormatArgsCollector { + pub fn new(storage: FormatArgsStorage) -> Self { + Self { + format_args: FxHashMap::default(), + storage, + } + } } impl_lint_pass!(FormatArgsCollector => []); @@ -47,15 +52,12 @@ impl EarlyLintPass for FormatArgsCollector { } self.format_args - .insert(expr.span.with_parent(None), Rc::new((**args).clone())); + .insert(expr.span.with_parent(None), (**args).clone()); } } fn check_crate_post(&mut self, _: &EarlyContext<'_>, _: &Crate) { - AST_FORMAT_ARGS.with(|ast_format_args| { - let result = ast_format_args.set(mem::take(&mut self.format_args)); - debug_assert!(result.is_ok()); - }); + self.storage.set(mem::take(&mut self.format_args)); } } diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 5783c788bb5e4..3614673ee941e 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -206,21 +206,24 @@ impl LogRowSeqScanExecutor { .batch_iter_log_with_pk_bounds(old_epoch, new_epoch) .await? .flat_map(|r| { - futures::stream::iter(std::iter::from_coroutine(move || { - match r { - Ok(change_log_row) => { - fn with_op(op: Op, row: impl Row) -> impl Row { - row.chain([Some(ScalarImpl::Int16(op.to_i16()))]) + futures::stream::iter(std::iter::from_coroutine( + #[coroutine] + move || { + match r { + Ok(change_log_row) => { + fn with_op(op: Op, row: impl Row) -> impl Row { + row.chain([Some(ScalarImpl::Int16(op.to_i16()))]) + } + for (op, row) in change_log_row.into_op_value_iter() { + yield Ok(with_op(op, row)); + } } - for (op, row) in change_log_row.into_op_value_iter() { - yield Ok(with_op(op, row)); + Err(e) => { + yield Err(e); } - } - Err(e) => { - yield Err(e); - } - }; - })) + }; + }, + )) }); pin_mut!(iter); diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index ce479daa2afc5..414f27b33b4a7 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -28,7 +28,6 @@ #![feature(allocator_api)] #![feature(impl_trait_in_assoc_type)] #![feature(assert_matches)] -#![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(map_try_insert)] #![feature(iter_from_coroutine)] diff --git a/src/cmd_all/src/lib.rs b/src/cmd_all/src/lib.rs index 73180c75032ca..9b95af8dce4d3 100644 --- a/src/cmd_all/src/lib.rs +++ b/src/cmd_all/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] #![feature(let_chains)] mod common; diff --git a/src/common/metrics/src/lib.rs b/src/common/metrics/src/lib.rs index fa0250f4f0c7b..35e8b11265843 100644 --- a/src/common/metrics/src/lib.rs +++ b/src/common/metrics/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] #![feature(type_alias_impl_trait)] #![feature(impl_trait_in_assoc_type)] use std::ops::Deref; diff --git a/src/common/secret/src/lib.rs b/src/common/secret/src/lib.rs index 8ac065e5ea183..17319a296734d 100644 --- a/src/common/secret/src/lib.rs +++ b/src/common/secret/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - type SecretId = u32; mod secret_manager; diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 467b313720f29..8d47d0c621646 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -26,7 +26,6 @@ #![feature(lint_reasons)] #![feature(coroutines)] #![feature(map_try_insert)] -#![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(let_chains)] #![feature(portable_simd)] @@ -35,7 +34,6 @@ #![allow(incomplete_features)] #![feature(iterator_try_collect)] #![feature(iter_order_by)] -#![feature(exclusive_range_pattern)] #![feature(binary_heap_into_iter_sorted)] #![feature(impl_trait_in_assoc_type)] #![feature(map_entry_replace)] diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index cee364ceb26de..d91fb56b1cb88 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -18,7 +18,6 @@ #![feature(let_chains)] #![feature(lint_reasons)] #![feature(impl_trait_in_assoc_type)] -#![feature(lazy_cell)] #![cfg_attr(coverage, feature(coverage_attribute))] #[macro_use] diff --git a/src/connector/benches/nexmark_integration.rs b/src/connector/benches/nexmark_integration.rs index 172931562efef..f59ae08a9ec14 100644 --- a/src/connector/benches/nexmark_integration.rs +++ b/src/connector/benches/nexmark_integration.rs @@ -18,8 +18,6 @@ //! `ByteStreamSourceParserImpl::create` based on the given configuration, rather //! than depending on a specific internal implementation. -#![feature(lazy_cell)] - use std::sync::LazyLock; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; diff --git a/src/connector/codec/src/lib.rs b/src/connector/codec/src/lib.rs index 651fa84e109fb..cbf0ad14046f7 100644 --- a/src/connector/codec/src/lib.rs +++ b/src/connector/codec/src/lib.rs @@ -22,7 +22,6 @@ #![feature(box_patterns)] #![feature(trait_alias)] #![feature(lint_reasons)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(box_into_inner)] #![feature(type_alias_impl_trait)] diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 85bb7740ae9f9..02a3b8c84b50f 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -20,7 +20,6 @@ #![feature(box_patterns)] #![feature(trait_alias)] #![feature(lint_reasons)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(box_into_inner)] #![feature(type_alias_impl_trait)] diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index dde74a999ac9f..ac93ab3e69807 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -87,7 +87,7 @@ impl AvroAccessBuilder { /// ## Confluent schema registry /// /// - In Kafka ([Confluent schema registry wire format](https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format)): - /// starts with 5 bytes`0x00{schema_id:08x}` followed by Avro binary encoding. + /// starts with 5 bytes`0x00{schema_id:08x}` followed by Avro binary encoding. async fn parse_avro_value(&self, payload: &[u8]) -> ConnectorResult> { // parse payload to avro value // if use confluent schema, get writer schema from confluent schema registry diff --git a/src/connector/src/sink/formatter/append_only.rs b/src/connector/src/sink/formatter/append_only.rs index 8a4b7feda6a42..c0c47fa37e171 100644 --- a/src/connector/src/sink/formatter/append_only.rs +++ b/src/connector/src/sink/formatter/append_only.rs @@ -40,19 +40,22 @@ impl SinkFormatter for AppendOnlyFormatter impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - for (op, row) in chunk.rows() { - if op != Op::Insert { - continue; - } - let event_key_object = match &self.key_encoder { - Some(key_encoder) => Some(tri!(key_encoder.encode(row))), - None => None, - }; - let event_object = Some(tri!(self.val_encoder.encode(row))); + std::iter::from_coroutine( + #[coroutine] + || { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let event_key_object = match &self.key_encoder { + Some(key_encoder) => Some(tri!(key_encoder.encode(row))), + None => None, + }; + let event_object = Some(tri!(self.val_encoder.encode(row))); - yield Ok((event_key_object, event_object)) - } - }) + yield Ok((event_key_object, event_object)) + } + }, + ) } } diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index fd4813e78541a..a9bf0404f473e 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -100,100 +100,103 @@ impl SinkFormatter for DebeziumJsonFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - let DebeziumJsonFormatter { - schema, - pk_indices, - db_name, - sink_from_name, - opts, - key_encoder, - val_encoder, - } = self; - let ts_ms = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - let source_field = json!({ - // todo: still some missing fields in source field - // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events - "db": db_name, - "table": sink_from_name, - "ts_ms": ts_ms, - }); - - let mut update_cache: Option> = None; - - for (op, row) in chunk.rows() { - let event_key_object: Option = Some(json!({ - "schema": json!({ - "type": "struct", - "fields": fields_pk_to_json(&schema.fields, pk_indices), - "optional": false, - "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), - }), - "payload": tri!(key_encoder.encode(row)), - })); - let event_object: Option = match op { - Op::Insert => Some(json!({ - "schema": schema_to_json(schema, db_name, sink_from_name), - "payload": { - "before": null, - "after": tri!(val_encoder.encode(row)), - "op": "c", - "ts_ms": ts_ms, - "source": source_field, - } - })), - Op::Delete => { - let value_obj = Some(json!({ + std::iter::from_coroutine( + #[coroutine] + || { + let DebeziumJsonFormatter { + schema, + pk_indices, + db_name, + sink_from_name, + opts, + key_encoder, + val_encoder, + } = self; + let ts_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let source_field = json!({ + // todo: still some missing fields in source field + // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events + "db": db_name, + "table": sink_from_name, + "ts_ms": ts_ms, + }); + + let mut update_cache: Option> = None; + + for (op, row) in chunk.rows() { + let event_key_object: Option = Some(json!({ + "schema": json!({ + "type": "struct", + "fields": fields_pk_to_json(&schema.fields, pk_indices), + "optional": false, + "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), + }), + "payload": tri!(key_encoder.encode(row)), + })); + let event_object: Option = match op { + Op::Insert => Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { - "before": tri!(val_encoder.encode(row)), - "after": null, - "op": "d", + "before": null, + "after": tri!(val_encoder.encode(row)), + "op": "c", "ts_ms": ts_ms, "source": source_field, } - })); - yield Ok((event_key_object.clone(), value_obj)); - - if opts.gen_tombstone { - // Tomestone event - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events - yield Ok((event_key_object, None)); - } - - continue; - } - Op::UpdateDelete => { - update_cache = Some(tri!(val_encoder.encode(row))); - continue; - } - Op::UpdateInsert => { - if let Some(before) = update_cache.take() { - Some(json!({ + })), + Op::Delete => { + let value_obj = Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { - "before": before, - "after": tri!(val_encoder.encode(row)), - "op": "u", + "before": tri!(val_encoder.encode(row)), + "after": null, + "op": "d", "ts_ms": ts_ms, "source": source_field, } - })) - } else { - warn!( - "not found UpdateDelete in prev row, skipping, row index {:?}", - row.index() - ); + })); + yield Ok((event_key_object.clone(), value_obj)); + + if opts.gen_tombstone { + // Tomestone event + // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events + yield Ok((event_key_object, None)); + } + continue; } - } - }; - yield Ok((event_key_object, event_object)); - } - }) + Op::UpdateDelete => { + update_cache = Some(tri!(val_encoder.encode(row))); + continue; + } + Op::UpdateInsert => { + if let Some(before) = update_cache.take() { + Some(json!({ + "schema": schema_to_json(schema, db_name, sink_from_name), + "payload": { + "before": before, + "after": tri!(val_encoder.encode(row)), + "op": "u", + "ts_ms": ts_ms, + "source": source_field, + } + })) + } else { + warn!( + "not found UpdateDelete in prev row, skipping, row index {:?}", + row.index() + ); + continue; + } + } + }; + yield Ok((event_key_object, event_object)); + } + }, + ) } } diff --git a/src/connector/src/sink/formatter/upsert.rs b/src/connector/src/sink/formatter/upsert.rs index 612c22aaaf86c..7e586a7917ea1 100644 --- a/src/connector/src/sink/formatter/upsert.rs +++ b/src/connector/src/sink/formatter/upsert.rs @@ -40,22 +40,25 @@ impl SinkFormatter for UpsertFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - for (op, row) in chunk.rows() { - let event_key_object = Some(tri!(self.key_encoder.encode(row))); - - let event_object = match op { - Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), - // Empty value with a key - Op::Delete => None, - Op::UpdateDelete => { - // upsert semantic does not require update delete event - continue; - } - }; - - yield Ok((event_key_object, event_object)) - } - }) + std::iter::from_coroutine( + #[coroutine] + || { + for (op, row) in chunk.rows() { + let event_key_object = Some(tri!(self.key_encoder.encode(row))); + + let event_object = match op { + Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), + // Empty value with a key + Op::Delete => None, + Op::UpdateDelete => { + // upsert semantic does not require update delete event + continue; + } + }; + + yield Ok((event_key_object, event_object)) + } + }, + ) } } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index c6e65fb00c399..3a6914d2249c5 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -403,7 +403,16 @@ struct KafkaPayloadWriter<'a> { config: &'a KafkaConfig, } -pub type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; +mod opaque_type { + use super::*; + pub type KafkaSinkDeliveryFuture = impl TryFuture + Unpin + 'static; + + pub(super) fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture { + future.map(KafkaPayloadWriter::<'static>::map_future_result) + } +} +use opaque_type::map_delivery_future; +pub use opaque_type::KafkaSinkDeliveryFuture; pub struct KafkaSinkWriter { formatter: SinkFormatterImpl, @@ -482,7 +491,7 @@ impl<'w> KafkaPayloadWriter<'w> { Ok(delivery_future) => { if self .add_future - .add_future_may_await(Self::map_delivery_future(delivery_future)) + .add_future_may_await(map_delivery_future(delivery_future)) .await? { tracing::warn!( @@ -567,10 +576,6 @@ impl<'w> KafkaPayloadWriter<'w> { Err(_) => Err(KafkaError::Canceled.into()), } } - - fn map_delivery_future(future: DeliveryFuture) -> KafkaSinkDeliveryFuture { - future.map(KafkaPayloadWriter::<'static>::map_future_result) - } } impl<'a> FormattedSink for KafkaPayloadWriter<'a> { diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 771d3c8a6f91d..3b5d49da46037 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -201,8 +201,36 @@ impl KinesisSinkWriter { } } -pub type KinesisSinkPayloadWriterDeliveryFuture = - impl TryFuture + Unpin + Send + 'static; +mod opaque_type { + use super::*; + pub type KinesisSinkPayloadWriterDeliveryFuture = + impl TryFuture + Unpin + Send + 'static; + + impl KinesisSinkPayloadWriter { + pub(super) fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture { + async move { + let builder = self.builder.expect("should not be None"); + let context_fmt = format!( + "failed to put record to {}", + builder + .get_stream_name() + .as_ref() + .expect("should have set stream name") + ); + Retry::spawn( + ExponentialBackoff::from_millis(100).map(jitter).take(3), + || builder.clone().send(), + ) + .await + .with_context(|| context_fmt.clone()) + .map_err(SinkError::Kinesis)?; + Ok(()) + } + .boxed() + } + } +} +pub use opaque_type::KinesisSinkPayloadWriterDeliveryFuture; impl KinesisSinkPayloadWriter { fn put_record(&mut self, key: String, payload: Vec) { @@ -216,28 +244,6 @@ impl KinesisSinkPayloadWriter { ), ); } - - fn finish(self) -> KinesisSinkPayloadWriterDeliveryFuture { - async move { - let builder = self.builder.expect("should not be None"); - let context_fmt = format!( - "failed to put record to {}", - builder - .get_stream_name() - .as_ref() - .expect("should have set stream name") - ); - Retry::spawn( - ExponentialBackoff::from_millis(100).map(jitter).take(3), - || builder.clone().send(), - ) - .await - .with_context(|| context_fmt.clone()) - .map_err(SinkError::Kinesis)?; - Ok(()) - } - .boxed() - } } impl FormattedSink for KinesisSinkPayloadWriter { diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 3f016ad94946d..a92d5b16f85e3 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -235,15 +235,20 @@ struct PulsarPayloadWriter<'w> { add_future: DeliveryFutureManagerAddFuture<'w, PulsarDeliveryFuture>, } -pub type PulsarDeliveryFuture = impl TryFuture + Unpin + 'static; - -fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture { - future.map(|result| { - result - .map(|_| ()) - .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e))) - }) +mod opaque_type { + use super::*; + pub type PulsarDeliveryFuture = impl TryFuture + Unpin + 'static; + + pub(super) fn may_delivery_future(future: SendFuture) -> PulsarDeliveryFuture { + future.map(|result| { + result + .map(|_| ()) + .map_err(|e: pulsar::Error| SinkError::Pulsar(anyhow!(e))) + }) + } } +use opaque_type::may_delivery_future; +pub use opaque_type::PulsarDeliveryFuture; impl PulsarSinkWriter { pub async fn new( diff --git a/src/expr/core/src/lib.rs b/src/expr/core/src/lib.rs index b250b8ce901f5..d45d4ca11f80a 100644 --- a/src/expr/core/src/lib.rs +++ b/src/expr/core/src/lib.rs @@ -15,7 +15,6 @@ #![feature(let_chains)] #![feature(lint_reasons)] #![feature(iterator_try_collect)] -#![feature(lazy_cell)] #![feature(coroutines)] #![feature(never_type)] #![feature(error_generic_member_access)] diff --git a/src/expr/impl/src/lib.rs b/src/expr/impl/src/lib.rs index 56bdbe3b81100..e5c69c2660eeb 100644 --- a/src/expr/impl/src/lib.rs +++ b/src/expr/impl/src/lib.rs @@ -25,8 +25,6 @@ #![feature(assert_matches)] #![feature(lint_reasons)] #![feature(iterator_try_collect)] -#![feature(exclusive_range_pattern)] -#![feature(lazy_cell)] #![feature(coroutines)] #![feature(test)] #![feature(iter_array_chunks)] diff --git a/src/expr/impl/src/window_function/buffer.rs b/src/expr/impl/src/window_function/buffer.rs index bd1c10d162b23..57217dda6fd4b 100644 --- a/src/expr/impl/src/window_function/buffer.rs +++ b/src/expr/impl/src/window_function/buffer.rs @@ -970,7 +970,7 @@ mod tests { let key = |key: i64| -> StateKey { StateKey { - order_key: memcmp_encoding::encode_value(&Some(ScalarImpl::from(key)), order_type) + order_key: memcmp_encoding::encode_value(Some(ScalarImpl::from(key)), order_type) .unwrap(), pk: OwnedRow::empty().into(), } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 185d65cb567a0..4bdf9fa398f77 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -25,7 +25,6 @@ #![feature(assert_matches)] #![feature(lint_reasons)] #![feature(box_patterns)] -#![feature(lazy_cell)] #![feature(macro_metavar_expr)] #![feature(min_specialization)] #![feature(extend_one)] diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 18d1807948d21..419f4ffd21cb5 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(error_generic_member_access)] -#![feature(lazy_cell)] #![feature(once_cell_try)] #![feature(type_alias_impl_trait)] #![feature(try_blocks)] @@ -243,25 +242,28 @@ struct JavaClassMethodCache { utc: OnceLock, } -// TODO: may only return a RowRef -pub type StreamChunkRowIterator<'a> = impl Iterator + 'a; +mod opaque_type { + use super::*; + // TODO: may only return a RowRef + pub type StreamChunkRowIterator<'a> = impl Iterator + 'a; + + impl<'a> JavaBindingIteratorInner<'a> { + pub(super) fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> { + JavaBindingIteratorInner::StreamChunk( + chunk + .rows() + .map(|(op, row)| (op.to_protobuf(), row.to_owned_row())), + ) + } + } +} +pub use opaque_type::StreamChunkRowIterator; pub type HummockJavaBindingIterator = BoxStream<'static, anyhow::Result<(Bytes, OwnedRow)>>; - pub enum JavaBindingIteratorInner<'a> { Hummock(HummockJavaBindingIterator), StreamChunk(StreamChunkRowIterator<'a>), } -impl<'a> JavaBindingIteratorInner<'a> { - fn from_chunk(chunk: &'a StreamChunk) -> JavaBindingIteratorInner<'a> { - JavaBindingIteratorInner::StreamChunk( - chunk - .rows() - .map(|(op, row)| (op.to_protobuf(), row.to_owned_row())), - ) - } -} - enum RowExtra { Op(Op), Key(Bytes), diff --git a/src/jni_core/src/macros.rs b/src/jni_core/src/macros.rs index 982ccda06ecf0..2e7d095e0bd45 100644 --- a/src/jni_core/src/macros.rs +++ b/src/jni_core/src/macros.rs @@ -375,7 +375,6 @@ macro_rules! to_jvalue { /// Generate the jni signature of a given function /// ``` -/// #![feature(lazy_cell)] /// use risingwave_jni_core::gen_jni_sig; /// assert_eq!(gen_jni_sig!(boolean f(int, short, byte[])), "(IS[B)Z"); /// assert_eq!( diff --git a/src/license/src/lib.rs b/src/license/src/lib.rs index 0e641be9789b1..7f2b25d8f3fb4 100644 --- a/src/license/src/lib.rs +++ b/src/license/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - mod feature; mod manager; diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 80a83349f2cc7..9ab248802772e 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -14,7 +14,6 @@ #![feature(lint_reasons)] #![feature(let_chains)] -#![feature(lazy_cell)] #![feature(impl_trait_in_assoc_type)] #![cfg_attr(coverage, feature(coverage_attribute))] diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 71c99a7e065b4..811b3b152d061 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -20,7 +20,6 @@ #![feature(extract_if)] #![feature(hash_extract_if)] #![feature(btree_extract_if)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(error_generic_member_access)] #![feature(assert_matches)] diff --git a/src/object_store/src/lib.rs b/src/object_store/src/lib.rs index 811d482587acb..d9e768b7f0290 100644 --- a/src/object_store/src/lib.rs +++ b/src/object_store/src/lib.rs @@ -14,7 +14,6 @@ #![feature(trait_alias)] #![feature(type_alias_impl_trait)] -#![feature(lazy_cell)] #![feature(lint_reasons)] #![feature(error_generic_member_access)] #![feature(let_chains)] diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 72ec08c46537b..8ee8dc078fe17 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,6 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![expect( + unexpected_cfgs, + reason = "feature(hdfs-backend) is banned https://github.com/risingwavelabs/risingwave/pull/7875" +)] + pub mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 0ef12f3da3a3f..001eb8128a5b2 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -116,10 +116,7 @@ impl S3StreamingUploader { /// Reference: const MIN_PART_SIZE: usize = 5 * 1024 * 1024; const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024; - let part_size = config - .upload_part_size - .min(MAX_PART_SIZE) - .max(MIN_PART_SIZE); + let part_size = config.upload_part_size.clamp(MIN_PART_SIZE, MAX_PART_SIZE); Self { client, diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 6b569277c54dd..dbb150030194e 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -21,7 +21,6 @@ #![feature(map_try_insert)] #![feature(hash_extract_if)] #![feature(btree_extract_if)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(error_generic_member_access)] #![cfg_attr(coverage, feature(coverage_attribute))] diff --git a/src/storage/benches/bench_block_iter.rs b/src/storage/benches/bench_block_iter.rs index f58499e07282f..a0ea7cfd844d1 100644 --- a/src/storage/benches/bench_block_iter.rs +++ b/src/storage/benches/bench_block_iter.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] use std::sync::LazyLock; use bytes::{BufMut, Bytes, BytesMut}; diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index aeeee6927d36a..531652065f014 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::{Arc, LazyLock}; diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 2ad827d8cae5f..c894ecf184128 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -22,7 +22,6 @@ #![feature(is_sorted)] #![feature(let_chains)] #![feature(btree_cursors)] -#![feature(lazy_cell)] mod key_cmp; use std::cmp::Ordering; diff --git a/src/storage/hummock_trace/src/lib.rs b/src/storage/hummock_trace/src/lib.rs index 64417832206e0..48b0a71010a74 100644 --- a/src/storage/hummock_trace/src/lib.rs +++ b/src/storage/hummock_trace/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] #![feature(cursor_remaining)] #![feature(trait_alias)] #![feature(coroutines)] diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index ab69c45093b46..e11d3e1cee1ca 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -31,12 +31,10 @@ #![feature(is_sorted)] #![feature(btree_extract_if)] #![feature(exact_size_is_empty)] -#![feature(lazy_cell)] #![cfg_attr(coverage, feature(coverage_attribute))] #![recursion_limit = "256"] #![feature(error_generic_member_access)] #![feature(let_chains)] -#![feature(exclusive_range_pattern)] #![feature(impl_trait_in_assoc_type)] #![feature(maybe_uninit_uninit_array)] #![feature(maybe_uninit_array_assume_init)] diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index fbff016589445..53643c47b5411 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -193,21 +193,24 @@ impl ChangeLogValue { } pub fn into_op_value_iter(self) -> impl Iterator { - std::iter::from_coroutine(move || match self { - Self::Insert(row) => { - yield (Op::Insert, row); - } - Self::Delete(row) => { - yield (Op::Delete, row); - } - Self::Update { - old_value, - new_value, - } => { - yield (Op::UpdateDelete, old_value); - yield (Op::UpdateInsert, new_value); - } - }) + std::iter::from_coroutine( + #[coroutine] + move || match self { + Self::Insert(row) => { + yield (Op::Insert, row); + } + Self::Delete(row) => { + yield (Op::Delete, row); + } + Self::Update { + old_value, + new_value, + } => { + yield (Op::UpdateDelete, old_value); + yield (Op::UpdateInsert, new_value); + } + }, + ) } } diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 7e1c247586241..3ee9e849dda4c 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -42,9 +42,27 @@ use crate::monitor::{ use crate::opts::StorageOpts; use crate::StateStore; -pub type HummockStorageType = impl StateStore + AsHummock; -pub type MemoryStateStoreType = impl StateStore + AsHummock; -pub type SledStateStoreType = impl StateStore + AsHummock; +mod opaque_type { + use super::*; + + pub type HummockStorageType = impl StateStore + AsHummock; + pub type MemoryStateStoreType = impl StateStore + AsHummock; + pub type SledStateStoreType = impl StateStore + AsHummock; + + pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType { + may_dynamic_dispatch(state_store) + } + + pub fn hummock(state_store: HummockStorage) -> HummockStorageType { + may_dynamic_dispatch(may_verify(state_store)) + } + + pub fn sled(state_store: SledStateStore) -> SledStateStoreType { + may_dynamic_dispatch(state_store) + } +} +use opaque_type::{hummock, in_memory, sled}; +pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType}; /// The type erased [`StateStore`]. #[derive(Clone, EnumAsInner)] @@ -114,7 +132,7 @@ impl StateStoreImpl { storage_metrics: Arc, ) -> Self { // The specific type of MemoryStateStoreType in deducted here. - Self::MemoryStateStore(may_dynamic_dispatch(state_store).monitored(storage_metrics)) + Self::MemoryStateStore(in_memory(state_store).monitored(storage_metrics)) } pub fn hummock( @@ -122,16 +140,14 @@ impl StateStoreImpl { storage_metrics: Arc, ) -> Self { // The specific type of HummockStateStoreType in deducted here. - Self::HummockStateStore( - may_dynamic_dispatch(may_verify(state_store)).monitored(storage_metrics), - ) + Self::HummockStateStore(hummock(state_store).monitored(storage_metrics)) } pub fn sled( state_store: SledStateStore, storage_metrics: Arc, ) -> Self { - Self::SledStateStore(may_dynamic_dispatch(state_store).monitored(storage_metrics)) + Self::SledStateStore(sled(state_store).monitored(storage_metrics)) } pub fn shared_in_memory_store(storage_metrics: Arc) -> Self { diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index a3fb15eef544a..876deabc80f98 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -29,7 +29,6 @@ #![feature(map_try_insert)] #![feature(never_type)] #![feature(btreemap_alloc)] -#![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(btree_extract_if)] #![feature(iter_order_by)] diff --git a/src/tests/simulation/src/lib.rs b/src/tests/simulation/src/lib.rs index ae2614e094a6d..aa6303b8e2f65 100644 --- a/src/tests/simulation/src/lib.rs +++ b/src/tests/simulation/src/lib.rs @@ -14,7 +14,6 @@ #![feature(trait_alias)] #![feature(lint_reasons)] -#![feature(lazy_cell)] #![feature(let_chains)] #![feature(try_blocks)] #![feature(register_tool)] diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 31cc488044648..e45c4a3285ac3 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -13,7 +13,6 @@ // limitations under the License. #![cfg_attr(not(madsim), allow(dead_code))] -#![feature(lazy_cell)] use std::path::PathBuf; diff --git a/src/tests/simulation/tests/integration_tests/main.rs b/src/tests/simulation/tests/integration_tests/main.rs index 0ad0725dde405..fe57a2c896f18 100644 --- a/src/tests/simulation/tests/integration_tests/main.rs +++ b/src/tests/simulation/tests/integration_tests/main.rs @@ -18,7 +18,6 @@ //! for the rationale behind this approach. #![feature(stmt_expr_attributes)] -#![feature(lazy_cell)] #![feature(extract_if)] mod backfill_tests; diff --git a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs index 30d9e0c73fecd..e87659d4f54db 100644 --- a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs +++ b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs @@ -81,7 +81,9 @@ async fn test_delta_join() -> Result<()> { .assert_result_eq(result); #[allow(unused_assignments)] - test_times += 1; + { + test_times += 1; + } }; } diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs index 0c215c9146a45..2eb9c59d1bf38 100644 --- a/src/tests/sqlsmith/src/lib.rs +++ b/src/tests/sqlsmith/src/lib.rs @@ -14,7 +14,6 @@ #![feature(let_chains)] #![feature(if_let_guard)] -#![feature(lazy_cell)] #![feature(box_patterns)] #![feature(register_tool)] #![register_tool(rw)] diff --git a/src/utils/pgwire/src/lib.rs b/src/utils/pgwire/src/lib.rs index 2279156d89b37..8d1c00541bb95 100644 --- a/src/utils/pgwire/src/lib.rs +++ b/src/utils/pgwire/src/lib.rs @@ -16,7 +16,6 @@ #![feature(trait_alias)] #![feature(iterator_try_collect)] #![feature(trusted_len)] -#![feature(lazy_cell)] #![feature(buf_read_has_data_left)] #![feature(round_char_boundary)] #![feature(never_type)]