diff --git a/parquet-variant-compute/benches/variant_kernels.rs b/parquet-variant-compute/benches/variant_kernels.rs index 4526713570be..f12887152c70 100644 --- a/parquet-variant-compute/benches/variant_kernels.rs +++ b/parquet-variant-compute/benches/variant_kernels.rs @@ -17,10 +17,12 @@ use arrow::array::{Array, ArrayRef, StringArray}; use arrow::util::test_util::seedable_rng; +use arrow_schema::{DataType, Field, Fields}; +use chrono::{DateTime, Utc}; use criterion::{Criterion, criterion_group, criterion_main}; -use parquet_variant::{Variant, VariantBuilder}; +use parquet_variant::{Uuid, Variant, VariantBuilder, VariantBuilderExt}; use parquet_variant_compute::{ - GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, variant_get, + GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, shred_variant, variant_get, }; use rand::Rng; use rand::SeedableRng; @@ -28,7 +30,8 @@ use rand::distr::Alphanumeric; use rand::rngs::StdRng; use std::fmt::Write; use std::sync::Arc; -fn benchmark_batch_json_string_to_variant(c: &mut Criterion) { + +fn json_to_variant_bench(c: &mut Criterion) { let input_array = StringArray::from_iter_values(json_repeated_struct(8000)); let array_ref: ArrayRef = Arc::new(input_array); c.bench_function( @@ -94,14 +97,58 @@ pub fn variant_get_bench(c: &mut Criterion) { }; c.bench_function("variant_get_primitive", |b| { - b.iter(|| variant_get(&input.clone(), options.clone())) + b.iter(|| variant_get(&input, options.clone())) + }); +} + +pub fn shred_variant_bench(c: &mut Criterion) { + // This benchmark models shredding semi-structured log entries + // where each entry has a common set of fields, some optional fields, + // and some random extra fields. + // + let mut generator = VariantLogGenerator { + rows_per_batch: 8192, + optional_field_prob: 0.0, + extra_field_prob: 0.0, + rng: StdRng::seed_from_u64(42), + }; + + // shred out the common and optional fields + // leaving the rest in the value field + let shredding_schema = DataType::Struct(Fields::from(vec![ + Field::new("timestamp", DataType::Utf8, true), + Field::new("level", DataType::Utf8, true), + Field::new("message", DataType::Utf8, true), + Field::new("user_id", DataType::Int64, true), + Field::new("session_id", DataType::Utf8, true), + ])); + + // Variants have only required fields + let variant_array = generator.next().unwrap(); + c.bench_function("shred_variant common fields", |b| { + b.iter(|| shred_variant(&variant_array, &shredding_schema)) + }); + + // Variants with some optional fields + generator.optional_field_prob = 0.5; + let variant_array = generator.next().unwrap(); + c.bench_function("shred_variant optional fields", |b| { + b.iter(|| shred_variant(&variant_array, &shredding_schema)) + }); + + // Variants with optional fields and random extra fields + generator.extra_field_prob = 0.7; + let variant_array = generator.next().unwrap(); + c.bench_function("shred_variant extra fields", |b| { + b.iter(|| shred_variant(&variant_array, &shredding_schema)) }); } criterion_group!( benches, variant_get_bench, - benchmark_batch_json_string_to_variant + shred_variant_bench, + json_to_variant_bench ); criterion_main!(benches); @@ -362,3 +409,116 @@ impl RandomJsonGenerator { panic!("Random value did not match any type"); } } + +/// Data generator for VariantArrays that simulate structured log entries. +/// +/// Each entry is an object with +/// 1. fields like "timestamp", "level", "message", that always appear +/// 2. Fields like "user_id", "session_id", that appear in some entries +/// 3. Arbitrary extra fields that should be preserved in the value field +/// +/// +/// Example entries: +/// ```json +/// { +/// "timestamp": "2024-10-01T12:00:00", -- always present +/// "level": "INFO", -- always present +/// "message": "User logged in", -- always present +/// "user_id": 12345, -- optional +/// "session_id": "abcde", -- optional +/// "extra_field_123": "extra_value_456" -- arbitrary extra field +/// } +/// ``` +struct VariantLogGenerator { + /// How many rows per batch + rows_per_batch: usize, + /// Probability of including optional fields (0 to 1) + optional_field_prob: f64, + /// Probability of including extra arbitrary fields (0 to 1) + extra_field_prob: f64, + /// Random number generator + rng: StdRng, +} + +impl Iterator for VariantLogGenerator { + type Item = VariantArray; + + fn next(&mut self) -> Option { + Some(self.next_array()) + } +} + +impl VariantLogGenerator { + fn next_array(&mut self) -> VariantArray { + let mut builder = VariantArrayBuilder::new(1000); + for _ in 0..self.rows_per_batch { + let mut obj_builder = builder.new_object(); + + obj_builder = obj_builder + .with_field("timestamp", self.random_timestamp()) + .with_field("level", self.random_level()) + .with_field("message", self.random_message()); + + // Optional fields + if self.rng.random::() < self.optional_field_prob { + obj_builder = obj_builder.with_field("user_id", self.rng.random_range(1000..9999)); + } + if self.rng.random::() < self.optional_field_prob { + obj_builder = obj_builder.with_field("session_id", Uuid::new_v4()); + } + + // Random extra fields + if self.rng.random::() < self.extra_field_prob { + let num_extra_fields = self.rng.random_range(1..4); + for _ in 0..num_extra_fields { + // totally random field name and value (modeling random logging payloads) + let field_name = format!("extra_field_{}", self.rng.random_range(1..100000000)); + let field_value = + format!("extra_value_{}", self.rng.random_range(1..100000000)); + obj_builder = obj_builder.with_field(field_name.as_str(), field_value.as_str()); + } + } + + obj_builder.finish(); + } + builder.build() + } + + fn random_timestamp(&mut self) -> DateTime { + // random timestamp + let hour = self.rng.random_range(0..24); + let minute = self.rng.random_range(0..60); + let second = self.rng.random_range(0..60); + let day = self.rng.random_range(1..28); + let month = self.rng.random_range(1..=12); + let year = self.rng.random_range(2020..=2024); + let naive = chrono::NaiveDate::from_ymd_opt(year, month, day) + .unwrap() + .and_hms_opt(hour, minute, second) + .unwrap(); + DateTime::from_naive_utc_and_offset(naive, Utc) + } + + /// Random level from ["DEBUG", "INFO", "WARN", "ERROR"] + fn random_level(&mut self) -> &'static str { + let levels = ["DEBUG", "INFO", "WARN", "ERROR"]; + levels[self.rng.random_range(0..levels.len())] + } + + /// Generate a random log message + fn random_message(&mut self) -> &str { + let messages = [ + "User logged in", + "User logged out", + "File not found", + "Connection established", + "Error processing request", + // a few longer messages + "Database connection timed out after multiple attempts", + "User attempted to access restricted resource without proper authorization", + "Scheduled maintenance will occur at midnight UTC", + ]; + + messages[self.rng.random_range(0..messages.len())] + } +}